Learning Platform
Глоссарий Troubleshooting
Урок 15.02 · 35 мин
Начальный
httpxtenacitypyarrowPydanticasyncioimplementation

Capstone: реализация ETL — httpx, tenacity, Pydantic, pyarrow

В прошлом уроке мы построили дизайн pipeline. Теперь — реальный код. Не псевдокод, а то, что можно запустить, протестировать, поставить в cron и забыть на месяц. Этот урок — самый «инженерный» во всём курсе. Если вы прошли его внимательно, вы знаете, как выглядит production-grade Python ETL в 2026 году.

В уроке: pyproject.toml, конфиг через pydantic-settings, async-клиенты на httpx, retry через tenacity, валидация через Pydantic, запись в Parquet через pyarrow, orchestrator в run.py.


Type hints в Python: аннотации, mypy, pydantic

pyproject.toml — корень проекта

Начинаем с описания зависимостей. PEP 621 + PEP 660 — современный способ задания пакета.

[project]
name = "crypto-etl"
version = "0.1.0"
description = "Multi-source ETL pipeline: CoinGecko + GitHub -> Parquet"
requires-python = ">=3.13"
dependencies = [
    "httpx>=0.28,<0.29",
    "tenacity>=9.0,<10.0",
    "pydantic>=2.9,<3.0",
    "pydantic-settings>=2.5,<3.0",
    "pyarrow>=17.0,<18.0",
    "loguru>=0.7,<1.0",
    "pybreaker>=1.2,<2.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=8.3,<9.0",
    "pytest-asyncio>=0.24,<1.0",
    "respx>=0.21,<1.0",
    "vcrpy>=6.0,<7.0",
    "pytest-recording>=0.13,<1.0",
    "ruff>=0.7,<1.0",
    "mypy>=1.13,<2.0",
    "coverage>=7.6,<8.0",
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src/crypto_etl"]

[tool.ruff]
line-length = 100
target-version = "py313"

[tool.ruff.lint]
select = ["E", "F", "I", "N", "UP", "B", "A", "C4", "SIM"]

[tool.mypy]
python_version = "3.13"
strict = true
files = ["src/crypto_etl"]

[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"

Установка: pip install -e ".[dev]". После этого crypto_etl импортируется отовсюду, изменения подхватываются без переустановки (editable install).


Конфигурация: src/crypto_etl/config.py

from functools import lru_cache
from pathlib import Path
from pydantic import Field, SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        case_sensitive=False,
    )

    coingecko_api_key: SecretStr
    github_token: SecretStr
    data_dir: Path = Path("./data")
    log_level: str = "INFO"

    coingecko_base_url: str = "https://api.coingecko.com/api/v3"
    github_base_url: str = "https://api.github.com"

    coins_to_track: list[str] = Field(default_factory=lambda: [
        "bitcoin", "ethereum", "solana", "cardano",
    ])
    repos_to_track: list[str] = Field(default_factory=lambda: [
        "bitcoin/bitcoin",
        "ethereum/go-ethereum",
        "solana-labs/solana",
    ])

    request_timeout_seconds: float = 30.0
    max_retries: int = 5
    backoff_base_seconds: float = 2.0
    backoff_max_seconds: float = 60.0


@lru_cache(maxsize=1)
def get_settings() -> Settings:
    return Settings()

Ключевые детали:

  • SecretStr для токенов — Pydantic не печатает секрет в repr/str. Если случайно залогируете объект Settings, токен останется скрыт.
  • lru_cache на get_settings — чтобы не парсить .env каждый раз.
  • Дефолты для всего, кроме секретов — чтобы можно было запустить без .env (упадёт только на validation секретов).

Pydantic-модели: src/crypto_etl/models.py

from datetime import datetime
from decimal import Decimal
from pydantic import BaseModel, ConfigDict, Field


class PriceRecord(BaseModel):
    model_config = ConfigDict(frozen=True)

    coin_id: str = Field(..., min_length=1, max_length=50)
    ts: datetime
    price_usd: Decimal = Field(..., gt=0)
    market_cap_usd: Decimal | None = Field(None, ge=0)
    volume_24h_usd: Decimal | None = Field(None, ge=0)
    fetched_at: datetime


class RepoRecord(BaseModel):
    model_config = ConfigDict(frozen=True)

    full_name: str = Field(..., pattern=r"^[\w.-]+/[\w.-]+$")
    stargazers: int = Field(..., ge=0)
    forks: int = Field(..., ge=0)
    open_issues: int = Field(..., ge=0)
    default_branch: str = Field(..., min_length=1)
    fetched_at: datetime


class ActivityRecord(BaseModel):
    model_config = ConfigDict(frozen=True)

    full_name: str = Field(..., pattern=r"^[\w.-]+/[\w.-]+$")
    week_start: datetime
    commits: int = Field(..., ge=0)
    fetched_at: datetime

pattern=r"^[\w.-]+/[\w.-]+$" для full_name — если кто-то передаст bitcoin без /, валидация упадёт. Это ловит ошибки рано.


HTTP-клиенты с retry

Базовый retry-декоратор: src/crypto_etl/clients/_retry.py

import httpx
from tenacity import (
    AsyncRetrying,
    RetryError,
    retry,
    stop_after_attempt,
    wait_exponential,
    wait_random,
    retry_if_exception_type,
    before_sleep_log,
)
from loguru import logger


class RateLimitError(Exception):
    """API сообщил, что мы упёрлись в лимит."""

    def __init__(self, retry_after: int):
        self.retry_after = retry_after
        super().__init__(f"Rate limit, retry after {retry_after}s")


class TransientError(Exception):
    """Временная ошибка, имеет смысл повторить."""


def is_rate_limit(response: httpx.Response) -> bool:
    """CoinGecko: 429. GitHub: 403 + 'rate limit' в body."""
    if response.status_code == 429:
        return True
    if response.status_code == 403:
        body = response.text.lower()
        return "rate limit" in body or "abuse detection" in body
    return False


def should_retry_status(status_code: int) -> bool:
    """5xx + 429 -- повторяем. Остальное -- не наша головная боль."""
    return status_code >= 500 or status_code == 429


retry_decorator = retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=2, min=2, max=60) + wait_random(0, 1),
    retry=retry_if_exception_type(
        (httpx.ConnectError, httpx.ReadTimeout, httpx.RemoteProtocolError, TransientError)
    ),
    before_sleep=before_sleep_log(logger, "WARNING"),
    reraise=True,
)

