Learning Platform
Глоссарий Troubleshooting
Урок 19.07 · 22 мин
Продвинутый
MigrationDAG CodeTask SDKImportsAssets

Migration DAG code — imports, decorators, Datasets→Assets, logical_date

В предыдущем уроке мы обсуждали tooling для migration. Сейчас — concrete DAG code changes: что переписать в каждом DAG-файле для 3.x compatibility. Большинство changes — mechanical (rename imports), но некоторые требуют semantic understanding (Datasets → Assets, SLA removal).

Этот урок — practical guide с before/after examples для каждого common pattern в production codebase.

DAG code migration — imports rename и API renames
2.x DAG fileProduction DAG в Airflow 2.x — типичный TaskFlow-based file с imports из airflow.decorators, airflow.models, и airflow.Dataset. Содержит execution_date references, schedule_interval, возможно sla= parameters.
ruff AIR301 auto-fix
Imports rename (AIR301)Frequency: almost every DAG. Auto-fix: yes. airflow.decorators import dag, task → airflow.sdk import dag, task. airflow.models import Variable, Connection → airflow.sdk import Variable, Connection. Single ruff command fixes hundreds files. Body identical.
Dataset → asset (AIR301 partial)Frequency: каждый DAG с Datasets feature. Auto-fix: partial — basic cases handled. from airflow import Dataset → from airflow.sdk import asset. Dataset(...) class → asset(...) function/decorator. Multi-dataset triggers: schedule=[a, b] → schedule=AssetAny(a, b) для explicit semantics.
ruff AIR302 auto-fix
execution_date → logical_date (AIR302)Legacy DAGs. Auto-fix: yes для Python code. execution_date parameter, next/prev_execution_date variants, all renamed. {{ ds }} alias всё работает. Edge case: {{ execution_date }} в SQL string templates — manual grep required, ruff не trogает strings.
schedule_interval → schedule (AIR302)Legacy DAGs (2.4+ recommended schedule уже). Auto-fix: yes. Identical semantic, just renamed parameter. @dag(schedule_interval='@daily') → @dag(schedule='@daily'). Cron strings unchanged.
manual rewrite required
SubDAG → TaskGroupRare в modern codebases (TaskGroup standard с 2.x). Auto-fix: NO — manual rewrite. SubDagOperator removed в 3.x. Tasks stay в parent DAG, no separate DagRun, no deadlock от recursive slot consumption, single executor.
sla removal → Listener APIProduction DAGs typically had SLA для critical tasks. Auto-fix: NO. sla= parameter и sla_miss_callback removed (AIP-89). Replacement: Listener API hooks (on_task_instance_completed). Часто better — централизованный monitoring rules, не embedded в DAG code.
3.x DAG file (clean)Total lines changed per typical DAG: ~6. Ruff --fix handles большинство. Manual: SubDAG (rare), SLA (medium), context['execution_date'] string lookups (edge case). Plus custom operators: from airflow.sdk import BaseOperator (new path), @apply_defaults removed.

Классы и dataclasses Python — фундамент BaseOperator

Change 1: TaskFlow API imports

Frequency: Almost every DAG в TaskFlow-based codebase.

Auto-fix: Yes — ruff AIR301.

# 2.x
from airflow.decorators import dag, task, task_group
from airflow.models import Variable, Connection

# 3.x
from airflow.sdk import dag, task, task_group, Variable, Connection

Реальный production file:

# Before — 2.x
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime

@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def my_dag():
    @task
    def fetch():
        return Variable.get("config_value")
    fetch()

my_dag()
# After — 3.x
from airflow.sdk import dag, task, Variable
from datetime import datetime

@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def my_dag():
    @task
    def fetch():
        return Variable.get("config_value")
    fetch()

my_dag()

Single line change в imports. Body identical.


Change 2: Datasets → Assets

Frequency: каждый DAG с Datasets feature.

Auto-fix: Partial — ruff handles basic cases.

# 2.x
from airflow import Dataset
from airflow.decorators import dag

orders_dataset = Dataset("s3://lake/orders/")
revenue_dataset = Dataset("s3://lake/revenue/")

@dag(schedule=[orders_dataset, revenue_dataset], ...)
def consumer_dag():
    @task(outlets=[revenue_dataset])
    def compute_revenue():
        ...
# 3.x
from airflow.sdk import asset, AssetAny, dag, task

