Learning Platform
Урок 11.01 · 40 мин
Начальный
ETLCapstonePipelineArchitectureProduction
Capstone DE: end-to-end pipeline — ingestion to serving SQL Capstone: аналитические запросы к данным, которые загрузил DE REST API Capstone: multi-source ETL pipeline через REST API Git Capstone: первый день DE — clone, ticket, PR cycle

Зачем этот урок

Восемь модулей назад мы открыли пустую папку и поставили uv. С тех пор каждый урок добавлял по одному инструменту:

dependency manager
, типы, генераторы, валидация, форматы файлов, HTTP, БД, pandas, тесты. По отдельности каждый — полезный, но в реальной работе вас никогда не попросят «напиши, пожалуйста, генератор». Вас попросят: «вот публичный API, нам нужны данные оттуда в нашем хранилище каждый день, идемпотентно, с тестами и CI».

Этот урок — про то, как все восемь модулей складываются в одно. Мы не учим ничего нового. Мы берём всё, что уже есть в голове, и собираем end-to-end

ETL
-пайплайн, который реально работает и который не стыдно положить в GitHub-профиль.

Сценарий — типичный. Есть публичный API (мы возьмём

GitHub Issues API
), есть локальный Postgres, между ними нужно построить inkrementellный pipeline: каждый запуск дотягивает новые issues с момента последнего успешного запуска, складирует их сначала как raw JSONL.gz (landing zone), потом нормализует в Parquet (analytical layer), потом мержит в Postgres-таблицу через staging + upsert (serving layer). Сверху — тесты, CI, упаковка в pip-installable пакет с console-script.

Этот урок — теоретический обзор и code-snippets. Реальный код, который вы соберёте руками — в labs/ (отдельная лаба после курса).

Что мы строим

Архитектура pipeline

Один прогон команды etl-pipeline проходит шесть этапов слева направо. Каждый этап — отдельный модуль Python-пакета. Между этапами — артефакты на диске (JSONL.gz, Parquet) и таблицы в Postgres.

ИсточникGitHub APIREST API, pagination через Link header
api_client.pyhttpx + tenacityМ06 урок 02 и 04
extract.pypaginate generatorМ03 урок 01 + М06 урок 03
ValidatePydantic v2М04 урок 03
models.pyIssue / Repo / LabelСхема ответа API
raw inputdict[str, Any]Что прилетело по сети
LandJSONL.gzМ05 уроки 02 и 05
archive layerdata/raw/dt=.../Hive-partitioning по дате
свойствоimmutableСырьё, никогда не правим
TransformpandasМ07 уроки 01-04
parquet outputdata/curated/Partition by event_date
dtypesфикс через PyArrowМ07 урок 01
LoadPostgres upsertМ06 уроки 05-06
staging tableCOPY FROM parquetpsycopg3 COPY, M06 урок 05
mergeON CONFLICT DO UPDATEИдемпотентный upsert

Параллельно сквозь все этапы идёт три «обвязки», которые тоже разбирали:

  • Logging:
    structlog
    на каждом этапе пишет JSON-строки в stdout (М03 урок 06). Все поля (этап, repo, page, count) — отдельные ключи, не вкомпиленные в текст. На production это значит: можно в Grafana построить «среднее время extract по дням» одним запросом.
  • Settings: один объект Settings через
    pydantic-settings
    (М01 урок 04, М06 урок 07). Все секреты — SecretStr, никогда не логируются. Pipeline стартует с одной валидной конфигурации или с понятной ошибкой.
  • Tests: на каждый модуль — pytest-тесты с моками (М08). HTTP-клиент гоняется через httpx.MockTransport, Postgres-load — через pytest-postgresql. CI на GitHub Actions запускает ruff + mypy + pytest на каждый PR.

Структура проекта

Раскладка строго по src-layout, который мы делали в М01/05. Это даёт pip-installable пакет с console-script, который ничем не отличается от того, что ставят инженеры на работе:

