Learning Platform
Глоссарий Troubleshooting
Урок 15.01 · 30 мин
Начальный
ETLdesignarchitectureParquetrate-limitsschema

Capstone: дизайн multi-source ETL pipeline

Курс подходит к финалу. За 12 модулей вы прошли путь от первого HTTP-запроса до contract testing. Теперь — capstone-проект. Цель: собрать всё знание в один production-ready pipeline.

В трёх уроках capstone мы пройдём полный цикл: дизайн (этот урок), реализация и resilience-тесты. Это первый урок — без кода, с архитектурными решениями. На юниор-роли это самая частая ошибка: сесть писать код, не подумав о структуре. Двадцать минут на schema и компоненты сэкономят несколько дней в будущем.


Постановка задачи

Бизнес-сценарий (упрощённый, но реалистичный): аналитической команде нужны почасовые срезы данных по криптовалютам и связанной активности на GitHub. Они хотят ответить на вопросы: «есть ли корреляция между активностью разработчиков ключевых проектов (Bitcoin, Ethereum) и движением цены?», «какие репозитории резко набрали звёзд в момент пампа?». Сырые данные должны лежать в Parquet, разделённые по дате — позже их подхватит DuckDB или Spark в аналитическом слое.

Что должен делать pipeline

Каждый час забрать новые данные из двух API и сложить в Parquet

CoinGecko APIПубличное REST API. /simple/price для текущих цен, /coins/{id}/market_chart для исторических. API-key в headers. Free tier: 30 calls/min, 10 000/day
GitHub APIPublic REST. /repos/{owner}/{name} для метаданных репо, /repos/{owner}/{name}/stats/participation для активности. OAuth2 token. 5000 calls/hour authenticated
PipelineЛокальный Python-скрипт run.py. В будущем -- Airflow DAG. Запуск раз в час по cron
data/raw/prices/dt=2026-05-15/Parquet-файл с ценами, partition by date. Schema: coin_id, ts, price_usd, market_cap, volume_24h
data/raw/repos/dt=2026-05-15/Parquet с метаданными репозиториев. Schema: full_name, stargazers, forks, open_issues, fetched_at
data/raw/activity/dt=2026-05-15/Parquet с участием в коде. Schema: full_name, week_start, commits, fetched_at

Требования:

  • Расписание: pull раз в час, идемпотентно (повтор запуска в один час не должен задвоить данные).
  • Источники: CoinGecko (API-key), GitHub (OAuth2).
  • Output: Parquet, partition by date (dt=YYYY-MM-DD).
  • Resilience: retry с экспоненциальным backoff, обработка 429 (rate limit) и 5xx, timeout на каждый запрос.
  • Тестируемость: unit на трансформации, mock на HTTP-вызовы (respx + VCR).
  • Конфигурация: токены через .env, ничего в git.

requests и httpx: HTTP-клиенты для ETL

Архитектура: Extract — Transform — Load

ETL — классическая абстракция, родом из 1970-х, но всё ещё работающая. Каждый этап разделён, имеет свой контракт.

ETL слои в нашем pipeline

Каждый слой имеет один input, один output, одну ответственность

Extract
Transform
Load
raw_prices: list[dict]validated_prices: list[PriceRecord]write parquet to data/raw/prices/dt=2026-05-15/_partition.tmp.parquet -> rename

Почему разделение важно:

  • Тестируемость: transform можно тестировать без HTTP (передаём dict, проверяем модель). Extract — без write (мокаем HTTP, проверяем что get_prices вернул).
  • Замена источников: если CoinGecko станет платным и мы переедем на CoinMarketCap — меняется только Extract, Transform/Load остаются.
  • Замена backend: если решим писать в S3 вместо локального диска — меняется только Load.

Структура проекта

Стандартная структура Python-проекта для ETL:

crypto-etl/
├── pyproject.toml          # зависимости и настройки инструментов
├── .env.example            # шаблон переменных окружения
├── .gitignore              # .env, data/, __pycache__/, .pytest_cache/
├── README.md
├── run.py                  # точка входа: orchestrator
├── src/
│   └── crypto_etl/
│       ├── __init__.py
│       ├── config.py       # загрузка .env, константы
│       ├── clients/
│       │   ├── __init__.py
│       │   ├── coingecko.py
│       │   └── github.py
│       ├── models.py       # Pydantic модели: PriceRecord, RepoRecord, ActivityRecord
│       ├── transforms/
│       │   ├── __init__.py
│       │   ├── prices.py
│       │   ├── repos.py
│       │   └── activity.py
│       └── storage/
│           ├── __init__.py
│           └── parquet_writer.py
└── tests/
    ├── conftest.py
    ├── cassettes/
    ├── test_clients_coingecko.py
    ├── test_clients_github.py
    ├── test_transforms_prices.py
    ├── test_storage.py
    └── test_e2e.py

