Learning Platform
Глоссарий Troubleshooting
Урок 09.05 · 25 мин
Продвинутый
Cross-DAGCascadeFan-outTriggerDagRunOperatorETL Patterns

Cross-DAG cascade — producer → multiple consumers

Реальный data warehouse состоит не из изолированных DAG-ов, а из cascade-структуры: bronze ingestion → silver transformations → gold marts → ML feature store → BI refresh.

Архитектура DWH: слои и Medallion Раньше связь между уровнями строилась через TriggerDagRunOperator (tight coupling) или ExternalTaskSensor (polling overhead). Datasets дают чистую alternative: publish-subscribe через типизированные URIs.

Этот урок — практические patterns для построения многоуровневых cascade-ов с Datasets, anti-patterns и сравнение с legacy approaches.


Anti-pattern: TriggerDagRunOperator

До Datasets единственный способ цепочки DAG-ов был explicit trigger:

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

@dag(schedule="@hourly")
def bronze_ingestion():
    extract = PythonOperator(...)

    trigger_silver = TriggerDagRunOperator(
        task_id="trigger_silver"
        trigger_dag_id="silver_transformations"
        wait_for_completion=False,
    )

    trigger_bi_refresh = TriggerDagRunOperator(
        task_id="trigger_bi"
        trigger_dag_id="bi_refresh"
        wait_for_completion=False,
    )

    extract >> [trigger_silver, trigger_bi_refresh]

Проблемы:

  1. Tight coupling: bronze DAG должен знать имена всех downstream DAG-ов. Добавили новый consumer — нужно изменить bronze. Это нарушает open-closed principle.
  2. Hard to discover lineage: чтобы понять «кто использует bronze data?» — нужно grep-ать кодовую базу на trigger_dag_id="bronze...". UI ничего не показывает.
  3. Failure semantics запутаны: если trigger task failed (не downstream DagRun, а сам trigger), что считать failure?
  4. Bypass max_active_runs_per_dag: trigger игнорирует concurrency limits downstream — может создать 100 runs за минуту.
  5. wait_for_completion=True блокирует worker slot (как poke sensor) на часы.

С Datasets такая логика инвертируется: bronze ничего не знает про consumers, consumers subscribe независимо.


Pattern 1: Fan-out (один producer → много consumers)

Базовый и самый частый pattern:

from airflow import Dataset
from airflow.decorators import dag, task

# Single source of truth
orders_bronze = Dataset("s3://lake/bronze/orders/")

# === BRONZE PRODUCER ===
@dag(schedule="@hourly", start_date=datetime(2026, 1, 1))
def orders_bronze_ingest():
    @task(outlets=[orders_bronze])
    def extract_from_postgres():
        pass
    extract_from_postgres()

# === SILVER CONSUMER (transformations) ===
@dag(schedule=[orders_bronze])
def orders_silver():
    @task
    def deduplicate(): pass
    @task
    def enrich_with_customers(): pass
    deduplicate() >> enrich_with_customers()

# === BI REFRESH CONSUMER ===
@dag(schedule=[orders_bronze])
def orders_bi_refresh():
    @task
    def rebuild_dashboard_cache(): pass
    rebuild_dashboard_cache()

# === ML FEATURE STORE CONSUMER ===
@dag(schedule=[orders_bronze])
def orders_ml_features():
    @task
    def compute_features(): pass
    compute_features()

Каждый consumer независим. Добавление нового consumer = создание нового DAG file. Bronze DAG не меняется.

Fan-out cascade через Datasets
orders_bronze_ingestProducer DAG. Schedule: @hourly. Task с outlets=[orders_bronze]. Эмитит dataset_event на каждый успешный run.
dataset_event INSERT
Dataset: orders_bronzeОдин dataset URI. Подписаны 3 consumer DAG-а (через dag_schedule_dataset_reference). При каждом event scheduler делает fan-out INSERT в dataset_dag_run_queue для всех 3.
3 queue rows created
orders_silverConsumer 1: transformations. Запускается ~5-10s после bronze SUCCESS. Эмитит свой dataset (silver/orders) для downstream gold layer.
orders_bi_refreshConsumer 2: BI dashboard rebuild. Независимый от silver — runs в параллель.
orders_ml_featuresConsumer 3: ML feature compute. Также параллельно. Эмитит feature dataset для ML training pipeline.