etl-project/
├── pyproject.toml              # манифест: deps, scripts, ruff, mypy, pytest config
├── uv.lock                     # locked deps (commit it!)
├── .env.example                # шаблон с пустыми значениями
├── .gitignore
├── README.md
├── docker-compose.yml          # локальный Postgres для разработки
├── .github/
│   └── workflows/
│       └── ci.yml              # ruff + mypy + pytest на каждый push
├── src/
│   └── etl/
│       ├── __init__.py
│       ├── __main__.py         # `python -m etl` точка входа
│       ├── settings.py         # pydantic-settings конфиг
│       ├── api_client.py       # httpx.Client + retries
│       ├── models.py           # Pydantic-модели Issue / Repo / Label
│       ├── extract.py          # paginate-generator API → dict
│       ├── transform.py        # pandas: JSONL → Parquet
│       ├── load.py             # Postgres staging + upsert
│       ├── pipeline.py         # orchestration: соединяет все этапы
│       └── logging_config.py   # structlog config
├── tests/
│   ├── __init__.py
│   ├── conftest.py             # фикстуры: settings, tmp_path, MockTransport
│   ├── test_api_client.py
│   ├── test_models.py
│   ├── test_extract.py
│   ├── test_transform.py
│   ├── test_load.py
│   └── test_pipeline_smoke.py  # end-to-end на мокированном API
└── data/                       # gitignored
    ├── raw/                    # JSONL.gz, archive layer
    └── curated/                # Parquet, analytical layer

В этой структуре нет ничего лишнего. Каждая папка обоснована, каждый файл — отдельная ответственность. В production-командах раскладка отличается только размером: добавляют dags/ для Airflow, dbt/ для SQL-трансформаций, infra/ для terraform. Но «ядро» Python-пакета выглядит ровно так.

Ключевые файлы

Дальше — реальные production-quality сниппеты, не «for demo only». Имеет смысл прочитать как образец того, как ваш собственный код должен выглядеть к концу курса.

settings.py

Одна точка валидации конфигурации, как в уроке М06/07:

from pathlib import Path

from pydantic import Field, SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        env_prefix="ETL_",
        case_sensitive=False,
        extra="ignore",
    )

    # GitHub API
    github_token: SecretStr = Field(..., description="GitHub PAT с repo:read")
    github_repo: str = Field(..., description="owner/repo, например python/cpython")
    github_per_page: int = Field(100, ge=1, le=100)

    # Storage
    raw_dir: Path = Path("data/raw")
    curated_dir: Path = Path("data/curated")

    # Postgres
    pg_dsn: SecretStr = Field(..., description="postgresql://user:pw@host:5432/db")
    pg_target_table: str = "github_issues"

    # Watermark — с какого момента дотягивать данные
    initial_since: str = "2020-01-01T00:00:00Z"


def get_settings() -> Settings:
    return Settings()  # type: ignore[call-arg]

Здесь видны все правила сразу: префикс ETL_ отделяет переменные от чужих, SecretStr для токена и DSN (не попадёт в repr/логи), валидация диапазонов через Field(ge=, le=), Path вместо строки для папок, явный default только там, где это безопасно.

api_client.py

HTTP-клиент по канону М06/02 + М06/04:

from collections.abc import Iterator
from typing import Any

import httpx
import structlog
from tenacity import (
    retry,
    retry_if_exception,
    stop_after_attempt,
    wait_exponential_jitter,
)

from etl.settings import Settings

log = structlog.get_logger(__name__)


def _is_retriable(exc: BaseException) -> bool:
    if isinstance(exc, httpx.HTTPStatusError):
        return exc.response.status_code in {429, 500, 502, 503, 504}
    return isinstance(
        exc,
        (httpx.ConnectError, httpx.ReadTimeout, httpx.RemoteProtocolError),
    )