orders_asset = asset("s3://lake/orders/")
revenue_asset = asset("s3://lake/revenue/")

@dag(schedule=AssetAny(orders_asset, revenue_asset), ...)
def consumer_dag():
    @task(outlets=[revenue_asset])
    def compute_revenue():
        ...

Note changes:

  • Datasetasset (lowercase в 3.x — функция/decorator, не class)
  • Multiple datasets — AssetAny(...) / AssetAll(...) для explicit semantics
  • Import path differs

For backward compat — 3.x package apache-airflow-providers-standard provides from airflow.sdk import Dataset as alias к asset.


Change 3: execution_date → logical_date

Frequency: legacy DAGs.

Auto-fix: Yes — ruff AIR302.

# 2.x
@task
def my_task(execution_date, **context):
    ds = execution_date.strftime("%Y-%m-%d")
    print(f"Running for {execution_date}")
    sql = f"SELECT * FROM events WHERE date = '{ds}'"

# В Jinja templates:
@task
def templated_task(**context):
    sql = "SELECT * FROM events WHERE ts = '{{ execution_date }}'"
# 3.x
@task
def my_task(logical_date, **context):
    ds = logical_date.strftime("%Y-%m-%d")
    print(f"Running for {logical_date}")
    sql = f"SELECT * FROM events WHERE date = '{ds}'"

@task
def templated_task(**context):
    sql = "SELECT * FROM events WHERE ts = '{{ logical_date }}'"

Также:

  • next_execution_datenext_logical_date
  • prev_execution_dateprev_logical_date
  • prev_execution_date_successprev_logical_date_success

{{ ds }} always works (alias) — не нужно менять.


Change 4: schedule_interval → schedule

Frequency: legacy DAGs (2.4+ recommended schedule).

Auto-fix: Yes — ruff AIR302.

# 2.x (still works, deprecated)
@dag(schedule_interval="@daily", ...)
def my_dag(): ...

@dag(schedule_interval="0 12 * * *", ...)
def cron_dag(): ...

# 3.x — must use `schedule`
@dag(schedule="@daily", ...)
def my_dag(): ...

@dag(schedule="0 12 * * *", ...)
def cron_dag(): ...

Identical semantic, just renamed parameter.


Change 5: SubDAG → TaskGroup (если still used)

Frequency: rare в modern codebases.

Auto-fix: NO — manual rewrite required.

# 2.x — SubDAG (deprecated, removed в 3.x)
from airflow import DAG
from airflow.operators.subdag import SubDagOperator

def create_subdag(parent_dag_name, child_dag_name, start_date):
    subdag = DAG(f"{parent_dag_name}.{child_dag_name}", start_date=start_date)
    task1 = BashOperator(task_id="task1", bash_command="echo 1", dag=subdag)
    task2 = BashOperator(task_id="task2", bash_command="echo 2", dag=subdag)
    task1 >> task2
    return subdag

@dag(...)
def parent_dag():
    subdag_task = SubDagOperator(
        task_id="my_subdag"
        subdag=create_subdag("parent_dag", "my_subdag", start_date),
    )
# 3.x — TaskGroup
from airflow.sdk import dag, task
from airflow.utils.task_group import TaskGroup
from airflow.operators.bash import BashOperator

@dag(...)
def parent_dag():
    with TaskGroup(group_id="my_subdag") as my_subdag:
        task1 = BashOperator(task_id="task1", bash_command="echo 1")
        task2 = BashOperator(task_id="task2", bash_command="echo 2")
        task1 >> task2

Note advantages — TaskGroup vs SubDAG:

  • Tasks stay в parent DAG (no separate DagRun)
  • No deadlock от recursive slot consumption
  • Single executor, single pool
  • Already standard practice в 2.x

Change 6: SLA removal (если used)

Frequency: production DAGs typically had SLA для critical tasks.

Auto-fix: NO — manual rewrite через Listener API.

# 2.x — Native SLA
@task(sla=timedelta(hours=1))
def critical_task():
    ...

@dag(
    sla_miss_callback=alert_sla_miss,
    ...
)
def my_dag(): ...

def alert_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis):
    send_alert(f"SLA missed: {slas}")
# 3.x — SLA removed; replace через Listener API
@task  # no sla parameter
def critical_task():
    ...

# In plugins/listeners/sla_listener.py
from airflow.listeners import hookimpl
from datetime import timedelta