Это src-layout (src/crypto_etl/) — рекомендованная Python Packaging Authority схема. Преимущество: тесты не могут случайно импортировать пакет напрямую — только установленный (pip install -e .), что соответствует поведению в проде.


Схема данных

Прежде чем писать код — фиксируем schema. Pydantic-модели как single source of truth.

# src/crypto_etl/models.py
from datetime import datetime
from decimal import Decimal
from pydantic import BaseModel, Field, ConfigDict

class PriceRecord(BaseModel):
    """Срез цены криптовалюты в момент времени."""
    model_config = ConfigDict(frozen=True)

    coin_id: str = Field(..., min_length=1, max_length=50)
    ts: datetime  # UTC
    price_usd: Decimal = Field(..., gt=0)
    market_cap_usd: Decimal | None = Field(None, ge=0)
    volume_24h_usd: Decimal | None = Field(None, ge=0)
    fetched_at: datetime  # UTC, момент HTTP-вызова


class RepoRecord(BaseModel):
    """Метаданные GitHub-репозитория на момент pull."""
    model_config = ConfigDict(frozen=True)

    full_name: str  # "bitcoin/bitcoin"
    stargazers: int = Field(..., ge=0)
    forks: int = Field(..., ge=0)
    open_issues: int = Field(..., ge=0)
    default_branch: str
    fetched_at: datetime  # UTC


class ActivityRecord(BaseModel):
    """Недельная активность коммитов в репозитории."""
    model_config = ConfigDict(frozen=True)

    full_name: str
    week_start: datetime  # начало недели UTC
    commits: int = Field(..., ge=0)
    fetched_at: datetime

frozen=True делает модели иммутабельными. Это важно: после валидации никто не должен менять поля. Кроме того, frozen-модели хешируемы — можно класть в set/dict, что иногда полезно для дедупликации.

Decimal для цен — потому что float теряет точность. Цена биткоина 65043.7891234567 во float станет приблизительной; для аналитики это раздражающие ошибки.

fetched_at — ключевой технический столбец. Без него непонятно, когда снят срез данных. При повторе ETL за тот же час — fetched_at другой, легко увидеть «свежесть».


Партиционирование Parquet

Parquet — формат, в котором мы пишем данные. Partition by date — это структура каталогов:

data/raw/prices/
├── dt=2026-05-13/
│   └── prices_2026-05-13_10.parquet
├── dt=2026-05-14/
│   └── prices_2026-05-14_10.parquet
│   └── prices_2026-05-14_11.parquet
│   └── ...
└── dt=2026-05-15/
    └── prices_2026-05-15_10.parquet

Когда DuckDB или Spark читает с фильтром WHERE dt = '2026-05-15', они видят названия каталогов и читают только нужные файлы — partition pruning. Это ускоряет чтение в десятки раз для time-series данных.

Имя файла внутри партиции включает время (prices_2026-05-15_10.parquet) — это даёт идемпотентность: повторный запуск pipeline в 10:00 запишет в тот же файл (overwrite), а не создаст дубль.

TIP

Партиционирование “by date” — стандарт для time-series. Но партиции не должны быть слишком мелкими. Если делать партицию на каждый час (dt=YYYY-MM-DD/hr=HH), у вас за год получится 8760 каталогов с маленькими файлами — это убивает производительность чтения (small files problem). Один файл-в-день для умеренного объёма — золотая середина.


Расписание и идемпотентность

Pipeline запускается раз в час. Минимально — через системный cron:

# /etc/cron.d/crypto-etl
0 * * * * python -m crypto_etl.run >> /var/log/crypto-etl.log 2>&1

Идемпотентность: что произойдёт, если запуск в 10:00 упал на середине, и cron запустит его снова в 11:00? Нужно, чтобы:

  1. Данные за 10:00 не задвоились (если 10:00-prices написались, не дозаписать).
  2. Данные за 11:00 написались полноценно.
  3. Если 10:00 не написался вовсе — при повторе записать.

Стратегии:

Идемпотентность ETL

Несколько подходов к избеганию дублей