class GitHubClient:
    def __init__(self, settings: Settings) -> None:
        self._settings = settings
        self._client = httpx.Client(
            base_url="https://api.github.com",
            headers={
                "Authorization": f"Bearer {settings.github_token.get_secret_value()}",
                "Accept": "application/vnd.github+json",
                "X-GitHub-Api-Version": "2022-11-28",
                "User-Agent": "etl-pipeline/0.1",
            },
            timeout=httpx.Timeout(30.0, connect=5.0),
        )

    def __enter__(self) -> "GitHubClient":
        return self

    def __exit__(self, *args: object) -> None:
        self._client.close()

    @retry(
        stop=stop_after_attempt(5),
        wait=wait_exponential_jitter(initial=1, max=16),
        retry=retry_if_exception(_is_retriable),
        reraise=True,
    )
    def _get(self, url: str, params: dict[str, Any] | None = None) -> httpx.Response:
        r = self._client.get(url, params=params)
        r.raise_for_status()
        return r

    def paginate_issues(self, since: str) -> Iterator[dict[str, Any]]:
        url = f"/repos/{self._settings.github_repo}/issues"
        params: dict[str, Any] = {
            "since": since,
            "state": "all",
            "per_page": self._settings.github_per_page,
            "sort": "updated",
            "direction": "asc",
        }
        page = 1
        while True:
            r = self._get(url, params=params)
            batch = r.json()
            log.info("page_fetched", page=page, count=len(batch))
            if not batch:
                return
            yield from batch
            next_url = r.links.get("next", {}).get("url")
            if not next_url:
                return
            url = next_url
            params = None
            page += 1

Один класс — все правила вместе: контекст-менеджер (М03/02), генератор для pagination (М03/01, М06/03), retry с фильтрацией по типу ошибки (М06/04), structlog в каждой странице (М03/06), типизация на каждом методе (М04/01). Это то самое, что М06/INDEX обещал «остаться в голове».

models.py

Граница доверия между API и нашим кодом. Pydantic-валидация — как в М04/03:

from datetime import datetime

from pydantic import BaseModel, ConfigDict, Field


class Label(BaseModel):
    model_config = ConfigDict(extra="ignore")

    id: int
    name: str
    color: str


class User(BaseModel):
    model_config = ConfigDict(extra="ignore")

    id: int
    login: str


class Issue(BaseModel):
    model_config = ConfigDict(extra="ignore")

    id: int
    number: int
    title: str
    state: str = Field(pattern=r"^(open|closed)$")
    user: User
    labels: list[Label] = Field(default_factory=list)
    comments: int = Field(ge=0)
    created_at: datetime
    updated_at: datetime
    closed_at: datetime | None = None
    body: str | None = None

extra="ignore" — GitHub в одном response отдаёт 60 полей, нам нужны 10, остальные просто молча игнорируем (а не падаем на новых полях). pattern для state, ge=0 для comments — пайплайн не пустит мусорные данные на этап трансформации.

extract.py

Связка двух предыдущих — клиент даёт сырые dict, мы их валидируем и пишем в landing-zone:

import gzip
import json
from collections.abc import Iterator
from datetime import UTC, datetime
from pathlib import Path
from typing import Any

import structlog

from etl.api_client import GitHubClient
from etl.settings import Settings

log = structlog.get_logger(__name__)


def _land_path(raw_dir: Path, run_dt: datetime) -> Path:
    dt = run_dt.strftime("%Y-%m-%d")
    return raw_dir / f"dt={dt}" / f"issues-{run_dt:%Y%m%dT%H%M%SZ}.jsonl.gz"


def extract_issues(
    settings: Settings,
    since: str,
    run_dt: datetime | None = None,
) -> Path:
    run_dt = run_dt or datetime.now(UTC)
    out = _land_path(settings.raw_dir, run_dt)
    out.parent.mkdir(parents=True, exist_ok=True)

    count = 0
    with (
        GitHubClient(settings) as gh,
        gzip.open(out, "wt", encoding="utf-8") as f,
    ):
        for raw in gh.paginate_issues(since=since):
            f.write(json.dumps(raw, ensure_ascii=False) + "\n")
            count += 1

    log.info("extract_done", count=count, file=str(out), bytes=out.stat().st_size)
    return out


def iter_jsonl_gz(path: Path) -> Iterator[dict[str, Any]]:
    with gzip.open(path, "rt", encoding="utf-8") as f:
        for line in f:
            yield json.loads(line)

Архив пишем сырым, без валидации — это сознательное решение. Landing zone — это «фотография мира на момент запроса». Если GitHub в новой версии API добавил поле и наш Pydantic ему не рад, мы не должны терять данные, мы должны иметь возможность через неделю прийти, поправить модель и переиграть всё с нуля.