CRITICAL_TASKS_SLA = {
    "my_dag.critical_task": timedelta(hours=1),
}

@hookimpl
def on_task_instance_completed(task_instance):
    key = f"{task_instance.dag_id}.{task_instance.task_id}"
    sla = CRITICAL_TASKS_SLA.get(key)
    if not sla:
        return

    duration = task_instance.end_date - task_instance.start_date
    if duration > sla:
        send_alert(f"SLA missed: {key} took {duration}, SLA = {sla}")

Alternative: external monitoring (Prometheus rules, Datadog monitors) на task duration metrics. Often better — centralized monitoring rules, не embedded в DAG code.


Change 7: Hooks через Task SDK

Frequency: moderate — anywhere что direct DB access uses.

Auto-fix: Mostly automatic — providers updated в 3.x compatibility.

# 2.x — direct DB access (sometimes happens в custom code)
from airflow.models import Variable
from airflow.utils.db import create_session

@task
def task_with_db_access():
    with create_session() as session:
        # Direct SQLAlchemy session
        from airflow.models import DagRun
        dagruns = session.query(DagRun).filter(...).all()
# 3.x — Task SDK doesn't allow direct DB access
# Use REST API client instead
from airflow.sdk.api.client import Client

@task
def task_with_api_access():
    client = Client()
    dagruns = client.dag_runs.list(dag_id="my_dag", state="success")

Most providers (PostgresHook, S3Hook, etc) handle это transparently — provider code uses Task SDK internally. Custom code with explicit create_session() requires rewrite.


Change 8: BaseOperator subclass compatibility

Frequency: moderate — anyone with custom operators.

Auto-fix: Mostly automatic — BaseOperator API стабилен.

Custom operators inheriting BaseOperator работают почти as-is. Some changes:

# 2.x — common patterns
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

class MyOperator(BaseOperator):
    @apply_defaults  # deprecated в 2.x, не нужно
    def __init__(self, my_param, **kwargs):
        super().__init__(**kwargs)
        self.my_param = my_param

    def execute(self, context):
        # context['execution_date'] deprecated
        execution_date = context['execution_date']

# 3.x
from airflow.sdk import BaseOperator  # NEW import path

class MyOperator(BaseOperator):
    def __init__(self, my_param, **kwargs):
        super().__init__(**kwargs)
        self.my_param = my_param

    def execute(self, context):
        # context['logical_date']
        logical_date = context['logical_date']

Note:

  • from airflow.sdk import BaseOperator (new path)
  • @apply_defaults removed (deprecated since 2.x)
  • Context dict has logical_date (not execution_date)

Change 9: default_view removed

# 2.x
@dag(default_view="graph", ...)

# 3.x — default_view removed (UI handles by default)
@dag(...)  # remove default_view

Minor — Airflow 3.x UI more sophisticated, default_view obsolete.


Real-world migration example — full DAG

Before (2.x):

from airflow import Dataset
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime, timedelta

orders_dataset = Dataset("s3://lake/orders/")

@dag(
    schedule_interval="@daily"
    start_date=datetime(2024, 1, 1),
    catchup=False,
    sla_miss_callback=alert_sla,
    tags=["etl"],
)
def orders_etl():

    @task(outlets=[orders_dataset], sla=timedelta(hours=1))
    def extract(execution_date):
        ds = execution_date.strftime("%Y-%m-%d")
        return f"SELECT * FROM orders WHERE ds = '{ds}'"

    @task
    def load(sql: str):
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        api_key = Variable.get("api_key")
        PostgresHook("warehouse").run(sql)

    load(extract())

orders_etl()

def alert_sla(dag, task_list, blocking_task_list, slas, blocking_tis):
    print(f"SLA missed: {slas}")

After (3.x):

from airflow.sdk import asset, dag, task, Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime

orders_asset = asset("s3://lake/orders/")

@dag(
    schedule="@daily",                  # was schedule_interval
    start_date=datetime(2024, 1, 1),
    catchup=False,
    # sla_miss_callback removed — replaced through Listener API
    tags=["etl"],
)
def orders_etl():

    @task(outlets=[orders_asset])       # no sla parameter
    def extract(logical_date):           # was execution_date
        ds = logical_date.strftime("%Y-%m-%d")
        return f"SELECT * FROM orders WHERE ds = '{ds}'"

    @task
    def load(sql: str):
        api_key = Variable.get("api_key")  # Same API, different import
        PostgresHook("warehouse").run(sql)

    load(extract())