Pattern 2: Multi-level cascade (medallion architecture)

Bronze → Silver → Gold pipeline, классика lakehouse:

bronze_orders = Dataset("s3://lake/bronze/orders/")
bronze_customers = Dataset("s3://lake/bronze/customers/")

silver_orders = Dataset("s3://lake/silver/orders_enriched/")
silver_customers = Dataset("s3://lake/silver/customers_clean/")

gold_revenue = Dataset("s3://lake/gold/daily_revenue/")

# Level 1: bronze (raw ingestion)
@dag(schedule="@hourly")
def bronze_orders_dag():
    @task(outlets=[bronze_orders])
    def extract_orders(): pass
    extract_orders()

@dag(schedule="@hourly")
def bronze_customers_dag():
    @task(outlets=[bronze_customers])
    def extract_customers(): pass
    extract_customers()

# Level 2: silver (cleaning + enrichment)
@dag(schedule=[bronze_orders, bronze_customers])
def silver_orders_dag():
    @task(outlets=[silver_orders])
    def enrich(): pass  # JOIN orders × customers
    enrich()

@dag(schedule=[bronze_customers])
def silver_customers_dag():
    @task(outlets=[silver_customers])
    def clean(): pass
    clean()

# Level 3: gold (business marts)
@dag(schedule=[silver_orders])
def gold_revenue_dag():
    @task(outlets=[gold_revenue])
    def compute_revenue(): pass
    compute_revenue()

# Level 4: leaf consumers
@dag(schedule=[gold_revenue])
def bi_dashboard_refresh(): ...

@dag(schedule=[gold_revenue])
def executive_email_report(): ...

Структура:

  • Bronze запускается по cron (@hourly).
  • Silver_orders ждёт ОБА bronze_orders + bronze_customers (AND).
  • Silver_customers ждёт только bronze_customers.
  • Gold ждёт silver_orders.
  • Leaf consumers (BI, email) ждут gold.

Один cron tick в bronze inicia cascade через 4 уровня — без TriggerDagRunOperator, без ExternalTaskSensor.


Pattern 3: Conditional triggering — DatasetAll/DatasetAny

В 2.9+ можно явно сказать «OR/AND»:

from airflow.datasets import DatasetAll, DatasetAny

@dag(schedule=DatasetAny(ds_a, ds_b, ds_c))
def any_source_updated():
    # Запустится когда ЛЮБОЙ из ds_a/b/c обновлён
    pass

@dag(schedule=DatasetAll(ds_a, ds_b, ds_c))
def all_sources_updated():
    # Запустится когда ВСЕ обновлены — default для list
    pass

# Можно комбинировать:
@dag(schedule=(ds_orders & ds_customers) | (ds_invoices & ds_payments))
def complex_logic():
    # Либо (orders + customers), либо (invoices + payments)
    pass

Это полезно для:

  • Fault tolerance: «запусти если ХОТЬ ОДИН source доехал»
  • Multi-region: «запусти консолидацию если все 3 региона готовы»
  • Alternative pipelines: «либо primary path, либо fallback»
NOTE

DatasetAny semantics: первое попавшееся event triggers run. Остальные events продолжают накапливать в queue, но scheduler удаляет их при создании DagRun (queue cleanup для DAG-а полный, не selective). Если хотите «запустить на каждый source отдельный run» — это не та семантика, нужны независимые DAG-и.


Pattern 4: DatasetOrTimeSchedule — hybrid

В 2.9+ доступно «либо по cron, либо когда dataset обновлён»:

from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable

@dag(schedule=DatasetOrTimeSchedule(
    timetable=CronTriggerTimetable("0 6 * * *", timezone="UTC"),
    datasets=[orders_silver],
))
def daily_report_or_dataset():
    # Запустится в 06:00 по cron, или раньше — если silver обновился
    pass