Валидация (через models.Issue) случится на следующем этапе — в transform.py. И тогда ошибки логируются, плохие строки складываются в quarantine-папку, но архив остаётся нетронутым.

transform.py

JSONL.gz → Parquet через pandas, по правилам М07/01 и М07/04:

from collections.abc import Iterable
from datetime import UTC, datetime
from pathlib import Path
from typing import Any

import pandas as pd
import structlog
from pydantic import ValidationError

from etl.extract import iter_jsonl_gz
from etl.models import Issue
from etl.settings import Settings

log = structlog.get_logger(__name__)


def _validate(rows: Iterable[dict[str, Any]]) -> Iterable[dict[str, Any]]:
    ok = bad = 0
    for raw in rows:
        try:
            issue = Issue.model_validate(raw)
        except ValidationError as e:
            bad += 1
            log.warning("issue_invalid", id=raw.get("id"), errors=e.errors())
            continue
        ok += 1
        # Flatten для табличной модели
        yield {
            "id": issue.id,
            "number": issue.number,
            "title": issue.title,
            "state": issue.state,
            "user_id": issue.user.id,
            "user_login": issue.user.login,
            "labels": ",".join(l.name for l in issue.labels) or None,
            "comments": issue.comments,
            "created_at": issue.created_at,
            "updated_at": issue.updated_at,
            "closed_at": issue.closed_at,
        }
    log.info("validation_done", ok=ok, bad=bad)


def transform(raw_path: Path, settings: Settings) -> Path:
    df = pd.DataFrame(_validate(iter_jsonl_gz(raw_path)))
    if df.empty:
        log.warning("transform_empty", raw=str(raw_path))
        return raw_path  # placeholder

    df = df.astype(
        {
            "id": "int64",
            "number": "int64",
            "state": "category",
            "user_id": "int64",
            "comments": "int64",
        }
    )
    df["event_date"] = df["updated_at"].dt.tz_convert("UTC").dt.date

    run_dt = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ")
    out = settings.curated_dir / f"issues-{run_dt}.parquet"
    out.parent.mkdir(parents=True, exist_ok=True)
    df.to_parquet(
        out,
        engine="pyarrow",
        compression="zstd",
        index=False,
    )
    log.info("transform_done", file=str(out), rows=len(df))
    return out

Если в архиве 100k issues, валидация и трансформация не держат всё в памяти потоково — _validate это генератор. В DataFrame материализуется один раз только на финальный to_parquet. Для миллионов строк есть chunksize-вариант (М07/04), здесь упрощено для читаемости.

load.py

Загрузка в Postgres по М06/05 + М06/06:

from pathlib import Path

import pandas as pd
import psycopg
import structlog
from psycopg import sql

from etl.settings import Settings

log = structlog.get_logger(__name__)


DDL = """
CREATE TABLE IF NOT EXISTS {table} (
    id          BIGINT PRIMARY KEY,
    number      BIGINT NOT NULL,
    title       TEXT NOT NULL,
    state       TEXT NOT NULL,
    user_id     BIGINT NOT NULL,
    user_login  TEXT NOT NULL,
    labels      TEXT,
    comments    BIGINT NOT NULL,
    created_at  TIMESTAMPTZ NOT NULL,
    updated_at  TIMESTAMPTZ NOT NULL,
    closed_at   TIMESTAMPTZ,
    event_date  DATE NOT NULL,
    loaded_at   TIMESTAMPTZ DEFAULT NOW()
)
"""

UPSERT = """
INSERT INTO {table} (
    id, number, title, state, user_id, user_login, labels,
    comments, created_at, updated_at, closed_at, event_date
)
SELECT id, number, title, state, user_id, user_login, labels,
       comments, created_at, updated_at, closed_at, event_date
FROM {staging}
ON CONFLICT (id) DO UPDATE SET
    title       = EXCLUDED.title,
    state       = EXCLUDED.state,
    labels      = EXCLUDED.labels,
    comments    = EXCLUDED.comments,
    updated_at  = EXCLUDED.updated_at,
    closed_at   = EXCLUDED.closed_at,
    event_date  = EXCLUDED.event_date,
    loaded_at   = NOW()
WHERE {table}.updated_at < EXCLUDED.updated_at
"""


