Learning Platform
Глоссарий Troubleshooting
Урок 09.02 · 24 мин
Продвинутый
DatasetsAIP-48Data-aware schedulingOutletsProducer/Consumer

Dataset fundamentals — data-aware scheduling в Airflow 2.4+

До Airflow 2.4 связь между DAG-ами строилась только через время: «producer DAG запускается в 02:00, consumer DAG — в 02:30, надеемся, что producer успел». Этот подход хрупкий: ETL зависит от внешних API, network latency, retries — producer может опоздать на 20 минут, consumer уже побежал, ELT пайплайн вытянул вчерашние данные.

AIP-48 (Airflow 2.4, сентябрь 2022) решил эту проблему фундаментально. Появился Dataset — declarative объект, представляющий логический набор данных с уникальным URI. DAG-ы могут эмитить dataset events (через outlets) и подписываться на них (через schedule=[dataset]). Когда все upstream datasets обновлены — scheduler автоматически запускает downstream DAG.

Это сдвиг парадигмы: с time-based на data-aware scheduling.


Anatomy of a Dataset

Dataset в 2.x — это тонкий wrapper над URI:

from airflow import Dataset

orders_dataset = Dataset("s3://warehouse/orders/year=2026/month=05/")
customers_dataset = Dataset("postgres://prod-db/customers")