wait_exponential(multiplier=2, min=2, max=60) означает паузы 2s, 4s, 8s, 16s, 32s (capped at 60s). + wait_random(0, 1) добавляет jitter — случайные 0-1 секунды поверх. Без jitter все клиенты, упавшие одновременно, будут стучаться к API в одни и те же моменты — типичный thundering herd.

reraise=True — после исчерпания попыток tenacity пробросит последнее исключение, а не свой RetryError. Это важно: вызывающий код видит реальную причину, а не обёртку.


CoinGecko-клиент: src/crypto_etl/clients/coingecko.py

from typing import Self

import httpx
from loguru import logger

from crypto_etl.clients._retry import (
    RateLimitError,
    TransientError,
    is_rate_limit,
    retry_decorator,
    should_retry_status,
)


class CoinGeckoClient:
    def __init__(
        self,
        api_key: str,
        base_url: str = "https://api.coingecko.com/api/v3",
        timeout: float = 30.0,
    ):
        self._api_key = api_key
        self._base_url = base_url.rstrip("/")
        self._timeout = timeout
        self._client: httpx.AsyncClient | None = None

    async def __aenter__(self) -> Self:
        self._client = httpx.AsyncClient(
            base_url=self._base_url,
            headers={
                "x-cg-pro-api-key": self._api_key,
                "Accept": "application/json",
                "User-Agent": "crypto-etl/0.1",
            },
            timeout=self._timeout,
        )
        return self

    async def __aexit__(self, *exc_info) -> None:
        if self._client:
            await self._client.aclose()
            self._client = None

    @retry_decorator
    async def _get(self, path: str, params: dict | None = None) -> dict:
        assert self._client is not None, "Use as async context manager"
        response = await self._client.get(path, params=params)
        if is_rate_limit(response):
            retry_after = int(response.headers.get("Retry-After", 60))
            logger.warning("CoinGecko rate limit, retry after {}s", retry_after)
            raise RateLimitError(retry_after=retry_after)
        if should_retry_status(response.status_code):
            raise TransientError(
                f"CoinGecko {response.status_code}: {response.text[:200]}"
            )
        response.raise_for_status()
        return response.json()

    async def get_prices(self, coin_ids: list[str]) -> dict:
        return await self._get(
            "/simple/price",
            params={
                "ids": ",".join(coin_ids),
                "vs_currencies": "usd",
                "include_market_cap": "true",
                "include_24hr_vol": "true",
            },
        )