def load(parquet_path: Path, settings: Settings) -> int:
    df = pd.read_parquet(parquet_path)
    target = sql.Identifier(settings.pg_target_table)
    staging = sql.Identifier(f"_stg_{settings.pg_target_table}")

    with psycopg.connect(settings.pg_dsn.get_secret_value()) as conn:
        with conn.cursor() as cur:
            cur.execute(sql.SQL(DDL).format(table=target))
            cur.execute(
                sql.SQL("CREATE TEMP TABLE {staging} (LIKE {table} INCLUDING ALL)")
                .format(staging=staging, table=target)
            )
            cols = [
                "id", "number", "title", "state", "user_id", "user_login",
                "labels", "comments", "created_at", "updated_at", "closed_at",
                "event_date",
            ]
            copy_sql = sql.SQL("COPY {staging} ({cols}) FROM STDIN").format(
                staging=staging,
                cols=sql.SQL(", ").join(map(sql.Identifier, cols)),
            )
            with cur.copy(copy_sql) as copy:
                for row in df[cols].itertuples(index=False, name=None):
                    copy.write_row(row)

            cur.execute(sql.SQL(UPSERT).format(table=target, staging=staging))
            affected = cur.rowcount
        conn.commit()

    log.info("load_done", affected=affected, file=str(parquet_path))
    return affected

Идемпотентность через staging + ON CONFLICT DO UPDATE WHERE updated_at < EXCLUDED.updated_at — если pipeline дёрнули дважды, второй раз ничего не сломает и не задвоит. Условие на updated_at дополнительно защищает от race condition: если две параллельные джобы попытаются обновить ту же запись, выиграет та, у которой данные новее.

pipeline.py

Оркестрация — соединяем три этапа в один прогон:

from datetime import UTC, datetime

import structlog

from etl.extract import extract_issues
from etl.load import load
from etl.logging_config import setup_logging
from etl.settings import get_settings
from etl.transform import transform

log = structlog.get_logger(__name__)


def run() -> None:
    setup_logging()
    settings = get_settings()
    started_at = datetime.now(UTC)

    log.info(
        "pipeline_start",
        repo=settings.github_repo,
        since=settings.initial_since,
    )
    try:
        raw = extract_issues(settings, since=settings.initial_since, run_dt=started_at)
        curated = transform(raw, settings)
        affected = load(curated, settings)
    except Exception:
        log.exception("pipeline_failed")
        raise

    log.info(
        "pipeline_done",
        affected=affected,
        duration_sec=(datetime.now(UTC) - started_at).total_seconds(),
    )


if __name__ == "__main__":
    run()

В production вместо settings.initial_since будет водомерный знак — последний успешно обработанный updated_at из таблицы или из отдельного _etl_state. Это даст инкрементальную загрузку: каждый запуск дотягивает только новые/изменённые issues. Подробно про watermark — в М03/05.

И последнее — точка входа console-script в pyproject.toml:

[project]
name = "etl"
version = "0.1.0"
requires-python = ">=3.13"
dependencies = [
    "httpx>=0.27",
    "pandas>=2.2",
    "psycopg[binary]>=3.2",
    "pyarrow>=17",
    "pydantic>=2.8",
    "pydantic-settings>=2.4",
    "structlog>=24.4",
    "tenacity>=9.0",
]

[project.optional-dependencies]
dev = [
    "mypy>=1.11",
    "pytest>=8.3",
    "pytest-cov>=5.0",
    "pytest-postgresql>=6.1",
    "ruff>=0.6",
]

[project.scripts]
etl-pipeline = "etl.pipeline:run"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

После uv sync в окружении доступна команда etl-pipeline — как git, psql или любая другая CLI. Это и есть «упаковали в Python-пакет» из обещания во вступительном уроке.

test_extract.py

Тестирование HTTP-клиента через httpx.MockTransport (М08):

import gzip
import json
from pathlib import Path

import httpx
import pytest

