Learning Platform
Глоссарий Troubleshooting
Урок 09.01 · 12 мин
Продвинутый
DatasetsAIP-48DatasetAliasData-aware scheduling

Datasets — обзор модуля

Data-aware scheduling — революция в DAG dependency model, появившаяся в Airflow 2.4 (AIP-48). Вместо «запусти каждый день в 02:00» можно сказать «запусти когда dataset orders_table обновлён». Этот модуль покрывает evolution Datasets в 2.x и обзорно — переименование в Assets в 3.x.

Уроки модуля

#УрокЧто внутри
01Обзор модуляТекущий урок
02Datasets fundamentals (2.4+)Dataset URI, outlets, schedule=[dataset]
03DatasetAlias (2.9+)Dynamic dataset references, dataset-driven events
04Internal mechanicsdataset_event, dataset_dag_run_queue tables
05Cross-DAG dataset cascadeProducer → multiple consumers
06Что нас ждёт в 3.x (обзор)Assets rename (AIP-74/75), Asset Partitions (3.2), Event-driven AIP-82

Ключевые концепты в 2.x

from airflow import Dataset
from airflow.decorators import dag, task

my_dataset = Dataset("s3://bucket/data.parquet")

@dag(schedule=[my_dataset])
def consumer(): ...

@dag(schedule="@daily")
def producer():
    @task(outlets=[my_dataset])
    def write(): ...

DatasetAlias (2.9+)

Динамические dataset references — для случаев, когда нельзя статически определить dataset URI:

from airflow.datasets import DatasetAlias

my_alias = DatasetAlias("my-alias")

@task(outlets=[my_alias])
def dynamic_writer(context):
    # На runtime определяем какой именно dataset обновили
    context["outlet_events"][my_alias].add(Dataset(f"s3://bucket/{path}"))

Внутренние таблицы

  • dataset — registered datasets
  • dataset_event — timeline всех обновлений
  • dataset_dag_run_queue — queue для dataset-triggered runs
  • dag_schedule_dataset_reference — какие DAGs подписаны

Killer takeaway

Data-aware scheduling решает фундаментальную проблему cron: DAG-A запускается в 02:00, DAG-B зависит от A, запускается в 02:30. Если A задерживается до 02:35 — B бежит на старых данных. С Datasets B запускается только когда A эмитит dataset event, независимо от времени.

Что нас ждёт в Airflow 3.x

Datasets переименованы в Assets (AIP-74/75). Новый декоратор @asset объединяет DAG из одного task в asset:

# 3.x:
from airflow.sdk import asset

@asset(schedule="@daily")
def orders_asset(context):
    return df  # автоматически outlet

Также появляются:

  • Asset Partitions (3.2) — downstream подписывается на конкретные partitions
  • Event-driven scheduling (AIP-82) — через message bus (SQS, Kafka)

В 2.x это уже работает, но в синтаксисе Datasets. Финальный модуль покажет миграцию.

Связи

  • Модуль 04 (Scheduler) — как scheduler читает dataset_dag_run_queue
  • Модуль 14 (OpenLineage) — Datasets интегрируются с lineage automatically
  • Модуль 18 (Upgrade Path) — миграция Datasets → Assets

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6