Orchestration & Workflow Design
Зачем оркестратор
Оркестратор — мозг data platform. Он координирует: что запускать, когда, в каком порядке, что делать при сбое.
Без оркестратора:
30 cron jobs, неизвестный порядок, нет retry,
при сбое — ручное расследование, нет lineage
С оркестратором:
DAG определяет зависимости, автоматический retry,
alerting, backfill, monitoring, lineage
Airflow vs Dagster vs Prefect
Airflow 3.0: Asset-Centric Paradigm Shift (2025)
Airflow 3.0 (релиз 22 апреля 2025) — первый major релиз с 2020 года. Это не minor bump — это смена ментальной модели: от «когда запустить task» к «когда asset должен быть свежим».
Ключевые изменения
AIP-75 Asset-Centric Syntax:
Airflow 2.x: Datasets (file-like, external pointer)
Airflow 3.0: Assets (first-class, @asset decorator)
@asset(schedule="@daily")
def silver_orders(bronze_orders: Asset) -> pd.DataFrame:
return bronze_orders.read().dropna()
→ DAG генерируется автоматически из asset graph
→ Lineage и зависимости — побочный эффект кода
Asset Watchers (event-driven):
trigger = AssetWatcher(
source="aws_sqs",
queue="s3-new-files",
)
→ DAG стартует на event, а не по cron
→ Прощай sensor pooling antipattern
DAG Versioning (AIP-65, AIP-66):
Каждый run привязан к версии DAG-а на момент старта
Меняешь код во время run-а — текущий run закончится на старой версии
UI показывает task structure, code, logs ровно той версии, что выполнялась
→ Решает «почему вчерашний run выглядит не так, как DAG сейчас»
Edge Worker:
Удалённое исполнение tasks за NAT, в on-prem ДЦ, в edge-локациях
Worker подключается к Airflow по HTTPS (не наоборот)
→ Hybrid-cloud и data residency без VPC peering
Task Execution API:
Multi-language: tasks могут быть не Python (Go, Rust, Java)
Decoupled scheduler от worker (security boundary)
Сравнение asset-aware engines
| Aspect | Airflow 3.0 | Dagster | Prefect 3.0 |
|---|---|---|---|
| Asset abstraction | Assets (AIP-75) | Software-Defined Assets | Artifacts (events) |
| Pioneer | Догнал в 2025 | Шёл первым с 2020 | Hybrid: flow-first + artifacts |
| Declarative scheduling | @asset(schedule=...) + Watchers | AutoMaterializePolicy, freshness rules | Deployment + event triggers |
| Lineage | Asset graph + OpenLineage | Native asset graph | Manual via artifacts |
| Versioning | DAG Versioning (built-in) | Через code locations | Через deployments |
| Best when | Existing Airflow infra + хочется asset-first | Greenfield, data mesh, asset-first | Python-heavy, ML, event-driven |
«Airflow устарел» — миф 2024 года. До 3.0 Dagster действительно опережал по asset-первичности. С 3.0 разница в paradigm закрыта: Airflow догнал по asset-centric, Dagster всё ещё лучше по developer experience и testing. Выбор теперь зависит от existing infrastructure и команды, не от парадигмы.
Migration path Airflow 2 → 3. Datasets → Assets — обратносовместимо: старый Dataset API работает. Asset Watchers заменяют sensor-poll antipattern. DAG Versioning включается опционально. Edge Worker — opt-in для hybrid setups. AWS MWAA, Astronomer, GCP Composer уже поддерживают Airflow 3 в 2025.
DAG Design Patterns
1. Fan-out / Fan-in
# Airflow: parallel processing → merge
extract >> [transform_a, transform_b, transform_c] >> merge >> load
# Use case: process multiple source tables in parallel,
# then merge results into final table
2. Sensor Pattern
# Wait for external event before processing
file_sensor = S3KeySensor(
bucket='raw-data',
bucket_key='orders/{{ ds }}/*.parquet',
timeout=3600, # 1 hour max wait
poke_interval=60
)
file_sensor >> process_orders >> load_warehouse
3. Dynamic DAGs
# Airflow 2.x TaskFlow API
@dag(schedule='@daily')
def dynamic_pipeline():
tables = get_source_tables() # returns ['orders', 'users', 'products']
@task
def process_table(table_name):
spark.read.table(table_name).write.mode('overwrite')...
process_table.expand(table_name=tables)
# Creates N parallel tasks at runtime
4. Backfill Pattern
# Idempotent tasks = safe backfill
@task
def load_daily_partition(ds):
# ds = execution date (2024-01-15)
# OVERWRITE partition → idempotent
spark.sql(f"""
INSERT OVERWRITE TABLE silver.orders
PARTITION (date='{ds}')
SELECT * FROM bronze.orders WHERE date='{ds}'
""")
# Backfill: airflow dags backfill -s 2024-01-01 -e 2024-01-14
Testing DAGs
# Unit test: validate DAG structure
def test_dag_structure():
dag = load_dag('daily_pipeline')
assert dag.task_count == 5
assert 'extract' in dag.task_ids
assert dag.get_task('load').upstream_task_ids == {'transform'}
# Integration test: run task with test data
def test_transform_task():
result = transform_orders.execute(
context={'ds': '2024-01-01'},
test_data=sample_orders_df
)
assert result.count() > 0
assert 'amount' in result.columns
Dagster vs Airflow 3.0 для новых проектов. Раньше выбор был “task-centric vs asset-centric”. С Airflow 3.0 (AIP-75) парадигма у обоих asset-aware. Решающие факторы теперь: (1) developer experience и testing — Dagster всё ещё впереди; (2) existing infra и compliance — Airflow с его экосистемой провайдеров; (3) team familiarity. Greenfield + asset mesh — Dagster. Greenfield + большая команда + готовые провайдеры — Airflow 3.0. Existing Airflow 2.x — мигрировать на 3.0, не переписывать на Dagster.
Anti-pattern: God DAG. Один DAG на 200 tasks = unmanageable. Разбивайте: один DAG per domain (orders_pipeline, users_pipeline), межDAG dependencies через sensors или datasets.