from etl.api_client import GitHubClient
from etl.extract import extract_issues
from etl.settings import Settings


@pytest.fixture
def settings(tmp_path: Path) -> Settings:
    return Settings(
        github_token="x",  # type: ignore[arg-type]
        github_repo="python/cpython",
        raw_dir=tmp_path / "raw",
        curated_dir=tmp_path / "curated",
        pg_dsn="postgresql://x@localhost/x",  # type: ignore[arg-type]
    )


def _make_handler(pages: list[list[dict]]) -> httpx.MockTransport:
    state = {"i": 0}

    def handler(request: httpx.Request) -> httpx.Response:
        i = state["i"]
        state["i"] += 1
        headers = {}
        if i + 1 < len(pages):
            headers["Link"] = (
                f'<https://api.github.com/next?page={i + 2}>; rel="next"'
            )
        return httpx.Response(200, json=pages[i], headers=headers)

    return httpx.MockTransport(handler)


def test_extract_writes_jsonl_gz(
    settings: Settings, monkeypatch: pytest.MonkeyPatch
) -> None:
    pages = [
        [{"id": 1, "number": 1, "title": "a", "state": "open"}],
        [{"id": 2, "number": 2, "title": "b", "state": "closed"}],
    ]

    original_init = GitHubClient.__init__

    def patched_init(self: GitHubClient, s: Settings) -> None:
        original_init(self, s)
        self._client = httpx.Client(transport=_make_handler(pages))

    monkeypatch.setattr(GitHubClient, "__init__", patched_init)

    out = extract_issues(settings, since="2020-01-01T00:00:00Z")
    lines = gzip.decompress(out.read_bytes()).decode().splitlines()
    assert [json.loads(l)["id"] for l in lines] == [1, 2]

Без реальной сети, без токена, без таймаутов. Тест запускается на CI за 100 миллисекунд, проверяет именно ту логику, которую написали — pagination, запись в JSONL.gz, ленивость генератора.

.github/workflows/ci.yml

Качество кода на каждом push’е:

name: CI

on:
  push:
    branches: [main]
  pull_request:

jobs:
  quality:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: astral-sh/setup-uv@v3
        with:
          enable-cache: true
      - name: Install Python
        run: uv python install 3.13
      - name: Install deps
        run: uv sync --frozen --extra dev
      - name: Lint
        run: uv run ruff check src tests
      - name: Format check
        run: uv run ruff format --check src tests
      - name: Type check
        run: uv run mypy src
      - name: Tests
        run: uv run pytest -v --cov=etl --cov-report=term-missing

Три гейта — ruff, mypy, pytest. Без них PR не сольётся. Junior, который умеет настроить такой CI в новом репо за 15 минут, уже отличается от среднего junior’а из бутлега.

Какой модуль какую часть закрывает

Pipeline → Lessons traceability

Каждый компонент пайплайна — это упражнение, которое мы уже разобрали в одном из модулей. Capstone-проект — это интегральный экзамен по всему курсу.

Компонентpyproject.toml + uv syncdep manager, lockfile, console-script
УрокиМ01/01-02uv-deep + pyproject-toml
Компонентruff + mypy + pre-commitlint, format, type check
УрокиМ01/03 + М04/04ruff-precommit + mypy
Компонентsettings.py через pydantic-settings.env, SecretStr, loud-fail
УрокиМ01/04 + М06/07env-settings + secrets
Компонентsrc/etl/ layout + console-scriptpip-installable пакет
УрокиМ01/05src-layout
Компонентmodels.py (Issue, User, Label)Pydantic v2 на границе
УрокиМ04/01-03type-hints, dataclasses, pydantic
Компонентextract.py (paginate generator)ленивый итератор
УрокиМ03/01 + М06/03generators + pagination
Компонентapi_client.py (retries, timeouts)HTTP-обвязка
УрокиМ06/02 + М06/04httpx + tenacity
КомпонентJSONL.gz landing layerarchive, immutable
УрокиМ05/02 + М05/05JSONL + gzip
Компонентtransform.py (pandas → Parquet)вычисления + columnar storage
УрокиМ07/01-04 + М05/04pandas + pyarrow
Компонентload.py (Postgres + COPY + upsert)staging + ON CONFLICT
УрокиМ06/05-06psycopg3 + SQLAlchemy
Компонентlogging_config.py (structlog)JSON-логи, 12-factor
УрокиМ03/06structlog
Компонентtests/ + MockTransport + pytest-postgresqlpure unit + integration
УрокиМ08/01-04testing module
Компонентci.yml на GitHub Actionslint + types + tests на каждый PR
УрокиМ08 + М04/04CI настройка

