Learning Platform
Глоссарий Troubleshooting
Урок 07.06 · 24 мин
Продвинутый
XComDynamic Mappingmap_indexXComArgzip

XCom в Dynamic Task Mapping

Когда task expanded через .expand(...), рождается N mapped TI с разными map_index. Каждый из них может push свой XCom, и downstream task может pull один, несколько, или все. Это сравнительно subtle область — schema xcom table с колонкой map_index, специфический xcom_pull behaviour, и XComArg методы (zip, map, индексация).

Урок предполагает, что вы знакомы с базовым .expand() (см. Module 07). Здесь — глубокий dive в data layer.


map_index в схеме xcom

Вспомним schema xcom table:

CREATE TABLE xcom (
    dag_run_id  INTEGER NOT NULL,
    task_id     VARCHAR(250) NOT NULL,
    map_index   INTEGER NOT NULL DEFAULT -1,    -- ← здесь
    key         VARCHAR(512) NOT NULL,
    ...
    value       BYTEA,
    PRIMARY KEY (dag_run_id, task_id, map_index, key)
);

map_index:

  • -1 — обычный (non-mapped) task. Один экземпляр per DagRun.
  • 0..N-1 — mapped task. N экземпляров.

Pro tip: PK включает map_index, что позволяет hold N independent XCom rows для одного task_id.


Базовый пример

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule=None, start_date=datetime(2026, 1, 1), catchup=False)
def mapping_xcom_demo():
    @task
    def get_items() -> list[int]:
        return [10, 20, 30]

    @task
    def double(x: int) -> int:
        return x * 2

    @task
    def sum_all(values: list[int]):
        print(f"sum = {sum(values)}")

    doubled = double.expand(x=get_items())   # 3 mapped TI
    sum_all(doubled)

Что произойдёт после run:

SELECT task_id, map_index, key, value
FROM xcom
WHERE run_id = '<run_id>'
ORDER BY task_id, map_index;

Результат:

task_idmap_indexkeyvalue
get_items-1return_value[10, 20, 30]
double0return_value20
double1return_value40
double2return_value60
sum_all-1return_valuenull

Три отдельные XCom строки для double task. Каждая со своим map_index.


xcom_pull с map_indexes

API имеет специальный параметр map_indexes:

# В downstream callable:
ti = context["ti"]

# Pull от конкретного mapped TI
value_0 = ti.xcom_pull(task_ids="double", map_indexes=0)   # 20
value_1 = ti.xcom_pull(task_ids="double", map_indexes=1)   # 40

# Pull от нескольких
subset = ti.xcom_pull(task_ids="double", map_indexes=[0, 2])   # [20, 60]

# Pull всех — без map_indexes
all_values = ti.xcom_pull(task_ids="double")   # [20, 40, 60]

Когда map_indexes не указан и task mapped, Airflow возвращает list агрегированных values в порядке map_index.

SQL под капотом:

-- xcom_pull(task_ids='double')  без map_indexes
SELECT map_index, value FROM xcom
WHERE run_id = '...'
  AND task_id = 'double'
  AND key = 'return_value'
ORDER BY map_index;

ORM hydrates все rows → возвращает list.


TaskFlow: автоматическая aggregation

В TaskFlow это работает автоматически:

@task
def sum_all(values: list[int]):
    print(f"sum = {sum(values)}")

sum_all(doubled)   # ← Airflow знает: doubled — XComArg от mapped task, нужна aggregation

На execute Airflow:

  1. Видит doubled = double.expand(...)MapXComArg
  2. Downstream sum_all(doubled) — pass MapXComArg как argument
  3. На execute sum_all, MapXComArg.resolve() → делает xcom_pull(task_ids='double') без map_indexes
  4. Получает list, передаёт в callable

XComArg операции для mapped data

.zip(a, b) — parallel pairs

@task
def get_users() -> list[str]:
    return ["alice", "bob", "carol"]

@task
def get_scores() -> list[int]:
    return [90, 85, 92]

@task
def report(name: str, score: int):
    print(f"{name}: {score}")

# zip пары:
users = get_users()
scores = get_scores()
report.expand_kwargs(
    XComArg.zip(users, scores).map(lambda x: {"name": x[0], "score": x[1]})
)

Что происходит:

  • XComArg.zip(users, scores) — это lazy reference на zip pair [("alice", 90), ("bob", 85), ("carol", 92)]
  • .map(lambda x: ...) — transformation per element (применяется на runtime)
  • report.expand_kwargs(...) — 3 mapped TI, каждая с kwargs

