Capstone: дизайн multi-source ETL pipeline
Курс подходит к финалу. За 12 модулей вы прошли путь от первого HTTP-запроса до contract testing. Теперь — capstone-проект. Цель: собрать всё знание в один production-ready pipeline.
В трёх уроках capstone мы пройдём полный цикл: дизайн (этот урок), реализация и resilience-тесты. Это первый урок — без кода, с архитектурными решениями. На юниор-роли это самая частая ошибка: сесть писать код, не подумав о структуре. Двадцать минут на schema и компоненты сэкономят несколько дней в будущем.
Постановка задачи
Бизнес-сценарий (упрощённый, но реалистичный): аналитической команде нужны почасовые срезы данных по криптовалютам и связанной активности на GitHub. Они хотят ответить на вопросы: «есть ли корреляция между активностью разработчиков ключевых проектов (Bitcoin, Ethereum) и движением цены?», «какие репозитории резко набрали звёзд в момент пампа?». Сырые данные должны лежать в Parquet, разделённые по дате — позже их подхватит DuckDB или Spark в аналитическом слое.
Каждый час забрать новые данные из двух API и сложить в Parquet
Требования:
- Расписание: pull раз в час, идемпотентно (повтор запуска в один час не должен задвоить данные).
- Источники: CoinGecko (API-key), GitHub (OAuth2).
- Output: Parquet, partition by date (
dt=YYYY-MM-DD). - Resilience: retry с экспоненциальным backoff, обработка 429 (rate limit) и 5xx, timeout на каждый запрос.
- Тестируемость: unit на трансформации, mock на HTTP-вызовы (respx + VCR).
- Конфигурация: токены через
.env, ничего в git.
requests и httpx: HTTP-клиенты для ETL
Архитектура: Extract — Transform — Load
ETL — классическая абстракция, родом из 1970-х, но всё ещё работающая. Каждый этап разделён, имеет свой контракт.
Каждый слой имеет один input, один output, одну ответственность
Почему разделение важно:
- Тестируемость: transform можно тестировать без HTTP (передаём dict, проверяем модель). Extract — без write (мокаем HTTP, проверяем что get_prices вернул).
- Замена источников: если CoinGecko станет платным и мы переедем на CoinMarketCap — меняется только Extract, Transform/Load остаются.
- Замена backend: если решим писать в S3 вместо локального диска — меняется только Load.
Структура проекта
Стандартная структура Python-проекта для ETL:
crypto-etl/
├── pyproject.toml # зависимости и настройки инструментов
├── .env.example # шаблон переменных окружения
├── .gitignore # .env, data/, __pycache__/, .pytest_cache/
├── README.md
├── run.py # точка входа: orchestrator
├── src/
│ └── crypto_etl/
│ ├── __init__.py
│ ├── config.py # загрузка .env, константы
│ ├── clients/
│ │ ├── __init__.py
│ │ ├── coingecko.py
│ │ └── github.py
│ ├── models.py # Pydantic модели: PriceRecord, RepoRecord, ActivityRecord
│ ├── transforms/
│ │ ├── __init__.py
│ │ ├── prices.py
│ │ ├── repos.py
│ │ └── activity.py
│ └── storage/
│ ├── __init__.py
│ └── parquet_writer.py
└── tests/
├── conftest.py
├── cassettes/
├── test_clients_coingecko.py
├── test_clients_github.py
├── test_transforms_prices.py
├── test_storage.py
└── test_e2e.py
Это src-layout (src/crypto_etl/) — рекомендованная Python Packaging Authority схема. Преимущество: тесты не могут случайно импортировать пакет напрямую — только установленный (pip install -e .), что соответствует поведению в проде.
Схема данных
Прежде чем писать код — фиксируем schema. Pydantic-модели как single source of truth.
# src/crypto_etl/models.py
from datetime import datetime
from decimal import Decimal
from pydantic import BaseModel, Field, ConfigDict
class PriceRecord(BaseModel):
"""Срез цены криптовалюты в момент времени."""
model_config = ConfigDict(frozen=True)
coin_id: str = Field(..., min_length=1, max_length=50)
ts: datetime # UTC
price_usd: Decimal = Field(..., gt=0)
market_cap_usd: Decimal | None = Field(None, ge=0)
volume_24h_usd: Decimal | None = Field(None, ge=0)
fetched_at: datetime # UTC, момент HTTP-вызова
class RepoRecord(BaseModel):
"""Метаданные GitHub-репозитория на момент pull."""
model_config = ConfigDict(frozen=True)
full_name: str # "bitcoin/bitcoin"
stargazers: int = Field(..., ge=0)
forks: int = Field(..., ge=0)
open_issues: int = Field(..., ge=0)
default_branch: str
fetched_at: datetime # UTC
class ActivityRecord(BaseModel):
"""Недельная активность коммитов в репозитории."""
model_config = ConfigDict(frozen=True)
full_name: str
week_start: datetime # начало недели UTC
commits: int = Field(..., ge=0)
fetched_at: datetime
frozen=True делает модели иммутабельными. Это важно: после валидации никто не должен менять поля. Кроме того, frozen-модели хешируемы — можно класть в set/dict, что иногда полезно для дедупликации.
Decimal для цен — потому что float теряет точность. Цена биткоина 65043.7891234567 во float станет приблизительной; для аналитики это раздражающие ошибки.
fetched_at — ключевой технический столбец. Без него непонятно, когда снят срез данных. При повторе ETL за тот же час — fetched_at другой, легко увидеть «свежесть».
Партиционирование Parquet
Parquet — формат, в котором мы пишем данные. Partition by date — это структура каталогов:
data/raw/prices/
├── dt=2026-05-13/
│ └── prices_2026-05-13_10.parquet
├── dt=2026-05-14/
│ └── prices_2026-05-14_10.parquet
│ └── prices_2026-05-14_11.parquet
│ └── ...
└── dt=2026-05-15/
└── prices_2026-05-15_10.parquet
Когда DuckDB или Spark читает с фильтром WHERE dt = '2026-05-15', они видят названия каталогов и читают только нужные файлы — partition pruning. Это ускоряет чтение в десятки раз для time-series данных.
Имя файла внутри партиции включает время (prices_2026-05-15_10.parquet) — это даёт идемпотентность: повторный запуск pipeline в 10:00 запишет в тот же файл (overwrite), а не создаст дубль.
Партиционирование “by date” — стандарт для time-series. Но партиции не должны быть слишком мелкими. Если делать партицию на каждый час (dt=YYYY-MM-DD/hr=HH), у вас за год получится 8760 каталогов с маленькими файлами — это убивает производительность чтения (small files problem). Один файл-в-день для умеренного объёма — золотая середина.
Расписание и идемпотентность
Pipeline запускается раз в час. Минимально — через системный cron:
# /etc/cron.d/crypto-etl
0 * * * * python -m crypto_etl.run >> /var/log/crypto-etl.log 2>&1
Идемпотентность: что произойдёт, если запуск в 10:00 упал на середине, и cron запустит его снова в 11:00? Нужно, чтобы:
- Данные за 10:00 не задвоились (если 10:00-prices написались, не дозаписать).
- Данные за 11:00 написались полноценно.
- Если 10:00 не написался вовсе — при повторе записать.
Стратегии:
Несколько подходов к избеганию дублей
Для простого capstone берём overwrite + state file. State file (data/state.json) хранит последний успешно обработанный час. На старте: читаем state, пропускаем завершённые часы, ищем «дыры» (если 10:00 не сделан, 11:00 ещё не наступил — пишем 10:00).
# Псевдокод
state = load_state() # {'last_completed_hour': '2026-05-15T09:00:00Z'}
now = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)
last = state.get('last_completed_hour', now - timedelta(hours=1))
current = last + timedelta(hours=1)
while current <= now:
pull_hour(current)
state['last_completed_hour'] = current.isoformat()
save_state(state)
current += timedelta(hours=1)
Так покрываются три кейса: упал на середине (state не обновился — повторим), пропустил час (cron не отработал — догоним), штатно отработал (next run возьмёт следующий час).
Rate limits каждого API
Это критическая часть дизайна. Без учёта rate limits ваш pipeline сломается на третьем запуске.
CoinGecko Free Tier
- 30 calls/min (rolling window).
- 10,000 calls/month.
- При превышении — 429 +
Retry-Afterheader.
В нашем pipeline мы тянем цены ~10 монет в час. Это 240 calls/day = ~7200 calls/month. Под лимитом.
GitHub API
- 5,000 calls/hour для authenticated requests (с OAuth-токеном).
- 60 calls/hour для anonymous.
- Headers:
X-RateLimit-Limit,X-RateLimit-Remaining,X-RateLimit-Reset(UTC timestamp). - При превышении — 403 + body
{"message": "API rate limit exceeded..."}.
В нашем pipeline ~10 репо × 2 endpoint (metadata + activity) × 24 hours = 480 calls/day. Под лимитом.
GitHub при rate limit возвращает не 429, а 403. Это нестандартно — большинство API используют 429. Если retry-логика смотрит только на 429 — пропустит rate limit от GitHub. Проверяйте оба статуса + содержимое тела сообщения.
Стратегия обработки
# Псевдокод retry-стратегии
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=2, min=2, max=60) + wait_random(0, 1),
retry=retry_if_exception_type((HTTPStatusError, ConnectError, ReadTimeout)),
reraise=True,
)
def call_with_retry(client, url, **kwargs):
response = client.get(url, **kwargs)
if response.status_code == 429 or (
response.status_code == 403 and "rate limit" in response.text.lower()
):
retry_after = int(response.headers.get("Retry-After", 60))
raise RateLimitError(retry_after=retry_after)
response.raise_for_status()
return response
Подробнее retry будет в уроке implementation. Здесь важно: на этапе дизайна явно сказать «у нас два API с разными rate-limit-статусами», и заложить эту логику в архитектуру.
Auth: API-key + OAuth2
CoinGecko использует API-key в header (x-cg-pro-api-key: <key>). Простой случай: положили в .env, читаем при старте, добавляем в каждый запрос.
GitHub использует OAuth2. Для нашего случая (read-only, server-side) подходит personal access token (PAT) или OAuth App с client_credentials. PAT проще: пользователь генерирует токен в settings GitHub, кладёт в .env. Без refresh, без сложного flow.
# .env (НЕ в git)
COINGECKO_API_KEY=cg-XXX
GITHUB_TOKEN=ghp_XXX
DATA_DIR=./data
LOG_LEVEL=INFO
# .env.example (в git)
COINGECKO_API_KEY=your-coingecko-key-here
GITHUB_TOKEN=your-github-token-here
DATA_DIR=./data
LOG_LEVEL=INFO
# src/crypto_etl/config.py
from pathlib import Path
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
coingecko_api_key: str
github_token: str
data_dir: Path = Path("./data")
log_level: str = "INFO"
coins_to_track: list[str] = [
"bitcoin", "ethereum", "solana", "cardano",
"polkadot", "chainlink", "litecoin", "monero",
]
repos_to_track: list[str] = [
"bitcoin/bitcoin",
"ethereum/go-ethereum",
"solana-labs/solana",
"input-output-hk/cardano-node",
]
settings = Settings()
pydantic-settings автоматически читает .env, валидирует типы, кидает ошибку, если обязательное поле отсутствует. Это предотвращает классический баг «забыл выставить переменную, скрипт упал на 50-м запросе с непонятной ошибкой».
План кода
Чтобы перейти к implementation, нужен outline кода. Вот он, в виде интерфейсов классов:
# src/crypto_etl/clients/coingecko.py
class CoinGeckoClient:
def __init__(self, api_key: str, base_url: str = "https://api.coingecko.com/api/v3"):
...
async def get_prices(self, coin_ids: list[str]) -> list[dict]:
"""Pull current prices for coins. Returns raw dicts from API."""
...
# src/crypto_etl/clients/github.py
class GitHubClient:
def __init__(self, token: str, base_url: str = "https://api.github.com"):
...
async def get_repo(self, full_name: str) -> dict:
"""GET /repos/{full_name}. Raw dict."""
...
async def get_participation(self, full_name: str) -> dict:
"""GET /repos/{full_name}/stats/participation. Raw dict."""
...
# src/crypto_etl/transforms/prices.py
def transform_prices(raw: list[dict], fetched_at: datetime) -> list[PriceRecord]:
"""Pure function. Raw CoinGecko response -> validated PriceRecord list."""
...
# src/crypto_etl/transforms/repos.py
def transform_repo(raw: dict, fetched_at: datetime) -> RepoRecord:
...
def transform_participation(raw: dict, full_name: str, fetched_at: datetime) -> list[ActivityRecord]:
...
# src/crypto_etl/storage/parquet_writer.py
def write_partition(
records: list[BaseModel],
base_dir: Path,
table_name: str,
partition_date: date,
file_suffix: str,
) -> Path:
"""Write to data/raw/{table}/dt={date}/{table}_{date}_{suffix}.parquet (atomic)."""
...
# run.py
async def main():
settings = get_settings()
state = load_state(settings.data_dir / "state.json")
cg = CoinGeckoClient(settings.coingecko_api_key)
gh = GitHubClient(settings.github_token)
for hour in hours_to_process(state):
async with cg, gh:
await pull_hour(hour, cg, gh, settings)
state["last_completed_hour"] = hour.isoformat()
save_state(state, settings.data_dir / "state.json")
Это минимально достаточный outline. Видно, где границы, что пишется отдельным файлом, что куда передаётся.
Чек-лист дизайна
Перед тем как сесть писать код, убедитесь:
- Schema зафиксирована — Pydantic-модели для всех output-таблиц.
- Каждый API изучен — endpoints, auth, rate limits, формат ошибок.
- Партиционирование выбрано — не слишком мелкое, не слишком крупное.
- Идемпотентность продумана — что делает pipeline при повторном запуске того же часа.
- Структура проекта набросана — модули, тесты, конфиг.
- Resilience-стратегия описана — retry, timeouts, circuit breaker (если нужен).
- Конфигурация через .env — никаких секретов в коде.
Если что-то не понятно — это место, где вы споткнётесь во время implementation. Лучше прояснить сейчас.
Итог
В этом уроке мы спроектировали ETL-pipeline до того, как написали хотя бы одну строчку кода. Зафиксировали:
- Источники и их особенности (CoinGecko + GitHub, разные auth, разные rate-limit-статусы).
- ETL-слои (Extract -> Transform -> Load) с явными контрактами.
- Структуру проекта (src-layout с разделением на clients/transforms/storage).
- Schema через Pydantic с frozen-моделями и Decimal для денег.
- Партиционирование Parquet by date с overwrite-стратегией.
- Идемпотентность через state file.
- Конфигурацию через
.env+ pydantic-settings.
В следующем уроке — implementation: реальный код, async httpx, tenacity для retry, pyarrow для Parquet. Без сюрпризов, потому что план есть.