Каждая строчка из этой таблицы — упражнение, которое вы уже сделали. Capstone — это просто сборка их в одно.

Как запустить локально

Полная пошаговая инструкция «с нуля», без рукомашества. Делается за пятнадцать минут.

1. Клонируем и заходим в проект.

git clone https://github.com/your-org/etl-project.git
cd etl-project

2. Ставим зависимости через uv.

uv нужно поставить один раз на машину — curl -LsSf https://astral.sh/uv/install.sh | sh (детали в М01/01). Дальше:

uv sync --extra dev

Эта команда сама поставит Python 3.13, создаст .venv/ и заполнит её ровно тем, что в uv.lock — детерминированно, как в CI.

3. Готовим .env.

cp .env.example .env

Файл .env сейчас выглядит так:

ETL_GITHUB_TOKEN=ghp_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
ETL_GITHUB_REPO=python/cpython
ETL_PG_DSN=postgresql://etl:etl@localhost:5432/etl

GitHub PAT (Personal Access Token) делается в Settings → Developer settings → Tokens, права public_repo достаточно для публичных репозиториев. Без токена API ограничит вас 60 запросами в час — на «cpython» закончится на первой странице.

4. Поднимаем Postgres локально через docker-compose.

docker compose up -d postgres

docker-compose.yml рядом с проектом — самый базовый: контейнер postgres:17-alpine, открытый порт 5432, пользователь etl/etl, база etl. На production здесь, конечно, будет managed Postgres (RDS, CloudSQL, neon), но локально это даёт идентичную среду.

5. Запускаем pipeline.

uv run etl-pipeline

В stdout полетят JSON-логи в реальном времени:

{"event": "pipeline_start", "repo": "python/cpython", "since": "2020-01-01T00:00:00Z", "level": "info", "timestamp": "2026-05-13T10:23:01Z"}
{"event": "page_fetched", "page": 1, "count": 100, "level": "info", "timestamp": "2026-05-13T10:23:02Z"}
{"event": "page_fetched", "page": 2, "count": 100, ...}
...
{"event": "extract_done", "count": 4823, "file": "data/raw/dt=2026-05-13/issues-20260513T102301Z.jsonl.gz", ...}
{"event": "validation_done", "ok": 4823, "bad": 0, ...}
{"event": "transform_done", "file": "data/curated/issues-20260513T102612Z.parquet", "rows": 4823, ...}
{"event": "load_done", "affected": 4823, ...}
{"event": "pipeline_done", "affected": 4823, "duration_sec": 412.7, ...}

6. Проверяем результат в Postgres.

docker compose exec postgres psql -U etl -d etl -c "
  SELECT state, COUNT(*) FROM github_issues GROUP BY state ORDER BY state;
"

Получаем что-то вроде:

 state  | count
--------+-------
 closed |  4621
 open   |   202

7. Запускаем тесты.

uv run pytest -v

Все зелёные — pipeline production-ready.

TIP

Запустите pipeline дважды подряд. Второй раз он не задвоит ни одной записи и в логах вы увидите affected: 0 — это и есть идемпотентность через ON CONFLICT DO UPDATE WHERE updated_at < EXCLUDED.updated_at. Идемпотентность — главное свойство production ETL, без которого Airflow-перезапуски убивают данные.

Зачем это всё в портфолио

Прямо сейчас, после восьми модулей и этого capstone, у вас на руках готовый артефакт для собеседования на Junior Data Engineer. Это не «hello world» и не «todo-app в Django» — это тот самый код, который ваш будущий тимлид пишет каждый день.