URI — это идентификатор, а не location. Airflow не валидирует URI, не читает данные по нему, не проверяет существование. URI — opaque string. Реальные правила:

  • URI должен быть уникальным внутри Airflow instance
  • Schema (s3://, postgres://, kafka://) — соглашение для людей
  • Максимум 3000 символов (поле uri в БД)
  • Можно использовать любой формат — my-team://orders/daily валидный URI
NOTE

В 2.x никакой связи с реальным хранилищем нет — Airflow не читает S3, не запрашивает Postgres. Dataset — это nominal identity, как label. В Airflow 3.x с AIP-82 появляются event-driven Assets, которые слушают реальные queues (SQS, Kafka), но это другая история.


Producer/consumer pattern — базовый шаблон

Минимальный пример dataset-driven pipeline:

from airflow import Dataset
from airflow.decorators import dag, task
from datetime import datetime

orders_ds = Dataset("s3://warehouse/orders/processed/")

# === PRODUCER DAG ===
@dag(
    dag_id="orders_etl_producer"
    schedule="@hourly"
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def orders_producer():
    @task(outlets=[orders_ds])
    def extract_and_load():
        # ETL logic
        pass

    extract_and_load()

orders_producer()


# === CONSUMER DAG ===
@dag(
    dag_id="orders_aggregator"
    schedule=[orders_ds],
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def orders_consumer():
    @task
    def aggregate_orders():
        pass

    aggregate_orders()

orders_consumer()

Несколько критических деталей:

  1. outlets=[orders_ds] — declarative declaration на DAG parse time. Scheduler знает заранее, что эта task эмитит данный dataset.
  2. schedule=[orders_ds] — НЕ строка-cron, а список Dataset объектов. DAG не имеет time schedule вообще.
  3. Dataset event эмитится только при успешном завершении task (state=success). Failed/skipped task НЕ эмитит event.
  4. AND-semantics by default: schedule=[ds1, ds2] означает «запусти когда ОБА обновлены».

Lifecycle: от outlet к downstream trigger

Dataset event flow — от producer к consumer
DAG parseDAG processor парсит DAG-файлы. Видит outlets=[orders_ds] и schedule=[orders_ds]. Записывает в dataset (если не существует), dag_schedule_dataset_reference (consumer subscription), task_outlet_dataset_reference (producer declaration). Происходит при каждом parse interval (default 30s).
DAG run триггерится time/manual
Producer task SUCCESSWorker завершает extract_and_load() с return → state=success. На этот момент TaskInstance.handle_failure_or_retry() видит outlets и вызывает _register_dataset_changes(). В транзакции: INSERT в dataset_event (timestamp + extra), INSERT в dataset_dag_run_queue для каждого consumer.
emit dataset_event
dataset_event rowОдин INSERT в dataset_event на каждый emit. Колонки: dataset_id, timestamp, source_task_id, source_dag_id, source_run_id, extra (JSONB). Это immutable log — никогда не UPDATE. Используется и для UI history, и для downstream triggering.
dataset_dag_run_queuePer-consumer queue: для каждого DAG, который подписан на dataset, добавляется row. Это **separate table**, не trigger. На каждый dataset_event scheduler пополняет queue. Когда queue для DAG-а покрывает все его schedule deps — scheduler создаёт DagRun и очищает queue.
scheduler tick (5s)
Scheduler scans queueКаждый tick scheduler делает SELECT FROM dataset_dag_run_queue JOIN dag_schedule_dataset_reference. Проверяет: для DAG D, чьи schedule = [d1, d2], есть ли rows для обоих datasets? Если AND условие satisfied → создаёт DagRun, очищает соответствующие queue rows.
Consumer DagRun createdDagRun с run_type='dataset_triggered' и triggered_by= datasets list. В UI отображается специальной иконкой. На этот момент queue очищена — следующий event начнёт накапливать новую.

Multiple datasets — AND semantics

Если consumer объявляет несколько datasets:

ds_orders = Dataset("s3://warehouse/orders/")
ds_customers = Dataset("s3://warehouse/customers/")

@dag(schedule=[ds_orders, ds_customers])
def joined_view():
    @task
    def join():
        pass
    join()

DAG запустится только когда оба datasets имеют события с момента последнего запуска consumer-а. Это AND, не OR.

В 2.9+ появилась поддержка conditional dataset expressions через operators:

from airflow.datasets import DatasetAll, DatasetAny

@dag(schedule=DatasetAny(ds_orders, ds_customers))  # OR
def any_dataset(): ...

@dag(schedule=DatasetAll(ds_orders, ds_customers))  # AND (default)
def all_datasets(): ...

# Можно комбинировать:
@dag(schedule=(ds_orders & ds_customers) | ds_special)
def complex_logic(): ...

Это компилируется в boolean expression tree, который scheduler проверяет на каждом tick.


Outlet с extra metadata

Producer может прикрепить metadata к dataset event:

from airflow.datasets.metadata import Metadata

@task(outlets=[orders_ds])
def extract():
    df = fetch_orders()
    return Metadata(orders_ds, extra={
        "row_count": len(df),
        "max_order_id": df["order_id"].max(),
        "schema_hash": compute_hash(df.columns),
    })

Эти metadata сохраняются в dataset_event.extra (JSONB) и доступны consumer-у через context["triggering_dataset_events"]:

@task
def aggregate(**context):
    for ds, events in context["triggering_dataset_events"].items():
        for event in events:
            row_count = event.extra["row_count"]
            print(f"Producer emitted {row_count} rows at {event.timestamp}")

Это позволяет реализовать idempotency check: consumer видит, какие именно partitions обновились, и обрабатывает только их.


Scheduler detection — что происходит каждый tick

Когда producer task завершается успешно, не происходит мгновенный pub/sub. Логика такая:

  1. Worker сериализует TI state и outlets в metadata DB (атомарная транзакция).
  2. Scheduler на следующем tick (default 5s) выполняет dataset processing phase.
  3. SQL приблизительно такой:
-- Псевдокод scheduler dataset processing
WITH consumer_satisfied AS (
    SELECT
        dag.dag_id,
        COUNT(DISTINCT ddrq.dataset_id) AS triggered_count,
        (SELECT COUNT(*) FROM dag_schedule_dataset_reference
         WHERE dag_id = dag.dag_id) AS required_count
    FROM dataset_dag_run_queue ddrq
    JOIN dag ON ddrq.target_dag_id = dag.dag_id
    GROUP BY dag.dag_id
    HAVING triggered_count >= required_count
)
SELECT * FROM consumer_satisfied;

Для каждого satisfied DAG-а scheduler создаёт DagRun(run_type=DatasetTriggeredTimetable) и очищает queue rows.

WARNING

Между producer SUCCESS и consumer queued проходит обычно 5-10 секунд (scheduler tick + DB transaction). Это не real-time. Если требуется sub-second latency между ETL stages — Datasets не ваш инструмент, смотрите на Kafka/Flink.


Сравнение: ExternalTaskSensor vs TriggerDagRunOperator vs Datasets

До Datasets было два legacy способа связать DAG-и:

ApproachProducer sideConsumer sideLatencyCost
ExternalTaskSensorНичегоPolling sensor в consumer30-300s polling intervalWorker slot занят
TriggerDagRunOperatorExplicit trigger в producerНичего0-5s (немедленно)Tight coupling: producer знает consumers
Datasets (2.4+)outlets=[ds]schedule=[ds]5-10s scheduler tickDecoupled, scalable

Datasets — strict superset функциональности для большинства cases. Coupling решается через URI (producer не знает consumers), latency приемлемая, нет polling overhead.


Production gotchas

  1. TaskFlow @task vs Classic Operator outlets синтаксис разный:

    # TaskFlow
    @task(outlets=[orders_ds])
    def my_task(): ...
    
    # Classic
    my_op = PythonOperator(
        task_id="my_task"
        python_callable=my_func,
        outlets=[orders_ds],  # outlets уровня operator, не task_id
    )
  2. Dataset URI immutable. Если изменить URI существующего dataset — это создаёт новый dataset в БД, старые subscriptions не мигрируют. Меняйте URI только при breaking-change refactoring.

  3. Skipped tasks НЕ эмитят events. Если task в producer DAG skipped (например, branching), consumer не запустится. Используйте trigger_rule="all_done" или splitter pattern.

  4. Dataset events не накапливаются для consumer. Если producer обновился 100 раз пока consumer запускался — это один dataset_event, который удовлетворил queue. После создания DagRun queue очищается, дальнейшие events добавляют новые queue rows.

  5. Failed consumer DagRun не “rolls back” queue. Если consumer запустился dataset-triggered и упал — queue уже очищена. Manual retry или backfill требуется.

  6. DAG без time schedule при schedule=[ds] — это feature, не bug. DAG никогда не запустится по расписанию, только по dataset events. Manual trigger тоже работает.

  7. Inspection через CLI:

    airflow datasets list
    airflow datasets list-events --uri "s3://warehouse/orders/processed/"
    airflow datasets details --uri "s3://warehouse/orders/processed/"

Real-world pattern: hourly ETL → daily aggregation

hourly_ds = Dataset("s3://lake/events/hourly/")

# Producer: 24 раза в день
@dag(schedule="@hourly", start_date=datetime(2026, 1, 1))
def hourly_extract():
    @task(outlets=[hourly_ds])
    def extract_hour():
        pass
    extract_hour()

# Consumer: запустится после КАЖДОГО обновления (24 раза в день)
@dag(schedule=[hourly_ds])
def downstream_after_hour():
    pass

# Custom timetable для "запусти один раз в день после
# 24 событий" — не из коробки, требуется работа с DatasetOrTimeSchedule
# или manual aggregation logic в consumer.

Это важный момент: один outlets emit = одно queue entry = один DagRun. Если хотите batch-консьюмер (один раз в день после нескольких hourly events) — реализуется через DatasetOrTimeSchedule (2.9+):

from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable

@dag(schedule=DatasetOrTimeSchedule(
    timetable=CronTriggerTimetable("0 6 * * *", timezone="UTC"),
    datasets=[hourly_ds],
))
def daily_or_dataset_aggregator(): ...

DAG запустится либо в 06:00 по cron, либо если до этого пришёл dataset event — что наступит раньше.


Проверка знанийKnowledge check
Producer DAG обновляет dataset 'ds_orders' каждые 5 минут. Consumer DAG имеет schedule=[ds_orders] и среднее время выполнения 30 минут. Что происходит с накопленными events во время выполнения consumer?
ОтветAnswer
Каждый producer success добавляет row в dataset_dag_run_queue для consumer-а. Когда первый event приходит — scheduler сразу создаёт DagRun (если max_active_runs позволяет) и очищает queue. Пока consumer бежит 30 минут, накапливаются ещё ~6 queue rows. После завершения первого run scheduler видит pending rows и сразу создаёт следующий DagRun, очищая queue (один DagRun, не 6 отдельных — events не множат runs если max_active_runs=1). Если max_active_runs=1 (default), второй DagRun будет один за весь накопленный batch. Если хотите batch consumer с явным контролем — используйте DatasetOrTimeSchedule с cron или manually aggregate в task logic через context['triggering_dataset_events'].

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. В Airflow 2.x что точно происходит при создании Dataset('s3://bucket/data.parquet')?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6