Learning Platform
Глоссарий Troubleshooting
Урок 15.03 · 32 мин
Начальный
resiliencecircuit-breakerpybreakertestscoverageCIpre-commit

Capstone: resilience и тесты — circuit breaker, идемпотентность, тесты, CI

В прошлом уроке мы написали ETL и научили его повторять запросы при ошибках. Этого недостаточно для прода. Если внешний API лёг на час — наш pipeline будет упорно стучаться к нему 5 раз × 60 секунд = 5 минут на каждый запрос. На 10 монетах × 4 репозиториях это часы потерянного CPU и риск получить бан за «упорство».

Решение — circuit breaker: после N подряд ошибок «открыть» цепь и не отправлять новые запросы, пока сервер не восстановится. И, конечно, тесты — без них любой нетривиальный pipeline становится legacy на третий день.

В этом уроке: circuit breaker через pybreaker, идемпотентность через state, полная стратегия тестирования (unit/mock/VCR/e2e), coverage, pre-commit hook, CI на GitHub Actions.


Mocking HTTP-запросов: responses, respx, VCR

Resilience: timeout, retry, circuit breaker

Три уровня защиты от внешних сбоев. Каждый решает свою задачу.

Три слоя resilience

Timeout -- первый. Retry -- второй. Circuit breaker -- третий

TimeoutКаждый HTTP-запрос с явным timeout (connect 5s, read 30s). Без него зависший сервер блокирует поток на минуты. Базовый уровень -- без него остальное бессмысленно
ловит зависаниеСервер принял TCP, но не отвечает. Без timeout клиент висит до OS-default -- обычно 2 часа
RetryПри временной ошибке (5xx, network) повторить с экспоненциальным backoff. Идемпотентные методы безопасны для retry. POST -- осторожно
ловит флакиСетевой пакет потерялся, сервер кратко перегрузился. Через секунду уже работает. Retry скрывает такие флуктуации
Circuit breakerПосле N подряд ошибок цепь 'открывается' и все следующие вызовы fail-fast без HTTP. Через cooldown цепь 'half-open' -- пробует один запрос. Успех -- закрывает цепь, неудача -- снова открывает
ловит длинный outageСервер упал не на секунду, а на час. Без breaker мы будем 5 минут × N запросов биться в стену. С breaker -- fail-fast, освобождаем CPU, не злим API

Circuit breaker через pybreaker

pybreaker — реализация паттерна circuit breaker в Python. Версия 1.x на 2026 год.

# src/crypto_etl/clients/_breaker.py
from pybreaker import CircuitBreaker, CircuitBreakerError, CircuitBreakerListener
from loguru import logger


class LoggingListener(CircuitBreakerListener):
    """Логирует переходы breaker'а -- чтобы видеть в логах, когда сорвался."""

    def state_change(self, cb, old_state, new_state):
        logger.warning(
            "Circuit breaker '{}' state: {} -> {}",
            cb.name, old_state.name, new_state.name,
        )


coingecko_breaker = CircuitBreaker(
    fail_max=5,
    reset_timeout=60,
    name="coingecko",
    listeners=[LoggingListener()],
)

github_breaker = CircuitBreaker(
    fail_max=10,
    reset_timeout=120,
    name="github",
    listeners=[LoggingListener()],
)

fail_max=5 — после 5 подряд провалов breaker откроется. reset_timeout=60 — через 60 секунд breaker перейдёт в half-open: пропустит один пробный запрос; успех закрывает цепь, неудача снова открывает на 60 секунд.

Использование (обернуть _get в client):

# src/crypto_etl/clients/coingecko.py (фрагмент)
from crypto_etl.clients._breaker import coingecko_breaker

class CoinGeckoClient:
    @retry_decorator
    async def _get(self, path: str, params: dict | None = None) -> dict:
        return await coingecko_breaker.call_async(self._raw_get, path, params)

    async def _raw_get(self, path: str, params: dict | None) -> dict:
        # ... реальный HTTP-запрос как раньше
        ...

Когда breaker открыт, call_async мгновенно выкидывает CircuitBreakerError без HTTP. Retry-декоратор видит это исключение и… по умолчанию tenacity его повторит, что неправильно. Нужно явно исключить CircuitBreakerError из retry:

from pybreaker import CircuitBreakerError

retry_decorator = retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=2, min=2, max=60) + wait_random(0, 1),
    retry=retry_if_exception_type(
        (httpx.ConnectError, httpx.ReadTimeout, httpx.RemoteProtocolError, TransientError)
    ),
    retry_error_callback=lambda rs: rs.outcome.result(),
    reraise=True,
)

