Learning Platform
Глоссарий Troubleshooting
Урок 18.01 · 12 мин
Продвинутый
Design PatternsIdempotencyFactoryBackfill

Design Patterns — обзор модуля

Production Airflow — это не про синтаксис @dag/@task, а про правильную архитектуру pipeline-ов. Этот модуль — collection patterns, которые отличают мaintainable production DAGs от спагетти-кода.

Уроки модуля

#УрокЧто внутри
01Обзор модуляТекущий урок
02IdempotencyMERGE INTO, atomic writes, deterministic paths
03DAG factory patternГенерация DAGs из конфига, pitfalls top-level imports
04Reusable TaskGroups + Setup/Teardown (2.7+)Wrapping ephemeral compute
05Backfill-safe DAGsData interval respecting, no datetime.now()
06OpenLineage-aware patternsDesigning for lineage emission
07Error handling и retry strategiesExponential backoff, dead letter queues
08Approval workflowsSensor-based approval (в 3.x — HITL operator)

Top 5 patterns

1. Idempotency

Любой task должен можно перезапускать без побочных эффектов:

@task
def load_to_warehouse(date: str, path: str):
    # MERGE INTO вместо INSERT
    sql = f"""
    MERGE INTO orders dest
    USING (SELECT * FROM external WHERE dt = '{date}') src
      ON dest.id = src.id
    WHEN MATCHED THEN UPDATE SET ...
    WHEN NOT MATCHED THEN INSERT ...;
    """
    execute(sql)

2. DAG factory

def make_dag(config: dict) -> DAG:
    @dag(dag_id=config['id'], schedule=config['schedule'])
    def generated():
        for step in config['steps']:
            task_from_step(step)
    return generated()

for cfg in load_configs():
    globals()[cfg['id']] = make_dag(cfg)

⚠️ Pitfall: load_configs() выполняется на каждом DAG parse → если делает network call, замедляет parsing.

3. Backfill-safe DAGs

# ❌ ПЛОХО — datetime.now() сдвигается на каждом parse
@dag(start_date=datetime.now() - timedelta(days=7), schedule='@daily')

# ✅ ХОРОШО — hardcoded
@dag(start_date=datetime(2024, 1, 1), schedule='@daily', catchup=False)

4. Approval workflow (через sensor)

В 2.x для human approval используется sensor pattern:

@task.sensor(poke_interval=300, timeout=86400, mode="reschedule")
def wait_approval(**context) -> PokeReturnValue:
    approval_status = check_approval_table(context["dag_run"].run_id)
    if approval_status == "approved":
        return PokeReturnValue(is_done=True)
    return PokeReturnValue(is_done=False)

В Airflow 3.1+ появился dedicated HITL (Human-in-the-Loop) operator для этого паттерна. В 2.x используем sensor-based подход.

5. OpenLineage-aware

Использовать standard operators (PostgresOperator, S3CopyObjectOperator) везде где возможно — они эмитят OL events автоматически (с 2.6+ через apache-airflow-providers-openlineage). Custom Python operators требуют write extractors.

Killer takeaway

Один principle превосходит все остальные: каждый task должен быть атомарным и идемпотентным. Если вы можете убить task посередине, удалить его state и перезапустить — данные останутся консистентными — это правильно сделанный task.

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8