Заметили Self из typing? Это правильный возврат для __aenter__. До Python 3.11 пришлось бы писать строковую аннотацию "CoinGeckoClient" или T = TypeVar(...).

Ассерт assert self._client is not None, "Use as async context manager" — runtime-проверка для случая, когда клиент попытались использовать без async with. Лучше упасть с понятным сообщением, чем с AttributeError: NoneType has no attribute 'get'.


GitHub-клиент: src/crypto_etl/clients/github.py

from typing import Self

import httpx
from loguru import logger

from crypto_etl.clients._retry import (
    RateLimitError,
    TransientError,
    is_rate_limit,
    retry_decorator,
    should_retry_status,
)


class GitHubClient:
    def __init__(
        self,
        token: str,
        base_url: str = "https://api.github.com",
        timeout: float = 30.0,
    ):
        self._token = token
        self._base_url = base_url.rstrip("/")
        self._timeout = timeout
        self._client: httpx.AsyncClient | None = None

    async def __aenter__(self) -> Self:
        self._client = httpx.AsyncClient(
            base_url=self._base_url,
            headers={
                "Authorization": f"Bearer {self._token}",
                "Accept": "application/vnd.github+json",
                "X-GitHub-Api-Version": "2022-11-28",
                "User-Agent": "crypto-etl/0.1",
            },
            timeout=self._timeout,
            http2=True,
        )
        return self

    async def __aexit__(self, *exc_info) -> None:
        if self._client:
            await self._client.aclose()
            self._client = None

    @retry_decorator
    async def _get(self, path: str) -> dict:
        assert self._client is not None
        response = await self._client.get(path)
        if is_rate_limit(response):
            reset = int(response.headers.get("X-RateLimit-Reset", "0"))
            logger.warning("GitHub rate limit, reset at unix={}", reset)
            raise RateLimitError(retry_after=60)
        if should_retry_status(response.status_code):
            raise TransientError(
                f"GitHub {response.status_code}: {response.text[:200]}"
            )
        response.raise_for_status()
        return response.json()

    async def get_repo(self, full_name: str) -> dict:
        return await self._get(f"/repos/{full_name}")

    async def get_participation(self, full_name: str) -> dict:
        return await self._get(f"/repos/{full_name}/stats/participation")

http2=True — GitHub поддерживает HTTP/2; включение даёт мультиплексирование запросов в одном соединении. Это важно, когда тянешь данные по 10 репо подряд: вместо 10 отдельных соединений — одно с 10 параллельными streams.


Pagination loop с cursor-based

CoinGecko в /coins/markets возвращает страницы по 250 записей. GitHub во многих endpoint использует Link header (RFC 5988). Для capstone мы пагинацию явно не используем (тянем фиксированный список монет/репо), но добавим helper для будущего.

# src/crypto_etl/clients/_pagination.py
from typing import AsyncIterator

import httpx


async def paginate_link(
    client: httpx.AsyncClient,
    initial_path: str,
    params: dict | None = None,
) -> AsyncIterator[list[dict]]:
    """Итератор по страницам GitHub-стиля (Link header)."""
    next_url: str | None = initial_path
    next_params = params

    while next_url:
        response = await client.get(next_url, params=next_params)
        response.raise_for_status()
        page = response.json()
        if not isinstance(page, list):
            yield [page]
            return
        yield page

        link = response.headers.get("Link", "")
        next_url = _parse_next_link(link)
        next_params = None  # Link уже содержит query string


def _parse_next_link(link_header: str) -> str | None:
    """Парсит Link: <url>; rel='next', <url>; rel='last'."""
    if not link_header:
        return None
    for part in link_header.split(","):
        if 'rel="next"' in part:
            url = part.split(";", 1)[0].strip()
            return url.strip("<>")
    return None

Использование:

async for page in paginate_link(client, "/users/octocat/repos", {"per_page": 100}):
    for repo in page:
        print(repo["name"])

