Learning Platform
Глоссарий Troubleshooting
Урок 22.02 · 35 мин
Средний
clonefeature-branchimplementationconventional-commitspushgh-pr-create

Clone, branch, implement: первые три шага capstone

Ticket DE-1234 открыт. У тебя 4-6 часов. Поехали — реализуем шаги 1-5 из overview.

Шаг 1: Clone и осмотр репо (15 минут)

Clone

Tech lead Алиса вчера дала тебе доступ. URL репо — [email protected]:acme-corp/analytics-dags.git. SSH-key уже в GitHub.

$ cd ~/projects
$ git clone [email protected]:acme-corp/analytics-dags.git
Cloning into 'analytics-dags'...
remote: Enumerating objects: 2347, done.
remote: Counting objects: 100% (2347/2347), done.
remote: Compressing objects: 100% (1234/1234), done.
remote: Total 2347 (delta 1123), reused 2200 (delta 1000), pack-reused 0
Receiving objects: 100% (2347/2347), 4.5 MiB | 12.3 MiB/s, done.
Resolving deltas: 100% (1123/1123), done.

$ cd analytics-dags

origin уже настроен на ssh URL:

$ git remote -v
origin  [email protected]:acme-corp/analytics-dags.git (fetch)
origin  [email protected]:acme-corp/analytics-dags.git (push)

Осмотр: 15-минутный sweep

Это самая важная часть onboarding. Не пиши код, сначала понимай.

1. Прочитай README.md:

$ cat README.md

Ищи: что за проект, как запускать, кто работает, contributing guidelines.

2. Структура корня:

$ ls -la
.git
.github/              # CI workflows, CODEOWNERS
.gitignore
.gitleaks.toml
.pre-commit-config.yaml
README.md
dags/                 # Airflow DAGs
plugins/              # custom operators
tests/                # pytest
pyproject.toml        # dependencies
uv.lock

3. Существующие DAG-и:

$ ls dags/
README.md
analytics_main_etl_dag.py
campaign_data_dag.py
fact_orders_dag.py
shared/                          # shared helpers
    __init__.py
    s3_utils.py
    snowflake_loaders.py
    secret_helpers.py

Открой один DAG, изучай convention — как пишут импорты, какой docstring, как используют hooks:

$ cat dags/fact_orders_dag.py

Найди похожий по теме DAG. У нас — fact_orders_dag.py тоже грузит из S3 в Snowflake. Это будет шаблон для твоей реализации.

4. CI workflows:

$ ls .github/workflows/
ci.yml
deploy.yml

$ cat .github/workflows/ci.yml

Понимай что прогонит CI — какие checks нужны зелёными.

5. CODEOWNERS:

$ cat .github/CODEOWNERS
* @acme-corp/tech-leads
/dags/ @acme-corp/data-engineering

Кто будет твой reviewer — tech-leads + data-engineering. В капстоуне — твой reviewer Алиса.

6. Recent commits:

$ git log --oneline --no-merges -20
abc1234 feat(dags): add campaign_data_dag for marketing attribution
def5678 fix(snowflake_loaders): handle empty parquet edge case
ghi9012 chore: bump dbt-snowflake to 1.9.0
jkl3456 docs: update README for onboarding
...

Учим style commit messages — все используют feat, fix, chore, docs prefix. Это Conventional Commits (модуль 5). Применяй ту же конвенцию в своих commits.

7. Существующие тесты:

$ ls tests/
conftest.py
test_fact_orders.py
test_campaign_data.py
test_s3_utils.py

Каждый DAG имеет тест. У нас будет test_user_events.py.

8. pyproject.toml:

$ cat pyproject.toml | head -30
[project]
name = "analytics-dags"
version = "0.1.0"
requires-python = ">=3.13"
dependencies = [
    "apache-airflow[postgres,amazon,snowflake]==2.10.0",
    "pandas==2.2.0",
    ...
]

[dependency-groups]
dev = [
    "pytest==8.0.0",
    "pytest-mock==3.12.0",
    "mypy==1.13.0",
    "ruff==0.7.0",
]

Знаешь dependencies — можно начать.

Setup local env

$ uv sync
Resolved 89 packages...
Installed 89 packages in 4.2s

$ source .venv/bin/activate  # or use uv run
$ pre-commit install
pre-commit installed at .git/hooks/pre-commit

