XCom в Dynamic Task Mapping
Когда task expanded через .expand(...), рождается N mapped TI с разными map_index. Каждый из них может push свой XCom, и downstream task может pull один, несколько, или все. Это сравнительно subtle область — schema xcom table с колонкой map_index, специфический xcom_pull behaviour, и XComArg методы (zip, map, индексация).
Урок предполагает, что вы знакомы с базовым .expand() (см. Module 07). Здесь — глубокий dive в data layer.
map_index в схеме xcom
Вспомним schema xcom table:
CREATE TABLE xcom (
dag_run_id INTEGER NOT NULL,
task_id VARCHAR(250) NOT NULL,
map_index INTEGER NOT NULL DEFAULT -1, -- ← здесь
key VARCHAR(512) NOT NULL,
...
value BYTEA,
PRIMARY KEY (dag_run_id, task_id, map_index, key)
);
map_index:
-1— обычный (non-mapped) task. Один экземпляр per DagRun.0..N-1— mapped task. N экземпляров.
Pro tip: PK включает map_index, что позволяет hold N independent XCom rows для одного task_id.
Базовый пример
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule=None, start_date=datetime(2026, 1, 1), catchup=False)
def mapping_xcom_demo():
@task
def get_items() -> list[int]:
return [10, 20, 30]
@task
def double(x: int) -> int:
return x * 2
@task
def sum_all(values: list[int]):
print(f"sum = {sum(values)}")
doubled = double.expand(x=get_items()) # 3 mapped TI
sum_all(doubled)
Что произойдёт после run:
SELECT task_id, map_index, key, value
FROM xcom
WHERE run_id = '<run_id>'
ORDER BY task_id, map_index;
Результат:
| task_id | map_index | key | value |
|---|---|---|---|
get_items | -1 | return_value | [10, 20, 30] |
double | 0 | return_value | 20 |
double | 1 | return_value | 40 |
double | 2 | return_value | 60 |
sum_all | -1 | return_value | null |
Три отдельные XCom строки для double task. Каждая со своим map_index.
xcom_pull с map_indexes
API имеет специальный параметр map_indexes:
# В downstream callable:
ti = context["ti"]
# Pull от конкретного mapped TI
value_0 = ti.xcom_pull(task_ids="double", map_indexes=0) # 20
value_1 = ti.xcom_pull(task_ids="double", map_indexes=1) # 40
# Pull от нескольких
subset = ti.xcom_pull(task_ids="double", map_indexes=[0, 2]) # [20, 60]
# Pull всех — без map_indexes
all_values = ti.xcom_pull(task_ids="double") # [20, 40, 60]
Когда map_indexes не указан и task mapped, Airflow возвращает list агрегированных values в порядке map_index.
SQL под капотом:
-- xcom_pull(task_ids='double') без map_indexes
SELECT map_index, value FROM xcom
WHERE run_id = '...'
AND task_id = 'double'
AND key = 'return_value'
ORDER BY map_index;
ORM hydrates все rows → возвращает list.
TaskFlow: автоматическая aggregation
В TaskFlow это работает автоматически:
@task
def sum_all(values: list[int]):
print(f"sum = {sum(values)}")
sum_all(doubled) # ← Airflow знает: doubled — XComArg от mapped task, нужна aggregation
На execute Airflow:
- Видит
doubled = double.expand(...)→MapXComArg - Downstream
sum_all(doubled)— pass MapXComArg как argument - На execute
sum_all, MapXComArg.resolve() → делаетxcom_pull(task_ids='double')безmap_indexes - Получает list, передаёт в callable
XComArg операции для mapped data
.zip(a, b) — parallel pairs
@task
def get_users() -> list[str]:
return ["alice", "bob", "carol"]
@task
def get_scores() -> list[int]:
return [90, 85, 92]
@task
def report(name: str, score: int):
print(f"{name}: {score}")
# zip пары:
users = get_users()
scores = get_scores()
report.expand_kwargs(
XComArg.zip(users, scores).map(lambda x: {"name": x[0], "score": x[1]})
)
Что происходит:
XComArg.zip(users, scores)— это lazy reference на zip pair[("alice", 90), ("bob", 85), ("carol", 92)].map(lambda x: ...)— transformation per element (применяется на runtime)report.expand_kwargs(...)— 3 mapped TI, каждая с kwargs
.map(fn) — transform каждый
@task
def double(x):
return x * 2
items = double.expand(x=[1, 2, 3])
squared = items.map(lambda x: x * x) # Lazy — не запускается immediately
@task
def use(v):
print(v)
use.expand(v=squared)
.map() — это lazy transformation на XComArg level. Применяется при resolve, не создаёт новый task. Полезно для simple inline transforms без extra task overhead.
.map() lambda должна быть top-level или picklable. Closure variables не работают. Если transformation сложный — лучше использовать обычный @task декоратор.
Lazy iteration: XComArg semantics
items = get_items() # XComArg, не list
mapped = process.expand(item=items) # MapXComArg
Когда MapXComArg resolved:
- На scheduler в Phase 2: scheduler делает SELECT на upstream XCom (
get_items), считает length списка, создаёт N TI с разнымиmap_index - На worker для каждого mapped TI: resolve конкретный element по
map_index - На downstream (если используется как list): xcom_pull агрегирует все mapped XCom в list
Это lazy — Airflow не материализует list в memory scheduler-а unnecessarily. Только подсчитывает length, создаёт rows.
Visualization
Edge cases
Edge 1: некоторые mapped TI failed
double map_idx=0: SUCCESS, XCom=20
double map_idx=1: FAILED, NO XCom
double map_idx=2: SUCCESS, XCom=60
Что sum_all получает в xcom_pull(task_ids='double')?
По default — NotPreviouslySkippedDep не позволит sum_all запуститься (т.к. upstream имеет failed TI). Если trigger_rule="all_done":
@task(trigger_rule="all_done")
def sum_all(values): ...
То получит [20, 60] — только успешные. Missing — просто отсутствуют в list.
С default= или явным fetch с map_indexes=1 получите None для failed TI.
Edge 2: empty upstream
@task
def get_items() -> list:
return [] # пустой
double.expand(x=get_items())
Scheduler видит upstream length=0 → создаёт 0 mapped TI → double task остаётся в state success с 0 sub-instances. Downstream sum_all([]) получает empty list — может или не может handle (в зависимости от callable logic).
Edge 3: max_map_length exceeded
double.expand(x=list(range(2000))) # default max_map_length=1024
Scheduler error: AirflowException: Number of mapped tasks exceeds max_map_length. Increase в config:
[core]
max_map_length = 4096
Но careful — 4000 mapped TI создают огромный scheduler load (см. Module 07 lesson 05).
Edge 4: stress на xcom_pull когда mapped huge
process.expand(item=range(1000)) # 1000 mapped TI
@task
def aggregate(items: list):
return sum(items)
aggregate(process_outputs) # ← xcom_pull забирает 1000 rows
SQL: SELECT FROM xcom WHERE task_id='process' возвращает 1000 rows. Worker hydrates все в memory. Если каждый XCom 10KB → 10MB в memory aggregator-а.
Mitigation: для huge mapping → streaming aggregation:
@task
def aggregate_streaming(**context):
from sqlalchemy import select
from airflow.models.xcom import XCom
session = context["session"]
stmt = select(XCom.value).where(
XCom.dag_id == context["dag"].dag_id,
XCom.run_id == context["run_id"],
XCom.task_id == "process",
).order_by(XCom.map_index)
total = 0
for (raw,) in session.execute(stmt):
# decode и aggregate streaming
v = json.loads(raw)
total += v
return total
Mapped tasks с multiple_outputs
@task(multiple_outputs=True)
def get_user_stats(user_id: int) -> dict:
return {"emails": [...], "logins": 42}
stats = get_user_stats.expand(user_id=[1, 2, 3])
# stats — MapXComArg
# Access keys:
emails = stats["emails"] # MapXComArg для key='emails' across all 3 TI
logins = stats["logins"] # для logins
# В downstream:
@task
def process(all_emails: list, all_logins: list):
pass
process(emails, logins) # получит list[3] emails, list[3] logins
В xcom table — 6 rows (3 mapped TI × 2 keys):
| task_id | map_index | key |
|---|---|---|
| get_user_stats | 0 | emails |
| get_user_stats | 0 | logins |
| get_user_stats | 1 | emails |
| get_user_stats | 1 | logins |
| get_user_stats | 2 | emails |
| get_user_stats | 2 | logins |
Production gotchas
Gotcha 1: XCom Custom Backend и mapped tasks
В Custom XCom Backend (урок 04) — каждая mapped TI пишет свой S3 file. Если ваш backend не учитывает map_index в S3 path, все mapped TI write в один key → overwriting.
# ❌ BAD
def _build_s3_key(dag_id, task_id, run_id):
return f"xcom/{dag_id}/{run_id}/{task_id}" # ← no map_index!
# ✅ GOOD
def _build_s3_key(dag_id, task_id, run_id, map_index):
return f"xcom/{dag_id}/{run_id}/{task_id}/map_{map_index or -1}"
Gotcha 2: ordered guarantee
@task
def get_files() -> list:
return os.listdir("/data") # ← order не гарантирован!
process.expand(file=get_files())
os.listdir order — undefined в Python. Downstream aggregation получит values в порядке map_index, но map_index соответствует input order. Если input random — output тоже random.
Fix: sort upstream если нужен order: return sorted(os.listdir("/data")).
Gotcha 3: aggregation memory blow
# 1000 mapped TI × 100KB XCom each = 100MB в memory aggregator
process.expand(file=files_1000)
@task
def aggregate(results: list): # ← memory blow
...
Fix: либо batch processing (process N в одном task), либо streaming aggregation через manual SQL.
Gotcha 4: cross-mapping XCom не работает
double.expand(x=[1,2,3]) # 3 mapped TI
@task
def use_other(x):
# Хотим взять value от другого map_index текущего task
# ⚠️ Это anti-pattern в Airflow — не работает напрямую
pass
use_other.expand(x=double_outputs)
Mapped task use_other map_idx=0 не может тривиально получить XCom use_other map_idx=1 — они выполняются независимо, нет ordering guarantee. Если нужна cross-element logic — aggregate сначала в non-mapped task, потом обрабатывайте.
Gotcha 5: UI rendering 1000+ mapped TI
UI пытается rendering grid view для всех mapped TI. 1000 TI = slow render, browser hang. Используйте max_active_tis_per_dag чтобы limit concurrent, или batch processing.