Learning Platform
Глоссарий Troubleshooting
Урок 07.03 · 26 мин
Продвинутый
XComxcom_pullXComArgTaskFlowmultiple_outputs

xcom_pull semantics — task_ids, key, include_prior_dates

xcom_pull — это API, через который Airflow tasks читают данные predecessor-ов. Несмотря на простой signature, у метода 7+ параметров с тонкими semantics, и неправильное использование — частая причина раздутого DB load и subtle bugs.

В этом уроке мы препарируем все формы xcom_pull, разберём XComArg resolution в TaskFlow, и поймём, когда include_prior_dates спасает, а когда стреляет в ногу.


Signature

В Airflow 2.10/2.11 метод TaskInstance.xcom_pull имеет signature:

def xcom_pull(
    self,
    task_ids: str | Iterable[str] | None = None,
    dag_id: str | None = None,
    key: str = "return_value",
    include_prior_dates: bool = False,
    session: Session = NEW_SESSION,
    *,
    map_indexes: int | Iterable[int] | None = None,
    default: Any = None,
    run_id: str | None = None,
) -> Any:

Семь параметров, каждый с своим поведением. Разбираем по очереди.


Форма 1: pull по task_ids (одна task)

def consumer(**context):
    ti = context["ti"]
    value = ti.xcom_pull(task_ids="producer")  # default key='return_value'

Что Airflow делает:

SELECT value FROM xcom
WHERE dag_id = 'my_dag'
  AND run_id = '2026-05-12T00:00:00'
  AND task_id = 'producer'
  AND key = 'return_value'
  AND map_index = -1            -- non-mapped task
LIMIT 1;

Возвращает один decoded Python object (или None если ничего нет).


Форма 2: pull по списку task_ids

values = ti.xcom_pull(task_ids=["task_a", "task_b", "task_c"])
# values = [value_a, value_b, value_c]   список в том же порядке

SQL:

SELECT task_id, value FROM xcom
WHERE run_id = '...'
  AND task_id IN ('task_a', 'task_b', 'task_c')
  AND key = 'return_value';

Результат — list в порядке task_ids, не в порядке INSERT. Если task_id не найден → None в соответствующей позиции.


Форма 3: pull по custom key

ti.xcom_push(key="status", value="ok")
ti.xcom_push(key="count", value=42)

# Downstream
status = ti.xcom_pull(task_ids="producer", key="status")  # "ok"
count = ti.xcom_pull(task_ids="producer", key="count")    # 42

Один task → несколько XCom строк с разными keys. Pull нужный по name.


Форма 4: pull без task_ids (anti-pattern)

all_xcom = ti.xcom_pull(key="return_value")  # без task_ids

Что Airflow делает: pull все XCom с этим key от всех upstream tasks (т.е. tasks с edges → current task).

SELECT task_id, value FROM xcom
WHERE run_id = '...'
  AND task_id IN (<upstream task_ids>)
  AND key = 'return_value';

Это expensive если upstream большой. Также порядок не гарантирован — Airflow сортирует по task_id alphabetically (зависит от версии).

WARNING

Избегайте xcom_pull() без task_ids. Это вызывает SELECT по всем upstream tasks, что (1) грузит DB, (2) hydrates все blobs в memory worker-а, (3) делает поведение зависимым от структуры DAG (добавление upstream task ломает downstream). Всегда указывайте конкретные task_ids.


Форма 5: include_prior_dates

value = ti.xcom_pull(
    task_ids="producer"
    include_prior_dates=True,
)

Без флага — pull только из текущего DagRun (run_id = current). С include_prior_dates=True — pull из самого свежего DagRun, где XCom existed.

SQL effectively:

SELECT value FROM xcom
WHERE dag_id = 'my_dag'
  AND task_id = 'producer'
  AND key = 'return_value'
  AND timestamp <= '<current_dagrun_execution_date>'
ORDER BY timestamp DESC
LIMIT 1;

Use case: idempotent backfill, где task должен использовать last known value, даже если в текущем run upstream не запускался.

Antipattern: использование как кэш — это нарушает Airflow idempotency model. DAG runs не должны зависеть друг от друга (кроме Dataset/Trigger deps).


