Datasets — обзор модуля
Data-aware scheduling — революция в DAG dependency model, появившаяся в Airflow 2.4 (AIP-48). Вместо «запусти каждый день в 02:00» можно сказать «запусти когда dataset orders_table обновлён». Этот модуль покрывает evolution Datasets в 2.x и обзорно — переименование в Assets в 3.x.
Уроки модуля
| # | Урок | Что внутри |
|---|---|---|
| 01 | Обзор модуля | Текущий урок |
| 02 | Datasets fundamentals (2.4+) | Dataset URI, outlets, schedule=[dataset] |
| 03 | DatasetAlias (2.9+) | Dynamic dataset references, dataset-driven events |
| 04 | Internal mechanics | dataset_event, dataset_dag_run_queue tables |
| 05 | Cross-DAG dataset cascade | Producer → multiple consumers |
| 06 | Что нас ждёт в 3.x (обзор) | Assets rename (AIP-74/75), Asset Partitions (3.2), Event-driven AIP-82 |
Ключевые концепты в 2.x
from airflow import Dataset
from airflow.decorators import dag, task
my_dataset = Dataset("s3://bucket/data.parquet")
@dag(schedule=[my_dataset])
def consumer(): ...
@dag(schedule="@daily")
def producer():
@task(outlets=[my_dataset])
def write(): ...
DatasetAlias (2.9+)
Динамические dataset references — для случаев, когда нельзя статически определить dataset URI:
from airflow.datasets import DatasetAlias
my_alias = DatasetAlias("my-alias")
@task(outlets=[my_alias])
def dynamic_writer(context):
# На runtime определяем какой именно dataset обновили
context["outlet_events"][my_alias].add(Dataset(f"s3://bucket/{path}"))
Внутренние таблицы
dataset— registered datasetsdataset_event— timeline всех обновленийdataset_dag_run_queue— queue для dataset-triggered runsdag_schedule_dataset_reference— какие DAGs подписаны
Killer takeaway
Data-aware scheduling решает фундаментальную проблему cron: DAG-A запускается в 02:00, DAG-B зависит от A, запускается в 02:30. Если A задерживается до 02:35 — B бежит на старых данных. С Datasets B запускается только когда A эмитит dataset event, независимо от времени.
Что нас ждёт в Airflow 3.x
Datasets переименованы в Assets (AIP-74/75). Новый декоратор @asset объединяет DAG из одного task в asset:
# 3.x:
from airflow.sdk import asset
@asset(schedule="@daily")
def orders_asset(context):
return df # автоматически outlet
Также появляются:
- Asset Partitions (3.2) — downstream подписывается на конкретные partitions
- Event-driven scheduling (AIP-82) — через message bus (SQS, Kafka)
В 2.x это уже работает, но в синтаксисе Datasets. Финальный модуль покажет миграцию.
Связи
- Модуль 04 (Scheduler) — как scheduler читает
dataset_dag_run_queue - Модуль 14 (OpenLineage) — Datasets интегрируются с lineage automatically
- Модуль 18 (Upgrade Path) — миграция Datasets → Assets