Use case: SLA-driven reports. Дашборд должен обновиться к 06:00 (cron fallback), но если silver layer закончил раньше — рано тоже хорошо. Lower-bound гарантия через time, upper-bound efficiency через dataset.


Anti-pattern: Excessive Dataset granularity

Соблазн — превратить каждую partition в отдельный Dataset:

# ANTI-PATTERN: separate dataset per day
ds_orders_2026_05_01 = Dataset("s3://lake/orders/dt=2026-05-01/")
ds_orders_2026_05_02 = Dataset("s3://lake/orders/dt=2026-05-02/")
# ... 365 datasets per year

Не делайте этого. dataset table растёт неограниченно, scheduler делает гигантские queries на JOIN. Используйте:

  • Один Dataset на logical entity (s3://lake/orders/) с metadata в extra для partition info
  • DatasetAlias если URI зависит от runtime (см. урок 03)

Anti-pattern: Producer publishing dataset before write complete

Лёгкая ошибка — эмитить event ДО того, как данные действительно записались:

# BAD
@task(outlets=[ds])
def bad_writer():
    initiate_async_export()  # запускает long-running export
    # task завершается → event эмитится → consumer запускается
    # → но данные ещё не записаны!

Эмиссия dataset event происходит при SUCCESS state task-а. Если task логически закончил, но реальные данные ещё пишутся async — consumer прочитает stale/partial data. Решение:

# GOOD
@task(outlets=[ds])
def good_writer():
    job_id = initiate_async_export()
    wait_for_export_completion(job_id)  # blocking until data ready
    # task завершается только когда данные действительно есть

Или используйте deferrable sensor (модуль 09) для wait части — не блокировать worker.


Сравнение: legacy vs Datasets

АспектTriggerDagRunOperatorExternalTaskSensorDatasets (2.4+)
CouplingTight (producer знает consumers)Tight (consumer знает producer)Decoupled (через URI)
LatencySub-second30-300s polling5-15s scheduler tick
Worker overheadTrigger task занят моментPolling worker slotZero (queue в DB)
Lineage UIНе визуализируетсяНе визуализируетсяNative graph view
ScalingLinear (один trigger per consumer)Quadratic (sensor per dep)Constant (DB indexed)
Conditional logicManual through PythonPer-sensor logicNative DatasetAll/Any
Hybrid time+dataTriggerDagRunOperator + cron — awkwardSensor + cron — fragileDatasetOrTimeSchedule native
ObservabilityUI не знаетUI не знаетUI cross-DAG graph
Failure semanticsTrigger task vs downstreamSensor statesClean: emit on success only

Datasets — strict superset для cross-DAG dependency. TriggerDagRunOperator остаётся valid для:

  • Manual triggering из UI/CLI
  • One-time triggers не привязанные к данным
  • Programmatic triggering с custom logic

Real-world ETL cascade — финансовый отчёт

Финтех use case: дневной финансовый отчёт зависит от 4 источников.

ds_transactions = Dataset("postgres://prod/transactions_daily/")
ds_fx_rates = Dataset("https://api.ecb.eu/fx-rates/")
ds_reconciled = Dataset("s3://finance/reconciled/")
ds_revenue_report = Dataset("s3://finance/reports/daily-revenue/")
ds_audit_log = Dataset("s3://finance/audit/")

# Stage 1: parallel ingestion
@dag(schedule="0 1 * * *")  # 01:00 UTC daily
def fetch_transactions():
    @task(outlets=[ds_transactions])
    def fetch(): pass
    fetch()

@dag(schedule="0 1 * * *")
def fetch_fx_rates():
    @task(outlets=[ds_fx_rates])
    def fetch(): pass
    fetch()

# Stage 2: reconciliation (waits for BOTH)
@dag(schedule=[ds_transactions, ds_fx_rates])
def reconcile_transactions():
    @task(outlets=[ds_reconciled])
    def reconcile():
        # Apply FX, dedup, validate
        pass
    reconcile()

# Stage 3: revenue + audit (parallel, both depend on reconciled)
@dag(schedule=[ds_reconciled])
def daily_revenue_report():
    @task(outlets=[ds_revenue_report])
    def compute(): pass
    compute()

@dag(schedule=[ds_reconciled])
def audit_log_generation():
    @task(outlets=[ds_audit_log])
    def audit(): pass
    audit()

# Stage 4: notifications (waits for BOTH leaf reports)
@dag(schedule=[ds_revenue_report, ds_audit_log])
def email_finance_team():
    @task
    def send_email(): pass
    send_email()

Сценарии failure:

  • Если fetch_fx_rates упал — reconcile_transactions НЕ запустится (AND-условие). Каскад замёрз — но transparency in UI: видно, что queue имеет только ds_transactions, не хватает ds_fx_rates. Operator может manually retry fx_rates или backfill.
  • Если reconcile_transactions skipped — events не эмитятся, downstream замёрз. Лучше: explicit fail + retry, не skip.

Production gotchas

  1. Cascade depth limit: на практике 4-5 уровней дают latency 30-60s от bronze trigger до leaf consumer. Это нормально для batch ETL, но не для real-time. Не строй чрезмерно глубокие cascade.

  2. Cycles НЕ детектируются. Можно случайно сделать ds_a → DAG-X → ds_b → DAG-Y → ds_a. Airflow выполняет cycle, что приводит к infinite loop. Manual review топологии или statisticheski через DB query.

    -- Найти cycles (упрощённо)
    WITH RECURSIVE cascade AS (
        SELECT
            sdr.dag_id AS source,
            tdr.dag_id AS target
        FROM dag_schedule_dataset_reference sdr
        JOIN task_outlet_dataset_reference tdr
            ON sdr.dataset_id = tdr.dataset_id
        UNION
        SELECT c.source, tdr.dag_id
        FROM cascade c
        JOIN dataset_event de ON ...
    )
    SELECT * FROM cascade WHERE source = target;
  3. Race condition между уровнями: если cascade слишком быстрый, второй ingestion на bronze может перекрыть первый до того, как silver запустился. Используйте max_active_runs=1 на critical stages.

  4. Backfill cascade complications. airflow dags backfill НЕ распространяется через datasets — нужно backfill каждый DAG отдельно или через CLI с --reset-dagruns. Это часто упускают.

  5. Dataset events НЕ синхронизированы между Airflow instances (multi-tenant deployment). Если у вас 2 Airflow installations, нужен external bridge (event bus или manual TriggerDagRunOperator через REST API).

  6. DAG в paused state не consume events — queue копит. После unpause runs запускаются by-one согласно max_active_runs. Это может вызвать «volcano effect» — длинная серия runs.

  7. Cascade observability: используйте OpenLineage (модуль 14) для visualization cross-DAG dependencies. Native Airflow UI показывает только graph внутри DAG, но не cascade-граф между DAG-ами автоматически — там есть dataset events view, но требует navigation.


Проверка знанийKnowledge check
В medallion-каскаде Bronze → Silver → Gold, Silver зависит от 2 bronze datasets через schedule=[bronze_orders, bronze_customers]. Bronze_orders обновляется каждый час, bronze_customers — раз в день. Сколько раз в день Silver DAG запустится?
ОтветAnswer
Один раз в день. Хотя bronze_orders эмитит 24 events за сутки и каждый раз пополняет queue для Silver, второй обязательный dataset (bronze_customers) эмитится только 1 раз. AND-условие satisfied только когда оба datasets имеют queue rows — это происходит когда bronze_customers выпустит свой event (один раз). После создания Silver DagRun queue полностью очищается (DELETE FROM dataset_dag_run_queue WHERE target_dag_id='silver'), и следующее накопление начинается с нуля. Накопленные 23 bronze_orders events 'теряются' — не создают дополнительных runs. Это design decision: dataset-triggered runs не множатся пропорционально event count. Если нужен hourly Silver — измените schedule на schedule=DatasetOrTimeSchedule(timetable='@hourly', datasets=[bronze_orders, bronze_customers]) или используйте только bronze_orders как trigger и читайте customers as-of inside task.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Когда оправдан TriggerDagRunOperator вместо Datasets для cross-DAG dependency?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6