Cross-DAG cascade — producer → multiple consumers
Реальный data warehouse состоит не из изолированных DAG-ов, а из cascade-структуры: bronze ingestion → silver transformations → gold marts → ML feature store → BI refresh.
Архитектура DWH: слои и Medallion Раньше связь между уровнями строилась через TriggerDagRunOperator (tight coupling) или ExternalTaskSensor (polling overhead). Datasets дают чистую alternative: publish-subscribe через типизированные URIs.
Этот урок — практические patterns для построения многоуровневых cascade-ов с Datasets, anti-patterns и сравнение с legacy approaches.
Anti-pattern: TriggerDagRunOperator
До Datasets единственный способ цепочки DAG-ов был explicit trigger:
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
@dag(schedule="@hourly")
def bronze_ingestion():
extract = PythonOperator(...)
trigger_silver = TriggerDagRunOperator(
task_id="trigger_silver"
trigger_dag_id="silver_transformations"
wait_for_completion=False,
)
trigger_bi_refresh = TriggerDagRunOperator(
task_id="trigger_bi"
trigger_dag_id="bi_refresh"
wait_for_completion=False,
)
extract >> [trigger_silver, trigger_bi_refresh]
Проблемы:
- Tight coupling: bronze DAG должен знать имена всех downstream DAG-ов. Добавили новый consumer — нужно изменить bronze. Это нарушает open-closed principle.
- Hard to discover lineage: чтобы понять «кто использует bronze data?» — нужно grep-ать кодовую базу на
trigger_dag_id="bronze...". UI ничего не показывает. - Failure semantics запутаны: если trigger task failed (не downstream DagRun, а сам trigger), что считать failure?
- Bypass max_active_runs_per_dag: trigger игнорирует concurrency limits downstream — может создать 100 runs за минуту.
wait_for_completion=Trueблокирует worker slot (как poke sensor) на часы.
С Datasets такая логика инвертируется: bronze ничего не знает про consumers, consumers subscribe независимо.
Pattern 1: Fan-out (один producer → много consumers)
Базовый и самый частый pattern:
from airflow import Dataset
from airflow.decorators import dag, task
# Single source of truth
orders_bronze = Dataset("s3://lake/bronze/orders/")
# === BRONZE PRODUCER ===
@dag(schedule="@hourly", start_date=datetime(2026, 1, 1))
def orders_bronze_ingest():
@task(outlets=[orders_bronze])
def extract_from_postgres():
pass
extract_from_postgres()
# === SILVER CONSUMER (transformations) ===
@dag(schedule=[orders_bronze])
def orders_silver():
@task
def deduplicate(): pass
@task
def enrich_with_customers(): pass
deduplicate() >> enrich_with_customers()
# === BI REFRESH CONSUMER ===
@dag(schedule=[orders_bronze])
def orders_bi_refresh():
@task
def rebuild_dashboard_cache(): pass
rebuild_dashboard_cache()
# === ML FEATURE STORE CONSUMER ===
@dag(schedule=[orders_bronze])
def orders_ml_features():
@task
def compute_features(): pass
compute_features()
Каждый consumer независим. Добавление нового consumer = создание нового DAG file. Bronze DAG не меняется.
Pattern 2: Multi-level cascade (medallion architecture)
Bronze → Silver → Gold pipeline, классика lakehouse:
bronze_orders = Dataset("s3://lake/bronze/orders/")
bronze_customers = Dataset("s3://lake/bronze/customers/")
silver_orders = Dataset("s3://lake/silver/orders_enriched/")
silver_customers = Dataset("s3://lake/silver/customers_clean/")
gold_revenue = Dataset("s3://lake/gold/daily_revenue/")
# Level 1: bronze (raw ingestion)
@dag(schedule="@hourly")
def bronze_orders_dag():
@task(outlets=[bronze_orders])
def extract_orders(): pass
extract_orders()
@dag(schedule="@hourly")
def bronze_customers_dag():
@task(outlets=[bronze_customers])
def extract_customers(): pass
extract_customers()
# Level 2: silver (cleaning + enrichment)
@dag(schedule=[bronze_orders, bronze_customers])
def silver_orders_dag():
@task(outlets=[silver_orders])
def enrich(): pass # JOIN orders × customers
enrich()
@dag(schedule=[bronze_customers])
def silver_customers_dag():
@task(outlets=[silver_customers])
def clean(): pass
clean()
# Level 3: gold (business marts)
@dag(schedule=[silver_orders])
def gold_revenue_dag():
@task(outlets=[gold_revenue])
def compute_revenue(): pass
compute_revenue()
# Level 4: leaf consumers
@dag(schedule=[gold_revenue])
def bi_dashboard_refresh(): ...
@dag(schedule=[gold_revenue])
def executive_email_report(): ...
Структура:
- Bronze запускается по cron (@hourly).
- Silver_orders ждёт ОБА bronze_orders + bronze_customers (AND).
- Silver_customers ждёт только bronze_customers.
- Gold ждёт silver_orders.
- Leaf consumers (BI, email) ждут gold.
Один cron tick в bronze inicia cascade через 4 уровня — без TriggerDagRunOperator, без ExternalTaskSensor.
Pattern 3: Conditional triggering — DatasetAll/DatasetAny
В 2.9+ можно явно сказать «OR/AND»:
from airflow.datasets import DatasetAll, DatasetAny
@dag(schedule=DatasetAny(ds_a, ds_b, ds_c))
def any_source_updated():
# Запустится когда ЛЮБОЙ из ds_a/b/c обновлён
pass
@dag(schedule=DatasetAll(ds_a, ds_b, ds_c))
def all_sources_updated():
# Запустится когда ВСЕ обновлены — default для list
pass
# Можно комбинировать:
@dag(schedule=(ds_orders & ds_customers) | (ds_invoices & ds_payments))
def complex_logic():
# Либо (orders + customers), либо (invoices + payments)
pass
Это полезно для:
- Fault tolerance: «запусти если ХОТЬ ОДИН source доехал»
- Multi-region: «запусти консолидацию если все 3 региона готовы»
- Alternative pipelines: «либо primary path, либо fallback»
DatasetAny semantics: первое попавшееся event triggers run. Остальные events продолжают накапливать в queue, но scheduler удаляет их при создании DagRun (queue cleanup для DAG-а полный, не selective). Если хотите «запустить на каждый source отдельный run» — это не та семантика, нужны независимые DAG-и.
Pattern 4: DatasetOrTimeSchedule — hybrid
В 2.9+ доступно «либо по cron, либо когда dataset обновлён»:
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
@dag(schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 6 * * *", timezone="UTC"),
datasets=[orders_silver],
))
def daily_report_or_dataset():
# Запустится в 06:00 по cron, или раньше — если silver обновился
pass
Use case: SLA-driven reports. Дашборд должен обновиться к 06:00 (cron fallback), но если silver layer закончил раньше — рано тоже хорошо. Lower-bound гарантия через time, upper-bound efficiency через dataset.
Anti-pattern: Excessive Dataset granularity
Соблазн — превратить каждую partition в отдельный Dataset:
# ANTI-PATTERN: separate dataset per day
ds_orders_2026_05_01 = Dataset("s3://lake/orders/dt=2026-05-01/")
ds_orders_2026_05_02 = Dataset("s3://lake/orders/dt=2026-05-02/")
# ... 365 datasets per year
Не делайте этого. dataset table растёт неограниченно, scheduler делает гигантские queries на JOIN. Используйте:
- Один Dataset на logical entity (
s3://lake/orders/) с metadata вextraдля partition info - DatasetAlias если URI зависит от runtime (см. урок 03)
Anti-pattern: Producer publishing dataset before write complete
Лёгкая ошибка — эмитить event ДО того, как данные действительно записались:
# BAD
@task(outlets=[ds])
def bad_writer():
initiate_async_export() # запускает long-running export
# task завершается → event эмитится → consumer запускается
# → но данные ещё не записаны!
Эмиссия dataset event происходит при SUCCESS state task-а. Если task логически закончил, но реальные данные ещё пишутся async — consumer прочитает stale/partial data. Решение:
# GOOD
@task(outlets=[ds])
def good_writer():
job_id = initiate_async_export()
wait_for_export_completion(job_id) # blocking until data ready
# task завершается только когда данные действительно есть
Или используйте deferrable sensor (модуль 09) для wait части — не блокировать worker.
Сравнение: legacy vs Datasets
| Аспект | TriggerDagRunOperator | ExternalTaskSensor | Datasets (2.4+) |
|---|---|---|---|
| Coupling | Tight (producer знает consumers) | Tight (consumer знает producer) | Decoupled (через URI) |
| Latency | Sub-second | 30-300s polling | 5-15s scheduler tick |
| Worker overhead | Trigger task занят момент | Polling worker slot | Zero (queue в DB) |
| Lineage UI | Не визуализируется | Не визуализируется | Native graph view |
| Scaling | Linear (один trigger per consumer) | Quadratic (sensor per dep) | Constant (DB indexed) |
| Conditional logic | Manual through Python | Per-sensor logic | Native DatasetAll/Any |
| Hybrid time+data | TriggerDagRunOperator + cron — awkward | Sensor + cron — fragile | DatasetOrTimeSchedule native |
| Observability | UI не знает | UI не знает | UI cross-DAG graph |
| Failure semantics | Trigger task vs downstream | Sensor states | Clean: emit on success only |
Datasets — strict superset для cross-DAG dependency. TriggerDagRunOperator остаётся valid для:
- Manual triggering из UI/CLI
- One-time triggers не привязанные к данным
- Programmatic triggering с custom logic
Real-world ETL cascade — финансовый отчёт
Финтех use case: дневной финансовый отчёт зависит от 4 источников.
ds_transactions = Dataset("postgres://prod/transactions_daily/")
ds_fx_rates = Dataset("https://api.ecb.eu/fx-rates/")
ds_reconciled = Dataset("s3://finance/reconciled/")
ds_revenue_report = Dataset("s3://finance/reports/daily-revenue/")
ds_audit_log = Dataset("s3://finance/audit/")
# Stage 1: parallel ingestion
@dag(schedule="0 1 * * *") # 01:00 UTC daily
def fetch_transactions():
@task(outlets=[ds_transactions])
def fetch(): pass
fetch()
@dag(schedule="0 1 * * *")
def fetch_fx_rates():
@task(outlets=[ds_fx_rates])
def fetch(): pass
fetch()
# Stage 2: reconciliation (waits for BOTH)
@dag(schedule=[ds_transactions, ds_fx_rates])
def reconcile_transactions():
@task(outlets=[ds_reconciled])
def reconcile():
# Apply FX, dedup, validate
pass
reconcile()
# Stage 3: revenue + audit (parallel, both depend on reconciled)
@dag(schedule=[ds_reconciled])
def daily_revenue_report():
@task(outlets=[ds_revenue_report])
def compute(): pass
compute()
@dag(schedule=[ds_reconciled])
def audit_log_generation():
@task(outlets=[ds_audit_log])
def audit(): pass
audit()
# Stage 4: notifications (waits for BOTH leaf reports)
@dag(schedule=[ds_revenue_report, ds_audit_log])
def email_finance_team():
@task
def send_email(): pass
send_email()
Сценарии failure:
- Если
fetch_fx_ratesупал —reconcile_transactionsНЕ запустится (AND-условие). Каскад замёрз — но transparency in UI: видно, что queue имеет толькоds_transactions, не хватаетds_fx_rates. Operator может manually retry fx_rates или backfill. - Если
reconcile_transactionsskipped — events не эмитятся, downstream замёрз. Лучше: explicit fail + retry, не skip.
Production gotchas
-
Cascade depth limit: на практике 4-5 уровней дают latency 30-60s от bronze trigger до leaf consumer. Это нормально для batch ETL, но не для real-time. Не строй чрезмерно глубокие cascade.
-
Cycles НЕ детектируются. Можно случайно сделать
ds_a → DAG-X → ds_b → DAG-Y → ds_a. Airflow выполняет cycle, что приводит к infinite loop. Manual review топологии или statisticheski через DB query.-- Найти cycles (упрощённо) WITH RECURSIVE cascade AS ( SELECT sdr.dag_id AS source, tdr.dag_id AS target FROM dag_schedule_dataset_reference sdr JOIN task_outlet_dataset_reference tdr ON sdr.dataset_id = tdr.dataset_id UNION SELECT c.source, tdr.dag_id FROM cascade c JOIN dataset_event de ON ... ) SELECT * FROM cascade WHERE source = target; -
Race condition между уровнями: если cascade слишком быстрый, второй ingestion на bronze может перекрыть первый до того, как silver запустился. Используйте
max_active_runs=1на critical stages. -
Backfill cascade complications.
airflow dags backfillНЕ распространяется через datasets — нужно backfill каждый DAG отдельно или через CLI с--reset-dagruns. Это часто упускают. -
Dataset events НЕ синхронизированы между Airflow instances (multi-tenant deployment). Если у вас 2 Airflow installations, нужен external bridge (event bus или manual TriggerDagRunOperator через REST API).
-
DAG в paused state не consume events — queue копит. После unpause runs запускаются by-one согласно max_active_runs. Это может вызвать «volcano effect» — длинная серия runs.
-
Cascade observability: используйте OpenLineage (модуль 14) для visualization cross-DAG dependencies. Native Airflow UI показывает только graph внутри DAG, но не cascade-граф между DAG-ами автоматически — там есть dataset events view, но требует navigation.