DatasetAlias — dynamic dataset references (Airflow 2.9+)
Стандартный Dataset из 2.4 имеет фундаментальное ограничение: URI должен быть статическим, известен на parse time. Это работает для большинства cases — s3://warehouse/orders/, postgres://prod-db/customers. Но реальный data engineering полон ситуаций, где URI зависит от runtime context:
- Партиции, имена которых вычисляются из данных (
s3://lake/year=2026/month=05/day=12/) - Tenant-specific paths (multi-tenant SaaS: каждый клиент в своём bucket)
- Conditional outputs: producer пишет в один из 5 destinations в зависимости от business logic
- Сompacted partition naming, где scheduler не может предугадать URI
DatasetAlias (Airflow 2.9, апрель 2024) решает эту проблему. Это placeholder dataset, который на parse time не знает свой URI; URI эмитится в runtime через outlet_events.
Стратегии именования топиков — параллель с DatasetAlias
Базовый синтаксис
from airflow import Dataset
from airflow.datasets import DatasetAlias
from airflow.decorators import dag, task
# На parse time: только alias имя
orders_alias = DatasetAlias("daily-orders-partition")
@dag(schedule="@daily")
def producer():
@task(outlets=[orders_alias])
def write_partition(**context):
execution_date = context["ds"]
partition_uri = f"s3://lake/orders/dt={execution_date}/"
# Реальная запись данных
df.to_parquet(partition_uri)
# Эмитим конкретный dataset в runtime
context["outlet_events"][orders_alias].add(
Dataset(partition_uri)
)
write_partition()
producer()
Ключевые моменты:
DatasetAlias("daily-orders-partition")— имя alias уникально внутри instance, не URI.outlets=[orders_alias]объявляется на parse time как обычный outlet, но это declaration of intent, не concrete dataset.context["outlet_events"][alias].add(Dataset(uri))— runtime эмиссия. Можно добавить несколько Dataset-ов под одним alias.- Если в runtime НЕ вызвать
.add()— не будет эмитировано ни одного event, и dataset_event row не создастся.
Lifecycle DatasetAlias
Реальный use case: tenant-aware ETL
Multi-tenant SaaS, где каждый клиент пишет данные в свой S3 prefix:
from airflow import Dataset
from airflow.datasets import DatasetAlias
from airflow.decorators import dag, task
tenants_alias = DatasetAlias("tenant-orders-processed")
@dag(
dag_id="tenant_etl_dispatcher"
schedule="@hourly",
)
def tenant_dispatcher():
@task
def list_active_tenants() -> list[str]:
# Запрос в metadata: какие tenants активны
return ["tenant_a", "tenant_b", "tenant_c"]
@task(outlets=[tenants_alias])
def process_tenant(tenant_id: str, **context):
# Реальный ETL для этого tenant
df = extract_orders(tenant_id)
output_uri = f"s3://warehouse/tenant={tenant_id}/dt={context['ds']}/orders.parquet"
df.write_parquet(output_uri)
# Регистрируем конкретный dataset
context["outlet_events"][tenants_alias].add(Dataset(output_uri))
tenants = list_active_tenants()
process_tenant.expand(tenant_id=tenants)
tenant_dispatcher()
Consumer:
@dag(schedule=[tenants_alias])
def aggregate_all_tenants():
@task
def aggregate(**context):
# context["triggering_dataset_events"][tenants_alias] —
# list of Dataset events, которые triggered этот run
events = context["triggering_dataset_events"][tenants_alias]
for event in events:
print(f"Tenant data updated: {event.dataset.uri}")
aggregate()
Результат: каждый tenant эмитит свой Dataset под общим alias. Когда любой tenant обновляет data — consumer triggered. Consumer видит конкретные URIs через events.
DatasetAlias resolution имеет OR semantics между concrete datasets. Если хотите AND (все tenants должны обновиться), это не та абстракция — нужно явно перечислить datasets через DatasetAll(ds_a, ds_b, ds_c) или logic в consumer task.
Адаптация: первый запуск DAG
Тонкость, которая ломает интуицию: на первый запуск consumer alias resolution может не сработать.
Сценарий:
- T=0: parse DAG, alias
orders_aliasregistered, концретных datasets нет. - T=0: consumer DAG имеет
schedule=[orders_alias]. Subscription записан, но вdataset_alias_dataset_referenceпусто. - T=10: producer запускается, эмитит
Dataset("s3://..."). Создаётся row вdataset_alias_dataset_reference(alias → dataset). - T=10: создаётся dataset_event для concrete Dataset. Scheduler пополняет
dataset_dag_run_queueдля тех consumers, которые subscribed на конкретный URI. - Проблема: consumer subscribed на alias, а не на concrete URI. Scheduler должен дополнительно проверить alias resolution.
В Airflow 2.9.x была известная гипертонкость: scheduler делает alias expansion в caused-by-dataset query, JOIN-ит dataset_alias_dataset_reference. Это работает, но first-run latency выше (~10-15s scheduler tick).
В 2.10+ alias resolution оптимизирован — расширение idempotent и indexed.
Multiple datasets per alias
Один task может эмитить несколько datasets под одним alias:
@task(outlets=[tenants_alias])
def process_batch(**context):
for tenant in ["tenant_a", "tenant_b", "tenant_c"]:
uri = f"s3://warehouse/tenant={tenant}/orders.parquet"
process(tenant, uri)
context["outlet_events"][tenants_alias].add(Dataset(uri))
После commit:
- 3 INSERT в
dataset(если URIs новые) - 3 rows в
dataset_alias_dataset_reference - 3 INSERT в
dataset_event - Для consumer, subscribed на
tenants_alias— 3 entries вdataset_dag_run_queue(но AND-OR логика DAG schedule определяет, сколько DagRuns создастся)
Если consumer имеет schedule=[tenants_alias] (без других datasets) — 3 events соберутся в queue, scheduler создаст один DagRun на следующем tick (одного event достаточно для satisfaction условия), очистит queue.
Mix: regular Dataset + DatasetAlias
Можно комбинировать в одном task:
static_ds = Dataset("s3://warehouse/orders/latest/")
dynamic_alias = DatasetAlias("orders-partition-alias")
@task(outlets=[static_ds, dynamic_alias])
def write_orders(**context):
# Update "latest" pointer (статический outlet — автоэмит)
update_latest_pointer()
# Эмитируем конкретный partition (через alias)
partition_uri = f"s3://warehouse/orders/dt={context['ds']}/"
write_partition(partition_uri)
context["outlet_events"][dynamic_alias].add(Dataset(partition_uri))
Static dataset эмитится автоматически (это стандартное поведение outlets). Alias требует явного .add() — если забыли, alias event не возникнет.
Inspection через CLI и SQL
# Все aliases
airflow datasets list --include-aliases
# Конкретные datasets под alias
airflow datasets show-alias --name "daily-orders-partition"
Прямой SQL для debugging:
-- Какие концретные datasets привязаны к alias
SELECT
da.name AS alias_name,
d.uri AS concrete_dataset_uri,
dadr.dataset_alias_id,
dadr.dataset_id
FROM dataset_alias da
JOIN dataset_alias_dataset_reference dadr
ON dadr.alias_id = da.id
JOIN dataset d
ON d.id = dadr.dataset_id
WHERE da.name = 'daily-orders-partition'
ORDER BY d.uri;
-- Latest events через alias
SELECT
da.name AS alias_name,
d.uri,
de.timestamp,
de.extra
FROM dataset_event de
JOIN dataset d ON de.dataset_id = d.id
JOIN dataset_alias_dataset_reference dadr ON dadr.dataset_id = d.id
JOIN dataset_alias da ON da.id = dadr.alias_id
WHERE da.name = 'daily-orders-partition'
ORDER BY de.timestamp DESC
LIMIT 50;
Production gotchas
-
Alias имя должно быть уникальным на instance level. Если два DAG-а используют
DatasetAlias("partitions")— это один и тот же alias, события сольются. Префиксуйте именами DAG-ов или teams. -
Resolution latency. Между первым
outlet_events.add()и первым successful trigger consumer-а может пройти 2 scheduler tick-а (один для регистрации alias→dataset, второй для queue processing). Не используйте DatasetAlias там, где нужна sub-10s latency. -
Failed task НЕ эмитит alias events. Точно как с regular datasets, но это особенно болезненно для idempotency: если retry succeeded, эмиссия произойдёт только на successful attempt.
-
outlet_events— этоOutletEventAccessors, не plain dict..add(Dataset(...))правильный API. Не пытайтесь делатьcontext["outlet_events"][alias] = [...]— не сработает. -
Alias resolution NOT working для legacy consumer. Если consumer написан под Airflow 2.4-2.8 с
schedule=[concrete_ds], а producer (2.9+) эмитит через alias — нужно убедиться, что concrete URI совпадает. Subscription через alias не пересекается с subscription через concrete dataset напрямую. -
Не путать с DatasetAny/DatasetAll. Это conditional expressions для статических datasets. DatasetAlias — для runtime URI dynamism. Они решают разные задачи.
-
extrametadata в alias-эмитированных events работает идентично regular datasets:from airflow.datasets.metadata import Metadata context["outlet_events"][alias].add( Dataset(uri), extra={"rows": 1000, "size_mb": 23.5} )
Когда использовать DatasetAlias vs обычный Dataset
| Сценарий | Правильный выбор |
|---|---|
Static, well-known URI (s3://warehouse/orders/) | Regular Dataset |
| URI зависит от execution_date | DatasetAlias |
| Multi-tenant с динамическими paths | DatasetAlias |
| Conditional output (один из нескольких known URIs) | Static Dataset для каждого + branching producer |
| Несколько partitions в одном run | DatasetAlias с многими .add() |
| Wildcards-like подписка («любой dataset под этим префиксом») | DatasetAlias со shared name |
DatasetAlias — мощный механизм, но легко ломает observability. Если в production видите неожиданные DagRuns на consumer DAG, debugging через dataset_event table усложняется: нужно JOIN-ить через alias chain. Документируйте семантику alias именования и держите консистентность.