Transforms — pure-функции

# src/crypto_etl/transforms/prices.py
from datetime import datetime
from decimal import Decimal

from crypto_etl.models import PriceRecord


def transform_prices(raw: dict, ts: datetime, fetched_at: datetime) -> list[PriceRecord]:
    """CoinGecko response -> PriceRecord list.

    Raw shape:
    {
        "bitcoin": {"usd": 65000.0, "usd_market_cap": 1.2e12, "usd_24h_vol": 2.5e10},
        "ethereum": {"usd": 3500.0, ...}
    }
    """
    records: list[PriceRecord] = []
    for coin_id, data in raw.items():
        records.append(PriceRecord(
            coin_id=coin_id,
            ts=ts,
            price_usd=Decimal(str(data["usd"])),
            market_cap_usd=(
                Decimal(str(data["usd_market_cap"])) if data.get("usd_market_cap") else None
            ),
            volume_24h_usd=(
                Decimal(str(data["usd_24h_vol"])) if data.get("usd_24h_vol") else None
            ),
            fetched_at=fetched_at,
        ))
    return records

Decimal(str(data["usd"])) — сначала str, потом Decimal. Если делать Decimal(data["usd"]) напрямую, получите Decimal от float со всем шумом представления (Decimal('65000.000000000005820766')). Через str — точное значение Decimal('65000.0').

# src/crypto_etl/transforms/repos.py
from datetime import datetime, timedelta, timezone

from crypto_etl.models import ActivityRecord, RepoRecord


def transform_repo(raw: dict, fetched_at: datetime) -> RepoRecord:
    return RepoRecord(
        full_name=raw["full_name"],
        stargazers=raw["stargazers_count"],
        forks=raw["forks_count"],
        open_issues=raw["open_issues_count"],
        default_branch=raw["default_branch"],
        fetched_at=fetched_at,
    )


def transform_participation(
    raw: dict,
    full_name: str,
    fetched_at: datetime,
) -> list[ActivityRecord]:
    """Raw shape: {'all': [int]*52, 'owner': [int]*52}.

    Каждый элемент -- кол-во коммитов за неделю (52 недели назад -> последняя).
    """
    records: list[ActivityRecord] = []
    weeks_back = 52
    now = fetched_at.replace(hour=0, minute=0, second=0, microsecond=0)
    weekday = now.weekday()
    last_week_start = now - timedelta(days=weekday)

    for i, commits in enumerate(raw.get("all", [])):
        week_start = last_week_start - timedelta(weeks=(weeks_back - 1 - i))
        records.append(ActivityRecord(
            full_name=full_name,
            week_start=week_start.replace(tzinfo=timezone.utc) if week_start.tzinfo is None else week_start,
            commits=commits,
            fetched_at=fetched_at,
        ))
    return records

Storage: запись Parquet

# src/crypto_etl/storage/parquet_writer.py
import os
from datetime import date
from pathlib import Path
from typing import Sequence

import pyarrow as pa
import pyarrow.parquet as pq
from loguru import logger
from pydantic import BaseModel


def _records_to_table(records: Sequence[BaseModel]) -> pa.Table:
    """Pydantic models -> pyarrow.Table через model_dump."""
    if not records:
        raise ValueError("Empty records")
    rows = [r.model_dump(mode="python") for r in records]
    columns: dict[str, list] = {key: [] for key in rows[0]}
    for row in rows:
        for key, value in row.items():
            columns[key].append(value)
    return pa.table(columns)


def write_partition(
    records: Sequence[BaseModel],
    base_dir: Path,
    table_name: str,
    partition_date: date,
    file_suffix: str,
) -> Path:
    """Атомарная запись Parquet в data/raw/{table}/dt={date}/{table}_{date}_{suffix}.parquet.

    Атомарность через write-to-tmp + rename: если запись упадёт посередине,
    в финальном пути не появится битый Parquet.
    """
    if not records:
        logger.warning("write_partition called with empty records for {}", table_name)
        return base_dir

    table = _records_to_table(records)

    partition_dir = base_dir / "raw" / table_name / f"dt={partition_date.isoformat()}"
    partition_dir.mkdir(parents=True, exist_ok=True)

    final_path = partition_dir / f"{table_name}_{partition_date.isoformat()}_{file_suffix}.parquet"
    tmp_path = partition_dir / f".{final_path.name}.tmp"

    pq.write_table(
        table,
        tmp_path,
        compression="zstd",
        compression_level=3,
    )
    os.replace(tmp_path, final_path)

    logger.info(
        "Wrote {} rows to {} ({} bytes)",
        len(records),
        final_path,
        final_path.stat().st_size,
    )
    return final_path