.map(fn) — transform каждый

@task
def double(x):
    return x * 2

items = double.expand(x=[1, 2, 3])
squared = items.map(lambda x: x * x)   # Lazy — не запускается immediately

@task
def use(v):
    print(v)

use.expand(v=squared)

.map() — это lazy transformation на XComArg level. Применяется при resolve, не создаёт новый task. Полезно для simple inline transforms без extra task overhead.

WARNING

.map() lambda должна быть top-level или picklable. Closure variables не работают. Если transformation сложный — лучше использовать обычный @task декоратор.


Lazy iteration: XComArg semantics

items = get_items()        # XComArg, не list
mapped = process.expand(item=items)   # MapXComArg

Когда MapXComArg resolved:

  • На scheduler в Phase 2: scheduler делает SELECT на upstream XCom (get_items), считает length списка, создаёт N TI с разными map_index
  • На worker для каждого mapped TI: resolve конкретный element по map_index
  • На downstream (если используется как list): xcom_pull агрегирует все mapped XCom в list

Это lazy — Airflow не материализует list в memory scheduler-а unnecessarily. Только подсчитывает length, создаёт rows.


Visualization

XCom flow в mapped task
upstream get_items()Non-mapped task возвращает list. Один row в xcom: map_index=-1, key=return_value, value=[10,20,30].
scheduler reads, len=3
scheduler.expand_mapped_taskВ Phase 2 scheduler loop: reads upstream XCom, считает length, создаёт 3 TI с map_index 0,1,2. Apply upstream deps (каждая mapped TI имеет dep на upstream get_items).
double map_idx=0Каждый worker запускает свой mapped TI. resolve_mapped_kwargs → берёт element 0 из upstream XCom (=10). Callable получает x=10, return 20.
double map_idx=1x=20, return 40.
double map_idx=2x=30, return 60.
3 INSERT into xcom
3 xcom rows: map_index 0,1,2После завершения каждого mapped TI — INSERT в xcom с разными map_index. PK (dag_run_id, task_id, map_index, key) гарантирует unique.
downstream sum_all consumer
sum_all SELECT WHERE map_indexsum_all это non-mapped (просто consumer). При resolve XComArg от MapXComArg делает SELECT с ORDER BY map_index. Получает [20,40,60].

Edge cases

Edge 1: некоторые mapped TI failed

double map_idx=0: SUCCESS, XCom=20
double map_idx=1: FAILED, NO XCom
double map_idx=2: SUCCESS, XCom=60

Что sum_all получает в xcom_pull(task_ids='double')?

По default — NotPreviouslySkippedDep не позволит sum_all запуститься (т.к. upstream имеет failed TI). Если trigger_rule="all_done":

@task(trigger_rule="all_done")
def sum_all(values): ...

То получит [20, 60] — только успешные. Missing — просто отсутствуют в list.

С default= или явным fetch с map_indexes=1 получите None для failed TI.

Edge 2: empty upstream

@task
def get_items() -> list:
    return []   # пустой

double.expand(x=get_items())

Scheduler видит upstream length=0 → создаёт 0 mapped TI → double task остаётся в state success с 0 sub-instances. Downstream sum_all([]) получает empty list — может или не может handle (в зависимости от callable logic).

Edge 3: max_map_length exceeded

double.expand(x=list(range(2000)))   # default max_map_length=1024

Scheduler error: AirflowException: Number of mapped tasks exceeds max_map_length. Increase в config:

[core]
max_map_length = 4096

Но careful — 4000 mapped TI создают огромный scheduler load (см. Module 07 lesson 05).

Edge 4: stress на xcom_pull когда mapped huge

process.expand(item=range(1000))   # 1000 mapped TI

@task
def aggregate(items: list):
    return sum(items)

aggregate(process_outputs)   # ← xcom_pull забирает 1000 rows

SQL: SELECT FROM xcom WHERE task_id='process' возвращает 1000 rows. Worker hydrates все в memory. Если каждый XCom 10KB → 10MB в memory aggregator-а.

Mitigation: для huge mapping → streaming aggregation:

@task
def aggregate_streaming(**context):
    from sqlalchemy import select
    from airflow.models.xcom import XCom

    session = context["session"]
    stmt = select(XCom.value).where(
        XCom.dag_id == context["dag"].dag_id,
        XCom.run_id == context["run_id"],
        XCom.task_id == "process",
    ).order_by(XCom.map_index)

    total = 0
    for (raw,) in session.execute(stmt):
        # decode и aggregate streaming
        v = json.loads(raw)
        total += v
    return total