retry_if_exception_type уже исключает CircuitBreakerError, потому что мы не указали его в кортеже. Это правильно: открытый breaker — это сигнал «не пытайся прямо сейчас», не сигнал «попробуй ещё раз через секунду».

WARNING

Главная ошибка с breaker — оборачивать им то, что не должно. Breaker имеет смысл только для одного external endpoint per breaker. Нельзя один breaker на весь pipeline — иначе одна сбойная зависимость отключит всё. Идеал: один breaker = одна зависимость.


Идемпотентность через state

Мы уже обсудили в дизайне. Напомним суть и разберём edge cases.

State file data/state.json хранит last_completed_hour. Pipeline на старте читает state, обрабатывает только нужные часы, обновляет state ПОСЛЕ успешной записи Parquet. При повторе:

  • Если успели записать Parquet, но не сохранили state — повторим час, перезапишем Parquet с тем же fetched_at. Не критично.
  • Если не успели записать Parquet — state не обновился, при следующем запуске повторим, всё запишется.
  • Если процесс убили посередине записи Parquet — атомарная запись (write-to-tmp + rename) гарантирует, что в final-пути либо старый файл, либо новый, никогда не битый.

Risk: одновременный запуск двух экземпляров pipeline. Что произойдёт:

# Псевдокод race condition:
# t=0:  process A: load_state() -> last="09:00"
# t=0:  process B: load_state() -> last="09:00"
# t=1:  process A: pull_hour(10:00) -> write Parquet
# t=1:  process B: pull_hour(10:00) -> write Parquet (перезапишет файл A)
# t=2:  process A: save_state(last="10:00")
# t=2:  process B: save_state(last="10:00")

Файл будет перезаписан, но данные те же. Не катастрофа, но 2x трафика к API и риск 429. В проде защита — distributed lock (Redis SETNX, file lock через fcntl, или — что чаще — Airflow guarantee single concurrency).

Для capstone — file lock через fcntl:

# src/crypto_etl/state.py (фрагмент)
import fcntl
from contextlib import contextmanager

@contextmanager
def file_lock(path: Path):
    """Эксклюзивный лок на файл. Кто пришёл вторым -- упадёт сразу."""
    path.parent.mkdir(parents=True, exist_ok=True)
    f = open(path, "w")
    try:
        fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
        yield
    except BlockingIOError:
        f.close()
        raise RuntimeError(f"Another instance is running (lock {path} held)")
    else:
        fcntl.flock(f, fcntl.LOCK_UN)
        f.close()
# run.py (main):
async def main():
    settings = get_settings()
    lock_path = settings.data_dir / ".lock"
    try:
        with file_lock(lock_path):
            # ... весь pipeline
            pass
    except RuntimeError as e:
        logger.error("{}", e)
        return

Теперь параллельный запуск второго экземпляра упадёт с понятным сообщением и не нагрузит API.


Стратегия тестирования: четыре уровня

Полный test pyramid для нашего ETL:

Test pyramid для ETL pipeline

Снизу -- много дешёвых, сверху -- мало дорогих

Unit (transforms)Pure-функции trasform_prices, transform_repo. Передали dict, проверили модель. Сотни тестов, каждый порядка 10ms. Не требуют HTTP, моков, дисков
Mock (clients via respx)Тесты CoinGeckoClient/GitHubClient с замоканным HTTP. Проверяют: retry на 500, обработка 429, корректные headers, правильный URL
VCR (cassettes от реального API)Один раз сделали реальный запрос к CoinGecko/GitHub, записали YAML cassette, теперь повторно проигрываем. Проверяют, что схема ответа от API не сломалась
E2E dry-runЗапуск всего pipeline на тестовых данных в isolated tmp_path. Проверяет, что pieces правильно склеены: clients -> transforms -> storage. Минимум, один-два теста

Unit-тесты transforms

# tests/test_transforms_prices.py
from datetime import datetime, timezone
from decimal import Decimal

import pytest

from crypto_etl.transforms.prices import transform_prices


@pytest.fixture
def fetched_at():
    return datetime(2026, 5, 15, 10, 0, 5, tzinfo=timezone.utc)


@pytest.fixture
def hour_ts():
    return datetime(2026, 5, 15, 10, 0, 0, tzinfo=timezone.utc)