Pre-commit поставил gitleaks (модуль 18) — теперь все commits будут проверяться.


Шаг 2: Создай feature branch (1 минута)

Conventions для имени branch — посмотрел в README или из открытых PR. Стандарт:

feat/<ticket-id>-<short-slug>
fix/<ticket-id>-<short-slug>
docs/<short-slug>
hotfix/<short-slug>

Для нашего тикета DE-1234:

$ git switch -c feat/de-1234-user-events-dag
Switched to a new branch 'feat/de-1234-user-events-dag'

$ git branch --show-current
feat/de-1234-user-events-dag

git switch -c (модуль 5) — новая команда для create + checkout.

TIP

Имя ветки — это первое впечатление PR. feat/de-1234-user-events-dag понятно: feature, related ticket, what for. fix-bug, wip, test — anti-patterns: некрасиво в git log, не grep-аются.


Шаг 3: Реализация — пиши код (2-4 часа)

Это 80% твоего времени capstone. Не Git, а Python + Airflow.

Скелет DAG

Создаём dags/user_events_dag.py. Используем fact_orders_dag.py как шаблон:

"""User events ingestion DAG.

Loads user behavior events from S3 (Parquet, partitioned by date) into
Snowflake table ANALYTICS.RAW.USER_EVENTS.

Schedule: daily at 04:00 UTC (after main ETL).
Idempotent: re-runs delete existing partition before re-insert.

Ticket: DE-1234.
"""
from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.snowflake.transfers.s3_to_snowflake import S3ToSnowflakeOperator

from dags.shared.snowflake_loaders import build_copy_into_sql

DAG_ID = "user_events_ingestion"
S3_BUCKET = "company-events-bucket"
S3_KEY_PREFIX = "user_events/year={ds_y}/month={ds_m}/day={ds_d}/"
SNOWFLAKE_TABLE = "ANALYTICS.RAW.USER_EVENTS"
SNOWFLAKE_STAGE = "@ANALYTICS.RAW.S3_STAGE"


with DAG(
    dag_id=DAG_ID,
    description="Load user events from S3 to Snowflake daily.",
    schedule="0 4 * * *",  # 04:00 UTC
    start_date=datetime(2026, 1, 1),
    catchup=False,
    max_active_runs=1,
    tags=["analytics", "user-events", "marketing"],
    default_args={
        "owner": "data-engineering",
        "retries": 2,
        "retry_delay": 300,  # 5 minutes
    },
) as dag:

    delete_existing = SnowflakeOperator(
        task_id="delete_existing_partition",
        snowflake_conn_id="snowflake_default",
        sql=(
            f"DELETE FROM {SNOWFLAKE_TABLE} "
            "WHERE event_date = TO_DATE('{{ ds }}', 'YYYY-MM-DD');"
        ),
    )

    load_to_snowflake = S3ToSnowflakeOperator(
        task_id="copy_into_snowflake",
        s3_keys=[
            S3_KEY_PREFIX.format(
                ds_y="{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}",
                ds_m="{{ macros.ds_format(ds, '%Y-%m-%d', '%m') }}",
                ds_d="{{ macros.ds_format(ds, '%Y-%m-%d', '%d') }}",
            )
        ],
        table=SNOWFLAKE_TABLE,
        schema="RAW",
        stage=SNOWFLAKE_STAGE,
        file_format="(TYPE = PARQUET)",
        snowflake_conn_id="snowflake_default",
        aws_conn_id="aws_default",
    )

    delete_existing >> load_to_snowflake

Замечания:

  • Idempotent: DELETE партиции перед загрузкой. Re-run = same result.
  • Connections: snowflake_default и aws_default — на сервере resolve-ятся через secrets backend (модуль 18 урок 04).
  • Conventional: type hints, docstring, no hardcoded secrets, retries.

Commit #1: первый commit DAG

$ git add dags/user_events_dag.py
$ git status
On branch feat/de-1234-user-events-dag
Changes to be committed:
        new file:   dags/user_events_dag.py

$ git commit -m "feat(dags): add user_events ingestion DAG

Daily load of S3 user_events parquet partitions to
Snowflake ANALYTICS.RAW.USER_EVENTS. Idempotent: pre-deletes
partition before copy.

