Зачем этот урок
Восемь модулей назад мы открыли пустую папку и поставили uv. С тех пор каждый урок добавлял по одному инструменту:
Этот урок — про то, как все восемь модулей складываются в одно. Мы не учим ничего нового. Мы берём всё, что уже есть в голове, и собираем end-to-end
Сценарий — типичный. Есть публичный API (мы возьмём
Этот урок — теоретический обзор и code-snippets. Реальный код, который вы соберёте руками — в labs/ (отдельная лаба после курса).
Что мы строим
Один прогон команды etl-pipeline проходит шесть этапов слева направо. Каждый этап — отдельный модуль Python-пакета. Между этапами — артефакты на диске (JSONL.gz, Parquet) и таблицы в Postgres.
Параллельно сквозь все этапы идёт три «обвязки», которые тоже разбирали:
- Logging: на каждом этапе пишет JSON-строки в stdout (М03 урок 06). Все поля (этап, repo, page, count) — отдельные ключи, не вкомпиленные в текст. На production это значит: можно в Grafana построить «среднее время extract по дням» одним запросом.structlog
- Settings: один объект
Settingsчерез(М01 урок 04, М06 урок 07). Все секреты —pydantic-settingsSecretStr, никогда не логируются. Pipeline стартует с одной валидной конфигурации или с понятной ошибкой. - Tests: на каждый модуль — pytest-тесты с моками (М08). HTTP-клиент гоняется через
httpx.MockTransport, Postgres-load — черезpytest-postgresql. CI на GitHub Actions запускает ruff + mypy + pytest на каждый PR.
Структура проекта
Раскладка строго по src-layout, который мы делали в М01/05. Это даёт pip-installable пакет с console-script, который ничем не отличается от того, что ставят инженеры на работе:
etl-project/
├── pyproject.toml # манифест: deps, scripts, ruff, mypy, pytest config
├── uv.lock # locked deps (commit it!)
├── .env.example # шаблон с пустыми значениями
├── .gitignore
├── README.md
├── docker-compose.yml # локальный Postgres для разработки
├── .github/
│ └── workflows/
│ └── ci.yml # ruff + mypy + pytest на каждый push
├── src/
│ └── etl/
│ ├── __init__.py
│ ├── __main__.py # `python -m etl` точка входа
│ ├── settings.py # pydantic-settings конфиг
│ ├── api_client.py # httpx.Client + retries
│ ├── models.py # Pydantic-модели Issue / Repo / Label
│ ├── extract.py # paginate-generator API → dict
│ ├── transform.py # pandas: JSONL → Parquet
│ ├── load.py # Postgres staging + upsert
│ ├── pipeline.py # orchestration: соединяет все этапы
│ └── logging_config.py # structlog config
├── tests/
│ ├── __init__.py
│ ├── conftest.py # фикстуры: settings, tmp_path, MockTransport
│ ├── test_api_client.py
│ ├── test_models.py
│ ├── test_extract.py
│ ├── test_transform.py
│ ├── test_load.py
│ └── test_pipeline_smoke.py # end-to-end на мокированном API
└── data/ # gitignored
├── raw/ # JSONL.gz, archive layer
└── curated/ # Parquet, analytical layer
В этой структуре нет ничего лишнего. Каждая папка обоснована, каждый файл — отдельная ответственность. В production-командах раскладка отличается только размером: добавляют dags/ для Airflow, dbt/ для SQL-трансформаций, infra/ для terraform. Но «ядро» Python-пакета выглядит ровно так.
Ключевые файлы
Дальше — реальные production-quality сниппеты, не «for demo only». Имеет смысл прочитать как образец того, как ваш собственный код должен выглядеть к концу курса.
settings.py
Одна точка валидации конфигурации, как в уроке М06/07:
from pathlib import Path
from pydantic import Field, SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
env_prefix="ETL_",
case_sensitive=False,
extra="ignore",
)
# GitHub API
github_token: SecretStr = Field(..., description="GitHub PAT с repo:read")
github_repo: str = Field(..., description="owner/repo, например python/cpython")
github_per_page: int = Field(100, ge=1, le=100)
# Storage
raw_dir: Path = Path("data/raw")
curated_dir: Path = Path("data/curated")
# Postgres
pg_dsn: SecretStr = Field(..., description="postgresql://user:pw@host:5432/db")
pg_target_table: str = "github_issues"
# Watermark — с какого момента дотягивать данные
initial_since: str = "2020-01-01T00:00:00Z"
def get_settings() -> Settings:
return Settings() # type: ignore[call-arg]
Здесь видны все правила сразу: префикс ETL_ отделяет переменные от чужих, SecretStr для токена и DSN (не попадёт в repr/логи), валидация диапазонов через Field(ge=, le=), Path вместо строки для папок, явный default только там, где это безопасно.
api_client.py
HTTP-клиент по канону М06/02 + М06/04:
from collections.abc import Iterator
from typing import Any
import httpx
import structlog
from tenacity import (
retry,
retry_if_exception,
stop_after_attempt,
wait_exponential_jitter,
)
from etl.settings import Settings
log = structlog.get_logger(__name__)
def _is_retriable(exc: BaseException) -> bool:
if isinstance(exc, httpx.HTTPStatusError):
return exc.response.status_code in {429, 500, 502, 503, 504}
return isinstance(
exc,
(httpx.ConnectError, httpx.ReadTimeout, httpx.RemoteProtocolError),
)
class GitHubClient:
def __init__(self, settings: Settings) -> None:
self._settings = settings
self._client = httpx.Client(
base_url="https://api.github.com",
headers={
"Authorization": f"Bearer {settings.github_token.get_secret_value()}",
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
"User-Agent": "etl-pipeline/0.1",
},
timeout=httpx.Timeout(30.0, connect=5.0),
)
def __enter__(self) -> "GitHubClient":
return self
def __exit__(self, *args: object) -> None:
self._client.close()
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential_jitter(initial=1, max=16),
retry=retry_if_exception(_is_retriable),
reraise=True,
)
def _get(self, url: str, params: dict[str, Any] | None = None) -> httpx.Response:
r = self._client.get(url, params=params)
r.raise_for_status()
return r
def paginate_issues(self, since: str) -> Iterator[dict[str, Any]]:
url = f"/repos/{self._settings.github_repo}/issues"
params: dict[str, Any] = {
"since": since,
"state": "all",
"per_page": self._settings.github_per_page,
"sort": "updated",
"direction": "asc",
}
page = 1
while True:
r = self._get(url, params=params)
batch = r.json()
log.info("page_fetched", page=page, count=len(batch))
if not batch:
return
yield from batch
next_url = r.links.get("next", {}).get("url")
if not next_url:
return
url = next_url
params = None
page += 1
Один класс — все правила вместе: контекст-менеджер (М03/02), генератор для pagination (М03/01, М06/03), retry с фильтрацией по типу ошибки (М06/04), structlog в каждой странице (М03/06), типизация на каждом методе (М04/01). Это то самое, что М06/INDEX обещал «остаться в голове».
models.py
Граница доверия между API и нашим кодом. Pydantic-валидация — как в М04/03:
from datetime import datetime
from pydantic import BaseModel, ConfigDict, Field
class Label(BaseModel):
model_config = ConfigDict(extra="ignore")
id: int
name: str
color: str
class User(BaseModel):
model_config = ConfigDict(extra="ignore")
id: int
login: str
class Issue(BaseModel):
model_config = ConfigDict(extra="ignore")
id: int
number: int
title: str
state: str = Field(pattern=r"^(open|closed)$")
user: User
labels: list[Label] = Field(default_factory=list)
comments: int = Field(ge=0)
created_at: datetime
updated_at: datetime
closed_at: datetime | None = None
body: str | None = None
extra="ignore" — GitHub в одном response отдаёт 60 полей, нам нужны 10, остальные просто молча игнорируем (а не падаем на новых полях). pattern для state, ge=0 для comments — пайплайн не пустит мусорные данные на этап трансформации.
extract.py
Связка двух предыдущих — клиент даёт сырые dict, мы их валидируем и пишем в landing-zone:
import gzip
import json
from collections.abc import Iterator
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import structlog
from etl.api_client import GitHubClient
from etl.settings import Settings
log = structlog.get_logger(__name__)
def _land_path(raw_dir: Path, run_dt: datetime) -> Path:
dt = run_dt.strftime("%Y-%m-%d")
return raw_dir / f"dt={dt}" / f"issues-{run_dt:%Y%m%dT%H%M%SZ}.jsonl.gz"
def extract_issues(
settings: Settings,
since: str,
run_dt: datetime | None = None,
) -> Path:
run_dt = run_dt or datetime.now(UTC)
out = _land_path(settings.raw_dir, run_dt)
out.parent.mkdir(parents=True, exist_ok=True)
count = 0
with (
GitHubClient(settings) as gh,
gzip.open(out, "wt", encoding="utf-8") as f,
):
for raw in gh.paginate_issues(since=since):
f.write(json.dumps(raw, ensure_ascii=False) + "\n")
count += 1
log.info("extract_done", count=count, file=str(out), bytes=out.stat().st_size)
return out
def iter_jsonl_gz(path: Path) -> Iterator[dict[str, Any]]:
with gzip.open(path, "rt", encoding="utf-8") as f:
for line in f:
yield json.loads(line)
Архив пишем сырым, без валидации — это сознательное решение. Landing zone — это «фотография мира на момент запроса». Если GitHub в новой версии API добавил поле и наш Pydantic ему не рад, мы не должны терять данные, мы должны иметь возможность через неделю прийти, поправить модель и переиграть всё с нуля.
Валидация (через models.Issue) случится на следующем этапе — в transform.py. И тогда ошибки логируются, плохие строки складываются в quarantine-папку, но архив остаётся нетронутым.
transform.py
JSONL.gz → Parquet через pandas, по правилам М07/01 и М07/04:
from collections.abc import Iterable
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import pandas as pd
import structlog
from pydantic import ValidationError
from etl.extract import iter_jsonl_gz
from etl.models import Issue
from etl.settings import Settings
log = structlog.get_logger(__name__)
def _validate(rows: Iterable[dict[str, Any]]) -> Iterable[dict[str, Any]]:
ok = bad = 0
for raw in rows:
try:
issue = Issue.model_validate(raw)
except ValidationError as e:
bad += 1
log.warning("issue_invalid", id=raw.get("id"), errors=e.errors())
continue
ok += 1
# Flatten для табличной модели
yield {
"id": issue.id,
"number": issue.number,
"title": issue.title,
"state": issue.state,
"user_id": issue.user.id,
"user_login": issue.user.login,
"labels": ",".join(l.name for l in issue.labels) or None,
"comments": issue.comments,
"created_at": issue.created_at,
"updated_at": issue.updated_at,
"closed_at": issue.closed_at,
}
log.info("validation_done", ok=ok, bad=bad)
def transform(raw_path: Path, settings: Settings) -> Path:
df = pd.DataFrame(_validate(iter_jsonl_gz(raw_path)))
if df.empty:
log.warning("transform_empty", raw=str(raw_path))
return raw_path # placeholder
df = df.astype(
{
"id": "int64",
"number": "int64",
"state": "category",
"user_id": "int64",
"comments": "int64",
}
)
df["event_date"] = df["updated_at"].dt.tz_convert("UTC").dt.date
run_dt = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ")
out = settings.curated_dir / f"issues-{run_dt}.parquet"
out.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(
out,
engine="pyarrow",
compression="zstd",
index=False,
)
log.info("transform_done", file=str(out), rows=len(df))
return out
Если в архиве 100k issues, валидация и трансформация не держат всё в памяти потоково — _validate это генератор. В DataFrame материализуется один раз только на финальный to_parquet. Для миллионов строк есть chunksize-вариант (М07/04), здесь упрощено для читаемости.
load.py
Загрузка в Postgres по М06/05 + М06/06:
from pathlib import Path
import pandas as pd
import psycopg
import structlog
from psycopg import sql
from etl.settings import Settings
log = structlog.get_logger(__name__)
DDL = """
CREATE TABLE IF NOT EXISTS {table} (
id BIGINT PRIMARY KEY,
number BIGINT NOT NULL,
title TEXT NOT NULL,
state TEXT NOT NULL,
user_id BIGINT NOT NULL,
user_login TEXT NOT NULL,
labels TEXT,
comments BIGINT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
closed_at TIMESTAMPTZ,
event_date DATE NOT NULL,
loaded_at TIMESTAMPTZ DEFAULT NOW()
)
"""
UPSERT = """
INSERT INTO {table} (
id, number, title, state, user_id, user_login, labels,
comments, created_at, updated_at, closed_at, event_date
)
SELECT id, number, title, state, user_id, user_login, labels,
comments, created_at, updated_at, closed_at, event_date
FROM {staging}
ON CONFLICT (id) DO UPDATE SET
title = EXCLUDED.title,
state = EXCLUDED.state,
labels = EXCLUDED.labels,
comments = EXCLUDED.comments,
updated_at = EXCLUDED.updated_at,
closed_at = EXCLUDED.closed_at,
event_date = EXCLUDED.event_date,
loaded_at = NOW()
WHERE {table}.updated_at < EXCLUDED.updated_at
"""
def load(parquet_path: Path, settings: Settings) -> int:
df = pd.read_parquet(parquet_path)
target = sql.Identifier(settings.pg_target_table)
staging = sql.Identifier(f"_stg_{settings.pg_target_table}")
with psycopg.connect(settings.pg_dsn.get_secret_value()) as conn:
with conn.cursor() as cur:
cur.execute(sql.SQL(DDL).format(table=target))
cur.execute(
sql.SQL("CREATE TEMP TABLE {staging} (LIKE {table} INCLUDING ALL)")
.format(staging=staging, table=target)
)
cols = [
"id", "number", "title", "state", "user_id", "user_login",
"labels", "comments", "created_at", "updated_at", "closed_at",
"event_date",
]
copy_sql = sql.SQL("COPY {staging} ({cols}) FROM STDIN").format(
staging=staging,
cols=sql.SQL(", ").join(map(sql.Identifier, cols)),
)
with cur.copy(copy_sql) as copy:
for row in df[cols].itertuples(index=False, name=None):
copy.write_row(row)
cur.execute(sql.SQL(UPSERT).format(table=target, staging=staging))
affected = cur.rowcount
conn.commit()
log.info("load_done", affected=affected, file=str(parquet_path))
return affected
Идемпотентность через staging + ON CONFLICT DO UPDATE WHERE updated_at < EXCLUDED.updated_at — если pipeline дёрнули дважды, второй раз ничего не сломает и не задвоит. Условие на updated_at дополнительно защищает от race condition: если две параллельные джобы попытаются обновить ту же запись, выиграет та, у которой данные новее.
pipeline.py
Оркестрация — соединяем три этапа в один прогон:
from datetime import UTC, datetime
import structlog
from etl.extract import extract_issues
from etl.load import load
from etl.logging_config import setup_logging
from etl.settings import get_settings
from etl.transform import transform
log = structlog.get_logger(__name__)
def run() -> None:
setup_logging()
settings = get_settings()
started_at = datetime.now(UTC)
log.info(
"pipeline_start",
repo=settings.github_repo,
since=settings.initial_since,
)
try:
raw = extract_issues(settings, since=settings.initial_since, run_dt=started_at)
curated = transform(raw, settings)
affected = load(curated, settings)
except Exception:
log.exception("pipeline_failed")
raise
log.info(
"pipeline_done",
affected=affected,
duration_sec=(datetime.now(UTC) - started_at).total_seconds(),
)
if __name__ == "__main__":
run()
В production вместо settings.initial_since будет водомерный знак — последний успешно обработанный updated_at из таблицы или из отдельного _etl_state. Это даст инкрементальную загрузку: каждый запуск дотягивает только новые/изменённые issues. Подробно про watermark — в М03/05.
И последнее — точка входа console-script в pyproject.toml:
[project]
name = "etl"
version = "0.1.0"
requires-python = ">=3.13"
dependencies = [
"httpx>=0.27",
"pandas>=2.2",
"psycopg[binary]>=3.2",
"pyarrow>=17",
"pydantic>=2.8",
"pydantic-settings>=2.4",
"structlog>=24.4",
"tenacity>=9.0",
]
[project.optional-dependencies]
dev = [
"mypy>=1.11",
"pytest>=8.3",
"pytest-cov>=5.0",
"pytest-postgresql>=6.1",
"ruff>=0.6",
]
[project.scripts]
etl-pipeline = "etl.pipeline:run"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
После uv sync в окружении доступна команда etl-pipeline — как git, psql или любая другая CLI. Это и есть «упаковали в Python-пакет» из обещания во вступительном уроке.
test_extract.py
Тестирование HTTP-клиента через httpx.MockTransport (М08):
import gzip
import json
from pathlib import Path
import httpx
import pytest
from etl.api_client import GitHubClient
from etl.extract import extract_issues
from etl.settings import Settings
@pytest.fixture
def settings(tmp_path: Path) -> Settings:
return Settings(
github_token="x", # type: ignore[arg-type]
github_repo="python/cpython",
raw_dir=tmp_path / "raw",
curated_dir=tmp_path / "curated",
pg_dsn="postgresql://x@localhost/x", # type: ignore[arg-type]
)
def _make_handler(pages: list[list[dict]]) -> httpx.MockTransport:
state = {"i": 0}
def handler(request: httpx.Request) -> httpx.Response:
i = state["i"]
state["i"] += 1
headers = {}
if i + 1 < len(pages):
headers["Link"] = (
f'<https://api.github.com/next?page={i + 2}>; rel="next"'
)
return httpx.Response(200, json=pages[i], headers=headers)
return httpx.MockTransport(handler)
def test_extract_writes_jsonl_gz(
settings: Settings, monkeypatch: pytest.MonkeyPatch
) -> None:
pages = [
[{"id": 1, "number": 1, "title": "a", "state": "open"}],
[{"id": 2, "number": 2, "title": "b", "state": "closed"}],
]
original_init = GitHubClient.__init__
def patched_init(self: GitHubClient, s: Settings) -> None:
original_init(self, s)
self._client = httpx.Client(transport=_make_handler(pages))
monkeypatch.setattr(GitHubClient, "__init__", patched_init)
out = extract_issues(settings, since="2020-01-01T00:00:00Z")
lines = gzip.decompress(out.read_bytes()).decode().splitlines()
assert [json.loads(l)["id"] for l in lines] == [1, 2]
Без реальной сети, без токена, без таймаутов. Тест запускается на CI за 100 миллисекунд, проверяет именно ту логику, которую написали — pagination, запись в JSONL.gz, ленивость генератора.
.github/workflows/ci.yml
Качество кода на каждом push’е:
name: CI
on:
push:
branches: [main]
pull_request:
jobs:
quality:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v3
with:
enable-cache: true
- name: Install Python
run: uv python install 3.13
- name: Install deps
run: uv sync --frozen --extra dev
- name: Lint
run: uv run ruff check src tests
- name: Format check
run: uv run ruff format --check src tests
- name: Type check
run: uv run mypy src
- name: Tests
run: uv run pytest -v --cov=etl --cov-report=term-missing
Три гейта — ruff, mypy, pytest. Без них PR не сольётся. Junior, который умеет настроить такой CI в новом репо за 15 минут, уже отличается от среднего junior’а из бутлега.
Какой модуль какую часть закрывает
Каждый компонент пайплайна — это упражнение, которое мы уже разобрали в одном из модулей. Capstone-проект — это интегральный экзамен по всему курсу.
Каждая строчка из этой таблицы — упражнение, которое вы уже сделали. Capstone — это просто сборка их в одно.
Как запустить локально
Полная пошаговая инструкция «с нуля», без рукомашества. Делается за пятнадцать минут.
1. Клонируем и заходим в проект.
git clone https://github.com/your-org/etl-project.git
cd etl-project
2. Ставим зависимости через uv.
uv нужно поставить один раз на машину — curl -LsSf https://astral.sh/uv/install.sh | sh (детали в М01/01). Дальше:
uv sync --extra dev
Эта команда сама поставит Python 3.13, создаст .venv/ и заполнит её ровно тем, что в uv.lock — детерминированно, как в CI.
3. Готовим .env.
cp .env.example .env
Файл .env сейчас выглядит так:
ETL_GITHUB_TOKEN=ghp_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
ETL_GITHUB_REPO=python/cpython
ETL_PG_DSN=postgresql://etl:etl@localhost:5432/etl
GitHub PAT (Personal Access Token) делается в Settings → Developer settings → Tokens, права public_repo достаточно для публичных репозиториев. Без токена API ограничит вас 60 запросами в час — на «cpython» закончится на первой странице.
4. Поднимаем Postgres локально через docker-compose.
docker compose up -d postgres
docker-compose.yml рядом с проектом — самый базовый: контейнер postgres:17-alpine, открытый порт 5432, пользователь etl/etl, база etl. На production здесь, конечно, будет managed Postgres (RDS, CloudSQL, neon), но локально это даёт идентичную среду.
5. Запускаем pipeline.
uv run etl-pipeline
В stdout полетят JSON-логи в реальном времени:
{"event": "pipeline_start", "repo": "python/cpython", "since": "2020-01-01T00:00:00Z", "level": "info", "timestamp": "2026-05-13T10:23:01Z"}
{"event": "page_fetched", "page": 1, "count": 100, "level": "info", "timestamp": "2026-05-13T10:23:02Z"}
{"event": "page_fetched", "page": 2, "count": 100, ...}
...
{"event": "extract_done", "count": 4823, "file": "data/raw/dt=2026-05-13/issues-20260513T102301Z.jsonl.gz", ...}
{"event": "validation_done", "ok": 4823, "bad": 0, ...}
{"event": "transform_done", "file": "data/curated/issues-20260513T102612Z.parquet", "rows": 4823, ...}
{"event": "load_done", "affected": 4823, ...}
{"event": "pipeline_done", "affected": 4823, "duration_sec": 412.7, ...}
6. Проверяем результат в Postgres.
docker compose exec postgres psql -U etl -d etl -c "
SELECT state, COUNT(*) FROM github_issues GROUP BY state ORDER BY state;
"
Получаем что-то вроде:
state | count
--------+-------
closed | 4621
open | 202
7. Запускаем тесты.
uv run pytest -v
Все зелёные — pipeline production-ready.
Запустите pipeline дважды подряд. Второй раз он не задвоит ни одной записи и в логах вы увидите affected: 0 — это и есть идемпотентность через ON CONFLICT DO UPDATE WHERE updated_at < EXCLUDED.updated_at. Идемпотентность — главное свойство production ETL, без которого Airflow-перезапуски убивают данные.
Зачем это всё в портфолио
Прямо сейчас, после восьми модулей и этого capstone, у вас на руках готовый артефакт для собеседования на Junior Data Engineer. Это не «hello world» и не «todo-app в Django» — это тот самый код, который ваш будущий тимлид пишет каждый день.
Что конкретно вы можете показать на интервью:
- На вопрос «как ты управляешь зависимостями» — открываете
pyproject.toml, объясняете per-pin философию,uv.lock, console-scripts. Это сильнее, чем 90% других кандидатов уровня junior. - На вопрос «как ты обкладываешь HTTP-вызовы» — показываете
api_client.py. Перечисляете: timeouts, raise_for_status, retry-фильтр по типу ошибки, exponential backoff с jitter, structlog в каждом запросе. Уже сильнее middle. - На вопрос «как ты валидируешь данные на входе» —
models.pyс Pydantic. Объясняете границу «внутри / на границе»,extra="ignore", паттерны для state, что плохая запись логируется и идёт в quarantine, а не валит пайплайн. - На вопрос «как сделать идемпотентным» — staging table + ON CONFLICT DO UPDATE WHERE updated_at-based. Объясняете, почему
INSERT ... ON CONFLICT DO NOTHINGне годится для slowly changing dimensions. - На вопрос «а тесты у тебя есть» —
tests/с MockTransport, pytest-postgresql, conftest. CI запускается на каждый push. Coverage — больше 80%.
Это всё уже у вас есть после курса. Главное — не закопать репозиторий в private, а сделать публичным и положить ссылку в резюме. Один работающий ETL на GitHub стоит десяти строк «знаю Python» в CV.
Куда расти после курса
Capstone-проект — это потолок Junior. Дальше начинаются области, которые в этом курсе принципиально не разбирались, потому что без основы они не имеют смысла.
Airflow — оркестрация. Сейчас наш pipeline запускается руками или по cron, без зависимостей между задачами, без retry на уровне task’а, без UI и без истории запусков. На реальной работе DAG в Airflow дёргает etl-pipeline каждый день в 5 утра, ждёт upstream-таблицу, ретраит при падении, аллертит в Slack при сбое. Следующий шаг — курс по Airflow.
dbt — transformation as code. Наш transform.py делает простой flatten Issue → DataFrame. На production трансформации измеряются десятками SQL-моделей с зависимостями, тестами на колонки (unique, not null, accepted_values), документацией, lineage-графом. Это всё делает dbt, и он стандарт индустрии.
Spark / Flink — большие данные. Pandas хороший до десятков гигабайт. Когда у вас сотни терабайт ивентов в Kafka — приходят Spark Streaming или Flink, они масштабируются горизонтально на сотни worker’ов. Но синтаксис там сильно похож на pandas/SQL — основа из этого курса остаётся.
Python 02 — Middle-уровень. Здесь были основы языка для DE-задач. На middle: внутренности CPython (refcount, GIL, GIL-free 3.13t, bytecode), async (asyncio, trio, httpx async), метаклассы, дескрипторы, Protocol/TypeVar/Generic, performance-профилирование, написание pandas-плагинов, polars в глубину. Без основы из Python 01 туда лезть рано — будет «вкусные паттерны без понимания, зачем».
Что параллельно: SQL 01 (если ещё не пройден), SQL 02 / internals — внутренности Postgres и индексов. DE-инженер без SQL не существует — половина проблем решается одним хорошо написанным запросом, минуя весь Python.
Финал
Восемь модулей назад мы открыли пустую папку. Сейчас у вас — pip-installable пакет, который дёргает API с retry-логикой, валидирует через Pydantic, складывает архив в JSONL.gz, конвертит в Parquet, загружает в Postgres идемпотентно, покрыт тестами, запускается из CI. Каждая строчка обоснована — вы знаете, почему она написана именно так, а не иначе.
Этого достаточно для оффера на Junior Data Engineer. Дальше — практика. Сделайте свою копию этого pipeline’а, но для другого API (Stack Overflow, Hacker News, GitLab, Telegram). Положите в GitHub. Через месяц поправьте, что вам не понравится. Через три — посмотрите на код, который писали в начале, и поймёте, насколько вы выросли.
В лабораторной части курса — labs/ — будет несколько больших задач именно по этому шаблону. Туда и идём.
Спасибо за курс. До встречи на Python 02 — когда захотите узнать, как этот же pipeline ускорить в десять раз через asyncio, или зачем __slots__, или почему GIL в Python 3.13 уже не такой страшный, как был.