Форма 6: map_indexes для mapped tasks

# Pull от конкретного mapped TI
value = ti.xcom_pull(task_ids="mapped_task", map_indexes=3)

# Pull от нескольких mapped TI
values = ti.xcom_pull(task_ids="mapped_task", map_indexes=[0, 1, 2])

# Pull агрегированно — все mapped TI
all_values = ti.xcom_pull(task_ids="mapped_task")  # без map_indexes

Деталях — в уроке 06 (XCom в Mapped tasks).


TaskFlow: XComArg resolution

В TaskFlow вы редко вызываете xcom_pull напрямую — это делается через XComArg:

from airflow.decorators import dag, task

@dag(...)
def my_dag():
    @task
    def producer() -> dict:
        return {"items": [1, 2, 3]}

    @task
    def consumer(data: dict):
        print(data["items"])

    consumer(producer())   # ← скрытый xcom_pull

Когда вы пишете producer() в body DAG, возвращается не dict, а XComArg — placeholder, который on runtime resolves в actual value.

XComArg внутри

# airflow/models/xcom_arg.py (упрощённо)
class PlainXComArg(XComArg):
    def __init__(self, operator, key="return_value"):
        self.operator = operator
        self.key = key

    def resolve(self, context, session=NEW_SESSION):
        # Делает xcom_pull под капотом
        return context["ti"].xcom_pull(
            task_ids=self.operator.task_id,
            key=self.key,
            session=session,
        )

Когда _PythonDecoratedOperator вызывает callable:

def execute(self, context):
    # Resolve каждый argument
    resolved_args = []
    for arg in self.op_args:
        if isinstance(arg, XComArg):
            resolved_args.append(arg.resolve(context))
        else:
            resolved_args.append(arg)

    return self.python_callable(*resolved_args, **resolved_kwargs)

Это значит: по одному xcom_pull на каждый XComArg argument. 5 arguments → 5 DB queries.


multiple_outputs unpacking

@task(multiple_outputs=True)
def split() -> dict:
    return {"users": [...], "orders": [...]}

@task
def use_users(users: list): pass

@task
def use_orders(orders: list): pass

parts = split()
use_users(parts["users"])    # ← XComArg subscript
use_orders(parts["orders"])

Что происходит на parse:

  1. split возвращает dict с keys ["users", "orders"]
  2. С multiple_outputs=True Airflow создаёт два отдельных XCom keys при execute: users и orders (не один return_value)
  3. parts["users"] возвращает PlainXComArg(operator=split, key="users")
  4. На runtime — xcom_pull(task_ids="split", key="users")
TypedDict — структурная типизация dict-as-record

TypedDict — auto multiple_outputs

from typing import TypedDict

class SplitResult(TypedDict):
    users: list
    orders: list

@task   # ← без multiple_outputs!
def split() -> SplitResult:
    return SplitResult(users=[...], orders=[...])

Airflow на parse inspects return annotation. Если TypedDict — auto-set multiple_outputs=True. Удобно для type-safe code.


Comparison: формы xcom_pull

Формы xcom_pull и их SQL
xcom_pull(task_ids='X')Single task, default key='return_value'. SELECT LIMIT 1. Один Python object возвращается. Самый частый case в Classic operators.
xcom_pull(task_ids=['X','Y'])List task_ids → возвращается list в том же порядке. None в позициях не найденных. SELECT с IN clause.
xcom_pull(key='custom')Push с key='custom', pull с тем же key. Один task может иметь много XCom строк с разными keys. SELECT WHERE key='custom'.
xcom_pull(include_prior_dates=True)Pull из последнего DagRun где XCom existed, не только из текущего. Use case: idempotent backfill с stable values. Anti-pattern если используется как cache.
xcom_pull(map_indexes=[0,1,2])Только для mapped tasks (Dynamic Task Mapping). Pull от конкретных map_index. Без map_indexes — pull все mapped TI агрегированно как list.
xcom_pull() — anti-patternБез task_ids — pull от всех upstream tasks с key='return_value'. Expensive, не предсказуемо при изменениях DAG. Избегайте.

TaskFlow vs Classic: XCom API differences

