Dataset fundamentals — data-aware scheduling в Airflow 2.4+
До Airflow 2.4 связь между DAG-ами строилась только через время: «producer DAG запускается в 02:00, consumer DAG — в 02:30, надеемся, что producer успел». Этот подход хрупкий: ETL зависит от внешних API, network latency, retries — producer может опоздать на 20 минут, consumer уже побежал, ELT пайплайн вытянул вчерашние данные.
AIP-48 (Airflow 2.4, сентябрь 2022) решил эту проблему фундаментально. Появился Dataset — declarative объект, представляющий логический набор данных с уникальным URI. DAG-ы могут эмитить dataset events (через outlets) и подписываться на них (через schedule=[dataset]). Когда все upstream datasets обновлены — scheduler автоматически запускает downstream DAG.
Это сдвиг парадигмы: с time-based на data-aware scheduling.
Anatomy of a Dataset
Dataset в 2.x — это тонкий wrapper над URI:
from airflow import Dataset
orders_dataset = Dataset("s3://warehouse/orders/year=2026/month=05/")
customers_dataset = Dataset("postgres://prod-db/customers")
URI — это идентификатор, а не location. Airflow не валидирует URI, не читает данные по нему, не проверяет существование. URI — opaque string. Реальные правила:
- URI должен быть уникальным внутри Airflow instance
- Schema (
s3://,postgres://,kafka://) — соглашение для людей - Максимум 3000 символов (поле
uriв БД) - Можно использовать любой формат —
my-team://orders/dailyвалидный URI
В 2.x никакой связи с реальным хранилищем нет — Airflow не читает S3, не запрашивает Postgres. Dataset — это nominal identity, как label. В Airflow 3.x с AIP-82 появляются event-driven Assets, которые слушают реальные queues (SQS, Kafka), но это другая история.
Producer/consumer pattern — базовый шаблон
Минимальный пример dataset-driven pipeline:
from airflow import Dataset
from airflow.decorators import dag, task
from datetime import datetime
orders_ds = Dataset("s3://warehouse/orders/processed/")
# === PRODUCER DAG ===
@dag(
dag_id="orders_etl_producer"
schedule="@hourly"
start_date=datetime(2026, 1, 1),
catchup=False,
)
def orders_producer():
@task(outlets=[orders_ds])
def extract_and_load():
# ETL logic
pass
extract_and_load()
orders_producer()
# === CONSUMER DAG ===
@dag(
dag_id="orders_aggregator"
schedule=[orders_ds],
start_date=datetime(2026, 1, 1),
catchup=False,
)
def orders_consumer():
@task
def aggregate_orders():
pass
aggregate_orders()
orders_consumer()
Несколько критических деталей:
outlets=[orders_ds]— declarative declaration на DAG parse time. Scheduler знает заранее, что эта task эмитит данный dataset.schedule=[orders_ds]— НЕ строка-cron, а список Dataset объектов. DAG не имеет time schedule вообще.- Dataset event эмитится только при успешном завершении task (state=success). Failed/skipped task НЕ эмитит event.
- AND-semantics by default:
schedule=[ds1, ds2]означает «запусти когда ОБА обновлены».
Lifecycle: от outlet к downstream trigger
Multiple datasets — AND semantics
Если consumer объявляет несколько datasets:
ds_orders = Dataset("s3://warehouse/orders/")
ds_customers = Dataset("s3://warehouse/customers/")
@dag(schedule=[ds_orders, ds_customers])
def joined_view():
@task
def join():
pass
join()
DAG запустится только когда оба datasets имеют события с момента последнего запуска consumer-а. Это AND, не OR.
В 2.9+ появилась поддержка conditional dataset expressions через operators:
from airflow.datasets import DatasetAll, DatasetAny
@dag(schedule=DatasetAny(ds_orders, ds_customers)) # OR
def any_dataset(): ...
@dag(schedule=DatasetAll(ds_orders, ds_customers)) # AND (default)
def all_datasets(): ...
# Можно комбинировать:
@dag(schedule=(ds_orders & ds_customers) | ds_special)
def complex_logic(): ...
Это компилируется в boolean expression tree, который scheduler проверяет на каждом tick.
Outlet с extra metadata
Producer может прикрепить metadata к dataset event:
from airflow.datasets.metadata import Metadata
@task(outlets=[orders_ds])
def extract():
df = fetch_orders()
return Metadata(orders_ds, extra={
"row_count": len(df),
"max_order_id": df["order_id"].max(),
"schema_hash": compute_hash(df.columns),
})
Эти metadata сохраняются в dataset_event.extra (JSONB) и доступны consumer-у через context["triggering_dataset_events"]:
@task
def aggregate(**context):
for ds, events in context["triggering_dataset_events"].items():
for event in events:
row_count = event.extra["row_count"]
print(f"Producer emitted {row_count} rows at {event.timestamp}")
Это позволяет реализовать idempotency check: consumer видит, какие именно partitions обновились, и обрабатывает только их.
Scheduler detection — что происходит каждый tick
Когда producer task завершается успешно, не происходит мгновенный pub/sub. Логика такая:
- Worker сериализует TI state и outlets в metadata DB (атомарная транзакция).
- Scheduler на следующем tick (default 5s) выполняет dataset processing phase.
- SQL приблизительно такой:
-- Псевдокод scheduler dataset processing
WITH consumer_satisfied AS (
SELECT
dag.dag_id,
COUNT(DISTINCT ddrq.dataset_id) AS triggered_count,
(SELECT COUNT(*) FROM dag_schedule_dataset_reference
WHERE dag_id = dag.dag_id) AS required_count
FROM dataset_dag_run_queue ddrq
JOIN dag ON ddrq.target_dag_id = dag.dag_id
GROUP BY dag.dag_id
HAVING triggered_count >= required_count
)
SELECT * FROM consumer_satisfied;
Для каждого satisfied DAG-а scheduler создаёт DagRun(run_type=DatasetTriggeredTimetable) и очищает queue rows.
Между producer SUCCESS и consumer queued проходит обычно 5-10 секунд (scheduler tick + DB transaction). Это не real-time. Если требуется sub-second latency между ETL stages — Datasets не ваш инструмент, смотрите на Kafka/Flink.
Сравнение: ExternalTaskSensor vs TriggerDagRunOperator vs Datasets
До Datasets было два legacy способа связать DAG-и:
| Approach | Producer side | Consumer side | Latency | Cost |
|---|---|---|---|---|
| ExternalTaskSensor | Ничего | Polling sensor в consumer | 30-300s polling interval | Worker slot занят |
| TriggerDagRunOperator | Explicit trigger в producer | Ничего | 0-5s (немедленно) | Tight coupling: producer знает consumers |
| Datasets (2.4+) | outlets=[ds] | schedule=[ds] | 5-10s scheduler tick | Decoupled, scalable |
Datasets — strict superset функциональности для большинства cases. Coupling решается через URI (producer не знает consumers), latency приемлемая, нет polling overhead.
Production gotchas
-
TaskFlow
@taskvs Classic Operator outlets синтаксис разный:# TaskFlow @task(outlets=[orders_ds]) def my_task(): ... # Classic my_op = PythonOperator( task_id="my_task" python_callable=my_func, outlets=[orders_ds], # outlets уровня operator, не task_id ) -
Dataset URI immutable. Если изменить URI существующего dataset — это создаёт новый dataset в БД, старые subscriptions не мигрируют. Меняйте URI только при breaking-change refactoring.
-
Skipped tasks НЕ эмитят events. Если task в producer DAG skipped (например, branching), consumer не запустится. Используйте
trigger_rule="all_done"или splitter pattern. -
Dataset events не накапливаются для consumer. Если producer обновился 100 раз пока consumer запускался — это один dataset_event, который удовлетворил queue. После создания DagRun queue очищается, дальнейшие events добавляют новые queue rows.
-
Failed consumer DagRun не “rolls back” queue. Если consumer запустился dataset-triggered и упал — queue уже очищена. Manual retry или backfill требуется.
-
DAG без time schedule при
schedule=[ds]— это feature, не bug. DAG никогда не запустится по расписанию, только по dataset events. Manual trigger тоже работает. -
Inspection через CLI:
airflow datasets list airflow datasets list-events --uri "s3://warehouse/orders/processed/" airflow datasets details --uri "s3://warehouse/orders/processed/"
Real-world pattern: hourly ETL → daily aggregation
hourly_ds = Dataset("s3://lake/events/hourly/")
# Producer: 24 раза в день
@dag(schedule="@hourly", start_date=datetime(2026, 1, 1))
def hourly_extract():
@task(outlets=[hourly_ds])
def extract_hour():
pass
extract_hour()
# Consumer: запустится после КАЖДОГО обновления (24 раза в день)
@dag(schedule=[hourly_ds])
def downstream_after_hour():
pass
# Custom timetable для "запусти один раз в день после
# 24 событий" — не из коробки, требуется работа с DatasetOrTimeSchedule
# или manual aggregation logic в consumer.
Это важный момент: один outlets emit = одно queue entry = один DagRun. Если хотите batch-консьюмер (один раз в день после нескольких hourly events) — реализуется через DatasetOrTimeSchedule (2.9+):
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
@dag(schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 6 * * *", timezone="UTC"),
datasets=[hourly_ds],
))
def daily_or_dataset_aggregator(): ...
DAG запустится либо в 06:00 по cron, либо если до этого пришёл dataset event — что наступит раньше.