def test_transform_prices_basic(hour_ts, fetched_at):
    raw = {
        "bitcoin": {"usd": 65000.5, "usd_market_cap": 1.2e12, "usd_24h_vol": 2.5e10},
        "ethereum": {"usd": 3500.25, "usd_market_cap": 4.2e11, "usd_24h_vol": 1.5e10},
    }
    records = transform_prices(raw, ts=hour_ts, fetched_at=fetched_at)

    assert len(records) == 2
    btc = next(r for r in records if r.coin_id == "bitcoin")
    assert btc.price_usd == Decimal("65000.5")
    assert btc.ts == hour_ts
    assert btc.fetched_at == fetched_at


def test_transform_prices_decimal_precision(hour_ts, fetched_at):
    raw = {"bitcoin": {"usd": 65000.123456789}}
    records = transform_prices(raw, ts=hour_ts, fetched_at=fetched_at)
    assert records[0].price_usd == Decimal("65000.123456789")


def test_transform_prices_missing_optional_fields(hour_ts, fetched_at):
    raw = {"bitcoin": {"usd": 65000.0}}
    records = transform_prices(raw, ts=hour_ts, fetched_at=fetched_at)

    assert records[0].market_cap_usd is None
    assert records[0].volume_24h_usd is None


def test_transform_prices_empty_input(hour_ts, fetched_at):
    records = transform_prices({}, ts=hour_ts, fetched_at=fetched_at)
    assert records == []


@pytest.mark.parametrize("invalid_price", [0, -100, "not-a-number"])
def test_transform_prices_invalid_price_raises(invalid_price, hour_ts, fetched_at):
    raw = {"bitcoin": {"usd": invalid_price}}
    with pytest.raises(Exception):
        transform_prices(raw, ts=hour_ts, fetched_at=fetched_at)

Mock-тесты для clients (respx)

# tests/test_clients_coingecko.py
import httpx
import pytest
import respx

from crypto_etl.clients.coingecko import CoinGeckoClient


@pytest.fixture
def respx_mock():
    with respx.mock(base_url="https://api.coingecko.com/api/v3") as router:
        yield router


@pytest.mark.asyncio
async def test_get_prices_basic(respx_mock):
    respx_mock.get("/simple/price").mock(
        return_value=httpx.Response(
            200,
            json={"bitcoin": {"usd": 65000.0, "usd_market_cap": 1.2e12, "usd_24h_vol": 2.5e10}},
        )
    )

    async with CoinGeckoClient(api_key="test-key") as cg:
        result = await cg.get_prices(["bitcoin"])

    assert result["bitcoin"]["usd"] == 65000.0


@pytest.mark.asyncio
async def test_get_prices_sends_api_key(respx_mock):
    route = respx_mock.get("/simple/price").mock(
        return_value=httpx.Response(200, json={"bitcoin": {"usd": 65000.0}})
    )

    async with CoinGeckoClient(api_key="my-secret-key") as cg:
        await cg.get_prices(["bitcoin"])

    request = route.calls.last.request
    assert request.headers["x-cg-pro-api-key"] == "my-secret-key"


@pytest.mark.asyncio
async def test_get_prices_429_retries(respx_mock):
    respx_mock.get("/simple/price").mock(
        side_effect=[
            httpx.Response(429, headers={"Retry-After": "1"}, json={"error": "rate limit"}),
            httpx.Response(429, headers={"Retry-After": "1"}, json={"error": "rate limit"}),
            httpx.Response(200, json={"bitcoin": {"usd": 65000.0}}),
        ]
    )

    async with CoinGeckoClient(api_key="test") as cg:
        result = await cg.get_prices(["bitcoin"])

    assert result["bitcoin"]["usd"] == 65000.0
    assert respx_mock.calls.call_count == 3


@pytest.mark.asyncio
async def test_get_prices_500_retries(respx_mock):
    respx_mock.get("/simple/price").mock(
        side_effect=[
            httpx.Response(500),
            httpx.Response(503),
            httpx.Response(200, json={"bitcoin": {"usd": 65000.0}}),
        ]
    )

    async with CoinGeckoClient(api_key="test") as cg:
        await cg.get_prices(["bitcoin"])

    assert respx_mock.calls.call_count == 3


@pytest.mark.asyncio
async def test_get_prices_404_does_not_retry(respx_mock):
    respx_mock.get("/simple/price").mock(
        return_value=httpx.Response(404, json={"error": "Not found"})
    )

    async with CoinGeckoClient(api_key="test") as cg:
        with pytest.raises(httpx.HTTPStatusError):
            await cg.get_prices(["nonexistent"])

    assert respx_mock.calls.call_count == 1


