Learning Platform
Глоссарий Troubleshooting
Урок 09.06 · 22 мин
Продвинутый
AssetsAIP-74AIP-75AIP-82MigrationAirflow 3.x

Datasets → Assets: что нас ждёт в Airflow 3.x

Курс таргетирован на Airflow 2.10/2.11 LTS, который останется поддерживаемым ещё долго (security fixes до 2027+). Но направление эволюции data-aware scheduling в Airflow ясно: Datasets переименованы в Assets и получают серьёзное расширение функциональности в 3.x. Этот урок — обзор того, что приходит, и migration path для уже работающих 2.x pipelines.

Понимать это полезно даже если ваша инсталляция 2.x: подходы из 3.x (особенно @asset decorator) показывают, куда идёт ментальная модель data engineering, и можно адаптировать 2.x код для проще-миграции.


Event-driven pipelines — фундамент AIP-82 Watchers

Почему rename: Dataset → Asset

В сообществе Airflow слово «Dataset» вызывало неоднозначность — конкурирующие платформы (Dagster, Prefect) используют термин Asset для аналогичной концепции, что стало де-факто стандартом в data orchestration space. AIP-74 (2024) формализовал rename:

Airflow 2.xAirflow 3.xИзменение
DatasetAssetТолько переименование класса
outlets=[ds]outlets=[asset]API идентичный
schedule=[ds]schedule=[asset]API идентичный
DatasetAliasAssetAliasПереименование
dataset_event tableasset_event tableDB migration
from airflow import Datasetfrom airflow.sdk import AssetИмпорт + Task SDK

Это NOT semantic change — концепция та же. Это strict rename для consistency с industry vocabulary.

NOTE

Airflow 3.0 включил deprecation shim: старые импорты from airflow import Dataset продолжают работать с DeprecationWarning. Это сделано для smooth migration. Но в 3.2+ deprecation shim удалят.


Главное нововведение: @asset decorator

AIP-75 (2024) добавил asset-first DAG model:

# Airflow 3.x синтаксис
from airflow.sdk import asset
from datetime import datetime

@asset(
    schedule="@daily"
    start_date=datetime(2026, 1, 1),
)
def daily_orders():
    """Эта функция — и task, и DAG, и asset одновременно."""
    return process_orders()

Что происходит под капотом:

  • @asset создаёт single-task DAG с одним asset outlet
  • Return value автоматически эмитится как asset event
  • Дальнейшие assets могут зависеть от этого:
@asset(schedule=[daily_orders])
def enriched_orders(daily_orders_data):
    # daily_orders_data автоматически injected как XCom value
    return enrich(daily_orders_data)

@asset(schedule=[enriched_orders])
def revenue_report(enriched_orders_data):
    return compute_revenue(enriched_orders_data)

Это asset-first mindset (как в Dagster): мысль «я строю асседы, между которыми lineage» вместо «я строю DAG-и, которые делают что-то».

Сравните с 2.x equivalent:

# Airflow 2.x — verbose
daily_orders_ds = Dataset("orders/daily")

@dag(schedule="@daily")
def daily_orders_dag():
    @task(outlets=[daily_orders_ds])
    def process(): return process_orders()
    process()

@dag(schedule=[daily_orders_ds])
def enriched_orders_dag():
    @task(outlets=[enriched_orders_ds])
    def enrich(**context):
        events = context["triggering_dataset_events"][daily_orders_ds]
        data = read_dataset(events[0].dataset.uri)
        return enrich(data)
    enrich()

В 3.x — десятки строк boilerplate схлопываются в декларативный @asset.


AIP-82: Event-driven scheduling

Самое радикальное изменение: Assets могут быть triggered внешними events, не только airflow tasks. С 2.4 Dataset был чисто internal pub/sub. В 3.x появляется механизм external event sources:

# Airflow 3.x
from airflow.sdk import Asset
from airflow.providers.amazon.aws.assets import S3CreateObjectWatcher

orders_uploads = Asset(
    "s3://uploads/orders/"
    watchers=[
        S3CreateObjectWatcher(
            bucket="uploads"
            prefix="orders/"
            aws_conn_id="aws_default",
        )
    ]
)

@asset(schedule=[orders_uploads])
def process_uploaded_orders():
    # Запускается когда новый file appears в S3
    pass

Watchers могут быть:

  • S3CreateObjectWatcher — slušaет S3 EventBridge
  • SQSWatcher — message в SQS queue
  • KafkaWatcher — message в Kafka topic
  • CustomWatcher — your own implementation