Refs: DE-1234"

Pre-commit hook (gitleaks) запустится:

gitleaks........................................................Passed

Зелёный. Если поймал что-то — fix перед re-commit.

TIP

Multi-line commit message: первая строка — короткое summary (50 char), пустая строка, body — детали. Refs: DE-1234 — ticket reference. Это standard conventional commit + Jira-link.

Тест

Создаём tests/test_user_events.py:

"""Tests for user_events_dag."""
from __future__ import annotations

import pytest
from airflow.models.dagbag import DagBag


def test_dag_imports_without_error():
    """DAG file should be importable — smoke test."""
    dag_bag = DagBag(include_examples=False)
    assert "user_events_ingestion" in dag_bag.dag_ids
    assert dag_bag.dags["user_events_ingestion"] is not None


def test_dag_has_expected_tasks():
    """DAG should have two tasks: delete + copy."""
    dag_bag = DagBag(include_examples=False)
    dag = dag_bag.dags["user_events_ingestion"]
    task_ids = sorted(t.task_id for t in dag.tasks)
    assert task_ids == ["copy_into_snowflake", "delete_existing_partition"]


def test_dag_schedule_is_daily_04():
    """Schedule must be 04:00 daily UTC."""
    dag_bag = DagBag(include_examples=False)
    dag = dag_bag.dags["user_events_ingestion"]
    assert dag.schedule_interval == "0 4 * * *"
$ uv run pytest tests/test_user_events.py -v
============== test session starts ==============
tests/test_user_events.py::test_dag_imports_without_error PASSED
tests/test_user_events.py::test_dag_has_expected_tasks PASSED
tests/test_user_events.py::test_dag_schedule_is_daily_04 PASSED
================ 3 passed in 1.23s ================

Зелёный.

Commit #2: тест

$ git add tests/test_user_events.py
$ git commit -m "test(user_events_dag): add smoke + schedule + tasks tests

Refs: DE-1234"

README обновление

Документация — часть acceptance criteria.

$ vim dags/README.md

Добавь секцию:

## user_events_ingestion

**Purpose**: Daily ingestion of user behavior events from S3 to Snowflake.

**Schedule**: Daily at 04:00 UTC (`0 4 * * *`).

**Source**: `s3://company-events-bucket/user_events/year={Y}/month={M}/day={D}/*.parquet`

**Target**: `ANALYTICS.RAW.USER_EVENTS`

**Owner**: [email protected]

**Idempotency**: Re-runs DELETE current partition before COPY INTO. Safe to retry.

**Ticket**: DE-1234
$ git add dags/README.md
$ git commit -m "docs(dags): document user_events_ingestion DAG

Refs: DE-1234"

Локальная проверка

Перед push — повтори всё что в CI:

$ uv run ruff check .
All checks passed!

$ uv run ruff format --check .
6 files would be left unchanged.

$ uv run mypy dags/ tests/
Success: no issues found in 12 source files.

$ uv run pytest -v
======== 47 passed in 8.42s ========

Все зелёные. Готов к push.


Шаг 4: Push с —set-upstream

$ git push --set-upstream origin feat/de-1234-user-events-dag
Enumerating objects: 12, done.
Counting objects: 100% (12/12), done.
Delta compression using up to 8 threads
Compressing objects: 100% (8/8), done.
Writing objects: 100% (8/8), 2.3 KiB | 2.3 MiB/s, done.
Total 8 (delta 4), reused 0 (delta 0), pack-reused 0
remote: Resolving deltas: 100% (4/4), completed with 4 local objects.
remote:
remote: Create a pull request for 'feat/de-1234-user-events-dag' on GitHub by visiting:
remote:      https://github.com/acme-corp/analytics-dags/pull/new/feat/de-1234-user-events-dag
remote:
To github.com:acme-corp/analytics-dags.git
 * [new branch]      feat/de-1234-user-events-dag -> feat/de-1234-user-events-dag
branch 'feat/de-1234-user-events-dag' set up to track 'origin/feat/de-1234-user-events-dag'.

--set-upstream (или -u) — связывает локальную ветку с remote. После этого git push без аргументов работает.

GitHub в output даёт ссылку на «Create PR». Можно через UI кликнуть, или через CLI — следующий шаг.


Шаг 5: Открыть PR через gh