Что конкретно вы можете показать на интервью:

  • На вопрос «как ты управляешь зависимостями» — открываете pyproject.toml, объясняете per-pin философию, uv.lock, console-scripts. Это сильнее, чем 90% других кандидатов уровня junior.
  • На вопрос «как ты обкладываешь HTTP-вызовы» — показываете api_client.py. Перечисляете: timeouts, raise_for_status, retry-фильтр по типу ошибки, exponential backoff с jitter, structlog в каждом запросе. Уже сильнее middle.
  • На вопрос «как ты валидируешь данные на входе»models.py с Pydantic. Объясняете границу «внутри / на границе», extra="ignore", паттерны для state, что плохая запись логируется и идёт в quarantine, а не валит пайплайн.
  • На вопрос «как сделать идемпотентным» — staging table + ON CONFLICT DO UPDATE WHERE updated_at-based. Объясняете, почему INSERT ... ON CONFLICT DO NOTHING не годится для slowly changing dimensions.
  • На вопрос «а тесты у тебя есть»tests/ с MockTransport, pytest-postgresql, conftest. CI запускается на каждый push. Coverage — больше 80%.

Это всё уже у вас есть после курса. Главное — не закопать репозиторий в private, а сделать публичным и положить ссылку в резюме. Один работающий ETL на GitHub стоит десяти строк «знаю Python» в CV.

Куда расти после курса

Capstone-проект — это потолок Junior. Дальше начинаются области, которые в этом курсе принципиально не разбирались, потому что без основы они не имеют смысла.

Airflow — оркестрация. Сейчас наш pipeline запускается руками или по cron, без зависимостей между задачами, без retry на уровне task’а, без UI и без истории запусков. На реальной работе DAG в Airflow дёргает etl-pipeline каждый день в 5 утра, ждёт upstream-таблицу, ретраит при падении, аллертит в Slack при сбое. Следующий шаг — курс по Airflow.

dbt — transformation as code. Наш transform.py делает простой flatten Issue → DataFrame. На production трансформации измеряются десятками SQL-моделей с зависимостями, тестами на колонки (unique, not null, accepted_values), документацией, lineage-графом. Это всё делает dbt, и он стандарт индустрии.

Spark / Flink — большие данные. Pandas хороший до десятков гигабайт. Когда у вас сотни терабайт ивентов в Kafka — приходят Spark Streaming или Flink, они масштабируются горизонтально на сотни worker’ов. Но синтаксис там сильно похож на pandas/SQL — основа из этого курса остаётся.

Python 02 — Middle-уровень. Здесь были основы языка для DE-задач. На middle: внутренности CPython (refcount, GIL, GIL-free 3.13t, bytecode), async (asyncio, trio, httpx async), метаклассы, дескрипторы, Protocol/TypeVar/Generic, performance-профилирование, написание pandas-плагинов, polars в глубину. Без основы из Python 01 туда лезть рано — будет «вкусные паттерны без понимания, зачем».

Что параллельно: SQL 01 (если ещё не пройден), SQL 02 / internals — внутренности Postgres и индексов. DE-инженер без SQL не существует — половина проблем решается одним хорошо написанным запросом, минуя весь Python.

Финал

Восемь модулей назад мы открыли пустую папку. Сейчас у вас — pip-installable пакет, который дёргает API с retry-логикой, валидирует через Pydantic, складывает архив в JSONL.gz, конвертит в Parquet, загружает в Postgres идемпотентно, покрыт тестами, запускается из CI. Каждая строчка обоснована — вы знаете, почему она написана именно так, а не иначе.

Этого достаточно для оффера на Junior Data Engineer. Дальше — практика. Сделайте свою копию этого pipeline’а, но для другого API (Stack Overflow, Hacker News, GitLab, Telegram). Положите в GitHub. Через месяц поправьте, что вам не понравится. Через три — посмотрите на код, который писали в начале, и поймёте, насколько вы выросли.

В лабораторной части курса — labs/ — будет несколько больших задач именно по этому шаблону. Туда и идём.

Спасибо за курс. До встречи на Python 02 — когда захотите узнать, как этот же pipeline ускорить в десять раз через asyncio, или зачем __slots__, или почему GIL в Python 3.13 уже не такой страшный, как был.

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 1