Это event-driven Airflow — фундаментально новый use case. Раньше real-time triggers были невозможны без external systems (Lambda → REST API → trigger DAG). В 3.x — native.

Архитектурно это работает через Triggerer: watchers — это deferrable triggers, которые крутятся в asyncio loop и эмитят dataset events при срабатывании. Один Airflow instance может слушать тысячи external sources.


Asset Partitions (Airflow 3.2)

Ещё более существенное расширение: partition-aware lineage. Сейчас в 2.x consumer триггерится при любом event:

# 2.x — atomic dataset, partition-blind
@dag(schedule=[Dataset("s3://lake/orders/")])
def consumer(): ...

В 3.2 partition становится first-class:

# 3.2 (preview)
from airflow.sdk import Asset, AssetPartition

orders_asset = Asset(
    "s3://lake/orders/"
    partitions=[
        AssetPartition(name="date", values=daily_date_range()),
    ]
)

@asset(schedule=[orders_asset.partition("date=2026-05-12")])
def daily_specific_consumer():
    # Запустится только когда partition date=2026-05-12 обновлён
    pass

Это позволяет:

  • Backfill propagation: при backfill partition в bronze автоматически перепустить downstream только для соответствующих partitions
  • Partial reprocessing: обновили партицию 2026-05-10, downstream обработает только её
  • Granular lineage: UI показывает «которые partitions свежие, которые stale»

В 2.x это симулируется через DatasetAlias (см. урок 03) — но без native partition semantics. 3.2 делает это идиоматично.


Migration path: 2.x → 3.x

Если уже есть production 2.x с Datasets, миграция в 3.x идёт по pulledшая стандартному upgrade path. Конкретно для datasets:

Stage 1: Python compatibility (на 2.x)

Перепишите импорты так, чтобы они работали в обеих версиях:

# Compat layer — работает в 2.x и 3.x
try:
    from airflow.sdk import Asset as Dataset  # 3.x style
except ImportError:
    from airflow import Dataset  # 2.x

# Везде используйте Dataset — это позволит switch later

Аналогично для DatasetAliasAssetAlias.

Stage 2: Database migration

При upgrade 2.x → 3.x Airflow выполняет airflow db migrate, который:

  • Переименует tables: datasetasset, dataset_eventasset_event, etc.
  • Сохранит все existing events (no data loss)
  • Установит views с old names на 6-month deprecation period для backwards-compat SQL queries
-- После migration старые queries работают:
SELECT * FROM dataset_event;  -- → внутри VIEW над asset_event

Stage 3: Code migration

После upgrade переключите imports на 3.x style:

from airflow.sdk import Asset, AssetAlias  # вместо Dataset, DatasetAlias

Stage 4: Adopt @asset (optional)

Существующие DAG-и с @dag + @task(outlets=[ds]) продолжают работать. Но новые pipelines можно писать в @asset style для cleaner code.

Эта миграция не обязательная — оба стиля coexist в 3.x.


Что НЕ меняется

Важно понимать, что фундаментальные концепции остаются:

  • URI-based identity — Asset идентифицируется через URI, как Dataset
  • AND/OR semanticsAssetAll/AssetAny идентичны Dataset аналогам
  • Queue mechanismasset_dag_run_queue работает identично
  • Emission on SUCCESS — task должна успешно завершиться для emit event
  • No physical data validation — Airflow по-прежнему не читает реальные данные

Mental model остаётся: «логические объекты с URIs, события эмитятся при успешном завершении task-producer-а».


Сравнительная таблица: 2.x vs 3.x assets

CapabilityAirflow 2.xAirflow 3.x
NamingDatasetAsset (rename)
Modulefrom airflow import Datasetfrom airflow.sdk import Asset
DAG-style@dag + @task(outlets=[ds])@asset decorator (новое)
Dynamic URIsDatasetAliasAssetAlias (идентично)
External eventsНет (только internal)AssetWatcher (S3/SQS/Kafka)
PartitionsСимулируются через aliasNative AssetPartition (3.2)
Lineage UICross-DAG dataset viewПолноценный asset graph + partition state
AND/ORDatasetAll/AnyAssetAll/Any
Hybrid time+dataDatasetOrTimeScheduleAssetOrTimeSchedule
Lineage standardsOpenLineage integrationOpenLineage + Asset metadata enrichment
Function injectionXCom через contextAuto-injection через function args

Когда migrate, а когда подождать

