Migration DAG code — imports, decorators, Datasets→Assets, logical_date
В предыдущем уроке мы обсуждали tooling для migration. Сейчас — concrete DAG code changes: что переписать в каждом DAG-файле для 3.x compatibility. Большинство changes — mechanical (rename imports), но некоторые требуют semantic understanding (Datasets → Assets, SLA removal).
Этот урок — practical guide с before/after examples для каждого common pattern в production codebase.
Классы и dataclasses Python — фундамент BaseOperator
Change 1: TaskFlow API imports
Frequency: Almost every DAG в TaskFlow-based codebase.
Auto-fix: Yes — ruff AIR301.
# 2.x
from airflow.decorators import dag, task, task_group
from airflow.models import Variable, Connection
# 3.x
from airflow.sdk import dag, task, task_group, Variable, Connection
Реальный production file:
# Before — 2.x
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime
@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def my_dag():
@task
def fetch():
return Variable.get("config_value")
fetch()
my_dag()
# After — 3.x
from airflow.sdk import dag, task, Variable
from datetime import datetime
@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def my_dag():
@task
def fetch():
return Variable.get("config_value")
fetch()
my_dag()
Single line change в imports. Body identical.
Change 2: Datasets → Assets
Frequency: каждый DAG с Datasets feature.
Auto-fix: Partial — ruff handles basic cases.
# 2.x
from airflow import Dataset
from airflow.decorators import dag
orders_dataset = Dataset("s3://lake/orders/")
revenue_dataset = Dataset("s3://lake/revenue/")
@dag(schedule=[orders_dataset, revenue_dataset], ...)
def consumer_dag():
@task(outlets=[revenue_dataset])
def compute_revenue():
...
# 3.x
from airflow.sdk import asset, AssetAny, dag, task
orders_asset = asset("s3://lake/orders/")
revenue_asset = asset("s3://lake/revenue/")
@dag(schedule=AssetAny(orders_asset, revenue_asset), ...)
def consumer_dag():
@task(outlets=[revenue_asset])
def compute_revenue():
...
Note changes:
Dataset→asset(lowercase в 3.x — функция/decorator, не class)- Multiple datasets —
AssetAny(...)/AssetAll(...)для explicit semantics - Import path differs
For backward compat — 3.x package apache-airflow-providers-standard provides from airflow.sdk import Dataset as alias к asset.
Change 3: execution_date → logical_date
Frequency: legacy DAGs.
Auto-fix: Yes — ruff AIR302.
# 2.x
@task
def my_task(execution_date, **context):
ds = execution_date.strftime("%Y-%m-%d")
print(f"Running for {execution_date}")
sql = f"SELECT * FROM events WHERE date = '{ds}'"
# В Jinja templates:
@task
def templated_task(**context):
sql = "SELECT * FROM events WHERE ts = '{{ execution_date }}'"
# 3.x
@task
def my_task(logical_date, **context):
ds = logical_date.strftime("%Y-%m-%d")
print(f"Running for {logical_date}")
sql = f"SELECT * FROM events WHERE date = '{ds}'"
@task
def templated_task(**context):
sql = "SELECT * FROM events WHERE ts = '{{ logical_date }}'"
Также:
next_execution_date→next_logical_dateprev_execution_date→prev_logical_dateprev_execution_date_success→prev_logical_date_success
{{ ds }} always works (alias) — не нужно менять.
Change 4: schedule_interval → schedule
Frequency: legacy DAGs (2.4+ recommended schedule).
Auto-fix: Yes — ruff AIR302.
# 2.x (still works, deprecated)
@dag(schedule_interval="@daily", ...)
def my_dag(): ...
@dag(schedule_interval="0 12 * * *", ...)
def cron_dag(): ...
# 3.x — must use `schedule`
@dag(schedule="@daily", ...)
def my_dag(): ...
@dag(schedule="0 12 * * *", ...)
def cron_dag(): ...
Identical semantic, just renamed parameter.
Change 5: SubDAG → TaskGroup (если still used)
Frequency: rare в modern codebases.
Auto-fix: NO — manual rewrite required.
# 2.x — SubDAG (deprecated, removed в 3.x)
from airflow import DAG
from airflow.operators.subdag import SubDagOperator
def create_subdag(parent_dag_name, child_dag_name, start_date):
subdag = DAG(f"{parent_dag_name}.{child_dag_name}", start_date=start_date)
task1 = BashOperator(task_id="task1", bash_command="echo 1", dag=subdag)
task2 = BashOperator(task_id="task2", bash_command="echo 2", dag=subdag)
task1 >> task2
return subdag
@dag(...)
def parent_dag():
subdag_task = SubDagOperator(
task_id="my_subdag"
subdag=create_subdag("parent_dag", "my_subdag", start_date),
)
# 3.x — TaskGroup
from airflow.sdk import dag, task
from airflow.utils.task_group import TaskGroup
from airflow.operators.bash import BashOperator
@dag(...)
def parent_dag():
with TaskGroup(group_id="my_subdag") as my_subdag:
task1 = BashOperator(task_id="task1", bash_command="echo 1")
task2 = BashOperator(task_id="task2", bash_command="echo 2")
task1 >> task2
Note advantages — TaskGroup vs SubDAG:
- Tasks stay в parent DAG (no separate DagRun)
- No deadlock от recursive slot consumption
- Single executor, single pool
- Already standard practice в 2.x
Change 6: SLA removal (если used)
Frequency: production DAGs typically had SLA для critical tasks.
Auto-fix: NO — manual rewrite через Listener API.
# 2.x — Native SLA
@task(sla=timedelta(hours=1))
def critical_task():
...
@dag(
sla_miss_callback=alert_sla_miss,
...
)
def my_dag(): ...
def alert_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis):
send_alert(f"SLA missed: {slas}")
# 3.x — SLA removed; replace через Listener API
@task # no sla parameter
def critical_task():
...
# In plugins/listeners/sla_listener.py
from airflow.listeners import hookimpl
from datetime import timedelta
CRITICAL_TASKS_SLA = {
"my_dag.critical_task": timedelta(hours=1),
}
@hookimpl
def on_task_instance_completed(task_instance):
key = f"{task_instance.dag_id}.{task_instance.task_id}"
sla = CRITICAL_TASKS_SLA.get(key)
if not sla:
return
duration = task_instance.end_date - task_instance.start_date
if duration > sla:
send_alert(f"SLA missed: {key} took {duration}, SLA = {sla}")
Alternative: external monitoring (Prometheus rules, Datadog monitors) на task duration metrics. Often better — centralized monitoring rules, не embedded в DAG code.
Change 7: Hooks через Task SDK
Frequency: moderate — anywhere что direct DB access uses.
Auto-fix: Mostly automatic — providers updated в 3.x compatibility.
# 2.x — direct DB access (sometimes happens в custom code)
from airflow.models import Variable
from airflow.utils.db import create_session
@task
def task_with_db_access():
with create_session() as session:
# Direct SQLAlchemy session
from airflow.models import DagRun
dagruns = session.query(DagRun).filter(...).all()
# 3.x — Task SDK doesn't allow direct DB access
# Use REST API client instead
from airflow.sdk.api.client import Client
@task
def task_with_api_access():
client = Client()
dagruns = client.dag_runs.list(dag_id="my_dag", state="success")
Most providers (PostgresHook, S3Hook, etc) handle это transparently — provider code uses Task SDK internally. Custom code with explicit create_session() requires rewrite.
Change 8: BaseOperator subclass compatibility
Frequency: moderate — anyone with custom operators.
Auto-fix: Mostly automatic — BaseOperator API стабилен.
Custom operators inheriting BaseOperator работают почти as-is. Some changes:
# 2.x — common patterns
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class MyOperator(BaseOperator):
@apply_defaults # deprecated в 2.x, не нужно
def __init__(self, my_param, **kwargs):
super().__init__(**kwargs)
self.my_param = my_param
def execute(self, context):
# context['execution_date'] deprecated
execution_date = context['execution_date']
# 3.x
from airflow.sdk import BaseOperator # NEW import path
class MyOperator(BaseOperator):
def __init__(self, my_param, **kwargs):
super().__init__(**kwargs)
self.my_param = my_param
def execute(self, context):
# context['logical_date']
logical_date = context['logical_date']
Note:
from airflow.sdk import BaseOperator(new path)@apply_defaultsremoved (deprecated since 2.x)- Context dict has
logical_date(notexecution_date)
Change 9: default_view removed
# 2.x
@dag(default_view="graph", ...)
# 3.x — default_view removed (UI handles by default)
@dag(...) # remove default_view
Minor — Airflow 3.x UI more sophisticated, default_view obsolete.
Real-world migration example — full DAG
Before (2.x):
from airflow import Dataset
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime, timedelta
orders_dataset = Dataset("s3://lake/orders/")
@dag(
schedule_interval="@daily"
start_date=datetime(2024, 1, 1),
catchup=False,
sla_miss_callback=alert_sla,
tags=["etl"],
)
def orders_etl():
@task(outlets=[orders_dataset], sla=timedelta(hours=1))
def extract(execution_date):
ds = execution_date.strftime("%Y-%m-%d")
return f"SELECT * FROM orders WHERE ds = '{ds}'"
@task
def load(sql: str):
from airflow.providers.postgres.hooks.postgres import PostgresHook
api_key = Variable.get("api_key")
PostgresHook("warehouse").run(sql)
load(extract())
orders_etl()
def alert_sla(dag, task_list, blocking_task_list, slas, blocking_tis):
print(f"SLA missed: {slas}")
After (3.x):
from airflow.sdk import asset, dag, task, Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
orders_asset = asset("s3://lake/orders/")
@dag(
schedule="@daily", # was schedule_interval
start_date=datetime(2024, 1, 1),
catchup=False,
# sla_miss_callback removed — replaced through Listener API
tags=["etl"],
)
def orders_etl():
@task(outlets=[orders_asset]) # no sla parameter
def extract(logical_date): # was execution_date
ds = logical_date.strftime("%Y-%m-%d")
return f"SELECT * FROM orders WHERE ds = '{ds}'"
@task
def load(sql: str):
api_key = Variable.get("api_key") # Same API, different import
PostgresHook("warehouse").run(sql)
load(extract())
orders_etl()
# SLA monitoring moved к plugins/listeners/sla_listener.py
# (separate file, hooks all DAGs uniformly)
Changes summary:
- Imports:
airflow.decorators→airflow.sdk - Imports:
from airflow import Dataset→from airflow.sdk import asset Dataset(...)→asset(...)schedule_interval→schedulesla=parameter removedsla_miss_callbackremoved (handled in Listener)execution_date→logical_date
Total lines changed: ~6. Ruff --fix handles most.
Production gotchas
Variable.get имеет slight behavior changes. В 3.x Task SDK fetch through API — каждый call = HTTP request. Cache more aggressive (default TTL 60s). For high-frequency calls — explicit caching.
Connection.get similar. API-based. For tasks doing many connection lookups — cache в local variable.
Custom XCom backends — interface stable. Custom S3XComBackend (модуль 06) works в 3.x without changes.
Provider operators — most stable. Verified providers updated к 3.x compat. PostgresOperator, S3Hook, etc — work identically.
Test coverage matters more в 3.x. Task SDK boundary changes subtle behaviors. Run full test suite после migration each file.
Linter может miss edge cases. Manual grep execution_date, schedule_interval, Dataset после ruff to verify.
airflow tasks test syntax stable. CLI commands largely unchanged. Tests pass in 2.x should pass в 3.x (if migration correct).
airflow.contrib REMOVED. If still has any — these are 2-3 years overdue for replacement. Migrate к providers.