os.replace — атомарная операция на одной файловой системе (POSIX rename(2)). Если процесс упадёт между pq.write_table и os.replace, в final_path останется старый файл (или ничего), но битого Parquet там не будет. Аналитический слой всегда видит либо полный, либо никакой файл.

compression="zstd" — современный стандарт. Жмёт лучше gzip, быстрее по CPU. Для time-series с повторами хорошо подходит.


State management

# src/crypto_etl/state.py
import json
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Iterator


def load_state(path: Path) -> dict:
    if not path.exists():
        return {}
    with open(path, encoding="utf-8") as f:
        return json.load(f)


def save_state(state: dict, path: Path) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    tmp = path.with_suffix(path.suffix + ".tmp")
    with open(tmp, "w", encoding="utf-8") as f:
        json.dump(state, f, indent=2, sort_keys=True)
    tmp.replace(path)


def hours_to_process(state: dict, max_lookback: int = 24) -> Iterator[datetime]:
    """Часы, которые нужно обработать: с last_completed_hour+1 до текущего часа.

    max_lookback ограничивает дозаполнение пропусков (если pipeline стоял неделю,
    не пытаемся догнать всё -- берём только последние 24 часа).
    """
    now = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)
    last_str = state.get("last_completed_hour")
    if last_str is None:
        yield now
        return
    last = datetime.fromisoformat(last_str)
    next_hour = last + timedelta(hours=1)
    earliest = now - timedelta(hours=max_lookback)
    if next_hour < earliest:
        next_hour = earliest

    while next_hour <= now:
        yield next_hour
        next_hour += timedelta(hours=1)

Orchestrator: run.py

import asyncio
from datetime import date, datetime, timezone

from loguru import logger

from crypto_etl.clients.coingecko import CoinGeckoClient
from crypto_etl.clients.github import GitHubClient
from crypto_etl.config import get_settings
from crypto_etl.state import hours_to_process, load_state, save_state
from crypto_etl.storage.parquet_writer import write_partition
from crypto_etl.transforms.prices import transform_prices
from crypto_etl.transforms.repos import transform_participation, transform_repo


async def pull_hour(
    hour: datetime,
    cg: CoinGeckoClient,
    gh: GitHubClient,
    settings,
) -> None:
    fetched_at = datetime.now(timezone.utc)
    suffix = hour.strftime("%H")

    raw_prices = await cg.get_prices(settings.coins_to_track)
    prices = transform_prices(raw_prices, ts=hour, fetched_at=fetched_at)
    write_partition(
        prices,
        base_dir=settings.data_dir,
        table_name="prices",
        partition_date=hour.date(),
        file_suffix=suffix,
    )

    repo_tasks = [gh.get_repo(name) for name in settings.repos_to_track]
    participation_tasks = [gh.get_participation(name) for name in settings.repos_to_track]
    raw_repos, raw_participations = await asyncio.gather(
        asyncio.gather(*repo_tasks),
        asyncio.gather(*participation_tasks),
    )

    repos = [transform_repo(r, fetched_at) for r in raw_repos]
    write_partition(
        repos,
        base_dir=settings.data_dir,
        table_name="repos",
        partition_date=hour.date(),
        file_suffix=suffix,
    )

    activity: list = []
    for full_name, raw in zip(settings.repos_to_track, raw_participations):
        activity.extend(transform_participation(raw, full_name, fetched_at))
    write_partition(
        activity,
        base_dir=settings.data_dir,
        table_name="activity",
        partition_date=hour.date(),
        file_suffix=suffix,
    )