Mapped tasks с multiple_outputs

@task(multiple_outputs=True)
def get_user_stats(user_id: int) -> dict:
    return {"emails": [...], "logins": 42}

stats = get_user_stats.expand(user_id=[1, 2, 3])
# stats — MapXComArg

# Access keys:
emails = stats["emails"]   # MapXComArg для key='emails' across all 3 TI
logins = stats["logins"]   # для logins

# В downstream:
@task
def process(all_emails: list, all_logins: list):
    pass

process(emails, logins)   # получит list[3] emails, list[3] logins

В xcom table — 6 rows (3 mapped TI × 2 keys):

task_idmap_indexkey
get_user_stats0emails
get_user_stats0logins
get_user_stats1emails
get_user_stats1logins
get_user_stats2emails
get_user_stats2logins

Production gotchas

Gotcha 1: XCom Custom Backend и mapped tasks

В Custom XCom Backend (урок 04) — каждая mapped TI пишет свой S3 file. Если ваш backend не учитывает map_index в S3 path, все mapped TI write в один key → overwriting.

# ❌ BAD
def _build_s3_key(dag_id, task_id, run_id):
    return f"xcom/{dag_id}/{run_id}/{task_id}"   # ← no map_index!

# ✅ GOOD
def _build_s3_key(dag_id, task_id, run_id, map_index):
    return f"xcom/{dag_id}/{run_id}/{task_id}/map_{map_index or -1}"

Gotcha 2: ordered guarantee

@task
def get_files() -> list:
    return os.listdir("/data")   # ← order не гарантирован!

process.expand(file=get_files())

os.listdir order — undefined в Python. Downstream aggregation получит values в порядке map_index, но map_index соответствует input order. Если input random — output тоже random.

Fix: sort upstream если нужен order: return sorted(os.listdir("/data")).

Gotcha 3: aggregation memory blow

# 1000 mapped TI × 100KB XCom each = 100MB в memory aggregator
process.expand(file=files_1000)

@task
def aggregate(results: list):   # ← memory blow
    ...

Fix: либо batch processing (process N в одном task), либо streaming aggregation через manual SQL.

Gotcha 4: cross-mapping XCom не работает

double.expand(x=[1,2,3])   # 3 mapped TI

@task
def use_other(x):
    # Хотим взять value от другого map_index текущего task
    # ⚠️ Это anti-pattern в Airflow — не работает напрямую
    pass

use_other.expand(x=double_outputs)

Mapped task use_other map_idx=0 не может тривиально получить XCom use_other map_idx=1 — они выполняются независимо, нет ordering guarantee. Если нужна cross-element logic — aggregate сначала в non-mapped task, потом обрабатывайте.

Gotcha 5: UI rendering 1000+ mapped TI

UI пытается rendering grid view для всех mapped TI. 1000 TI = slow render, browser hang. Используйте max_active_tis_per_dag чтобы limit concurrent, или batch processing.


Проверка знанийKnowledge check
В DAG `process.expand(item=get_items())` после успешного run с 100 items — что лежит в xcom table? Какой SQL Airflow генерирует когда downstream task делает xcom_pull(task_ids='process')?
ОтветAnswer
В xcom table 101 row для этого run: 1 row для `get_items` (map_index=-1, key=return_value, value=[item_0,...,item_99]) + 100 rows для `process` (map_index 0..99, key=return_value, value=каждого процесса result). Primary key (dag_run_id, task_id, map_index, key) гарантирует уникальность. SQL для downstream `xcom_pull(task_ids='process')` без map_indexes: `SELECT map_index, value FROM xcom WHERE run_id = '<...>' AND task_id = 'process' AND key = 'return_value' ORDER BY map_index;` — возвращает 100 rows. Airflow ORM hydrates все в memory worker-а и собирает в Python list в порядке map_index. Если каждый XCom 10KB → 1MB в memory aggregator-а — на 1000+ mapped TI это становится bottleneck. Mitigation: streaming aggregation через manual SQLAlchemy select с iterator, или batch processing (один task обрабатывает N items вместо N mapped tasks). Custom XCom Backend обязан включать map_index в storage path иначе все mapped TI overwriting один S3 key.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. DAG `process.expand(item=get_items())` где get_items() возвращает list из 100 элементов. Сколько строк в xcom table после успешного run и что в каждой?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6