Capstone: resilience и тесты — circuit breaker, идемпотентность, тесты, CI
В прошлом уроке мы написали ETL и научили его повторять запросы при ошибках. Этого недостаточно для прода. Если внешний API лёг на час — наш pipeline будет упорно стучаться к нему 5 раз × 60 секунд = 5 минут на каждый запрос. На 10 монетах × 4 репозиториях это часы потерянного CPU и риск получить бан за «упорство».
Решение — circuit breaker: после N подряд ошибок «открыть» цепь и не отправлять новые запросы, пока сервер не восстановится. И, конечно, тесты — без них любой нетривиальный pipeline становится legacy на третий день.
В этом уроке: circuit breaker через pybreaker, идемпотентность через state, полная стратегия тестирования (unit/mock/VCR/e2e), coverage, pre-commit hook, CI на GitHub Actions.
Mocking HTTP-запросов: responses, respx, VCR
Resilience: timeout, retry, circuit breaker
Три уровня защиты от внешних сбоев. Каждый решает свою задачу.
Timeout -- первый. Retry -- второй. Circuit breaker -- третий
Circuit breaker через pybreaker
pybreaker — реализация паттерна circuit breaker в Python. Версия 1.x на 2026 год.
# src/crypto_etl/clients/_breaker.py
from pybreaker import CircuitBreaker, CircuitBreakerError, CircuitBreakerListener
from loguru import logger
class LoggingListener(CircuitBreakerListener):
"""Логирует переходы breaker'а -- чтобы видеть в логах, когда сорвался."""
def state_change(self, cb, old_state, new_state):
logger.warning(
"Circuit breaker '{}' state: {} -> {}",
cb.name, old_state.name, new_state.name,
)
coingecko_breaker = CircuitBreaker(
fail_max=5,
reset_timeout=60,
name="coingecko",
listeners=[LoggingListener()],
)
github_breaker = CircuitBreaker(
fail_max=10,
reset_timeout=120,
name="github",
listeners=[LoggingListener()],
)
fail_max=5 — после 5 подряд провалов breaker откроется. reset_timeout=60 — через 60 секунд breaker перейдёт в half-open: пропустит один пробный запрос; успех закрывает цепь, неудача снова открывает на 60 секунд.
Использование (обернуть _get в client):
# src/crypto_etl/clients/coingecko.py (фрагмент)
from crypto_etl.clients._breaker import coingecko_breaker
class CoinGeckoClient:
@retry_decorator
async def _get(self, path: str, params: dict | None = None) -> dict:
return await coingecko_breaker.call_async(self._raw_get, path, params)
async def _raw_get(self, path: str, params: dict | None) -> dict:
# ... реальный HTTP-запрос как раньше
...
Когда breaker открыт, call_async мгновенно выкидывает CircuitBreakerError без HTTP. Retry-декоратор видит это исключение и… по умолчанию tenacity его повторит, что неправильно. Нужно явно исключить CircuitBreakerError из retry:
from pybreaker import CircuitBreakerError
retry_decorator = retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=2, min=2, max=60) + wait_random(0, 1),
retry=retry_if_exception_type(
(httpx.ConnectError, httpx.ReadTimeout, httpx.RemoteProtocolError, TransientError)
),
retry_error_callback=lambda rs: rs.outcome.result(),
reraise=True,
)
retry_if_exception_type уже исключает CircuitBreakerError, потому что мы не указали его в кортеже. Это правильно: открытый breaker — это сигнал «не пытайся прямо сейчас», не сигнал «попробуй ещё раз через секунду».
Главная ошибка с breaker — оборачивать им то, что не должно. Breaker имеет смысл только для одного external endpoint per breaker. Нельзя один breaker на весь pipeline — иначе одна сбойная зависимость отключит всё. Идеал: один breaker = одна зависимость.
Идемпотентность через state
Мы уже обсудили в дизайне. Напомним суть и разберём edge cases.
State file data/state.json хранит last_completed_hour. Pipeline на старте читает state, обрабатывает только нужные часы, обновляет state ПОСЛЕ успешной записи Parquet. При повторе:
- Если успели записать Parquet, но не сохранили state — повторим час, перезапишем Parquet с тем же
fetched_at. Не критично. - Если не успели записать Parquet — state не обновился, при следующем запуске повторим, всё запишется.
- Если процесс убили посередине записи Parquet — атомарная запись (write-to-tmp + rename) гарантирует, что в final-пути либо старый файл, либо новый, никогда не битый.
Risk: одновременный запуск двух экземпляров pipeline. Что произойдёт:
# Псевдокод race condition:
# t=0: process A: load_state() -> last="09:00"
# t=0: process B: load_state() -> last="09:00"
# t=1: process A: pull_hour(10:00) -> write Parquet
# t=1: process B: pull_hour(10:00) -> write Parquet (перезапишет файл A)
# t=2: process A: save_state(last="10:00")
# t=2: process B: save_state(last="10:00")
Файл будет перезаписан, но данные те же. Не катастрофа, но 2x трафика к API и риск 429. В проде защита — distributed lock (Redis SETNX, file lock через fcntl, или — что чаще — Airflow guarantee single concurrency).
Для capstone — file lock через fcntl:
# src/crypto_etl/state.py (фрагмент)
import fcntl
from contextlib import contextmanager
@contextmanager
def file_lock(path: Path):
"""Эксклюзивный лок на файл. Кто пришёл вторым -- упадёт сразу."""
path.parent.mkdir(parents=True, exist_ok=True)
f = open(path, "w")
try:
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
yield
except BlockingIOError:
f.close()
raise RuntimeError(f"Another instance is running (lock {path} held)")
else:
fcntl.flock(f, fcntl.LOCK_UN)
f.close()
# run.py (main):
async def main():
settings = get_settings()
lock_path = settings.data_dir / ".lock"
try:
with file_lock(lock_path):
# ... весь pipeline
pass
except RuntimeError as e:
logger.error("{}", e)
return
Теперь параллельный запуск второго экземпляра упадёт с понятным сообщением и не нагрузит API.
Стратегия тестирования: четыре уровня
Полный test pyramid для нашего ETL:
Снизу -- много дешёвых, сверху -- мало дорогих
Unit-тесты transforms
# tests/test_transforms_prices.py
from datetime import datetime, timezone
from decimal import Decimal
import pytest
from crypto_etl.transforms.prices import transform_prices
@pytest.fixture
def fetched_at():
return datetime(2026, 5, 15, 10, 0, 5, tzinfo=timezone.utc)
@pytest.fixture
def hour_ts():
return datetime(2026, 5, 15, 10, 0, 0, tzinfo=timezone.utc)
def test_transform_prices_basic(hour_ts, fetched_at):
raw = {
"bitcoin": {"usd": 65000.5, "usd_market_cap": 1.2e12, "usd_24h_vol": 2.5e10},
"ethereum": {"usd": 3500.25, "usd_market_cap": 4.2e11, "usd_24h_vol": 1.5e10},
}
records = transform_prices(raw, ts=hour_ts, fetched_at=fetched_at)
assert len(records) == 2
btc = next(r for r in records if r.coin_id == "bitcoin")
assert btc.price_usd == Decimal("65000.5")
assert btc.ts == hour_ts
assert btc.fetched_at == fetched_at
def test_transform_prices_decimal_precision(hour_ts, fetched_at):
raw = {"bitcoin": {"usd": 65000.123456789}}
records = transform_prices(raw, ts=hour_ts, fetched_at=fetched_at)
assert records[0].price_usd == Decimal("65000.123456789")
def test_transform_prices_missing_optional_fields(hour_ts, fetched_at):
raw = {"bitcoin": {"usd": 65000.0}}
records = transform_prices(raw, ts=hour_ts, fetched_at=fetched_at)
assert records[0].market_cap_usd is None
assert records[0].volume_24h_usd is None
def test_transform_prices_empty_input(hour_ts, fetched_at):
records = transform_prices({}, ts=hour_ts, fetched_at=fetched_at)
assert records == []
@pytest.mark.parametrize("invalid_price", [0, -100, "not-a-number"])
def test_transform_prices_invalid_price_raises(invalid_price, hour_ts, fetched_at):
raw = {"bitcoin": {"usd": invalid_price}}
with pytest.raises(Exception):
transform_prices(raw, ts=hour_ts, fetched_at=fetched_at)
Mock-тесты для clients (respx)
# tests/test_clients_coingecko.py
import httpx
import pytest
import respx
from crypto_etl.clients.coingecko import CoinGeckoClient
@pytest.fixture
def respx_mock():
with respx.mock(base_url="https://api.coingecko.com/api/v3") as router:
yield router
@pytest.mark.asyncio
async def test_get_prices_basic(respx_mock):
respx_mock.get("/simple/price").mock(
return_value=httpx.Response(
200,
json={"bitcoin": {"usd": 65000.0, "usd_market_cap": 1.2e12, "usd_24h_vol": 2.5e10}},
)
)
async with CoinGeckoClient(api_key="test-key") as cg:
result = await cg.get_prices(["bitcoin"])
assert result["bitcoin"]["usd"] == 65000.0
@pytest.mark.asyncio
async def test_get_prices_sends_api_key(respx_mock):
route = respx_mock.get("/simple/price").mock(
return_value=httpx.Response(200, json={"bitcoin": {"usd": 65000.0}})
)
async with CoinGeckoClient(api_key="my-secret-key") as cg:
await cg.get_prices(["bitcoin"])
request = route.calls.last.request
assert request.headers["x-cg-pro-api-key"] == "my-secret-key"
@pytest.mark.asyncio
async def test_get_prices_429_retries(respx_mock):
respx_mock.get("/simple/price").mock(
side_effect=[
httpx.Response(429, headers={"Retry-After": "1"}, json={"error": "rate limit"}),
httpx.Response(429, headers={"Retry-After": "1"}, json={"error": "rate limit"}),
httpx.Response(200, json={"bitcoin": {"usd": 65000.0}}),
]
)
async with CoinGeckoClient(api_key="test") as cg:
result = await cg.get_prices(["bitcoin"])
assert result["bitcoin"]["usd"] == 65000.0
assert respx_mock.calls.call_count == 3
@pytest.mark.asyncio
async def test_get_prices_500_retries(respx_mock):
respx_mock.get("/simple/price").mock(
side_effect=[
httpx.Response(500),
httpx.Response(503),
httpx.Response(200, json={"bitcoin": {"usd": 65000.0}}),
]
)
async with CoinGeckoClient(api_key="test") as cg:
await cg.get_prices(["bitcoin"])
assert respx_mock.calls.call_count == 3
@pytest.mark.asyncio
async def test_get_prices_404_does_not_retry(respx_mock):
respx_mock.get("/simple/price").mock(
return_value=httpx.Response(404, json={"error": "Not found"})
)
async with CoinGeckoClient(api_key="test") as cg:
with pytest.raises(httpx.HTTPStatusError):
await cg.get_prices(["nonexistent"])
assert respx_mock.calls.call_count == 1
@pytest.mark.asyncio
async def test_get_prices_connect_error_retries(respx_mock):
respx_mock.get("/simple/price").mock(
side_effect=[
httpx.ConnectError("network down"),
httpx.ConnectError("network down"),
httpx.Response(200, json={"bitcoin": {"usd": 65000.0}}),
]
)
async with CoinGeckoClient(api_key="test") as cg:
await cg.get_prices(["bitcoin"])
assert respx_mock.calls.call_count == 3
VCR-тесты для real responses
# tests/test_clients_coingecko_vcr.py
import pytest
from crypto_etl.clients.coingecko import CoinGeckoClient
@pytest.fixture(scope="session")
def vcr_config():
return {
"record_mode": "once",
"filter_headers": [
("x-cg-pro-api-key", "REDACTED"),
("Authorization", "REDACTED"),
],
"filter_query_parameters": [("api_key", "REDACTED")],
}
@pytest.mark.vcr
@pytest.mark.asyncio
async def test_get_prices_real_response_shape():
"""Что реально возвращает CoinGecko в /simple/price."""
async with CoinGeckoClient(api_key="dummy") as cg:
result = await cg.get_prices(["bitcoin", "ethereum"])
assert "bitcoin" in result
assert "ethereum" in result
assert "usd" in result["bitcoin"]
assert isinstance(result["bitcoin"]["usd"], (int, float))
assert result["bitcoin"]["usd"] > 0
Локально (с настоящим API-ключом): запускаем тест -> cassette записывается. Открываем cassette, проверяем что нет реального ключа. Коммитим в репо. На CI всё работает оффлайн.
E2E тест на dry-run
# tests/test_e2e.py
from datetime import datetime, timezone
from pathlib import Path
import httpx
import pytest
import pyarrow.parquet as pq
import respx
from crypto_etl.clients.coingecko import CoinGeckoClient
from crypto_etl.clients.github import GitHubClient
from crypto_etl.transforms.prices import transform_prices
from crypto_etl.storage.parquet_writer import write_partition
@pytest.mark.asyncio
async def test_e2e_prices_pipeline(tmp_path: Path):
"""Pull -> transform -> write. Один час, одна монета."""
hour = datetime(2026, 5, 15, 10, 0, 0, tzinfo=timezone.utc)
fetched_at = datetime(2026, 5, 15, 10, 0, 5, tzinfo=timezone.utc)
with respx.mock(base_url="https://api.coingecko.com/api/v3") as router:
router.get("/simple/price").mock(
return_value=httpx.Response(
200,
json={"bitcoin": {"usd": 65000.0, "usd_market_cap": 1.2e12, "usd_24h_vol": 2.5e10}},
)
)
async with CoinGeckoClient(api_key="test") as cg:
raw = await cg.get_prices(["bitcoin"])
records = transform_prices(raw, ts=hour, fetched_at=fetched_at)
path = write_partition(
records,
base_dir=tmp_path,
table_name="prices",
partition_date=hour.date(),
file_suffix="10",
)
assert path.exists()
assert "dt=2026-05-15" in str(path)
table = pq.read_table(path)
assert table.num_rows == 1
df = table.to_pydict()
assert df["coin_id"] == ["bitcoin"]
assert float(df["price_usd"][0]) == 65000.0
tmp_path — встроенная фикстура pytest, возвращает временный каталог, очищается после теста. Никогда не пишите тесты в реальный data/ — будут засорять репо и ломать другие тесты.
Coverage
pip install coverage
# или установлено через optional-dependencies
coverage run -m pytest tests/
coverage report -m
coverage html
Целевой coverage для core логики (transforms, storage) — 95%+. Для clients — 80%+. Для orchestrator (run.py) — 60-70% достаточно (там в основном glue-код).
# pyproject.toml -- фрагмент
[tool.coverage.run]
source = ["src/crypto_etl"]
branch = true
omit = ["*/tests/*"]
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"if __name__ == .__main__.:",
"raise NotImplementedError",
]
Pre-commit hook
Pre-commit запускает линтеры до того, как код уйдёт в коммит. Не даёт случайно закоммитить криво отформатированный код или код с типовыми багами.
# .pre-commit-config.yaml
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.7.4
hooks:
- id: ruff
args: [--fix]
- id: ruff-format
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.13.0
hooks:
- id: mypy
additional_dependencies:
- pydantic>=2.9
- types-requests
args: [--strict]
- repo: local
hooks:
- id: pytest-fast
name: pytest (fast tests only)
entry: pytest tests/ -m "not slow" --tb=short -q
language: system
pass_filenames: false
stages: [pre-commit]
- id: no-secrets-in-cassettes
name: check cassettes for secrets
entry: |
bash -c '
if grep -rE "Bearer ghp_|Bearer cg-|api_key=[a-zA-Z0-9]{16,}" tests/cassettes/; then
echo "SECRET DETECTED IN CASSETTE"
exit 1
fi
'
language: system
pass_filenames: false
Установка: pre-commit install. После этого при каждом git commit запустятся хуки.
Главный — no-secrets-in-cassettes: grep по cassette-файлам на типичные паттерны токенов. Если разработчик забыл настроить filter_headers и закоммитил cassette с реальным токеном — хук блокирует commit.
CI на GitHub Actions
# .github/workflows/ci.yml
name: CI
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.13"]
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip
- name: Install
run: |
python -m pip install --upgrade pip
pip install -e ".[dev]"
- name: Lint with ruff
run: ruff check src tests
- name: Format check
run: ruff format --check src tests
- name: Type check with mypy
run: mypy src
- name: Test with coverage
env:
COINGECKO_API_KEY: dummy
GITHUB_TOKEN: dummy
run: |
coverage run -m pytest tests/ --tb=short
coverage report -m --fail-under=80
- name: VCR mode check (CI must not write new cassettes)
env:
COINGECKO_API_KEY: dummy
GITHUB_TOKEN: dummy
run: pytest tests/ --record-mode=none --tb=short
Ключевые моменты:
coverage report --fail-under=80— если суммарный coverage упал ниже 80%, CI падает. Защита от деградации тестов.--record-mode=none— гарантия, что VCR-тесты не пойдут в реальный API из CI.COINGECKO_API_KEY: dummy— тесты не должны зависеть от реальных секретов; секреты нужны только для re-record cassettes локально.
Чек-лист production-ready клиента
Все слои готовы. Если бы вас спросили на собеседовании «что нужно, чтобы клиент был production-ready», ответ:
Ровно столько, сколько нужно. Ни больше ни меньше
Итог
В этом уроке мы добавили в pipeline три уровня resilience (timeout, retry, circuit breaker) и полную стратегию тестирования (unit, mock, VCR, e2e). Покрытие через coverage, защита от ошибок в коде через ruff/mypy/pre-commit, защита от утечки секретов через cassette-grep хук, гарантия зелёного билда через CI на GitHub Actions.
Pipeline готов к проду. Можно ставить в cron, можно поднимать в Airflow как DAG, можно деплоить в Kubernetes как CronJob. Внутренности одни и те же.
В следующем (последнем) уроке — wrap-up: что повторить, куда расти дальше, какой mindset уносить с курса.