async def main() -> None:
    settings = get_settings()
    logger.remove()
    logger.add(lambda m: print(m, end=""), level=settings.log_level)

    state_path = settings.data_dir / "state.json"
    state = load_state(state_path)
    logger.info("Starting ETL, state={}", state)

    async with (
        CoinGeckoClient(
            settings.coingecko_api_key.get_secret_value(),
            settings.coingecko_base_url,
            settings.request_timeout_seconds,
        ) as cg,
        GitHubClient(
            settings.github_token.get_secret_value(),
            settings.github_base_url,
            settings.request_timeout_seconds,
        ) as gh,
    ):
        for hour in hours_to_process(state):
            logger.info("Processing hour={}", hour.isoformat())
            try:
                await pull_hour(hour, cg, gh, settings)
            except Exception:
                logger.exception("Failed hour={}, will retry next run", hour)
                break
            state["last_completed_hour"] = hour.isoformat()
            save_state(state, state_path)
            logger.info("Hour {} completed", hour.isoformat())

    logger.info("ETL run complete")


if __name__ == "__main__":
    asyncio.run(main())

Запуск: python run.py. Cron: 0 * * * * cd /opt/crypto-etl && /opt/crypto-etl/.venv/bin/python run.py >> /var/log/crypto-etl.log 2>&1.

Что делает run.py

Поток выполнения orchestrator

run.py
state
Clients
Writer
load_statehours_to_process(state)get_prices, get_repo, get_participationwrite_partition x3 (prices, repos, activity)save_state(last_completed_hour)

Один важный момент: save_state происходит после write_partition. Если write упал — state не обновился, при следующем запуске тот же час будет повторён. Это и есть идемпотентность через state file.


Логирование

loguru — современная альтернатива стандартному logging. Без boilerplate, с человеческим API.

from loguru import logger

logger.info("Starting ETL")
logger.warning("Rate limit, retry after {}s", retry_after)
logger.exception("Failed hour={}, will retry next run", hour)

Дефолтный формат — цветной, с timestamp, level, file:line, message. Для prod добавляется JSON-output:

logger.add(
    "logs/crypto-etl-{time:YYYY-MM-DD}.json",
    serialize=True,
    rotation="1 day",
    retention="14 days",
    level="INFO",
)

Логи в JSON удобно скармливать в Loki/ELK/Datadog. Ротация — автоматически.


Чего НЕ делает наш код (намеренно)

Capstone — учебный, поэтому несколько вещей упрощены:

  • Нет реальных метрик (Prometheus). В проде — prometheus_client, экспорт счётчиков «успешных запросов», «retry-ев», «ошибок 5xx».
  • Нет distributed locks. Если cron запустит pipeline дважды одновременно (что не должно случиться, но…) — будет race на state file. В проде — Redis-lock или Airflow-runtime гарантирует single-instance.
  • Нет dead-letter queue. Если час упал и за день не починили — данные пропадают. В проде — DLQ с попыткой replay.
  • Нет S3-backend. Мы пишем в локальный диск. В проде — pyarrow.fs.S3FileSystem или явная загрузка в S3 через boto3.

Все эти расширения — линейная работа поверх существующей структуры. Главное, что слои уже разделены: добавить S3 — поменять только parquet_writer.py, всё остальное не трогается.


Проверка знанийKnowledge check
В коде `Decimal(str(price_float))` на первый взгляд избыточно -- кажется, что можно просто `Decimal(price_float)`. Почему первый вариант корректнее?
ОтветAnswer

Итог

В этом уроке мы реализовали полный pipeline. Пройдено:

  • pyproject.toml с правильными зависимостями (httpx 0.28, tenacity 9, pydantic 2.9, pyarrow 17).
  • Конфигурация через pydantic-settings + SecretStr для токенов.
  • Async-клиенты на httpx с retry-декоратором tenacity.
  • Различия в обработке rate-limit между CoinGecko (429) и GitHub (403 + body).
  • Pydantic-модели с валидацией и Decimal для денег.
  • Pure-функции transform — готовы к unit-тестированию.
  • Атомарная запись Parquet через write-to-tmp + os.replace.
  • State management для идемпотентности.
  • Orchestrator с asyncio.gather для параллелизма.
  • Логирование через loguru.

Pipeline можно запускать. Но прежде чем катить в прод — нужны тесты. О них — в следующем уроке: unit-тесты для transforms, mock через respx для clients, VCR для интеграционных, e2e dry-run, pre-commit и CI.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 6. В коде Decimal(str(price_float)) на первый взгляд избыточно. Почему этот вариант корректнее, чем Decimal(price_float)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4