Learning Platform
Глоссарий Troubleshooting
Урок 09.03 · 26 мин
Продвинутый
DatasetAliasDynamic DatasetsAIP-48Runtime URIDataset Events

DatasetAlias — dynamic dataset references (Airflow 2.9+)

Стандартный Dataset из 2.4 имеет фундаментальное ограничение: URI должен быть статическим, известен на parse time. Это работает для большинства cases — s3://warehouse/orders/, postgres://prod-db/customers. Но реальный data engineering полон ситуаций, где URI зависит от runtime context:

  • Партиции, имена которых вычисляются из данных (s3://lake/year=2026/month=05/day=12/)
  • Tenant-specific paths (multi-tenant SaaS: каждый клиент в своём bucket)
  • Conditional outputs: producer пишет в один из 5 destinations в зависимости от business logic
  • Сompacted partition naming, где scheduler не может предугадать URI

DatasetAlias (Airflow 2.9, апрель 2024) решает эту проблему. Это placeholder dataset, который на parse time не знает свой URI; URI эмитится в runtime через outlet_events.


Стратегии именования топиков — параллель с DatasetAlias

Базовый синтаксис

from airflow import Dataset
from airflow.datasets import DatasetAlias
from airflow.decorators import dag, task

# На parse time: только alias имя
orders_alias = DatasetAlias("daily-orders-partition")

@dag(schedule="@daily")
def producer():
    @task(outlets=[orders_alias])
    def write_partition(**context):
        execution_date = context["ds"]
        partition_uri = f"s3://lake/orders/dt={execution_date}/"

        # Реальная запись данных
        df.to_parquet(partition_uri)

        # Эмитим конкретный dataset в runtime
        context["outlet_events"][orders_alias].add(
            Dataset(partition_uri)
        )

    write_partition()

producer()

Ключевые моменты:

  1. DatasetAlias("daily-orders-partition") — имя alias уникально внутри instance, не URI.
  2. outlets=[orders_alias] объявляется на parse time как обычный outlet, но это declaration of intent, не concrete dataset.
  3. context["outlet_events"][alias].add(Dataset(uri)) — runtime эмиссия. Можно добавить несколько Dataset-ов под одним alias.
  4. Если в runtime НЕ вызвать .add() — не будет эмитировано ни одного event, и dataset_event row не создастся.

Lifecycle DatasetAlias

DatasetAlias — parse vs runtime
DAG parse (T=parse)DAG processor видит outlets=[DatasetAlias('daily-orders-partition')]. Записывает alias в dataset_alias table. НИКАКОЙ конкретный Dataset не регистрируется. dataset_alias_dataset_reference пуст.
DagRun starts
Task execution (T=runtime)Worker запускает task. Через context['outlet_events'][alias].add(Dataset(uri)) накапливает list of concrete datasets. Этот list хранится в memory worker-а до commit.
task SUCCESS
Atomic commitПри successful completion: для каждого Dataset из accumulated list — INSERT INTO dataset (если URI ранее не виден), INSERT INTO dataset_alias_dataset_reference (alias_id, dataset_id), INSERT INTO dataset_event (стандартный event). Всё в одной транзакции.
scheduler next tick
Consumer DAG resolutionЕсли consumer DAG имеет schedule=[orders_alias], scheduler смотрит ВСЕ concrete datasets, привязанные к этому alias (через dataset_alias_dataset_reference). Если ЛЮБОЙ из них имеет new event — alias считается triggered. Это OR semantics между concrete datasets под одним alias.

Реальный use case: tenant-aware ETL

Multi-tenant SaaS, где каждый клиент пишет данные в свой S3 prefix:

from airflow import Dataset
from airflow.datasets import DatasetAlias
from airflow.decorators import dag, task

tenants_alias = DatasetAlias("tenant-orders-processed")

@dag(
    dag_id="tenant_etl_dispatcher"
    schedule="@hourly",
)
def tenant_dispatcher():
    @task
    def list_active_tenants() -> list[str]:
        # Запрос в metadata: какие tenants активны
        return ["tenant_a", "tenant_b", "tenant_c"]

    @task(outlets=[tenants_alias])
    def process_tenant(tenant_id: str, **context):
        # Реальный ETL для этого tenant
        df = extract_orders(tenant_id)
        output_uri = f"s3://warehouse/tenant={tenant_id}/dt={context['ds']}/orders.parquet"
        df.write_parquet(output_uri)

        # Регистрируем конкретный dataset
        context["outlet_events"][tenants_alias].add(Dataset(output_uri))

    tenants = list_active_tenants()
    process_tenant.expand(tenant_id=tenants)

tenant_dispatcher()

Consumer:

@dag(schedule=[tenants_alias])
def aggregate_all_tenants():
    @task
    def aggregate(**context):
        # context["triggering_dataset_events"][tenants_alias] —
        # list of Dataset events, которые triggered этот run
        events = context["triggering_dataset_events"][tenants_alias]
        for event in events:
            print(f"Tenant data updated: {event.dataset.uri}")

    aggregate()

Результат: каждый tenant эмитит свой Dataset под общим alias. Когда любой tenant обновляет data — consumer triggered. Consumer видит конкретные URIs через events.

NOTE

DatasetAlias resolution имеет OR semantics между concrete datasets. Если хотите AND (все tenants должны обновиться), это не та абстракция — нужно явно перечислить datasets через DatasetAll(ds_a, ds_b, ds_c) или logic в consumer task.


Адаптация: первый запуск DAG

Тонкость, которая ломает интуицию: на первый запуск consumer alias resolution может не сработать.

Сценарий:

  1. T=0: parse DAG, alias orders_alias registered, концретных datasets нет.
  2. T=0: consumer DAG имеет schedule=[orders_alias]. Subscription записан, но в dataset_alias_dataset_reference пусто.
  3. T=10: producer запускается, эмитит Dataset("s3://..."). Создаётся row в dataset_alias_dataset_reference (alias → dataset).
  4. T=10: создаётся dataset_event для concrete Dataset. Scheduler пополняет dataset_dag_run_queue для тех consumers, которые subscribed на конкретный URI.
  5. Проблема: consumer subscribed на alias, а не на concrete URI. Scheduler должен дополнительно проверить alias resolution.

В Airflow 2.9.x была известная гипертонкость: scheduler делает alias expansion в caused-by-dataset query, JOIN-ит dataset_alias_dataset_reference. Это работает, но first-run latency выше (~10-15s scheduler tick).

В 2.10+ alias resolution оптимизирован — расширение idempotent и indexed.


Multiple datasets per alias

Один task может эмитить несколько datasets под одним alias:

@task(outlets=[tenants_alias])
def process_batch(**context):
    for tenant in ["tenant_a", "tenant_b", "tenant_c"]:
        uri = f"s3://warehouse/tenant={tenant}/orders.parquet"
        process(tenant, uri)
        context["outlet_events"][tenants_alias].add(Dataset(uri))

После commit:

  • 3 INSERT в dataset (если URIs новые)
  • 3 rows в dataset_alias_dataset_reference
  • 3 INSERT в dataset_event
  • Для consumer, subscribed на tenants_alias — 3 entries в dataset_dag_run_queue (но AND-OR логика DAG schedule определяет, сколько DagRuns создастся)

Если consumer имеет schedule=[tenants_alias] (без других datasets) — 3 events соберутся в queue, scheduler создаст один DagRun на следующем tick (одного event достаточно для satisfaction условия), очистит queue.


Mix: regular Dataset + DatasetAlias

Можно комбинировать в одном task:

static_ds = Dataset("s3://warehouse/orders/latest/")
dynamic_alias = DatasetAlias("orders-partition-alias")

@task(outlets=[static_ds, dynamic_alias])
def write_orders(**context):
    # Update "latest" pointer (статический outlet — автоэмит)
    update_latest_pointer()

    # Эмитируем конкретный partition (через alias)
    partition_uri = f"s3://warehouse/orders/dt={context['ds']}/"
    write_partition(partition_uri)
    context["outlet_events"][dynamic_alias].add(Dataset(partition_uri))

Static dataset эмитится автоматически (это стандартное поведение outlets). Alias требует явного .add() — если забыли, alias event не возникнет.


Inspection через CLI и SQL

# Все aliases
airflow datasets list --include-aliases

# Конкретные datasets под alias
airflow datasets show-alias --name "daily-orders-partition"

Прямой SQL для debugging:

-- Какие концретные datasets привязаны к alias
SELECT
    da.name AS alias_name,
    d.uri AS concrete_dataset_uri,
    dadr.dataset_alias_id,
    dadr.dataset_id
FROM dataset_alias da
JOIN dataset_alias_dataset_reference dadr
    ON dadr.alias_id = da.id
JOIN dataset d
    ON d.id = dadr.dataset_id
WHERE da.name = 'daily-orders-partition'
ORDER BY d.uri;

-- Latest events через alias
SELECT
    da.name AS alias_name,
    d.uri,
    de.timestamp,
    de.extra
FROM dataset_event de
JOIN dataset d ON de.dataset_id = d.id
JOIN dataset_alias_dataset_reference dadr ON dadr.dataset_id = d.id
JOIN dataset_alias da ON da.id = dadr.alias_id
WHERE da.name = 'daily-orders-partition'
ORDER BY de.timestamp DESC
LIMIT 50;

Production gotchas

  1. Alias имя должно быть уникальным на instance level. Если два DAG-а используют DatasetAlias("partitions") — это один и тот же alias, события сольются. Префиксуйте именами DAG-ов или teams.

  2. Resolution latency. Между первым outlet_events.add() и первым successful trigger consumer-а может пройти 2 scheduler tick-а (один для регистрации alias→dataset, второй для queue processing). Не используйте DatasetAlias там, где нужна sub-10s latency.

  3. Failed task НЕ эмитит alias events. Точно как с regular datasets, но это особенно болезненно для idempotency: если retry succeeded, эмиссия произойдёт только на successful attempt.

  4. outlet_events — это OutletEventAccessors, не plain dict. .add(Dataset(...)) правильный API. Не пытайтесь делать context["outlet_events"][alias] = [...] — не сработает.

  5. Alias resolution NOT working для legacy consumer. Если consumer написан под Airflow 2.4-2.8 с schedule=[concrete_ds], а producer (2.9+) эмитит через alias — нужно убедиться, что concrete URI совпадает. Subscription через alias не пересекается с subscription через concrete dataset напрямую.

  6. Не путать с DatasetAny/DatasetAll. Это conditional expressions для статических datasets. DatasetAlias — для runtime URI dynamism. Они решают разные задачи.

  7. extra metadata в alias-эмитированных events работает идентично regular datasets:

    from airflow.datasets.metadata import Metadata
    context["outlet_events"][alias].add(
        Dataset(uri),
        extra={"rows": 1000, "size_mb": 23.5}
    )

Когда использовать DatasetAlias vs обычный Dataset

СценарийПравильный выбор
Static, well-known URI (s3://warehouse/orders/)Regular Dataset
URI зависит от execution_dateDatasetAlias
Multi-tenant с динамическими pathsDatasetAlias
Conditional output (один из нескольких known URIs)Static Dataset для каждого + branching producer
Несколько partitions в одном runDatasetAlias с многими .add()
Wildcards-like подписка («любой dataset под этим префиксом»)DatasetAlias со shared name
WARNING

DatasetAlias — мощный механизм, но легко ломает observability. Если в production видите неожиданные DagRuns на consumer DAG, debugging через dataset_event table усложняется: нужно JOIN-ить через alias chain. Документируйте семантику alias именования и держите консистентность.


Проверка знанийKnowledge check
DAG-producer объявляет outlets=[DatasetAlias('orders-alias')], но в task forget вызвать context['outlet_events'][alias].add(Dataset(...)). Task завершается успешно. Что произойдёт с подписанным consumer-ом, имеющим schedule=[orders-alias]?
ОтветAnswer
Consumer НЕ запустится. Объявление alias в outlets — это intent declaration, но без runtime emission через outlet_events.add() никакого dataset_event row не создастся. Это типичный bug: producer успешный, в логах clean, но downstream pipeline не triggered. Debugging: проверить dataset_event table (пусто для этого alias) и dataset_alias_dataset_reference (тоже не пополнилось). Защита: добавить assertion в task или unit-test, проверяющий что после run в alias registered хотя бы один concrete dataset. В 3.x (Asset API) появится более строгая модель — @asset функция автоматически эмитит свой return как event, без забытых вызовов.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Главная проблема, которую решает DatasetAlias (2.9+)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6