Learning Platform
Глоссарий Troubleshooting
Урок 03.03 · 22 мин
Продвинутый
TaskFlowPythonOperatorXComType Hints

TaskFlow API vs Classic

В Airflow 2.0 появился TaskFlow API — Pythonic декларативный синтаксис с decorators @dag и @task. Это серьёзный сдвиг в developer experience: typed inputs/outputs, automatic XCom, чище код. Но под капотом TaskFlow — это обёртка над classic Operator API. Этот урок препарирует обе модели и даёт framework для выбора.


Classic Operator API — старый стиль

До 2.0 единственный стиль был классический Operator-based:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract():
    return "data extracted"

def transform(**context):
    ti = context["ti"]
    raw = ti.xcom_pull(task_ids="extract")
    return f"transformed: {raw}"

def load(**context):
    ti = context["ti"]
    transformed = ti.xcom_pull(task_ids="transform")
    print(f"Loading: {transformed}")

with DAG(
    dag_id="classic_etl"
    schedule="@daily"
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:
    extract_op = PythonOperator(
        task_id="extract"
        python_callable=extract,
    )

    transform_op = PythonOperator(
        task_id="transform"
        python_callable=transform,
    )

    load_op = PythonOperator(
        task_id="load"
        python_callable=load,
    )

    extract_op >> transform_op >> load_op

Что вы видите:

  • Manual XCom: xcom_pull(task_ids="...") для получения предыдущих результатов
  • Boilerplate **context для access к context
  • Explicit dependency chain через >> operator
  • task_id дублирует function name

TaskFlow API — современный стиль

То же самое через TaskFlow:

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule="@daily"
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def taskflow_etl():
    @task
    def extract() -> str:
        return "data extracted"

    @task
    def transform(raw: str) -> str:
        return f"transformed: {raw}"

    @task
    def load(transformed: str):
        print(f"Loading: {transformed}")

    raw = extract()
    transformed = transform(raw)
    load(transformed)

taskflow_etl()

Различия:

  • Implicit XCom: transform(raw) — Airflow автоматически делает xcom_pull
  • Type hints для inputs/outputs
  • Dependency chain implicit — Python function calls определяют граф
  • No boilerplate — нет **context, xcom_pull

Что внутри @task — _PythonDecoratedOperator

Когда вы пишете @task, Airflow создаёт обёртку — _PythonDecoratedOperator. Псевдокод:

class _PythonDecoratedOperator(PythonOperator):
    """Behind-the-scenes class created by @task decorator."""

    def __init__(self, python_callable, op_args=None, op_kwargs=None, multiple_outputs=False, **kwargs):
        # task_id derived from function name
        task_id = kwargs.pop("task_id", python_callable.__name__)

        super().__init__(
            task_id=task_id,
            python_callable=python_callable,
            op_args=op_args,
            op_kwargs=op_kwargs,
            **kwargs,
        )
        self.multiple_outputs = multiple_outputs

    def execute(self, context):
        # Resolve XCom inputs
        resolved_args = self._resolve_xcom_args(self.op_args, context)
        resolved_kwargs = self._resolve_xcom_kwargs(self.op_kwargs, context)

        # Call user function
        result = self.python_callable(*resolved_args, **resolved_kwargs)

        # Push to XCom
        if self.multiple_outputs:
            for k, v in result.items():
                context["ti"].xcom_push(key=k, value=v)
        else:
            context["ti"].xcom_push(key="return_value", value=result)

        return result

Ключевые моменты:

  1. _PythonDecoratedOperator наследник PythonOperator — это просто advanced wrapper
  2. XCom resolution автоматический — при вызове transform(raw) где raw это return от extract, Airflow знает что нужно сделать xcom_pull
  3. Auto-push результата в XCom после execute
@task wrapper architecture — parse vs runtime
@task def transform(raw)Пользовательский Python function декорирован @task. Сам callable не выполняется при parse — только регистрируется как python_callable в _PythonDecoratedOperator instance.
parse time: decorator wraps
_PythonDecoratedOperator instanceSubclass PythonOperator. task_id derived от function name (transform). Holds python_callable reference, op_args/op_kwargs (могут быть XComArg). Сериализуется в serialized_dag как PythonOperator с extra metadata.
transform(raw) called in @dag body
XComArg(extract_task)raw на самом деле не string — это XComArg placeholder, который ссылается на task 'extract'. При parse Airflow строит dependency edge extract → transform, embedding XComArg в op_args нового task.
Dependency edge createdAirflow добавляет implicit dependency extract.set_downstream(transform). Это эквивалентно extract >> transform в classic стиле, но генерируется автоматически из function call chain.
runtime: execute(context)
_resolve_xcom_args()При actual execution worker процесса XComArg resolved: делает xcom_pull(task_ids='extract', key='return_value') из metadata DB. Получает реальное значение, передаёт в callable.
callable executed, returns value
xcom_push('return_value', result)После успешного execute результат функции автоматически push в XCom table с key='return_value'. Downstream tasks смогут его pull-нуть через тот же resolution механизм.

XCom semantics в TaskFlow

@task
def get_user_data() -> dict:
    return {"name": "Alice", "age": 30}

@task
def process(data: dict):
    print(data["name"])

process(get_user_data())

При parse Airflow видит:

  • get_user_data returns dict → registered как outlet XCom
  • process(get_user_data()) — argument это reference на upstream task
  • Создаётся dependency get_user_data >> process
  • На runtime process получает result get_user_data через xcom_pull

multiple_outputs

Если return dict и хотите разделить XCom keys:

@task(multiple_outputs=True)
def split_data() -> dict:
    return {"users": [...], "orders": [...]}

@task
def process_users(users: list):
    pass

@task
def process_orders(orders: list):
    pass

splits = split_data()
process_users(splits["users"])     # ← Doc value for typing; under hood — splits.users
process_orders(splits["orders"])

Каждый key становится отдельным XCom — users, orders, а не один return_value.

Также можно с type hint TypedDict:

from typing import TypedDict

class SplitOutput(TypedDict):
    users: list
    orders: list

@task
def split_data() -> SplitOutput:
    return SplitOutput(users=[...], orders=[...])

Airflow распознаёт TypedDict и автоматически обрабатывает как multiple_outputs=True.

TypedDict в Python: structured dict как record

Hybrid — использование @task с классическими operators

Можно смешивать:

from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator

@dag(...)
def hybrid_dag():
    @task
    def get_date() -> str:
        return "2026-05-13"

    date = get_date()

    create_table = PostgresOperator(
        task_id="create_table"
        sql="CREATE TABLE orders_{{ ti.xcom_pull(task_ids='get_date') }} (...)",
    )

    date >> create_table

@task для Python logic, classic operators для provider-specific actions.


@task variants

Кроме @task есть специализированные decorators:

DecoratorЧто делает
@taskПростой Python callable
@task.pythonAlias для @task
@task.virtualenvЗапуск в Python venv
@task.external_pythonЗапуск в external Python interpreter
@task.bashBash command
@task.dockerВ Docker container
@task.kubernetesВ Kubernetes pod (через KPO)
@task.branchBranching (returns task_id)
@task.short_circuitConditional skip
@task.sensor (2.5+)Sensor через @task синтаксис

@task.virtualenv — изоляция Python deps

@task.virtualenv(
    requirements=["pandas==2.1.0", "numpy==1.25"],
    python_version="3.11"
    system_site_packages=False,
)
def isolated_task():
    import pandas as pd
    return pd.__version__

Полезно если разные tasks требуют conflicting deps.

@task.branch — conditional logic

@task.branch
def choose_path(date: datetime) -> str:
    if date.weekday() < 5:
        return "weekday_processing"  # task_id
    else:
        return "weekend_processing"

@task
def weekday_processing(): pass

@task
def weekend_processing(): pass

branch = choose_path(datetime.now())
branch >> [weekday_processing(), weekend_processing()]

Branch task возвращает task_id (или list of task_ids), и downstream все остальные tasks skipped через trigger_rule=ONE_SUCCESS.


Когда какой стиль

Используйте TaskFlow когда:

  • Pure Python logic
  • Простой data flow между tasks
  • Хотите type hints для safety
  • Команда любит modern Pythonic стиль

Используйте Classic Operators когда:

  • Provider-specific tasks (PostgresOperator, S3CopyObjectOperator, SparkSubmitOperator)
  • Нужно явно control все task attributes
  • Tasks не имеют data flow (только dependency)

Mix их когда нужно — это normal pattern.

Function decorators: что внутри @decorator

Миграция Classic → TaskFlow

Пошагово:

Шаг 1: PythonOperator → @task

Было:

def my_func():
    return "result"

PythonOperator(task_id="my_func", python_callable=my_func)

Стало:

@task
def my_func():
    return "result"

my_func()

Шаг 2: xcom_pull → function arguments

Было:

def consumer(**context):
    ti = context["ti"]
    data = ti.xcom_pull(task_ids="producer")

Стало:

@task
def consumer(data):
    # data автоматически
    pass

consumer(producer())

Шаг 3: >> → function calls

Было:

a_op >> b_op >> c_op

Стало:

c(b(a()))

Или для tasks без data flow:

a_task = a()
b_task = b()
a_task >> b_task  # Просто dependency без data

Common pitfalls TaskFlow

Pitfall 1: invocation outside @dag

@task
def standalone_task():
    return "data"

# ❌ standalone_task() вне @dag — ничего не делает

@task work только внутри @dag function.

Pitfall 2: imperative loops

@dag(...)
def my_dag():
    @task
    def process(item): pass

    # ❌ Создаст 100 tasks с одинаковым task_id — collision
    for i in range(100):
        process(i)

Для dynamic task counts — используйте .expand() (Dynamic Task Mapping, Module 07):

@dag(...)
def my_dag():
    @task
    def process(item): pass

    process.expand(item=list(range(100)))  # 100 mapped TI с разными map_index

Pitfall 3: assuming function semantics

@dag(...)
def my_dag():
    @task
    def get_data() -> int:
        return 42

    @task
    def use_data(x: int) -> int:
        return x * 2

    # ❌ get_data() возвращает XComArg, не int
    result = get_data()
    print(result + 1)  # TypeError: can't concat XComArg + int

    # ✅ Передавайте дальше как task argument
    final = use_data(get_data())

get_data() в DAG body возвращает XComArg — placeholder, который resolved на runtime. Нельзя делать Python operations над ним напрямую.


Производительность

TaskFlow добавляет minimal overhead над Classic:

  • _PythonDecoratedOperator — тонкая обёртка над PythonOperator
  • XCom resolution — несколько extra DB queries при execute()
  • Type hint inspection — once at parse time

В benchmarks разница менее 5%. Choose стиль по readability, не по perf.


Проверка знанийKnowledge check
Когда TaskFlow @task — правильный выбор, а когда лучше Classic Operator? Дай 2-3 сценария для каждого.
ОтветAnswer
**TaskFlow когда**: (1) pure Python ETL logic с data flow между tasks — type hints + automatic XCom делают код cleaner; (2) команда новая для Airflow — TaskFlow более intuitive чем PythonOperator + xcom_pull; (3) tasks принимают/возвращают complex Python objects — render_template_as_native_obj + TaskFlow handles это elegantly. **Classic когда**: (1) provider-specific tasks (PostgresOperator, S3CopyObjectOperator, SparkSubmitOperator) — нет смысла оборачивать в @task; (2) tasks не имеют data flow, только dependencies — `a >> b >> c` чище чем function nesting; (3) нужно explicit control над всеми attributes (executor_config, trigger_rule, pool) — иногда легче через kwargs operator чем через task decorator parameters. **Mix их когда нужно** — это нормальный production pattern.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Что внутри @task decorator с точки зрения Airflow framework?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8