Overwrite по timestampИмя файла включает час: prices_2026-05-15_10.parquet. Повторный запуск в 10:00 перезапишет файл с теми же свежими данными. Минус: если запуск был частичным, данные потеряются
State fileПосле каждого успешного hour-pull записать в state.json: {'last_completed_hour': '2026-05-15T10:00:00Z'}. На старте -- читать state, пропускать уже завершённые часы. Минус: state -- отдельная точка отказа
Incremental upsertПри записи проверять, есть ли уже строки с такими (coin_id, ts) -- если да, перезаписать. Сложно с Parquet (нет UPDATE), требует чтения старого файла
Append + dedup at readПросто аппендить, дедуплицировать на чтении (DuckDB DISTINCT). Минус: размер растёт, медленные чтения

Для простого capstone берём overwrite + state file. State file (data/state.json) хранит последний успешно обработанный час. На старте: читаем state, пропускаем завершённые часы, ищем «дыры» (если 10:00 не сделан, 11:00 ещё не наступил — пишем 10:00).

# Псевдокод
state = load_state()  # {'last_completed_hour': '2026-05-15T09:00:00Z'}
now = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)
last = state.get('last_completed_hour', now - timedelta(hours=1))

current = last + timedelta(hours=1)
while current <= now:
    pull_hour(current)
    state['last_completed_hour'] = current.isoformat()
    save_state(state)
    current += timedelta(hours=1)

Так покрываются три кейса: упал на середине (state не обновился — повторим), пропустил час (cron не отработал — догоним), штатно отработал (next run возьмёт следующий час).


Rate limits каждого API

Это критическая часть дизайна. Без учёта rate limits ваш pipeline сломается на третьем запуске.

CoinGecko Free Tier

  • 30 calls/min (rolling window).
  • 10,000 calls/month.
  • При превышении — 429 + Retry-After header.

В нашем pipeline мы тянем цены ~10 монет в час. Это 240 calls/day = ~7200 calls/month. Под лимитом.

GitHub API

  • 5,000 calls/hour для authenticated requests (с OAuth-токеном).
  • 60 calls/hour для anonymous.
  • Headers: X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Reset (UTC timestamp).
  • При превышении — 403 + body {"message": "API rate limit exceeded..."}.

В нашем pipeline ~10 репо × 2 endpoint (metadata + activity) × 24 hours = 480 calls/day. Под лимитом.

WARNING

GitHub при rate limit возвращает не 429, а 403. Это нестандартно — большинство API используют 429. Если retry-логика смотрит только на 429 — пропустит rate limit от GitHub. Проверяйте оба статуса + содержимое тела сообщения.

Стратегия обработки

# Псевдокод retry-стратегии
@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=2, min=2, max=60) + wait_random(0, 1),
    retry=retry_if_exception_type((HTTPStatusError, ConnectError, ReadTimeout)),
    reraise=True,
)
def call_with_retry(client, url, **kwargs):
    response = client.get(url, **kwargs)
    if response.status_code == 429 or (
        response.status_code == 403 and "rate limit" in response.text.lower()
    ):
        retry_after = int(response.headers.get("Retry-After", 60))
        raise RateLimitError(retry_after=retry_after)
    response.raise_for_status()
    return response

Подробнее retry будет в уроке implementation. Здесь важно: на этапе дизайна явно сказать «у нас два API с разными rate-limit-статусами», и заложить эту логику в архитектуру.


Auth: API-key + OAuth2

CoinGecko использует API-key в header (x-cg-pro-api-key: <key>). Простой случай: положили в .env, читаем при старте, добавляем в каждый запрос.

GitHub использует OAuth2. Для нашего случая (read-only, server-side) подходит personal access token (PAT) или OAuth App с client_credentials. PAT проще: пользователь генерирует токен в settings GitHub, кладёт в .env. Без refresh, без сложного flow.

# .env (НЕ в git)
COINGECKO_API_KEY=cg-XXX
GITHUB_TOKEN=ghp_XXX
DATA_DIR=./data
LOG_LEVEL=INFO
# .env.example (в git)
COINGECKO_API_KEY=your-coingecko-key-here
GITHUB_TOKEN=your-github-token-here
DATA_DIR=./data
LOG_LEVEL=INFO
# src/crypto_etl/config.py
from pathlib import Path
from pydantic_settings import BaseSettings, SettingsConfigDict

class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")

    coingecko_api_key: str
    github_token: str
    data_dir: Path = Path("./data")
    log_level: str = "INFO"

    coins_to_track: list[str] = [
        "bitcoin", "ethereum", "solana", "cardano",
        "polkadot", "chainlink", "litecoin", "monero",
    ]
    repos_to_track: list[str] = [
        "bitcoin/bitcoin",
        "ethereum/go-ethereum",
        "solana-labs/solana",
        "input-output-hk/cardano-node",
    ]

