Clone, branch, implement: первые три шага capstone
Ticket DE-1234 открыт. У тебя 4-6 часов. Поехали — реализуем шаги 1-5 из overview.
Шаг 1: Clone и осмотр репо (15 минут)
Clone
Tech lead Алиса вчера дала тебе доступ. URL репо — [email protected]:acme-corp/analytics-dags.git. SSH-key уже в GitHub.
$ cd ~/projects
$ git clone [email protected]:acme-corp/analytics-dags.git
Cloning into 'analytics-dags'...
remote: Enumerating objects: 2347, done.
remote: Counting objects: 100% (2347/2347), done.
remote: Compressing objects: 100% (1234/1234), done.
remote: Total 2347 (delta 1123), reused 2200 (delta 1000), pack-reused 0
Receiving objects: 100% (2347/2347), 4.5 MiB | 12.3 MiB/s, done.
Resolving deltas: 100% (1123/1123), done.
$ cd analytics-dags
origin уже настроен на ssh URL:
$ git remote -v
origin [email protected]:acme-corp/analytics-dags.git (fetch)
origin [email protected]:acme-corp/analytics-dags.git (push)
Осмотр: 15-минутный sweep
Это самая важная часть onboarding. Не пиши код, сначала понимай.
1. Прочитай README.md:
$ cat README.md
Ищи: что за проект, как запускать, кто работает, contributing guidelines.
2. Структура корня:
$ ls -la
.git
.github/ # CI workflows, CODEOWNERS
.gitignore
.gitleaks.toml
.pre-commit-config.yaml
README.md
dags/ # Airflow DAGs
plugins/ # custom operators
tests/ # pytest
pyproject.toml # dependencies
uv.lock
3. Существующие DAG-и:
$ ls dags/
README.md
analytics_main_etl_dag.py
campaign_data_dag.py
fact_orders_dag.py
shared/ # shared helpers
__init__.py
s3_utils.py
snowflake_loaders.py
secret_helpers.py
Открой один DAG, изучай convention — как пишут импорты, какой docstring, как используют hooks:
$ cat dags/fact_orders_dag.py
Найди похожий по теме DAG. У нас — fact_orders_dag.py тоже грузит из S3 в Snowflake. Это будет шаблон для твоей реализации.
4. CI workflows:
$ ls .github/workflows/
ci.yml
deploy.yml
$ cat .github/workflows/ci.yml
Понимай что прогонит CI — какие checks нужны зелёными.
5. CODEOWNERS:
$ cat .github/CODEOWNERS
* @acme-corp/tech-leads
/dags/ @acme-corp/data-engineering
Кто будет твой reviewer — tech-leads + data-engineering. В капстоуне — твой reviewer Алиса.
6. Recent commits:
$ git log --oneline --no-merges -20
abc1234 feat(dags): add campaign_data_dag for marketing attribution
def5678 fix(snowflake_loaders): handle empty parquet edge case
ghi9012 chore: bump dbt-snowflake to 1.9.0
jkl3456 docs: update README for onboarding
...
Учим style commit messages — все используют feat, fix, chore, docs prefix. Это Conventional Commits (модуль 5). Применяй ту же конвенцию в своих commits.
7. Существующие тесты:
$ ls tests/
conftest.py
test_fact_orders.py
test_campaign_data.py
test_s3_utils.py
Каждый DAG имеет тест. У нас будет test_user_events.py.
8. pyproject.toml:
$ cat pyproject.toml | head -30
[project]
name = "analytics-dags"
version = "0.1.0"
requires-python = ">=3.13"
dependencies = [
"apache-airflow[postgres,amazon,snowflake]==2.10.0",
"pandas==2.2.0",
...
]
[dependency-groups]
dev = [
"pytest==8.0.0",
"pytest-mock==3.12.0",
"mypy==1.13.0",
"ruff==0.7.0",
]
Знаешь dependencies — можно начать.
Setup local env
$ uv sync
Resolved 89 packages...
Installed 89 packages in 4.2s
$ source .venv/bin/activate # or use uv run
$ pre-commit install
pre-commit installed at .git/hooks/pre-commit
Pre-commit поставил gitleaks (модуль 18) — теперь все commits будут проверяться.
Шаг 2: Создай feature branch (1 минута)
Conventions для имени branch — посмотрел в README или из открытых PR. Стандарт:
feat/<ticket-id>-<short-slug>
fix/<ticket-id>-<short-slug>
docs/<short-slug>
hotfix/<short-slug>
Для нашего тикета DE-1234:
$ git switch -c feat/de-1234-user-events-dag
Switched to a new branch 'feat/de-1234-user-events-dag'
$ git branch --show-current
feat/de-1234-user-events-dag
git switch -c (модуль 5) — новая команда для create + checkout.
Имя ветки — это первое впечатление PR. feat/de-1234-user-events-dag понятно: feature, related ticket, what for. fix-bug, wip, test — anti-patterns: некрасиво в git log, не grep-аются.
Шаг 3: Реализация — пиши код (2-4 часа)
Это 80% твоего времени capstone. Не Git, а Python + Airflow.
Скелет DAG
Создаём dags/user_events_dag.py. Используем fact_orders_dag.py как шаблон:
"""User events ingestion DAG.
Loads user behavior events from S3 (Parquet, partitioned by date) into
Snowflake table ANALYTICS.RAW.USER_EVENTS.
Schedule: daily at 04:00 UTC (after main ETL).
Idempotent: re-runs delete existing partition before re-insert.
Ticket: DE-1234.
"""
from __future__ import annotations
from datetime import datetime
from airflow import DAG
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.snowflake.transfers.s3_to_snowflake import S3ToSnowflakeOperator
from dags.shared.snowflake_loaders import build_copy_into_sql
DAG_ID = "user_events_ingestion"
S3_BUCKET = "company-events-bucket"
S3_KEY_PREFIX = "user_events/year={ds_y}/month={ds_m}/day={ds_d}/"
SNOWFLAKE_TABLE = "ANALYTICS.RAW.USER_EVENTS"
SNOWFLAKE_STAGE = "@ANALYTICS.RAW.S3_STAGE"
with DAG(
dag_id=DAG_ID,
description="Load user events from S3 to Snowflake daily.",
schedule="0 4 * * *", # 04:00 UTC
start_date=datetime(2026, 1, 1),
catchup=False,
max_active_runs=1,
tags=["analytics", "user-events", "marketing"],
default_args={
"owner": "data-engineering",
"retries": 2,
"retry_delay": 300, # 5 minutes
},
) as dag:
delete_existing = SnowflakeOperator(
task_id="delete_existing_partition",
snowflake_conn_id="snowflake_default",
sql=(
f"DELETE FROM {SNOWFLAKE_TABLE} "
"WHERE event_date = TO_DATE('{{ ds }}', 'YYYY-MM-DD');"
),
)
load_to_snowflake = S3ToSnowflakeOperator(
task_id="copy_into_snowflake",
s3_keys=[
S3_KEY_PREFIX.format(
ds_y="{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}",
ds_m="{{ macros.ds_format(ds, '%Y-%m-%d', '%m') }}",
ds_d="{{ macros.ds_format(ds, '%Y-%m-%d', '%d') }}",
)
],
table=SNOWFLAKE_TABLE,
schema="RAW",
stage=SNOWFLAKE_STAGE,
file_format="(TYPE = PARQUET)",
snowflake_conn_id="snowflake_default",
aws_conn_id="aws_default",
)
delete_existing >> load_to_snowflake
Замечания:
- Idempotent:
DELETEпартиции перед загрузкой. Re-run = same result. - Connections:
snowflake_defaultиaws_default— на сервере resolve-ятся через secrets backend (модуль 18 урок 04). - Conventional: type hints, docstring, no hardcoded secrets, retries.
Commit #1: первый commit DAG
$ git add dags/user_events_dag.py
$ git status
On branch feat/de-1234-user-events-dag
Changes to be committed:
new file: dags/user_events_dag.py
$ git commit -m "feat(dags): add user_events ingestion DAG
Daily load of S3 user_events parquet partitions to
Snowflake ANALYTICS.RAW.USER_EVENTS. Idempotent: pre-deletes
partition before copy.
Refs: DE-1234"
Pre-commit hook (gitleaks) запустится:
gitleaks........................................................Passed
Зелёный. Если поймал что-то — fix перед re-commit.
Multi-line commit message: первая строка — короткое summary (50 char), пустая строка, body — детали. Refs: DE-1234 — ticket reference. Это standard conventional commit + Jira-link.
Тест
Создаём tests/test_user_events.py:
"""Tests for user_events_dag."""
from __future__ import annotations
import pytest
from airflow.models.dagbag import DagBag
def test_dag_imports_without_error():
"""DAG file should be importable — smoke test."""
dag_bag = DagBag(include_examples=False)
assert "user_events_ingestion" in dag_bag.dag_ids
assert dag_bag.dags["user_events_ingestion"] is not None
def test_dag_has_expected_tasks():
"""DAG should have two tasks: delete + copy."""
dag_bag = DagBag(include_examples=False)
dag = dag_bag.dags["user_events_ingestion"]
task_ids = sorted(t.task_id for t in dag.tasks)
assert task_ids == ["copy_into_snowflake", "delete_existing_partition"]
def test_dag_schedule_is_daily_04():
"""Schedule must be 04:00 daily UTC."""
dag_bag = DagBag(include_examples=False)
dag = dag_bag.dags["user_events_ingestion"]
assert dag.schedule_interval == "0 4 * * *"
$ uv run pytest tests/test_user_events.py -v
============== test session starts ==============
tests/test_user_events.py::test_dag_imports_without_error PASSED
tests/test_user_events.py::test_dag_has_expected_tasks PASSED
tests/test_user_events.py::test_dag_schedule_is_daily_04 PASSED
================ 3 passed in 1.23s ================
Зелёный.
Commit #2: тест
$ git add tests/test_user_events.py
$ git commit -m "test(user_events_dag): add smoke + schedule + tasks tests
Refs: DE-1234"
README обновление
Документация — часть acceptance criteria.
$ vim dags/README.md
Добавь секцию:
## user_events_ingestion
**Purpose**: Daily ingestion of user behavior events from S3 to Snowflake.
**Schedule**: Daily at 04:00 UTC (`0 4 * * *`).
**Source**: `s3://company-events-bucket/user_events/year={Y}/month={M}/day={D}/*.parquet`
**Target**: `ANALYTICS.RAW.USER_EVENTS`
**Owner**: [email protected]
**Idempotency**: Re-runs DELETE current partition before COPY INTO. Safe to retry.
**Ticket**: DE-1234
$ git add dags/README.md
$ git commit -m "docs(dags): document user_events_ingestion DAG
Refs: DE-1234"
Локальная проверка
Перед push — повтори всё что в CI:
$ uv run ruff check .
All checks passed!
$ uv run ruff format --check .
6 files would be left unchanged.
$ uv run mypy dags/ tests/
Success: no issues found in 12 source files.
$ uv run pytest -v
======== 47 passed in 8.42s ========
Все зелёные. Готов к push.
Шаг 4: Push с —set-upstream
$ git push --set-upstream origin feat/de-1234-user-events-dag
Enumerating objects: 12, done.
Counting objects: 100% (12/12), done.
Delta compression using up to 8 threads
Compressing objects: 100% (8/8), done.
Writing objects: 100% (8/8), 2.3 KiB | 2.3 MiB/s, done.
Total 8 (delta 4), reused 0 (delta 0), pack-reused 0
remote: Resolving deltas: 100% (4/4), completed with 4 local objects.
remote:
remote: Create a pull request for 'feat/de-1234-user-events-dag' on GitHub by visiting:
remote: https://github.com/acme-corp/analytics-dags/pull/new/feat/de-1234-user-events-dag
remote:
To github.com:acme-corp/analytics-dags.git
* [new branch] feat/de-1234-user-events-dag -> feat/de-1234-user-events-dag
branch 'feat/de-1234-user-events-dag' set up to track 'origin/feat/de-1234-user-events-dag'.
--set-upstream (или -u) — связывает локальную ветку с remote. После этого git push без аргументов работает.
GitHub в output даёт ссылку на «Create PR». Можно через UI кликнуть, или через CLI — следующий шаг.
Шаг 5: Открыть PR через gh
GitHub CLI:
$ gh pr create \
--base main \
--title "feat(dags): add user_events ingestion DAG (DE-1234)" \
--body "$(cat <<'EOF'
## Summary
Daily ingestion DAG for user behavior events from S3 to Snowflake \`ANALYTICS.RAW.USER_EVENTS\`.
Ticket: [DE-1234](https://acme.atlassian.net/browse/DE-1234)
## Changes
- New DAG \`dags/user_events_dag.py\`: daily schedule 04:00 UTC.
- Idempotent: \`DELETE\` of partition before \`COPY INTO\`.
- 3 tests added in \`tests/test_user_events.py\`.
- Documented in \`dags/README.md\`.
## Testing
\`\`\`
uv run pytest tests/test_user_events.py -v
\`\`\`
All 47 tests passing locally. CI should be green.
## Acceptance criteria
- [x] CI passes (ruff, mypy, pytest, gitleaks)
- [ ] Review approval from @alice
- [ ] DAG visible in Airflow UI after deploy
## Reviewer notes
- Schedule chosen as 04:00 UTC = post-main-ETL window. If conflicts with other DAGs, willing to adjust.
- Using \`S3ToSnowflakeOperator\` for portability (avoids custom S3 download + COPY).
EOF
)" \
--reviewer alice,@acme-corp/data-engineering \
--label "type:feat,team:data-engineering"
Output:
https://github.com/acme-corp/analytics-dags/pull/789
PR создан. Auto-assigned reviewers: alice + team data-engineering (через CODEOWNERS modul 18 урок 04).
Альтернатива — через UI
Если предпочитаешь UI:
$ gh pr create --web
# открывает браузер на pull/new
Заполнить title, body, reviewers ручками. Тот же результат.
Шаг 6: Подождать CI зелёный
После открытия PR — GitHub Actions запускает CI:
$ gh pr checks
Some checks pending
ci/lint pending
ci/type-check pending
ci/test pending
ci/secret-scan pending
# через минуту
$ gh pr checks
All checks passing
ci/lint pass 36s
ci/type-check pass 1m12s
ci/test pass 2m05s
ci/secret-scan pass 18s
5 минут — все зелёные. Если красное — fix перед review (нет смысла грузить reviewer-а на сломанный код).
Если CI fail
$ gh pr checks
Some checks were not successful
ci/lint fail 25s https://github.com/...
...
$ gh run view --log <run-id>
# смотришь логи
# Видишь, например, ruff:
dags/user_events_dag.py:42:80: E501 Line too long (102 > 100)
# Fix
$ sed -i 's/somethingtoolong/break_line/' dags/user_events_dag.py
$ uv run ruff check dags/user_events_dag.py
All checks passed!
$ git add dags/user_events_dag.py
$ git commit -m "fix(user_events_dag): wrap long line per ruff"
$ git push
# CI запустится автоматически на новый push
После fix — wait зелёного.
Best practices recap
| Стадия | Best practice |
|---|---|
| Clone | Сразу uv sync + pre-commit install |
| Осмотр | 15 минут на README + 1 похожий DAG + CI workflow |
| Branch name | feat/<ticket>-<slug> |
| Commits | Conventional: feat:, test:, docs: + ticket ref |
| Размер commit | Small, focused (один логический change) |
| Push | --set-upstream первый раз |
| PR title | feat(scope): summary (TICKET) |
| PR body | Summary + Changes + Testing + Checklist + Reviewer notes |
| Reviewers | Через CODEOWNERS + explicit --reviewer |
| CI | Зелёный перед прошением review |
Killer takeaway
Первый день junior DE: clone репо, 15-минутный sweep (README + похожий DAG + CI + CODEOWNERS) — это самое важное для onboarding. git switch -c feat/<ticket>-<slug> — стандартное имя. Conventional commits с ticket ref — стиль команды. Локально проверь всё что в CI (ruff/mypy/pytest) перед push. git push --set-upstream origin <branch> первый раз. gh pr create — быстрый PR с правильно сформатированным body. Wait зелёного CI перед request review. Pre-commit hooks (gitleaks) — твоя страховка от обидной утечки.