AspectClassicTaskFlow
Pushti.xcom_push(key, value) или returnreturn или Context.ti.xcom_push
Pullti.xcom_pull(task_ids, key) explicitFunction argument через XComArg
Multiple keysxcom_push(key=...) × Nmultiple_outputs=True + subscript
Type safetyManualЧерез type hints
Default keyreturn_valuereturn_value
When pulledНа execute() Python codeНа execute(), framework делает

Production gotchas

Gotcha 1: N XComArg = N queries

@task
def aggregate(a, b, c, d, e):  # 5 XComArg parameters
    pass

aggregate(task_a(), task_b(), task_c(), task_d(), task_e())

На execute — 5 отдельных xcom_pull (5 round-trips к DB). Если можно — batch в один upstream task, который возвращает dict с 5 keys.

Gotcha 2: include_prior_dates сюрпризы

ti.xcom_pull(task_ids="x", include_prior_dates=True)

Если в текущем DagRun x не запускался — pull из last successful. Это может быть сильно устаревший value (вчера, неделю назад). При backfill — ваше “current” использует stale data.

Gotcha 3: default возвращается тихо

value = ti.xcom_pull(task_ids="x", default="fallback")

Если x не push-нул или not exists — возвращается "fallback" без warning. Производственный bug: upstream task сменил key, downstream получает fallback silently.

Gotcha 4: XComArg vs raw value confusion

@task
def f():
    return 42

@dag(...)
def my_dag():
    x = f()  # ← XComArg, не int!
    y = x + 1  # ❌ TypeError: cannot add XComArg + int

XComArg supports __getitem__ (для multiple_outputs) и .map() / .zip() для Mapping, но не arithmetic. Любая трансформация должна быть внутри downstream @task.

Gotcha 5: session leak в long xcom_pull

def my_func(**context):
    ti = context["ti"]
    for i in range(1000):
        v = ti.xcom_pull(task_ids=f"task_{i}")  # 1000 DB sessions?

Каждый pull без явной session берёт NEW_SESSION (создаёт connection из pool). 1000 calls — 1000 round-trips. Если нужно много — pass single session:

from airflow.utils.session import create_session

with create_session() as session:
    for i in range(1000):
        v = ti.xcom_pull(task_ids=f"task_{i}", session=session)

Один connection, batched queries.


Hands-on: profiling XCom queries

В Postgres логе с log_min_duration_statement = 100ms смотрите медленные XCom:

-- Топ tasks по XCom volume
SELECT
    dag_id,
    task_id,
    COUNT(*) AS push_count,
    AVG(octet_length(value)) AS avg_size,
    MAX(octet_length(value)) AS max_size,
    SUM(octet_length(value)) AS total_size
FROM xcom
GROUP BY dag_id, task_id
ORDER BY total_size DESC
LIMIT 20;

Если видите task с avg_size > 100KB — это кандидат на (1) Custom Backend, или (2) рефакторинг (передавать reference вместо данных).


Проверка знанийKnowledge check
В TaskFlow `consumer(a(), b(), c())` вызывает сколько xcom_pull запросов на runtime? Как оптимизировать если нужны 5+ upstream?
ОтветAnswer
Три xcom_pull — по одному на каждый XComArg argument. Каждый XComArg.resolve() в execute() делает отдельный SELECT FROM xcom WHERE task_id=... — 3 round-trips к DB. Для 5+ upstream это становится bottleneck (5 × ~5ms = 25ms latency только на pulls). Оптимизация: (1) Aggregate в один upstream — если можно, объедините logic нескольких producer-ов в один task, return dict с несколькими keys — это 1 xcom_pull. (2) Pass single session: `with create_session() as s: ti.xcom_pull(..., session=s)` — батчит queries в одном connection. (3) Custom XCom Backend с in-process cache: если consumer часто читает same XCom — backend может cache по run_id/task_id key. (4) Multiple outputs from single task: `@task(multiple_outputs=True)` + subscript — это всё равно multiple keys, но один task, один upstream node. Главный паттерн: **fewer producer tasks, fatter XCom (in pre-48KB range)**.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. В TaskFlow `consumer(a(), b(), c())` сколько xcom_pull SQL запросов выполнится на runtime?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6