settings = Settings()

pydantic-settings автоматически читает .env, валидирует типы, кидает ошибку, если обязательное поле отсутствует. Это предотвращает классический баг «забыл выставить переменную, скрипт упал на 50-м запросе с непонятной ошибкой».


План кода

Чтобы перейти к implementation, нужен outline кода. Вот он, в виде интерфейсов классов:

# src/crypto_etl/clients/coingecko.py
class CoinGeckoClient:
    def __init__(self, api_key: str, base_url: str = "https://api.coingecko.com/api/v3"):
        ...

    async def get_prices(self, coin_ids: list[str]) -> list[dict]:
        """Pull current prices for coins. Returns raw dicts from API."""
        ...


# src/crypto_etl/clients/github.py
class GitHubClient:
    def __init__(self, token: str, base_url: str = "https://api.github.com"):
        ...

    async def get_repo(self, full_name: str) -> dict:
        """GET /repos/{full_name}. Raw dict."""
        ...

    async def get_participation(self, full_name: str) -> dict:
        """GET /repos/{full_name}/stats/participation. Raw dict."""
        ...


# src/crypto_etl/transforms/prices.py
def transform_prices(raw: list[dict], fetched_at: datetime) -> list[PriceRecord]:
    """Pure function. Raw CoinGecko response -> validated PriceRecord list."""
    ...


# src/crypto_etl/transforms/repos.py
def transform_repo(raw: dict, fetched_at: datetime) -> RepoRecord:
    ...

def transform_participation(raw: dict, full_name: str, fetched_at: datetime) -> list[ActivityRecord]:
    ...


# src/crypto_etl/storage/parquet_writer.py
def write_partition(
    records: list[BaseModel],
    base_dir: Path,
    table_name: str,
    partition_date: date,
    file_suffix: str,
) -> Path:
    """Write to data/raw/{table}/dt={date}/{table}_{date}_{suffix}.parquet (atomic)."""
    ...


# run.py
async def main():
    settings = get_settings()
    state = load_state(settings.data_dir / "state.json")
    cg = CoinGeckoClient(settings.coingecko_api_key)
    gh = GitHubClient(settings.github_token)

    for hour in hours_to_process(state):
        async with cg, gh:
            await pull_hour(hour, cg, gh, settings)
        state["last_completed_hour"] = hour.isoformat()
        save_state(state, settings.data_dir / "state.json")

Это минимально достаточный outline. Видно, где границы, что пишется отдельным файлом, что куда передаётся.


Чек-лист дизайна

Перед тем как сесть писать код, убедитесь:

  1. Schema зафиксирована — Pydantic-модели для всех output-таблиц.
  2. Каждый API изучен — endpoints, auth, rate limits, формат ошибок.
  3. Партиционирование выбрано — не слишком мелкое, не слишком крупное.
  4. Идемпотентность продумана — что делает pipeline при повторном запуске того же часа.
  5. Структура проекта набросана — модули, тесты, конфиг.
  6. Resilience-стратегия описана — retry, timeouts, circuit breaker (если нужен).
  7. Конфигурация через .env — никаких секретов в коде.

Если что-то не понятно — это место, где вы споткнётесь во время implementation. Лучше прояснить сейчас.

Проверка знанийKnowledge check
Junior разработчик предлагает партиционировать Parquet по часу: data/raw/prices/dt=2026-05-15/hr=10/prices.parquet. Аргумент: 'так быстрее искать данные за конкретный час'. Какой главный риск этого решения?
ОтветAnswer

Итог

В этом уроке мы спроектировали ETL-pipeline до того, как написали хотя бы одну строчку кода. Зафиксировали:

  • Источники и их особенности (CoinGecko + GitHub, разные auth, разные rate-limit-статусы).
  • ETL-слои (Extract -> Transform -> Load) с явными контрактами.
  • Структуру проекта (src-layout с разделением на clients/transforms/storage).
  • Schema через Pydantic с frozen-моделями и Decimal для денег.
  • Партиционирование Parquet by date с overwrite-стратегией.
  • Идемпотентность через state file.
  • Конфигурацию через .env + pydantic-settings.

В следующем уроке — implementation: реальный код, async httpx, tenacity для retry, pyarrow для Parquet. Без сюрпризов, потому что план есть.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 6. Junior разработчик предлагает партиционировать Parquet по часу: data/raw/prices/dt=2026-05-15/hr=10/prices.parquet. В чём главный риск этого подхода?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4