Capstone: реализация ETL — httpx, tenacity, Pydantic, pyarrow
В прошлом уроке мы построили дизайн pipeline. Теперь — реальный код. Не псевдокод, а то, что можно запустить, протестировать, поставить в cron и забыть на месяц. Этот урок — самый «инженерный» во всём курсе. Если вы прошли его внимательно, вы знаете, как выглядит production-grade Python ETL в 2026 году.
В уроке: pyproject.toml, конфиг через pydantic-settings, async-клиенты на httpx, retry через tenacity, валидация через Pydantic, запись в Parquet через pyarrow, orchestrator в run.py.
Type hints в Python: аннотации, mypy, pydantic
pyproject.toml — корень проекта
Начинаем с описания зависимостей. PEP 621 + PEP 660 — современный способ задания пакета.
[project]
name = "crypto-etl"
version = "0.1.0"
description = "Multi-source ETL pipeline: CoinGecko + GitHub -> Parquet"
requires-python = ">=3.13"
dependencies = [
"httpx>=0.28,<0.29",
"tenacity>=9.0,<10.0",
"pydantic>=2.9,<3.0",
"pydantic-settings>=2.5,<3.0",
"pyarrow>=17.0,<18.0",
"loguru>=0.7,<1.0",
"pybreaker>=1.2,<2.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.3,<9.0",
"pytest-asyncio>=0.24,<1.0",
"respx>=0.21,<1.0",
"vcrpy>=6.0,<7.0",
"pytest-recording>=0.13,<1.0",
"ruff>=0.7,<1.0",
"mypy>=1.13,<2.0",
"coverage>=7.6,<8.0",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/crypto_etl"]
[tool.ruff]
line-length = 100
target-version = "py313"
[tool.ruff.lint]
select = ["E", "F", "I", "N", "UP", "B", "A", "C4", "SIM"]
[tool.mypy]
python_version = "3.13"
strict = true
files = ["src/crypto_etl"]
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"
Установка: pip install -e ".[dev]". После этого crypto_etl импортируется отовсюду, изменения подхватываются без переустановки (editable install).
Конфигурация: src/crypto_etl/config.py
from functools import lru_cache
from pathlib import Path
from pydantic import Field, SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
)
coingecko_api_key: SecretStr
github_token: SecretStr
data_dir: Path = Path("./data")
log_level: str = "INFO"
coingecko_base_url: str = "https://api.coingecko.com/api/v3"
github_base_url: str = "https://api.github.com"
coins_to_track: list[str] = Field(default_factory=lambda: [
"bitcoin", "ethereum", "solana", "cardano",
])
repos_to_track: list[str] = Field(default_factory=lambda: [
"bitcoin/bitcoin",
"ethereum/go-ethereum",
"solana-labs/solana",
])
request_timeout_seconds: float = 30.0
max_retries: int = 5
backoff_base_seconds: float = 2.0
backoff_max_seconds: float = 60.0
@lru_cache(maxsize=1)
def get_settings() -> Settings:
return Settings()
Ключевые детали:
SecretStrдля токенов — Pydantic не печатает секрет в repr/str. Если случайно залогируете объект Settings, токен останется скрыт.lru_cacheна get_settings — чтобы не парсить .env каждый раз.- Дефолты для всего, кроме секретов — чтобы можно было запустить без .env (упадёт только на validation секретов).
Pydantic-модели: src/crypto_etl/models.py
from datetime import datetime
from decimal import Decimal
from pydantic import BaseModel, ConfigDict, Field
class PriceRecord(BaseModel):
model_config = ConfigDict(frozen=True)
coin_id: str = Field(..., min_length=1, max_length=50)
ts: datetime
price_usd: Decimal = Field(..., gt=0)
market_cap_usd: Decimal | None = Field(None, ge=0)
volume_24h_usd: Decimal | None = Field(None, ge=0)
fetched_at: datetime
class RepoRecord(BaseModel):
model_config = ConfigDict(frozen=True)
full_name: str = Field(..., pattern=r"^[\w.-]+/[\w.-]+$")
stargazers: int = Field(..., ge=0)
forks: int = Field(..., ge=0)
open_issues: int = Field(..., ge=0)
default_branch: str = Field(..., min_length=1)
fetched_at: datetime
class ActivityRecord(BaseModel):
model_config = ConfigDict(frozen=True)
full_name: str = Field(..., pattern=r"^[\w.-]+/[\w.-]+$")
week_start: datetime
commits: int = Field(..., ge=0)
fetched_at: datetime
pattern=r"^[\w.-]+/[\w.-]+$" для full_name — если кто-то передаст bitcoin без /, валидация упадёт. Это ловит ошибки рано.
HTTP-клиенты с retry
Базовый retry-декоратор: src/crypto_etl/clients/_retry.py
import httpx
from tenacity import (
AsyncRetrying,
RetryError,
retry,
stop_after_attempt,
wait_exponential,
wait_random,
retry_if_exception_type,
before_sleep_log,
)
from loguru import logger
class RateLimitError(Exception):
"""API сообщил, что мы упёрлись в лимит."""
def __init__(self, retry_after: int):
self.retry_after = retry_after
super().__init__(f"Rate limit, retry after {retry_after}s")
class TransientError(Exception):
"""Временная ошибка, имеет смысл повторить."""
def is_rate_limit(response: httpx.Response) -> bool:
"""CoinGecko: 429. GitHub: 403 + 'rate limit' в body."""
if response.status_code == 429:
return True
if response.status_code == 403:
body = response.text.lower()
return "rate limit" in body or "abuse detection" in body
return False
def should_retry_status(status_code: int) -> bool:
"""5xx + 429 -- повторяем. Остальное -- не наша головная боль."""
return status_code >= 500 or status_code == 429
retry_decorator = retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=2, min=2, max=60) + wait_random(0, 1),
retry=retry_if_exception_type(
(httpx.ConnectError, httpx.ReadTimeout, httpx.RemoteProtocolError, TransientError)
),
before_sleep=before_sleep_log(logger, "WARNING"),
reraise=True,
)
wait_exponential(multiplier=2, min=2, max=60) означает паузы 2s, 4s, 8s, 16s, 32s (capped at 60s). + wait_random(0, 1) добавляет jitter — случайные 0-1 секунды поверх. Без jitter все клиенты, упавшие одновременно, будут стучаться к API в одни и те же моменты — типичный thundering herd.
reraise=True — после исчерпания попыток tenacity пробросит последнее исключение, а не свой RetryError. Это важно: вызывающий код видит реальную причину, а не обёртку.
CoinGecko-клиент: src/crypto_etl/clients/coingecko.py
from typing import Self
import httpx
from loguru import logger
from crypto_etl.clients._retry import (
RateLimitError,
TransientError,
is_rate_limit,
retry_decorator,
should_retry_status,
)
class CoinGeckoClient:
def __init__(
self,
api_key: str,
base_url: str = "https://api.coingecko.com/api/v3",
timeout: float = 30.0,
):
self._api_key = api_key
self._base_url = base_url.rstrip("/")
self._timeout = timeout
self._client: httpx.AsyncClient | None = None
async def __aenter__(self) -> Self:
self._client = httpx.AsyncClient(
base_url=self._base_url,
headers={
"x-cg-pro-api-key": self._api_key,
"Accept": "application/json",
"User-Agent": "crypto-etl/0.1",
},
timeout=self._timeout,
)
return self
async def __aexit__(self, *exc_info) -> None:
if self._client:
await self._client.aclose()
self._client = None
@retry_decorator
async def _get(self, path: str, params: dict | None = None) -> dict:
assert self._client is not None, "Use as async context manager"
response = await self._client.get(path, params=params)
if is_rate_limit(response):
retry_after = int(response.headers.get("Retry-After", 60))
logger.warning("CoinGecko rate limit, retry after {}s", retry_after)
raise RateLimitError(retry_after=retry_after)
if should_retry_status(response.status_code):
raise TransientError(
f"CoinGecko {response.status_code}: {response.text[:200]}"
)
response.raise_for_status()
return response.json()
async def get_prices(self, coin_ids: list[str]) -> dict:
return await self._get(
"/simple/price",
params={
"ids": ",".join(coin_ids),
"vs_currencies": "usd",
"include_market_cap": "true",
"include_24hr_vol": "true",
},
)
Заметили Self из typing? Это правильный возврат для __aenter__. До Python 3.11 пришлось бы писать строковую аннотацию "CoinGeckoClient" или T = TypeVar(...).
Ассерт assert self._client is not None, "Use as async context manager" — runtime-проверка для случая, когда клиент попытались использовать без async with. Лучше упасть с понятным сообщением, чем с AttributeError: NoneType has no attribute 'get'.
GitHub-клиент: src/crypto_etl/clients/github.py
from typing import Self
import httpx
from loguru import logger
from crypto_etl.clients._retry import (
RateLimitError,
TransientError,
is_rate_limit,
retry_decorator,
should_retry_status,
)
class GitHubClient:
def __init__(
self,
token: str,
base_url: str = "https://api.github.com",
timeout: float = 30.0,
):
self._token = token
self._base_url = base_url.rstrip("/")
self._timeout = timeout
self._client: httpx.AsyncClient | None = None
async def __aenter__(self) -> Self:
self._client = httpx.AsyncClient(
base_url=self._base_url,
headers={
"Authorization": f"Bearer {self._token}",
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
"User-Agent": "crypto-etl/0.1",
},
timeout=self._timeout,
http2=True,
)
return self
async def __aexit__(self, *exc_info) -> None:
if self._client:
await self._client.aclose()
self._client = None
@retry_decorator
async def _get(self, path: str) -> dict:
assert self._client is not None
response = await self._client.get(path)
if is_rate_limit(response):
reset = int(response.headers.get("X-RateLimit-Reset", "0"))
logger.warning("GitHub rate limit, reset at unix={}", reset)
raise RateLimitError(retry_after=60)
if should_retry_status(response.status_code):
raise TransientError(
f"GitHub {response.status_code}: {response.text[:200]}"
)
response.raise_for_status()
return response.json()
async def get_repo(self, full_name: str) -> dict:
return await self._get(f"/repos/{full_name}")
async def get_participation(self, full_name: str) -> dict:
return await self._get(f"/repos/{full_name}/stats/participation")
http2=True — GitHub поддерживает HTTP/2; включение даёт мультиплексирование запросов в одном соединении. Это важно, когда тянешь данные по 10 репо подряд: вместо 10 отдельных соединений — одно с 10 параллельными streams.
Pagination loop с cursor-based
CoinGecko в /coins/markets возвращает страницы по 250 записей. GitHub во многих endpoint использует Link header (RFC 5988). Для capstone мы пагинацию явно не используем (тянем фиксированный список монет/репо), но добавим helper для будущего.
# src/crypto_etl/clients/_pagination.py
from typing import AsyncIterator
import httpx
async def paginate_link(
client: httpx.AsyncClient,
initial_path: str,
params: dict | None = None,
) -> AsyncIterator[list[dict]]:
"""Итератор по страницам GitHub-стиля (Link header)."""
next_url: str | None = initial_path
next_params = params
while next_url:
response = await client.get(next_url, params=next_params)
response.raise_for_status()
page = response.json()
if not isinstance(page, list):
yield [page]
return
yield page
link = response.headers.get("Link", "")
next_url = _parse_next_link(link)
next_params = None # Link уже содержит query string
def _parse_next_link(link_header: str) -> str | None:
"""Парсит Link: <url>; rel='next', <url>; rel='last'."""
if not link_header:
return None
for part in link_header.split(","):
if 'rel="next"' in part:
url = part.split(";", 1)[0].strip()
return url.strip("<>")
return None
Использование:
async for page in paginate_link(client, "/users/octocat/repos", {"per_page": 100}):
for repo in page:
print(repo["name"])
Transforms — pure-функции
# src/crypto_etl/transforms/prices.py
from datetime import datetime
from decimal import Decimal
from crypto_etl.models import PriceRecord
def transform_prices(raw: dict, ts: datetime, fetched_at: datetime) -> list[PriceRecord]:
"""CoinGecko response -> PriceRecord list.
Raw shape:
{
"bitcoin": {"usd": 65000.0, "usd_market_cap": 1.2e12, "usd_24h_vol": 2.5e10},
"ethereum": {"usd": 3500.0, ...}
}
"""
records: list[PriceRecord] = []
for coin_id, data in raw.items():
records.append(PriceRecord(
coin_id=coin_id,
ts=ts,
price_usd=Decimal(str(data["usd"])),
market_cap_usd=(
Decimal(str(data["usd_market_cap"])) if data.get("usd_market_cap") else None
),
volume_24h_usd=(
Decimal(str(data["usd_24h_vol"])) if data.get("usd_24h_vol") else None
),
fetched_at=fetched_at,
))
return records
Decimal(str(data["usd"])) — сначала str, потом Decimal. Если делать Decimal(data["usd"]) напрямую, получите Decimal от float со всем шумом представления (Decimal('65000.000000000005820766')). Через str — точное значение Decimal('65000.0').
# src/crypto_etl/transforms/repos.py
from datetime import datetime, timedelta, timezone
from crypto_etl.models import ActivityRecord, RepoRecord
def transform_repo(raw: dict, fetched_at: datetime) -> RepoRecord:
return RepoRecord(
full_name=raw["full_name"],
stargazers=raw["stargazers_count"],
forks=raw["forks_count"],
open_issues=raw["open_issues_count"],
default_branch=raw["default_branch"],
fetched_at=fetched_at,
)
def transform_participation(
raw: dict,
full_name: str,
fetched_at: datetime,
) -> list[ActivityRecord]:
"""Raw shape: {'all': [int]*52, 'owner': [int]*52}.
Каждый элемент -- кол-во коммитов за неделю (52 недели назад -> последняя).
"""
records: list[ActivityRecord] = []
weeks_back = 52
now = fetched_at.replace(hour=0, minute=0, second=0, microsecond=0)
weekday = now.weekday()
last_week_start = now - timedelta(days=weekday)
for i, commits in enumerate(raw.get("all", [])):
week_start = last_week_start - timedelta(weeks=(weeks_back - 1 - i))
records.append(ActivityRecord(
full_name=full_name,
week_start=week_start.replace(tzinfo=timezone.utc) if week_start.tzinfo is None else week_start,
commits=commits,
fetched_at=fetched_at,
))
return records
Storage: запись Parquet
# src/crypto_etl/storage/parquet_writer.py
import os
from datetime import date
from pathlib import Path
from typing import Sequence
import pyarrow as pa
import pyarrow.parquet as pq
from loguru import logger
from pydantic import BaseModel
def _records_to_table(records: Sequence[BaseModel]) -> pa.Table:
"""Pydantic models -> pyarrow.Table через model_dump."""
if not records:
raise ValueError("Empty records")
rows = [r.model_dump(mode="python") for r in records]
columns: dict[str, list] = {key: [] for key in rows[0]}
for row in rows:
for key, value in row.items():
columns[key].append(value)
return pa.table(columns)
def write_partition(
records: Sequence[BaseModel],
base_dir: Path,
table_name: str,
partition_date: date,
file_suffix: str,
) -> Path:
"""Атомарная запись Parquet в data/raw/{table}/dt={date}/{table}_{date}_{suffix}.parquet.
Атомарность через write-to-tmp + rename: если запись упадёт посередине,
в финальном пути не появится битый Parquet.
"""
if not records:
logger.warning("write_partition called with empty records for {}", table_name)
return base_dir
table = _records_to_table(records)
partition_dir = base_dir / "raw" / table_name / f"dt={partition_date.isoformat()}"
partition_dir.mkdir(parents=True, exist_ok=True)
final_path = partition_dir / f"{table_name}_{partition_date.isoformat()}_{file_suffix}.parquet"
tmp_path = partition_dir / f".{final_path.name}.tmp"
pq.write_table(
table,
tmp_path,
compression="zstd",
compression_level=3,
)
os.replace(tmp_path, final_path)
logger.info(
"Wrote {} rows to {} ({} bytes)",
len(records),
final_path,
final_path.stat().st_size,
)
return final_path
os.replace — атомарная операция на одной файловой системе (POSIX rename(2)). Если процесс упадёт между pq.write_table и os.replace, в final_path останется старый файл (или ничего), но битого Parquet там не будет. Аналитический слой всегда видит либо полный, либо никакой файл.
compression="zstd" — современный стандарт. Жмёт лучше gzip, быстрее по CPU. Для time-series с повторами хорошо подходит.
State management
# src/crypto_etl/state.py
import json
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Iterator
def load_state(path: Path) -> dict:
if not path.exists():
return {}
with open(path, encoding="utf-8") as f:
return json.load(f)
def save_state(state: dict, path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(state, f, indent=2, sort_keys=True)
tmp.replace(path)
def hours_to_process(state: dict, max_lookback: int = 24) -> Iterator[datetime]:
"""Часы, которые нужно обработать: с last_completed_hour+1 до текущего часа.
max_lookback ограничивает дозаполнение пропусков (если pipeline стоял неделю,
не пытаемся догнать всё -- берём только последние 24 часа).
"""
now = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)
last_str = state.get("last_completed_hour")
if last_str is None:
yield now
return
last = datetime.fromisoformat(last_str)
next_hour = last + timedelta(hours=1)
earliest = now - timedelta(hours=max_lookback)
if next_hour < earliest:
next_hour = earliest
while next_hour <= now:
yield next_hour
next_hour += timedelta(hours=1)
Orchestrator: run.py
import asyncio
from datetime import date, datetime, timezone
from loguru import logger
from crypto_etl.clients.coingecko import CoinGeckoClient
from crypto_etl.clients.github import GitHubClient
from crypto_etl.config import get_settings
from crypto_etl.state import hours_to_process, load_state, save_state
from crypto_etl.storage.parquet_writer import write_partition
from crypto_etl.transforms.prices import transform_prices
from crypto_etl.transforms.repos import transform_participation, transform_repo
async def pull_hour(
hour: datetime,
cg: CoinGeckoClient,
gh: GitHubClient,
settings,
) -> None:
fetched_at = datetime.now(timezone.utc)
suffix = hour.strftime("%H")
raw_prices = await cg.get_prices(settings.coins_to_track)
prices = transform_prices(raw_prices, ts=hour, fetched_at=fetched_at)
write_partition(
prices,
base_dir=settings.data_dir,
table_name="prices",
partition_date=hour.date(),
file_suffix=suffix,
)
repo_tasks = [gh.get_repo(name) for name in settings.repos_to_track]
participation_tasks = [gh.get_participation(name) for name in settings.repos_to_track]
raw_repos, raw_participations = await asyncio.gather(
asyncio.gather(*repo_tasks),
asyncio.gather(*participation_tasks),
)
repos = [transform_repo(r, fetched_at) for r in raw_repos]
write_partition(
repos,
base_dir=settings.data_dir,
table_name="repos",
partition_date=hour.date(),
file_suffix=suffix,
)
activity: list = []
for full_name, raw in zip(settings.repos_to_track, raw_participations):
activity.extend(transform_participation(raw, full_name, fetched_at))
write_partition(
activity,
base_dir=settings.data_dir,
table_name="activity",
partition_date=hour.date(),
file_suffix=suffix,
)
async def main() -> None:
settings = get_settings()
logger.remove()
logger.add(lambda m: print(m, end=""), level=settings.log_level)
state_path = settings.data_dir / "state.json"
state = load_state(state_path)
logger.info("Starting ETL, state={}", state)
async with (
CoinGeckoClient(
settings.coingecko_api_key.get_secret_value(),
settings.coingecko_base_url,
settings.request_timeout_seconds,
) as cg,
GitHubClient(
settings.github_token.get_secret_value(),
settings.github_base_url,
settings.request_timeout_seconds,
) as gh,
):
for hour in hours_to_process(state):
logger.info("Processing hour={}", hour.isoformat())
try:
await pull_hour(hour, cg, gh, settings)
except Exception:
logger.exception("Failed hour={}, will retry next run", hour)
break
state["last_completed_hour"] = hour.isoformat()
save_state(state, state_path)
logger.info("Hour {} completed", hour.isoformat())
logger.info("ETL run complete")
if __name__ == "__main__":
asyncio.run(main())
Запуск: python run.py. Cron: 0 * * * * cd /opt/crypto-etl && /opt/crypto-etl/.venv/bin/python run.py >> /var/log/crypto-etl.log 2>&1.
Поток выполнения orchestrator
Один важный момент: save_state происходит после write_partition. Если write упал — state не обновился, при следующем запуске тот же час будет повторён. Это и есть идемпотентность через state file.
Логирование
loguru — современная альтернатива стандартному logging. Без boilerplate, с человеческим API.
from loguru import logger
logger.info("Starting ETL")
logger.warning("Rate limit, retry after {}s", retry_after)
logger.exception("Failed hour={}, will retry next run", hour)
Дефолтный формат — цветной, с timestamp, level, file:line, message. Для prod добавляется JSON-output:
logger.add(
"logs/crypto-etl-{time:YYYY-MM-DD}.json",
serialize=True,
rotation="1 day",
retention="14 days",
level="INFO",
)
Логи в JSON удобно скармливать в Loki/ELK/Datadog. Ротация — автоматически.
Чего НЕ делает наш код (намеренно)
Capstone — учебный, поэтому несколько вещей упрощены:
- Нет реальных метрик (Prometheus). В проде —
prometheus_client, экспорт счётчиков «успешных запросов», «retry-ев», «ошибок 5xx». - Нет distributed locks. Если cron запустит pipeline дважды одновременно (что не должно случиться, но…) — будет race на state file. В проде — Redis-lock или Airflow-runtime гарантирует single-instance.
- Нет dead-letter queue. Если час упал и за день не починили — данные пропадают. В проде — DLQ с попыткой replay.
- Нет S3-backend. Мы пишем в локальный диск. В проде —
pyarrow.fs.S3FileSystemили явная загрузка в S3 через boto3.
Все эти расширения — линейная работа поверх существующей структуры. Главное, что слои уже разделены: добавить S3 — поменять только parquet_writer.py, всё остальное не трогается.
Итог
В этом уроке мы реализовали полный pipeline. Пройдено:
- pyproject.toml с правильными зависимостями (httpx 0.28, tenacity 9, pydantic 2.9, pyarrow 17).
- Конфигурация через pydantic-settings + SecretStr для токенов.
- Async-клиенты на httpx с retry-декоратором tenacity.
- Различия в обработке rate-limit между CoinGecko (429) и GitHub (403 + body).
- Pydantic-модели с валидацией и Decimal для денег.
- Pure-функции transform — готовы к unit-тестированию.
- Атомарная запись Parquet через write-to-tmp + os.replace.
- State management для идемпотентности.
- Orchestrator с asyncio.gather для параллелизма.
- Логирование через loguru.
Pipeline можно запускать. Но прежде чем катить в прод — нужны тесты. О них — в следующем уроке: unit-тесты для transforms, mock через respx для clients, VCR для интеграционных, e2e dry-run, pre-commit и CI.