@pytest.mark.asyncio
async def test_get_prices_connect_error_retries(respx_mock):
    respx_mock.get("/simple/price").mock(
        side_effect=[
            httpx.ConnectError("network down"),
            httpx.ConnectError("network down"),
            httpx.Response(200, json={"bitcoin": {"usd": 65000.0}}),
        ]
    )

    async with CoinGeckoClient(api_key="test") as cg:
        await cg.get_prices(["bitcoin"])

    assert respx_mock.calls.call_count == 3

VCR-тесты для real responses

# tests/test_clients_coingecko_vcr.py
import pytest
from crypto_etl.clients.coingecko import CoinGeckoClient


@pytest.fixture(scope="session")
def vcr_config():
    return {
        "record_mode": "once",
        "filter_headers": [
            ("x-cg-pro-api-key", "REDACTED"),
            ("Authorization", "REDACTED"),
        ],
        "filter_query_parameters": [("api_key", "REDACTED")],
    }


@pytest.mark.vcr
@pytest.mark.asyncio
async def test_get_prices_real_response_shape():
    """Что реально возвращает CoinGecko в /simple/price."""
    async with CoinGeckoClient(api_key="dummy") as cg:
        result = await cg.get_prices(["bitcoin", "ethereum"])

    assert "bitcoin" in result
    assert "ethereum" in result
    assert "usd" in result["bitcoin"]
    assert isinstance(result["bitcoin"]["usd"], (int, float))
    assert result["bitcoin"]["usd"] > 0

Локально (с настоящим API-ключом): запускаем тест -> cassette записывается. Открываем cassette, проверяем что нет реального ключа. Коммитим в репо. На CI всё работает оффлайн.

E2E тест на dry-run

# tests/test_e2e.py
from datetime import datetime, timezone
from pathlib import Path

import httpx
import pytest
import pyarrow.parquet as pq
import respx

from crypto_etl.clients.coingecko import CoinGeckoClient
from crypto_etl.clients.github import GitHubClient
from crypto_etl.transforms.prices import transform_prices
from crypto_etl.storage.parquet_writer import write_partition


@pytest.mark.asyncio
async def test_e2e_prices_pipeline(tmp_path: Path):
    """Pull -> transform -> write. Один час, одна монета."""
    hour = datetime(2026, 5, 15, 10, 0, 0, tzinfo=timezone.utc)
    fetched_at = datetime(2026, 5, 15, 10, 0, 5, tzinfo=timezone.utc)

    with respx.mock(base_url="https://api.coingecko.com/api/v3") as router:
        router.get("/simple/price").mock(
            return_value=httpx.Response(
                200,
                json={"bitcoin": {"usd": 65000.0, "usd_market_cap": 1.2e12, "usd_24h_vol": 2.5e10}},
            )
        )
        async with CoinGeckoClient(api_key="test") as cg:
            raw = await cg.get_prices(["bitcoin"])
        records = transform_prices(raw, ts=hour, fetched_at=fetched_at)
        path = write_partition(
            records,
            base_dir=tmp_path,
            table_name="prices",
            partition_date=hour.date(),
            file_suffix="10",
        )

    assert path.exists()
    assert "dt=2026-05-15" in str(path)

    table = pq.read_table(path)
    assert table.num_rows == 1
    df = table.to_pydict()
    assert df["coin_id"] == ["bitcoin"]
    assert float(df["price_usd"][0]) == 65000.0

tmp_path — встроенная фикстура pytest, возвращает временный каталог, очищается после теста. Никогда не пишите тесты в реальный data/ — будут засорять репо и ломать другие тесты.


Coverage

pip install coverage
# или установлено через optional-dependencies
coverage run -m pytest tests/
coverage report -m
coverage html

Целевой coverage для core логики (transforms, storage) — 95%+. Для clients — 80%+. Для orchestrator (run.py) — 60-70% достаточно (там в основном glue-код).

# pyproject.toml -- фрагмент
[tool.coverage.run]
source = ["src/crypto_etl"]
branch = true
omit = ["*/tests/*"]

[tool.coverage.report]
exclude_lines = [
    "pragma: no cover",
    "if __name__ == .__main__.:",
    "raise NotImplementedError",
]

Pre-commit hook

Pre-commit запускает линтеры до того, как код уйдёт в коммит. Не даёт случайно закоммитить криво отформатированный код или код с типовыми багами.

