xcom_pull semantics — task_ids, key, include_prior_dates
xcom_pull — это API, через который Airflow tasks читают данные predecessor-ов. Несмотря на простой signature, у метода 7+ параметров с тонкими semantics, и неправильное использование — частая причина раздутого DB load и subtle bugs.
В этом уроке мы препарируем все формы xcom_pull, разберём XComArg resolution в TaskFlow, и поймём, когда include_prior_dates спасает, а когда стреляет в ногу.
Signature
В Airflow 2.10/2.11 метод TaskInstance.xcom_pull имеет signature:
def xcom_pull(
self,
task_ids: str | Iterable[str] | None = None,
dag_id: str | None = None,
key: str = "return_value",
include_prior_dates: bool = False,
session: Session = NEW_SESSION,
*,
map_indexes: int | Iterable[int] | None = None,
default: Any = None,
run_id: str | None = None,
) -> Any:
Семь параметров, каждый с своим поведением. Разбираем по очереди.
Форма 1: pull по task_ids (одна task)
def consumer(**context):
ti = context["ti"]
value = ti.xcom_pull(task_ids="producer") # default key='return_value'
Что Airflow делает:
SELECT value FROM xcom
WHERE dag_id = 'my_dag'
AND run_id = '2026-05-12T00:00:00'
AND task_id = 'producer'
AND key = 'return_value'
AND map_index = -1 -- non-mapped task
LIMIT 1;
Возвращает один decoded Python object (или None если ничего нет).
Форма 2: pull по списку task_ids
values = ti.xcom_pull(task_ids=["task_a", "task_b", "task_c"])
# values = [value_a, value_b, value_c] список в том же порядке
SQL:
SELECT task_id, value FROM xcom
WHERE run_id = '...'
AND task_id IN ('task_a', 'task_b', 'task_c')
AND key = 'return_value';
Результат — list в порядке task_ids, не в порядке INSERT. Если task_id не найден → None в соответствующей позиции.
Форма 3: pull по custom key
ti.xcom_push(key="status", value="ok")
ti.xcom_push(key="count", value=42)
# Downstream
status = ti.xcom_pull(task_ids="producer", key="status") # "ok"
count = ti.xcom_pull(task_ids="producer", key="count") # 42
Один task → несколько XCom строк с разными keys. Pull нужный по name.
Форма 4: pull без task_ids (anti-pattern)
all_xcom = ti.xcom_pull(key="return_value") # без task_ids
Что Airflow делает: pull все XCom с этим key от всех upstream tasks (т.е. tasks с edges → current task).
SELECT task_id, value FROM xcom
WHERE run_id = '...'
AND task_id IN (<upstream task_ids>)
AND key = 'return_value';
Это expensive если upstream большой. Также порядок не гарантирован — Airflow сортирует по task_id alphabetically (зависит от версии).
Избегайте xcom_pull() без task_ids. Это вызывает SELECT по всем upstream tasks, что (1) грузит DB, (2) hydrates все blobs в memory worker-а, (3) делает поведение зависимым от структуры DAG (добавление upstream task ломает downstream). Всегда указывайте конкретные task_ids.
Форма 5: include_prior_dates
value = ti.xcom_pull(
task_ids="producer"
include_prior_dates=True,
)
Без флага — pull только из текущего DagRun (run_id = current). С include_prior_dates=True — pull из самого свежего DagRun, где XCom existed.
SQL effectively:
SELECT value FROM xcom
WHERE dag_id = 'my_dag'
AND task_id = 'producer'
AND key = 'return_value'
AND timestamp <= '<current_dagrun_execution_date>'
ORDER BY timestamp DESC
LIMIT 1;
Use case: idempotent backfill, где task должен использовать last known value, даже если в текущем run upstream не запускался.
Antipattern: использование как кэш — это нарушает Airflow idempotency model. DAG runs не должны зависеть друг от друга (кроме Dataset/Trigger deps).
Форма 6: map_indexes для mapped tasks
# Pull от конкретного mapped TI
value = ti.xcom_pull(task_ids="mapped_task", map_indexes=3)
# Pull от нескольких mapped TI
values = ti.xcom_pull(task_ids="mapped_task", map_indexes=[0, 1, 2])
# Pull агрегированно — все mapped TI
all_values = ti.xcom_pull(task_ids="mapped_task") # без map_indexes
Деталях — в уроке 06 (XCom в Mapped tasks).
TaskFlow: XComArg resolution
В TaskFlow вы редко вызываете xcom_pull напрямую — это делается через XComArg:
from airflow.decorators import dag, task
@dag(...)
def my_dag():
@task
def producer() -> dict:
return {"items": [1, 2, 3]}
@task
def consumer(data: dict):
print(data["items"])
consumer(producer()) # ← скрытый xcom_pull
Когда вы пишете producer() в body DAG, возвращается не dict, а XComArg — placeholder, который on runtime resolves в actual value.
XComArg внутри
# airflow/models/xcom_arg.py (упрощённо)
class PlainXComArg(XComArg):
def __init__(self, operator, key="return_value"):
self.operator = operator
self.key = key
def resolve(self, context, session=NEW_SESSION):
# Делает xcom_pull под капотом
return context["ti"].xcom_pull(
task_ids=self.operator.task_id,
key=self.key,
session=session,
)
Когда _PythonDecoratedOperator вызывает callable:
def execute(self, context):
# Resolve каждый argument
resolved_args = []
for arg in self.op_args:
if isinstance(arg, XComArg):
resolved_args.append(arg.resolve(context))
else:
resolved_args.append(arg)
return self.python_callable(*resolved_args, **resolved_kwargs)
Это значит: по одному xcom_pull на каждый XComArg argument. 5 arguments → 5 DB queries.
multiple_outputs unpacking
@task(multiple_outputs=True)
def split() -> dict:
return {"users": [...], "orders": [...]}
@task
def use_users(users: list): pass
@task
def use_orders(orders: list): pass
parts = split()
use_users(parts["users"]) # ← XComArg subscript
use_orders(parts["orders"])
Что происходит на parse:
splitвозвращаетdictс keys["users", "orders"]- С
multiple_outputs=TrueAirflow создаёт два отдельных XCom keys при execute:usersиorders(не одинreturn_value) parts["users"]возвращаетPlainXComArg(operator=split, key="users")- На runtime —
xcom_pull(task_ids="split", key="users")
TypedDict — auto multiple_outputs
from typing import TypedDict
class SplitResult(TypedDict):
users: list
orders: list
@task # ← без multiple_outputs!
def split() -> SplitResult:
return SplitResult(users=[...], orders=[...])
Airflow на parse inspects return annotation. Если TypedDict — auto-set multiple_outputs=True. Удобно для type-safe code.
Comparison: формы xcom_pull
TaskFlow vs Classic: XCom API differences
| Aspect | Classic | TaskFlow |
|---|---|---|
| Push | ti.xcom_push(key, value) или return | return или Context.ti.xcom_push |
| Pull | ti.xcom_pull(task_ids, key) explicit | Function argument через XComArg |
| Multiple keys | xcom_push(key=...) × N | multiple_outputs=True + subscript |
| Type safety | Manual | Через type hints |
| Default key | return_value | return_value |
| When pulled | На execute() Python code | На execute(), framework делает |
Production gotchas
Gotcha 1: N XComArg = N queries
@task
def aggregate(a, b, c, d, e): # 5 XComArg parameters
pass
aggregate(task_a(), task_b(), task_c(), task_d(), task_e())
На execute — 5 отдельных xcom_pull (5 round-trips к DB). Если можно — batch в один upstream task, который возвращает dict с 5 keys.
Gotcha 2: include_prior_dates сюрпризы
ti.xcom_pull(task_ids="x", include_prior_dates=True)
Если в текущем DagRun x не запускался — pull из last successful. Это может быть сильно устаревший value (вчера, неделю назад). При backfill — ваше “current” использует stale data.
Gotcha 3: default возвращается тихо
value = ti.xcom_pull(task_ids="x", default="fallback")
Если x не push-нул или not exists — возвращается "fallback" без warning. Производственный bug: upstream task сменил key, downstream получает fallback silently.
Gotcha 4: XComArg vs raw value confusion
@task
def f():
return 42
@dag(...)
def my_dag():
x = f() # ← XComArg, не int!
y = x + 1 # ❌ TypeError: cannot add XComArg + int
XComArg supports __getitem__ (для multiple_outputs) и .map() / .zip() для Mapping, но не arithmetic. Любая трансформация должна быть внутри downstream @task.
Gotcha 5: session leak в long xcom_pull
def my_func(**context):
ti = context["ti"]
for i in range(1000):
v = ti.xcom_pull(task_ids=f"task_{i}") # 1000 DB sessions?
Каждый pull без явной session берёт NEW_SESSION (создаёт connection из pool). 1000 calls — 1000 round-trips. Если нужно много — pass single session:
from airflow.utils.session import create_session
with create_session() as session:
for i in range(1000):
v = ti.xcom_pull(task_ids=f"task_{i}", session=session)
Один connection, batched queries.
Hands-on: profiling XCom queries
В Postgres логе с log_min_duration_statement = 100ms смотрите медленные XCom:
-- Топ tasks по XCom volume
SELECT
dag_id,
task_id,
COUNT(*) AS push_count,
AVG(octet_length(value)) AS avg_size,
MAX(octet_length(value)) AS max_size,
SUM(octet_length(value)) AS total_size
FROM xcom
GROUP BY dag_id, task_id
ORDER BY total_size DESC
LIMIT 20;
Если видите task с avg_size > 100KB — это кандидат на (1) Custom Backend, или (2) рефакторинг (передавать reference вместо данных).