GitHub CLI:

$ gh pr create \
    --base main \
    --title "feat(dags): add user_events ingestion DAG (DE-1234)" \
    --body "$(cat <<'EOF'
## Summary

Daily ingestion DAG for user behavior events from S3 to Snowflake \`ANALYTICS.RAW.USER_EVENTS\`.

Ticket: [DE-1234](https://acme.atlassian.net/browse/DE-1234)

## Changes

- New DAG \`dags/user_events_dag.py\`: daily schedule 04:00 UTC.
- Idempotent: \`DELETE\` of partition before \`COPY INTO\`.
- 3 tests added in \`tests/test_user_events.py\`.
- Documented in \`dags/README.md\`.

## Testing

\`\`\`
uv run pytest tests/test_user_events.py -v
\`\`\`

All 47 tests passing locally. CI should be green.

## Acceptance criteria

- [x] CI passes (ruff, mypy, pytest, gitleaks)
- [ ] Review approval from @alice
- [ ] DAG visible in Airflow UI after deploy

## Reviewer notes

- Schedule chosen as 04:00 UTC = post-main-ETL window. If conflicts with other DAGs, willing to adjust.
- Using \`S3ToSnowflakeOperator\` for portability (avoids custom S3 download + COPY).
EOF
)" \
    --reviewer alice,@acme-corp/data-engineering \
    --label "type:feat,team:data-engineering"

Output:

https://github.com/acme-corp/analytics-dags/pull/789

PR создан. Auto-assigned reviewers: alice + team data-engineering (через CODEOWNERS modul 18 урок 04).

Альтернатива — через UI

Если предпочитаешь UI:

$ gh pr create --web
# открывает браузер на pull/new

Заполнить title, body, reviewers ручками. Тот же результат.


Шаг 6: Подождать CI зелёный

После открытия PR — GitHub Actions запускает CI:

$ gh pr checks
Some checks pending
  ci/lint           pending
  ci/type-check     pending
  ci/test           pending
  ci/secret-scan    pending

# через минуту
$ gh pr checks
All checks passing
  ci/lint           pass    36s
  ci/type-check     pass    1m12s
  ci/test           pass    2m05s
  ci/secret-scan    pass    18s

5 минут — все зелёные. Если красное — fix перед review (нет смысла грузить reviewer-а на сломанный код).

Если CI fail

$ gh pr checks
Some checks were not successful
  ci/lint            fail    25s    https://github.com/...
  ...

$ gh run view --log <run-id>
# смотришь логи

# Видишь, например, ruff:
dags/user_events_dag.py:42:80: E501 Line too long (102 > 100)

# Fix
$ sed -i 's/somethingtoolong/break_line/' dags/user_events_dag.py
$ uv run ruff check dags/user_events_dag.py
All checks passed!

$ git add dags/user_events_dag.py
$ git commit -m "fix(user_events_dag): wrap long line per ruff"
$ git push
# CI запустится автоматически на новый push

После fix — wait зелёного.


Best practices recap

СтадияBest practice
CloneСразу uv sync + pre-commit install
Осмотр15 минут на README + 1 похожий DAG + CI workflow
Branch namefeat/<ticket>-<slug>
CommitsConventional: feat:, test:, docs: + ticket ref
Размер commitSmall, focused (один логический change)
Push--set-upstream первый раз
PR titlefeat(scope): summary (TICKET)
PR bodySummary + Changes + Testing + Checklist + Reviewer notes
ReviewersЧерез CODEOWNERS + explicit --reviewer
CIЗелёный перед прошением review

Killer takeaway

Первый день junior DE: clone репо, 15-минутный sweep (README + похожий DAG + CI + CODEOWNERS) — это самое важное для onboarding. git switch -c feat/<ticket>-<slug> — стандартное имя. Conventional commits с ticket ref — стиль команды. Локально проверь всё что в CI (ruff/mypy/pytest) перед push. git push --set-upstream origin <branch> первый раз. gh pr create — быстрый PR с правильно сформатированным body. Wait зелёного CI перед request review. Pre-commit hooks (gitleaks) — твоя страховка от обидной утечки.

dbt на практике: feature branch workflow и первый PR
Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. JIRA ticket DE-1234 'Add user_events ingestion DAG'. Какое имя feature ветки следовать командной convention?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4