orders_etl()

# SLA monitoring moved к plugins/listeners/sla_listener.py
# (separate file, hooks all DAGs uniformly)

Changes summary:

  1. Imports: airflow.decoratorsairflow.sdk
  2. Imports: from airflow import Datasetfrom airflow.sdk import asset
  3. Dataset(...)asset(...)
  4. schedule_intervalschedule
  5. sla= parameter removed
  6. sla_miss_callback removed (handled in Listener)
  7. execution_datelogical_date

Total lines changed: ~6. Ruff --fix handles most.


Production gotchas

Variable.get имеет slight behavior changes. В 3.x Task SDK fetch through API — каждый call = HTTP request. Cache more aggressive (default TTL 60s). For high-frequency calls — explicit caching.

Connection.get similar. API-based. For tasks doing many connection lookups — cache в local variable.

Custom XCom backends — interface stable. Custom S3XComBackend (модуль 06) works в 3.x without changes.

Provider operators — most stable. Verified providers updated к 3.x compat. PostgresOperator, S3Hook, etc — work identically.

Test coverage matters more в 3.x. Task SDK boundary changes subtle behaviors. Run full test suite после migration each file.

Linter может miss edge cases. Manual grep execution_date, schedule_interval, Dataset после ruff to verify.

airflow tasks test syntax stable. CLI commands largely unchanged. Tests pass in 2.x should pass в 3.x (if migration correct).

airflow.contrib REMOVED. If still has any — these are 2-3 years overdue for replacement. Migrate к providers.


Проверка знанийKnowledge check
Production команда мигрирует 200 DAGs на 3.x. После ruff auto-fix часть tests failed на integration tests, хотя unit tests pass. Что могло пойти не так и как troubleshoot?
ОтветAnswer
Common migration issue — subtle semantic changes которые linter не catches. Likely causes: (1) **`execution_date` в string templates** — ruff handles Python code references, но `'{{ execution_date }}'` в SQL strings remained. Templates evaluate at runtime; в 3.x `execution_date` не resolves → empty string или error. Fix: `grep -rn '{{ execution_date }}' dags/` — manual replace на `{{ logical_date }}`. (2) **Context dict keys differ** — `context['execution_date']` in operators не updated by ruff. Lookups fail в runtime. Fix: explicit search `context\\['execution_date'\\]` в codebase. (3) **`Variable.get` performance regressions** — в 3.x Variable.get through Task SDK = HTTP API call. Tasks doing many calls (e.g., loop with Variable.get) — slow. Integration tests timeout. Fix: cache values в local: `cfg = Variable.get('config'); for item in items: use(cfg)`. (4) **Custom XCom backend imports** — если используете S3XComBackend (модуль 06), config string не updated. Old import path `plugins.xcom_backends.s3_xcom_backend` should be reviewed — но in our case stable. (5) **Provider versions** — providers updated separately. `apache-airflow-providers-amazon` 9.x могут have breaking changes from 8.x version в operator APIs. Check provider release notes. (6) **`schedule_interval` в old @dag definitions** — ruff fixes obvious, но nested calls e.g., `dag.schedule_interval = '@daily'` (mutation in setup code) — not caught. (7) **Asset/Dataset triggering** — DAG triggers depend on dataset URI matching. If renamed Dataset URI by mistake, downstream DAGs not triggered. Verify все asset URIs consistent. **Troubleshooting workflow**: (1) Run failing integration tests с verbose logging — exact error point; (2) `airflow tasks test` failing DAG manually — verify what context contains; (3) Compare context dict 2.x vs 3.x — `print(context.keys())`; (4) Run new vs old version в parallel staging — compare logs; (5) Provider compatibility check — pip install with explicit versions от 3.x official requirements. **Test suite for catching такого**: (1) Integration test для each DAG `airflow dags test` — catches runtime context issues; (2) Template rendering tests с both `{{ ds }}` и `{{ logical_date }}`; (3) Smoke tests with real Variable/Connection lookups. **Bigger lesson**: ruff auto-fix — necessary but not sufficient. Plan 2-3 weeks staging с full integration test suite after migration. 200 DAGs migration takes 1-2 weeks code + 2-3 weeks testing + 1 week production rollout. Не rush.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. TaskFlow DAG migration к 3.x — главный change в imports?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 10