Datasets → Assets: что нас ждёт в Airflow 3.x
Курс таргетирован на Airflow 2.10/2.11 LTS, который останется поддерживаемым ещё долго (security fixes до 2027+). Но направление эволюции data-aware scheduling в Airflow ясно: Datasets переименованы в Assets и получают серьёзное расширение функциональности в 3.x. Этот урок — обзор того, что приходит, и migration path для уже работающих 2.x pipelines.
Понимать это полезно даже если ваша инсталляция 2.x: подходы из 3.x (особенно @asset decorator) показывают, куда идёт ментальная модель data engineering, и можно адаптировать 2.x код для проще-миграции.
Event-driven pipelines — фундамент AIP-82 Watchers
Почему rename: Dataset → Asset
В сообществе Airflow слово «Dataset» вызывало неоднозначность — конкурирующие платформы (Dagster, Prefect) используют термин Asset для аналогичной концепции, что стало де-факто стандартом в data orchestration space. AIP-74 (2024) формализовал rename:
| Airflow 2.x | Airflow 3.x | Изменение |
|---|---|---|
Dataset | Asset | Только переименование класса |
outlets=[ds] | outlets=[asset] | API идентичный |
schedule=[ds] | schedule=[asset] | API идентичный |
DatasetAlias | AssetAlias | Переименование |
dataset_event table | asset_event table | DB migration |
from airflow import Dataset | from airflow.sdk import Asset | Импорт + Task SDK |
Это NOT semantic change — концепция та же. Это strict rename для consistency с industry vocabulary.
Airflow 3.0 включил deprecation shim: старые импорты from airflow import Dataset продолжают работать с DeprecationWarning. Это сделано для smooth migration. Но в 3.2+ deprecation shim удалят.
Главное нововведение: @asset decorator
AIP-75 (2024) добавил asset-first DAG model:
# Airflow 3.x синтаксис
from airflow.sdk import asset
from datetime import datetime
@asset(
schedule="@daily"
start_date=datetime(2026, 1, 1),
)
def daily_orders():
"""Эта функция — и task, и DAG, и asset одновременно."""
return process_orders()
Что происходит под капотом:
@assetсоздаёт single-task DAG с одним asset outlet- Return value автоматически эмитится как asset event
- Дальнейшие assets могут зависеть от этого:
@asset(schedule=[daily_orders])
def enriched_orders(daily_orders_data):
# daily_orders_data автоматически injected как XCom value
return enrich(daily_orders_data)
@asset(schedule=[enriched_orders])
def revenue_report(enriched_orders_data):
return compute_revenue(enriched_orders_data)
Это asset-first mindset (как в Dagster): мысль «я строю асседы, между которыми lineage» вместо «я строю DAG-и, которые делают что-то».
Сравните с 2.x equivalent:
# Airflow 2.x — verbose
daily_orders_ds = Dataset("orders/daily")
@dag(schedule="@daily")
def daily_orders_dag():
@task(outlets=[daily_orders_ds])
def process(): return process_orders()
process()
@dag(schedule=[daily_orders_ds])
def enriched_orders_dag():
@task(outlets=[enriched_orders_ds])
def enrich(**context):
events = context["triggering_dataset_events"][daily_orders_ds]
data = read_dataset(events[0].dataset.uri)
return enrich(data)
enrich()
В 3.x — десятки строк boilerplate схлопываются в декларативный @asset.
AIP-82: Event-driven scheduling
Самое радикальное изменение: Assets могут быть triggered внешними events, не только airflow tasks. С 2.4 Dataset был чисто internal pub/sub. В 3.x появляется механизм external event sources:
# Airflow 3.x
from airflow.sdk import Asset
from airflow.providers.amazon.aws.assets import S3CreateObjectWatcher
orders_uploads = Asset(
"s3://uploads/orders/"
watchers=[
S3CreateObjectWatcher(
bucket="uploads"
prefix="orders/"
aws_conn_id="aws_default",
)
]
)
@asset(schedule=[orders_uploads])
def process_uploaded_orders():
# Запускается когда новый file appears в S3
pass
Watchers могут быть:
- S3CreateObjectWatcher — slušaет S3 EventBridge
- SQSWatcher — message в SQS queue
- KafkaWatcher — message в Kafka topic
- CustomWatcher — your own implementation
Это event-driven Airflow — фундаментально новый use case. Раньше real-time triggers были невозможны без external systems (Lambda → REST API → trigger DAG). В 3.x — native.
Архитектурно это работает через Triggerer: watchers — это deferrable triggers, которые крутятся в asyncio loop и эмитят dataset events при срабатывании. Один Airflow instance может слушать тысячи external sources.
Asset Partitions (Airflow 3.2)
Ещё более существенное расширение: partition-aware lineage. Сейчас в 2.x consumer триггерится при любом event:
# 2.x — atomic dataset, partition-blind
@dag(schedule=[Dataset("s3://lake/orders/")])
def consumer(): ...
В 3.2 partition становится first-class:
# 3.2 (preview)
from airflow.sdk import Asset, AssetPartition
orders_asset = Asset(
"s3://lake/orders/"
partitions=[
AssetPartition(name="date", values=daily_date_range()),
]
)
@asset(schedule=[orders_asset.partition("date=2026-05-12")])
def daily_specific_consumer():
# Запустится только когда partition date=2026-05-12 обновлён
pass
Это позволяет:
- Backfill propagation: при backfill partition в bronze автоматически перепустить downstream только для соответствующих partitions
- Partial reprocessing: обновили партицию 2026-05-10, downstream обработает только её
- Granular lineage: UI показывает «которые partitions свежие, которые stale»
В 2.x это симулируется через DatasetAlias (см. урок 03) — но без native partition semantics. 3.2 делает это идиоматично.
Migration path: 2.x → 3.x
Если уже есть production 2.x с Datasets, миграция в 3.x идёт по pulledшая стандартному upgrade path. Конкретно для datasets:
Stage 1: Python compatibility (на 2.x)
Перепишите импорты так, чтобы они работали в обеих версиях:
# Compat layer — работает в 2.x и 3.x
try:
from airflow.sdk import Asset as Dataset # 3.x style
except ImportError:
from airflow import Dataset # 2.x
# Везде используйте Dataset — это позволит switch later
Аналогично для DatasetAlias → AssetAlias.
Stage 2: Database migration
При upgrade 2.x → 3.x Airflow выполняет airflow db migrate, который:
- Переименует tables:
dataset→asset,dataset_event→asset_event, etc. - Сохранит все existing events (no data loss)
- Установит views с old names на 6-month deprecation period для backwards-compat SQL queries
-- После migration старые queries работают:
SELECT * FROM dataset_event; -- → внутри VIEW над asset_event
Stage 3: Code migration
После upgrade переключите imports на 3.x style:
from airflow.sdk import Asset, AssetAlias # вместо Dataset, DatasetAlias
Stage 4: Adopt @asset (optional)
Существующие DAG-и с @dag + @task(outlets=[ds]) продолжают работать. Но новые pipelines можно писать в @asset style для cleaner code.
Эта миграция не обязательная — оба стиля coexist в 3.x.
Что НЕ меняется
Важно понимать, что фундаментальные концепции остаются:
- URI-based identity — Asset идентифицируется через URI, как Dataset
- AND/OR semantics —
AssetAll/AssetAnyидентичны Dataset аналогам - Queue mechanism —
asset_dag_run_queueработает identично - Emission on SUCCESS — task должна успешно завершиться для emit event
- No physical data validation — Airflow по-прежнему не читает реальные данные
Mental model остаётся: «логические объекты с URIs, события эмитятся при успешном завершении task-producer-а».
Сравнительная таблица: 2.x vs 3.x assets
| Capability | Airflow 2.x | Airflow 3.x |
|---|---|---|
| Naming | Dataset | Asset (rename) |
| Module | from airflow import Dataset | from airflow.sdk import Asset |
| DAG-style | @dag + @task(outlets=[ds]) | @asset decorator (новое) |
| Dynamic URIs | DatasetAlias | AssetAlias (идентично) |
| External events | Нет (только internal) | AssetWatcher (S3/SQS/Kafka) |
| Partitions | Симулируются через alias | Native AssetPartition (3.2) |
| Lineage UI | Cross-DAG dataset view | Полноценный asset graph + partition state |
| AND/OR | DatasetAll/Any | AssetAll/Any |
| Hybrid time+data | DatasetOrTimeSchedule | AssetOrTimeSchedule |
| Lineage standards | OpenLineage integration | OpenLineage + Asset metadata enrichment |
| Function injection | XCom через context | Auto-injection через function args |
Когда migrate, а когда подождать
Migrate на 3.x в 2026, если:
- Нужна asset partition granularity (heavy backfill scenarios)
- Хотите event-driven triggers без external code (SQS, Kafka watchers)
- Большая команда, выигрыш от
@assetдекларативности
Останьтесь на 2.10/2.11 LTS, если:
- Production stability важнее новых features
- Используется много custom plugins/operators (3.x breaking changes в plugin API)
- Migration cost > ожидаемая ценность от новых features
- Не используете Datasets вообще (тогда 3.x rename — пустое колесо)
Airflow 2.11 LTS будет поддерживаться security patches минимум до конца 2027. Это даёт 1-2 года для запланированной миграции.
Production gotchas (3.x preview)
Эти problems встречались в early 3.x deployments. Если будете upgrade’ить — учтите:
-
@assetфункция должна быть top-level. Не nested внутри@dagbody. Это другая абстракция, не TaskFlow. -
Watchers требуют отдельный triggerer process. Если включили
AssetWatcher, triggerer должен быть deployed — иначе watchers «зависают». На 2.x triggerer был необязателен для Datasets, в 3.x с event-driven assets — обязателен. -
Backwards-compat views deprecated после 3.2. Не оставляйте production SQL queries на старых table names — мигрируйте.
-
Asset Partitions требуют partition strategy declarative. Нельзя dynamically discover partitions — нужно описать range в DAG file. Для truly-dynamic — используйте
AssetAliasподход. -
OpenLineage event format changed для assets vs datasets. Если у вас downstream lineage consumer (Marquez, OpenLineage UI), нужно update схемы.
-
AssetWatchersecurity implications. External event sources — потенциальная attack surface. SQSWatcher с poorly-secured queue → arbitrary DAG triggering. Audit watcher configs. -
DB migration во время upgrade блокирует instance. Для large
dataset_eventtables (миллионы rows) migration занимает 10-30 минут downtime. Planируйте maintenance window.
Минимальный 2.x код, готовый к 3.x
Если хочется maximum compatibility, пишите 2.x код так:
# 1. Используйте compat imports
try:
from airflow.sdk import Asset as Dataset
from airflow.sdk import AssetAlias as DatasetAlias
except ImportError:
from airflow import Dataset
from airflow.datasets import DatasetAlias
# 2. Используйте explicit DatasetAll/Any вместо bare list
from airflow.datasets import DatasetAll
@dag(schedule=DatasetAll(ds_a, ds_b)) # не [ds_a, ds_b]
def consumer(): ...
# 3. Не используйте deprecated patterns (TriggerDagRunOperator для cross-DAG)
# 4. Document URIs в config, не hardcode
DATASET_URIS = {
"bronze_orders": "s3://lake/bronze/orders/",
"silver_orders": "s3://lake/silver/orders/",
}
# 5. Wrap creation в helper, чтобы migration был single file change
def make_dataset(name: str) -> Dataset:
return Dataset(DATASET_URIS[name])
При миграции достаточно поменять make_dataset implementation и compat imports — основной DAG код не трогается.