Design Patterns — обзор модуля
Production Airflow — это не про синтаксис @dag/@task, а про правильную архитектуру pipeline-ов. Этот модуль — collection patterns, которые отличают мaintainable production DAGs от спагетти-кода.
Уроки модуля
| # | Урок | Что внутри |
|---|---|---|
| 01 | Обзор модуля | Текущий урок |
| 02 | Idempotency | MERGE INTO, atomic writes, deterministic paths |
| 03 | DAG factory pattern | Генерация DAGs из конфига, pitfalls top-level imports |
| 04 | Reusable TaskGroups + Setup/Teardown (2.7+) | Wrapping ephemeral compute |
| 05 | Backfill-safe DAGs | Data interval respecting, no datetime.now() |
| 06 | OpenLineage-aware patterns | Designing for lineage emission |
| 07 | Error handling и retry strategies | Exponential backoff, dead letter queues |
| 08 | Approval workflows | Sensor-based approval (в 3.x — HITL operator) |
Top 5 patterns
1. Idempotency
Любой task должен можно перезапускать без побочных эффектов:
@task
def load_to_warehouse(date: str, path: str):
# MERGE INTO вместо INSERT
sql = f"""
MERGE INTO orders dest
USING (SELECT * FROM external WHERE dt = '{date}') src
ON dest.id = src.id
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...;
"""
execute(sql)
2. DAG factory
def make_dag(config: dict) -> DAG:
@dag(dag_id=config['id'], schedule=config['schedule'])
def generated():
for step in config['steps']:
task_from_step(step)
return generated()
for cfg in load_configs():
globals()[cfg['id']] = make_dag(cfg)
⚠️ Pitfall: load_configs() выполняется на каждом DAG parse → если делает network call, замедляет parsing.
3. Backfill-safe DAGs
# ❌ ПЛОХО — datetime.now() сдвигается на каждом parse
@dag(start_date=datetime.now() - timedelta(days=7), schedule='@daily')
# ✅ ХОРОШО — hardcoded
@dag(start_date=datetime(2024, 1, 1), schedule='@daily', catchup=False)
4. Approval workflow (через sensor)
В 2.x для human approval используется sensor pattern:
@task.sensor(poke_interval=300, timeout=86400, mode="reschedule")
def wait_approval(**context) -> PokeReturnValue:
approval_status = check_approval_table(context["dag_run"].run_id)
if approval_status == "approved":
return PokeReturnValue(is_done=True)
return PokeReturnValue(is_done=False)
В Airflow 3.1+ появился dedicated HITL (Human-in-the-Loop) operator для этого паттерна. В 2.x используем sensor-based подход.
5. OpenLineage-aware
Использовать standard operators (PostgresOperator, S3CopyObjectOperator) везде где возможно — они эмитят OL events автоматически (с 2.6+ через apache-airflow-providers-openlineage). Custom Python operators требуют write extractors.
Killer takeaway
Один principle превосходит все остальные: каждый task должен быть атомарным и идемпотентным. Если вы можете убить task посередине, удалить его state и перезапустить — данные останутся консистентными — это правильно сделанный task.