# .pre-commit-config.yaml
repos:
  - repo: https://github.com/astral-sh/ruff-pre-commit
    rev: v0.7.4
    hooks:
      - id: ruff
        args: [--fix]
      - id: ruff-format

  - repo: https://github.com/pre-commit/mirrors-mypy
    rev: v1.13.0
    hooks:
      - id: mypy
        additional_dependencies:
          - pydantic>=2.9
          - types-requests
        args: [--strict]

  - repo: local
    hooks:
      - id: pytest-fast
        name: pytest (fast tests only)
        entry: pytest tests/ -m "not slow" --tb=short -q
        language: system
        pass_filenames: false
        stages: [pre-commit]

      - id: no-secrets-in-cassettes
        name: check cassettes for secrets
        entry: |
          bash -c '
            if grep -rE "Bearer ghp_|Bearer cg-|api_key=[a-zA-Z0-9]{16,}" tests/cassettes/; then
              echo "SECRET DETECTED IN CASSETTE"
              exit 1
            fi
          '
        language: system
        pass_filenames: false

Установка: pre-commit install. После этого при каждом git commit запустятся хуки.

Главный — no-secrets-in-cassettes: grep по cassette-файлам на типичные паттерны токенов. Если разработчик забыл настроить filter_headers и закоммитил cassette с реальным токеном — хук блокирует commit.


CI на GitHub Actions

# .github/workflows/ci.yml
name: CI

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        python-version: ["3.13"]
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: ${{ matrix.python-version }}
          cache: pip

      - name: Install
        run: |
          python -m pip install --upgrade pip
          pip install -e ".[dev]"

      - name: Lint with ruff
        run: ruff check src tests

      - name: Format check
        run: ruff format --check src tests

      - name: Type check with mypy
        run: mypy src

      - name: Test with coverage
        env:
          COINGECKO_API_KEY: dummy
          GITHUB_TOKEN: dummy
        run: |
          coverage run -m pytest tests/ --tb=short
          coverage report -m --fail-under=80

      - name: VCR mode check (CI must not write new cassettes)
        env:
          COINGECKO_API_KEY: dummy
          GITHUB_TOKEN: dummy
        run: pytest tests/ --record-mode=none --tb=short

Ключевые моменты:

  • coverage report --fail-under=80 — если суммарный coverage упал ниже 80%, CI падает. Защита от деградации тестов.
  • --record-mode=none — гарантия, что VCR-тесты не пойдут в реальный API из CI.
  • COINGECKO_API_KEY: dummy — тесты не должны зависеть от реальных секретов; секреты нужны только для re-record cassettes локально.

Чек-лист production-ready клиента

Все слои готовы. Если бы вас спросили на собеседовании «что нужно, чтобы клиент был production-ready», ответ:

Production-ready API client checklist

Ровно столько, сколько нужно. Ни больше ни меньше

TimeoutsConnect + read. Без них клиент висит до OS-default (часы)
Retry с backoffExponential + jitter. Идемпотентные методы безопасны для retry. Stop after N
Circuit breakerPer external dependency. Защищает от длинных outage
Доменные исключенияAuthError, RateLimitError, NotFoundError. Не raw HTTPError
ЛогированиеStructured (JSON) + correlation IDs если есть. Достаточно info для debug в проде
МетрикиCounter on (endpoint, status). Histogram на duration. В Prometheus/Datadog
Секреты вне кодаENV vars, secret manager. Никогда в git, никогда в логах
ТестыUnit (transforms), mock (clients), VCR (real shapes), e2e (smoke)
Schema validationPydantic на каждый response. Поломанная схема = понятная ошибка, а не KeyError на 5-м уровне
ИдемпотентностьПовторный запуск не задваивает данные. State file или uniqueness в БД

Проверка знанийKnowledge check
Команда добавила circuit breaker через pybreaker. После деплоя в проде breaker открывается несколько раз в день для CoinGecko, потом закрывается. В логах видно, что reset_timeout=60s срабатывает, half-open пропускает запрос, тот успешен. Это нормально или сигнал проблемы?
ОтветAnswer

Итог

В этом уроке мы добавили в pipeline три уровня resilience (timeout, retry, circuit breaker) и полную стратегию тестирования (unit, mock, VCR, e2e). Покрытие через coverage, защита от ошибок в коде через ruff/mypy/pre-commit, защита от утечки секретов через cassette-grep хук, гарантия зелёного билда через CI на GitHub Actions.

Pipeline готов к проду. Можно ставить в cron, можно поднимать в Airflow как DAG, можно деплоить в Kubernetes как CronJob. Внутренности одни и те же.

В следующем (последнем) уроке — wrap-up: что повторить, куда расти дальше, какой mindset уносить с курса.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 6. В проде circuit breaker для CoinGecko открывается несколько раз в день, потом через reset_timeout закрывается, half-open пропускает запрос, тот успешен. Это нормально?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4