Migrate на 3.x в 2026, если:

  • Нужна asset partition granularity (heavy backfill scenarios)
  • Хотите event-driven triggers без external code (SQS, Kafka watchers)
  • Большая команда, выигрыш от @asset декларативности

Останьтесь на 2.10/2.11 LTS, если:

  • Production stability важнее новых features
  • Используется много custom plugins/operators (3.x breaking changes в plugin API)
  • Migration cost > ожидаемая ценность от новых features
  • Не используете Datasets вообще (тогда 3.x rename — пустое колесо)

Airflow 2.11 LTS будет поддерживаться security patches минимум до конца 2027. Это даёт 1-2 года для запланированной миграции.


Production gotchas (3.x preview)

Эти problems встречались в early 3.x deployments. Если будете upgrade’ить — учтите:

  1. @asset функция должна быть top-level. Не nested внутри @dag body. Это другая абстракция, не TaskFlow.

  2. Watchers требуют отдельный triggerer process. Если включили AssetWatcher, triggerer должен быть deployed — иначе watchers «зависают». На 2.x triggerer был необязателен для Datasets, в 3.x с event-driven assets — обязателен.

  3. Backwards-compat views deprecated после 3.2. Не оставляйте production SQL queries на старых table names — мигрируйте.

  4. Asset Partitions требуют partition strategy declarative. Нельзя dynamically discover partitions — нужно описать range в DAG file. Для truly-dynamic — используйте AssetAlias подход.

  5. OpenLineage event format changed для assets vs datasets. Если у вас downstream lineage consumer (Marquez, OpenLineage UI), нужно update схемы.

  6. AssetWatcher security implications. External event sources — потенциальная attack surface. SQSWatcher с poorly-secured queue → arbitrary DAG triggering. Audit watcher configs.

  7. DB migration во время upgrade блокирует instance. Для large dataset_event tables (миллионы rows) migration занимает 10-30 минут downtime. Planируйте maintenance window.


Минимальный 2.x код, готовый к 3.x

Если хочется maximum compatibility, пишите 2.x код так:

# 1. Используйте compat imports
try:
    from airflow.sdk import Asset as Dataset
    from airflow.sdk import AssetAlias as DatasetAlias
except ImportError:
    from airflow import Dataset
    from airflow.datasets import DatasetAlias

# 2. Используйте explicit DatasetAll/Any вместо bare list
from airflow.datasets import DatasetAll

@dag(schedule=DatasetAll(ds_a, ds_b))  # не [ds_a, ds_b]
def consumer(): ...

# 3. Не используйте deprecated patterns (TriggerDagRunOperator для cross-DAG)
# 4. Document URIs в config, не hardcode
DATASET_URIS = {
    "bronze_orders": "s3://lake/bronze/orders/",
    "silver_orders": "s3://lake/silver/orders/",
}

# 5. Wrap creation в helper, чтобы migration был single file change
def make_dataset(name: str) -> Dataset:
    return Dataset(DATASET_URIS[name])

При миграции достаточно поменять make_dataset implementation и compat imports — основной DAG код не трогается.


Проверка знанийKnowledge check
Команда планирует upgrade с Airflow 2.10 на 3.x в 2027. Сейчас 50 production DAG-ов с Datasets, 10 из них используют DatasetAlias. Какие действия рекомендуется сделать на 2.x для уменьшения migration cost?
ОтветAnswer
(1) Standardize imports через compat layer (try/except для Asset/Dataset) — минимум boilerplate при switch. (2) Заменить bare list schedule=[ds_a, ds_b] на DatasetAll(ds_a, ds_b) — explicit semantics, easier mental model на 3.x. (3) Wrap Dataset() construction в factory function (DATASET_URIS config + make_dataset helper) — single point of change. (4) Документировать URI conventions сейчас — в 3.x будете править asset names, нужен registry. (5) Audit DatasetAlias usage: какие из них перерастут в Asset Partitions (3.2)? Какие останутся alias-style? Это поможет планировать post-migration refactoring. (6) Включить OpenLineage сейчас (модуль 14) — на 3.x lineage будет богаче (asset metadata), но fundamental wiring уже работать. (7) Удалить TriggerDagRunOperator anti-pattern если он есть параллельно с Datasets — двойная система ломает 3.x lineage UI. (8) Тренировать команду на @asset mindset — даже не используя его в 2.x, понимать философию помогает adapt позже.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что точно меняется при rename Dataset → Asset в Airflow 3.x?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6