TaskFlow API vs Classic
В Airflow 2.0 появился TaskFlow API — Pythonic декларативный синтаксис с decorators @dag и @task. Это серьёзный сдвиг в developer experience: typed inputs/outputs, automatic XCom, чище код. Но под капотом TaskFlow — это обёртка над classic Operator API. Этот урок препарирует обе модели и даёт framework для выбора.
Classic Operator API — старый стиль
До 2.0 единственный стиль был классический Operator-based:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract():
return "data extracted"
def transform(**context):
ti = context["ti"]
raw = ti.xcom_pull(task_ids="extract")
return f"transformed: {raw}"
def load(**context):
ti = context["ti"]
transformed = ti.xcom_pull(task_ids="transform")
print(f"Loading: {transformed}")
with DAG(
dag_id="classic_etl"
schedule="@daily"
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
extract_op = PythonOperator(
task_id="extract"
python_callable=extract,
)
transform_op = PythonOperator(
task_id="transform"
python_callable=transform,
)
load_op = PythonOperator(
task_id="load"
python_callable=load,
)
extract_op >> transform_op >> load_op
Что вы видите:
- Manual XCom:
xcom_pull(task_ids="...")для получения предыдущих результатов - Boilerplate
**contextдля access к context - Explicit dependency chain через
>>operator - task_id дублирует function name
TaskFlow API — современный стиль
То же самое через TaskFlow:
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule="@daily"
start_date=datetime(2026, 1, 1),
catchup=False,
)
def taskflow_etl():
@task
def extract() -> str:
return "data extracted"
@task
def transform(raw: str) -> str:
return f"transformed: {raw}"
@task
def load(transformed: str):
print(f"Loading: {transformed}")
raw = extract()
transformed = transform(raw)
load(transformed)
taskflow_etl()
Различия:
- Implicit XCom:
transform(raw)— Airflow автоматически делает xcom_pull - Type hints для inputs/outputs
- Dependency chain implicit — Python function calls определяют граф
- No boilerplate — нет
**context,xcom_pull
Что внутри @task — _PythonDecoratedOperator
Когда вы пишете @task, Airflow создаёт обёртку — _PythonDecoratedOperator. Псевдокод:
class _PythonDecoratedOperator(PythonOperator):
"""Behind-the-scenes class created by @task decorator."""
def __init__(self, python_callable, op_args=None, op_kwargs=None, multiple_outputs=False, **kwargs):
# task_id derived from function name
task_id = kwargs.pop("task_id", python_callable.__name__)
super().__init__(
task_id=task_id,
python_callable=python_callable,
op_args=op_args,
op_kwargs=op_kwargs,
**kwargs,
)
self.multiple_outputs = multiple_outputs
def execute(self, context):
# Resolve XCom inputs
resolved_args = self._resolve_xcom_args(self.op_args, context)
resolved_kwargs = self._resolve_xcom_kwargs(self.op_kwargs, context)
# Call user function
result = self.python_callable(*resolved_args, **resolved_kwargs)
# Push to XCom
if self.multiple_outputs:
for k, v in result.items():
context["ti"].xcom_push(key=k, value=v)
else:
context["ti"].xcom_push(key="return_value", value=result)
return result
Ключевые моменты:
_PythonDecoratedOperatorнаследник PythonOperator — это просто advanced wrapper- XCom resolution автоматический — при вызове
transform(raw)гдеrawэто return отextract, Airflow знает что нужно сделать xcom_pull - Auto-push результата в XCom после execute
XCom semantics в TaskFlow
@task
def get_user_data() -> dict:
return {"name": "Alice", "age": 30}
@task
def process(data: dict):
print(data["name"])
process(get_user_data())
При parse Airflow видит:
get_user_datareturnsdict→ registered как outlet XComprocess(get_user_data())— argument это reference на upstream task- Создаётся dependency
get_user_data >> process - На runtime
processполучает resultget_user_dataчерез xcom_pull
multiple_outputs
Если return dict и хотите разделить XCom keys:
@task(multiple_outputs=True)
def split_data() -> dict:
return {"users": [...], "orders": [...]}
@task
def process_users(users: list):
pass
@task
def process_orders(orders: list):
pass
splits = split_data()
process_users(splits["users"]) # ← Doc value for typing; under hood — splits.users
process_orders(splits["orders"])
Каждый key становится отдельным XCom — users, orders, а не один return_value.
Также можно с type hint TypedDict:
from typing import TypedDict
class SplitOutput(TypedDict):
users: list
orders: list
@task
def split_data() -> SplitOutput:
return SplitOutput(users=[...], orders=[...])
Airflow распознаёт TypedDict и автоматически обрабатывает как multiple_outputs=True.
Hybrid — использование @task с классическими operators
Можно смешивать:
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
@dag(...)
def hybrid_dag():
@task
def get_date() -> str:
return "2026-05-13"
date = get_date()
create_table = PostgresOperator(
task_id="create_table"
sql="CREATE TABLE orders_{{ ti.xcom_pull(task_ids='get_date') }} (...)",
)
date >> create_table
@task для Python logic, classic operators для provider-specific actions.
@task variants
Кроме @task есть специализированные decorators:
| Decorator | Что делает |
|---|---|
@task | Простой Python callable |
@task.python | Alias для @task |
@task.virtualenv | Запуск в Python venv |
@task.external_python | Запуск в external Python interpreter |
@task.bash | Bash command |
@task.docker | В Docker container |
@task.kubernetes | В Kubernetes pod (через KPO) |
@task.branch | Branching (returns task_id) |
@task.short_circuit | Conditional skip |
@task.sensor (2.5+) | Sensor через @task синтаксис |
@task.virtualenv — изоляция Python deps
@task.virtualenv(
requirements=["pandas==2.1.0", "numpy==1.25"],
python_version="3.11"
system_site_packages=False,
)
def isolated_task():
import pandas as pd
return pd.__version__
Полезно если разные tasks требуют conflicting deps.
@task.branch — conditional logic
@task.branch
def choose_path(date: datetime) -> str:
if date.weekday() < 5:
return "weekday_processing" # task_id
else:
return "weekend_processing"
@task
def weekday_processing(): pass
@task
def weekend_processing(): pass
branch = choose_path(datetime.now())
branch >> [weekday_processing(), weekend_processing()]
Branch task возвращает task_id (или list of task_ids), и downstream все остальные tasks skipped через trigger_rule=ONE_SUCCESS.
Когда какой стиль
Используйте TaskFlow когда:
- Pure Python logic
- Простой data flow между tasks
- Хотите type hints для safety
- Команда любит modern Pythonic стиль
Используйте Classic Operators когда:
- Provider-specific tasks (PostgresOperator, S3CopyObjectOperator, SparkSubmitOperator)
- Нужно явно control все task attributes
- Tasks не имеют data flow (только dependency)
Mix их когда нужно — это normal pattern.
Function decorators: что внутри @decoratorМиграция Classic → TaskFlow
Пошагово:
Шаг 1: PythonOperator → @task
Было:
def my_func():
return "result"
PythonOperator(task_id="my_func", python_callable=my_func)
Стало:
@task
def my_func():
return "result"
my_func()
Шаг 2: xcom_pull → function arguments
Было:
def consumer(**context):
ti = context["ti"]
data = ti.xcom_pull(task_ids="producer")
Стало:
@task
def consumer(data):
# data автоматически
pass
consumer(producer())
Шаг 3: >> → function calls
Было:
a_op >> b_op >> c_op
Стало:
c(b(a()))
Или для tasks без data flow:
a_task = a()
b_task = b()
a_task >> b_task # Просто dependency без data
Common pitfalls TaskFlow
Pitfall 1: invocation outside @dag
@task
def standalone_task():
return "data"
# ❌ standalone_task() вне @dag — ничего не делает
@task work только внутри @dag function.
Pitfall 2: imperative loops
@dag(...)
def my_dag():
@task
def process(item): pass
# ❌ Создаст 100 tasks с одинаковым task_id — collision
for i in range(100):
process(i)
Для dynamic task counts — используйте .expand() (Dynamic Task Mapping, Module 07):
@dag(...)
def my_dag():
@task
def process(item): pass
process.expand(item=list(range(100))) # 100 mapped TI с разными map_index
Pitfall 3: assuming function semantics
@dag(...)
def my_dag():
@task
def get_data() -> int:
return 42
@task
def use_data(x: int) -> int:
return x * 2
# ❌ get_data() возвращает XComArg, не int
result = get_data()
print(result + 1) # TypeError: can't concat XComArg + int
# ✅ Передавайте дальше как task argument
final = use_data(get_data())
get_data() в DAG body возвращает XComArg — placeholder, который resolved на runtime. Нельзя делать Python operations над ним напрямую.
Производительность
TaskFlow добавляет minimal overhead над Classic:
- _PythonDecoratedOperator — тонкая обёртка над PythonOperator
- XCom resolution — несколько extra DB queries при execute()
- Type hint inspection — once at parse time
В benchmarks разница менее 5%. Choose стиль по readability, не по perf.