Learning Platform
Глоссарий Troubleshooting
Урок 08.04 · 32 мин
Продвинутый
Dynamic MappingSchedulerexpand_mapped_taskRuntimemax_map_length

Runtime mechanics — как scheduler expands mapped task

Это самый глубокий урок модуля. До этого мы рассматривали .expand() с client perspective: “пишу .expand, получаю N mapped TI”. Но под капотом scheduler выполняет нетривиальный dance: reads upstream XCom, calculates length, creates N task_instance rows with right map_index, applies upstream dependencies, handles edge cases.

Понимание этого механизма критично для:

  • Debugging stuck mapped tasks
  • Optimizing scheduler performance при high mapping count
  • Понимания почему max_map_length имеет именно такое значение
  • Кастомизации MappedOperator subclasses (advanced use case)

List comprehensions и генераторы — фундамент expand

DAG processing phases recap

Сначала вспомним scheduler main loop (Module 04):

PhaseЧто делает
Phase 1 — DAG processorParse .py файлов, serialize в DB
Phase 2 — SchedulerСоздание DagRun, scheduling TI, expand mapped tasks
Phase 3 — Critical sectionAcquire lock, enqueue scheduled → queued
Phase 4 — HousekeepingOrphan cleanup, log cleanup

Expand mapped task происходит в Phase 2, между schedule TI и critical section.


Что хранится после parse

При parse Airflow видит:

process.expand(filename=get_files())

Создаётся MappedOperator — placeholder для mapped task. В serialized_dag table:

{
  "task_id": "process",
  "_is_mapped": true,
  "task_type": "_PythonDecoratedOperator",
  "expand_input": {
    "type": "dict-of-lists",
    "value": {
      "filename": {"__type__": "XComArg", "task_id": "get_files", "key": "return_value"}
    }
  },
  "partial_kwargs": {},
  ...
}

Length не известна при parse. Это XComArg reference на upstream task.

В task_instance table при создании DagRun:

SELECT task_id, map_index, state FROM task_instance
WHERE run_id = '<run_id>';
task_idmap_indexstate
get_files-1scheduled
process-1scheduled

process с map_index=-1 — это unmapped placeholder. На этом этапе ещё нет N mapped TI.


Phase 2 scheduler loop

После того как get_files success, scheduler в следующем tick делает:

expand_mapped_task в Phase 2
scheduler tick (5s)Phase 2 main loop iteration. Scheduler перебирает DagRun-ы которые eligible для scheduling.
find ready mapped tasks
scan task_instance WHERE _is_mapped=true AND state IN (scheduled, None)Scheduler ищет mapped TI placeholder (map_index=-1) у которых upstream завершён. Это candidates для expansion.
check upstream completeДля каждого candidate — проверяет что upstream (производитель XCom для expand_input) завершился success. Если ещё не — skip, попробуем на следующем tick.
да
resolve expand_inputReads XCom от upstream task — value, который должен быть iterable. SELECT FROM xcom WHERE task_id='get_files' AND key='return_value'.
length check vs max_map_lengthСчитает length iterable. Если > max_map_length (default 1024) — AirflowException, task переходит в failed. Иначе — продолжает.
MappedOperator.expand_mapped_task(session)Core механизм: создаёт N task_instance rows с map_index 0..N-1. Удаляет placeholder с map_index=-1 (или меняет на map_index=0 для эффективности). Каждая новая TI inherits dependencies, retry config, и т.д.
INSERT INTO task_instance × N
N TI scheduledПосле expand — N TI с state=scheduled, map_index 0..N-1. Они идут в Phase 3 (critical section) для enqueue в executor.

Implementation: упрощённый псевдокод

В коде airflow/models/mappedoperator.py:

class MappedOperator(BaseOperator):
    """Operator that hasn't been "expanded" yet."""

    def expand_mapped_task(self, run_id: str, *, session: Session) -> tuple[Sequence[TaskInstance], int]:
        """
        Expand mapped task в N task_instance rows.

        Returns: (new TIs created, total length)
        """
        # 1. Resolve expand_input — read upstream XCom
        expand_input = self.expand_input.resolve(run_id=run_id, session=session)
        # expand_input — dict: {arg_name: iterable_value}

        # 2. Calculate total length
        total_length = self._get_unmap_length(expand_input)
        if total_length > conf.getint("core", "max_map_length"):
            raise AirflowException(
                f"Number of mapped tasks ({total_length}) exceeds max_map_length"
            )

        # 3. Get/create TI rows
        dag_run = session.query(DagRun).filter_by(run_id=run_id, dag_id=self.dag_id).one()
        existing_tis = session.query(TaskInstance).filter(
            TaskInstance.dag_id == self.dag_id,
            TaskInstance.run_id == run_id,
            TaskInstance.task_id == self.task_id,
        ).all()

        # Если есть placeholder map_index=-1 — удаляем
        for ti in existing_tis:
            if ti.map_index == -1:
                session.delete(ti)

        # 4. Insert N new TI с map_index 0..N-1
        new_tis = []
        for i in range(total_length):
            ti = TaskInstance(
                task=self,
                run_id=run_id,
                map_index=i,
                state=State.SCHEDULED,
            )
            session.add(ti)
            new_tis.append(ti)

        session.flush()
        return new_tis, total_length

Этот метод вызывается на каждый scheduler tick для каждого ready mapped task.


Resolve: что worker видит на execute

После expand — worker запускает mapped TI. В execute():

class _PythonDecoratedOperator:
    def execute(self, context):
        ti = context["ti"]
        map_index = ti.map_index   # например, 1

        # Resolve каждый kwarg expand_input
        resolved_kwargs = {}
        for kwarg_name, kwarg_source in self.expand_input.items():
            if isinstance(kwarg_source, XComArg):
                # SELECT FROM xcom WHERE task_id=upstream AND key=...
                upstream_value = kwarg_source.resolve(context)
                # upstream_value — это full list/iterable

                # Pick element по map_index
                resolved_kwargs[kwarg_name] = upstream_value[map_index]
            elif isinstance(kwarg_source, list):
                # Static list
                resolved_kwargs[kwarg_name] = kwarg_source[map_index]
            else:
                raise TypeError(f"Cannot resolve {kwarg_source}")

        # Merge с partial_kwargs (constants)
        resolved_kwargs.update(self.partial_kwargs)

        # Call user function
        result = self.python_callable(**resolved_kwargs)

        # Push XCom с map_index
        ti.xcom_push(key="return_value", value=result)
        # → INSERT INTO xcom (task_id, map_index, key, value)
        #   VALUES ('process', 1, 'return_value', result_bytes)

Key insight: каждая mapped TI делает свой xcom_pull upstream и picks element по map_index. Это N round-trips для N mapped TI — но scheduler уже знает total_length, так что нет coordination.


Cartesian product internals

При нескольких expand args — itertools.product:

def _get_unmap_length(self, expand_input):
    if isinstance(expand_input, dict):
        # expand(a=[1,2], b=[x,y])
        lengths = [len(v) for v in expand_input.values()]
        return reduce(operator.mul, lengths, 1)   # cartesian
    ...

При resolve worker:

# expand(env=['dev','prod'], region=['us','eu','ap'])
# map_index=4 → ?

from itertools import product
combinations = list(product(['dev','prod'], ['us','eu','ap']))
# [('dev','us'), ('dev','eu'), ('dev','ap'), ('prod','us'), ('prod','eu'), ('prod','ap')]
env, region = combinations[4]   # ('prod', 'eu')

Internally implemented через index unraveling — не storing полный product в memory.


Dependencies — upstream/downstream

После expand:

get_files (1 TI, map_idx=-1)

    ├─ process[0] ─┐
    ├─ process[1] ─┤
    └─ process[2] ─┘

              summarize (1 TI, map_idx=-1, depends on ALL process mapped TI)

Каждая mapped TI имеет dependency на upstream get_files. Downstream summarize — на все mapped TI process (через task_instance_dependency table или dynamic resolve).

trigger_rule для downstream — обычно all_success (default). Если одна из mapped TI failed → summarize не запускается.

С trigger_rule="all_done":

@task(trigger_rule="all_done")
def summarize(results):
    pass

summarize запустится даже если некоторые mapped failed — получит partial list.


Batch-обработка — параллельные vs последовательные паттерны

max_map_length — почему именно 1024

Default max_map_length = 1024 выбран как baseline для:

ResourceImpact
DB rows1024 task_instance + 1024 xcom + 1024 log records per run
Scheduler memoryHolds list of TI в memory во время Phase 2
Critical section durationUPDATE state=queued × 1024 → дольше lock
UI renderGrid view 1024 cells — slow but tolerable
Logging volume1024 separate log files per run

При 1024 mapped TI per run × 100 DAG-ов × 24 runs/day = 2.5M task_instance per day. Это уже значимая нагрузка.

Если очень нужно больше:

[core]
max_map_length = 4096

Но understand trade-offs:

  • Scheduler tick длительность увеличивается
  • UI становится slow для DAG с >2k mapped TI
  • DB size grows

Alternative — batch processing (см. урок 05).


Edge case: dynamic re-expansion (clear + retry)

Что если вы делаете airflow tasks clear на mapped task?

airflow tasks clear -t process my_dag --execution-date 2026-05-12

Airflow:

  1. Удаляет N mapped TI rows
  2. Создаёт placeholder с map_index=-1 (если upstream still cleared)
  3. На следующем scheduler tick — re-expand: scheduler читает upstream XCom (которое stil there) и создаёт новые N mapped TI

Если upstream length изменился (например, upstream был cleared и re-ran → возвращает другой list) — re-expansion с новой N. Это может surprise: после clear N mapped TI = 100, после re-run N = 95.


Mapping с empty input

@task
def get_items() -> list:
    return []   # empty

process.expand(item=get_items())

В expand_mapped_task:

total_length = 0
# Удаляет placeholder
# Не создаёт новых TI
# Mapped task переходит в state=success (с 0 sub-instances)

UI показывает process с 0/0 mapped TI. Downstream summarize([]) запускается с empty list.


State machine для mapped TI

Каждая mapped TI имеет full lifecycle, independent от других:

scheduled → queued → running → success/failed/up_for_retry/skipped

Что важно:

  • process[0] может failed → retry → success
  • process[1] параллельно — running
  • Independent state per map_index

Это огромное преимущество vs batch: isolated retry per item. Если 1 из 100 items упадёт — retry только эту 1 с её retry count, не все 100.


Cross-mapping limitation

Mapped task не может consume specific другой mapped task element:

# ❌ Не работает напрямую
double = process.expand(x=[1, 2, 3])

@task
def use_previous(x, prev):
    pass

# Хотим: use_previous[i] видит process[i-1] result
use_previous.expand(x=range(3), prev=???)

Mapped TI выполняются independently, нет ordering guarantee. Если нужна cross-element logic:

  • Aggregate в non-mapped task сначала
  • Or batch processing внутри одного task с sequential loop

Production gotchas

Gotcha 1: upstream не возвращает iterable

@task
def get_x() -> int:
    return 42

process.expand(x=get_x())   # AirflowException: not iterable

Type checker не помогает (т.к. runtime resolution). Fix: ensure return type list/tuple.

Gotcha 2: длительная expand_mapped_task замедляет scheduler

Phase 2 включает expand. Если у вас 100 DAG-ов с 1000 mapped TI каждая, scheduler tick тратит много time на expand_mapped_task. Mitigation: dedicated scheduler для compute-heavy mappings (через dag_processor_subdir partitioning).

Gotcha 3: deserialize upstream XCom heavy

@task
def get_files() -> list[dict]:
    return [{"key": "...", "data": "<big blob>"} for _ in range(500)]

process.expand_kwargs(get_files())

Каждая mapped TI делает xcom_pull(get_files) → pulls full 500-item list (10MB+). 500 mapped TI × 10MB pulls = 5GB network traffic. Use Custom Backend для offload или return лишь references.

Gotcha 4: changing upstream между runs

@task
def get_files() -> list:
    return list_s3_files()   # 100 today, 50 завтра

Каждый run — разный count mapped TI. Не предсказуемо. Если ваш DAG требует stable count — pin upstream via Param или scheduled snapshot.

Gotcha 5: expand + custom XCom backend race

При expand scheduler reads upstream XCom. Если Custom Backend имеет latency (S3 hit ~100ms), scheduler ждёт. На high-mapping DAG это ощутимый delay.


Hands-on: SQL для debugging stuck mapped

Если mapped task stuck в “scheduled”:

-- Все mapped TI для DAG
SELECT
    task_id,
    map_index,
    state,
    start_date,
    end_date,
    try_number,
    queued_dttm
FROM task_instance
WHERE dag_id = 'my_dag'
  AND run_id = '<run_id>'
  AND task_id IN (SELECT task_id FROM serialized_dag_task WHERE _is_mapped = true)
ORDER BY task_id, map_index;

Если все mapped TI в state=scheduled — scheduler не enqueues (pool full, max_active_tasks, etc).

Если placeholder map_index=-1 — expand_mapped_task ещё не выполнен (upstream not complete или scheduler busy).


Проверка знанийKnowledge check
Опиши end-to-end путь от parse до execute для `process.expand(file=get_files())` где get_files возвращает 5 элементов. Что хранится в DB на каждом этапе?
ОтветAnswer
**Parse**: DAG processor видит .expand() — создаёт MappedOperator placeholder. serialized_dag хранит JSON с _is_mapped=true и expand_input=XComArg(get_files, return_value). Length unknown. **DagRun created**: task_instance table получает 2 rows: get_files (map_index=-1, state=scheduled), process (map_index=-1, state=scheduled — placeholder). **Phase 2 scheduler — get_files runs**: scheduler в critical section enqueues get_files → worker executes → return [a,b,c,d,e] → INSERT xcom (task_id=get_files, map_index=-1, key=return_value, value=list). state=success. **Phase 2 — expand_mapped_task**: scheduler видит process placeholder, upstream done, читает xcom от get_files (SELECT), length=5, max_map_length OK (5<<1024), вызывает expand_mapped_task(session). Удаляет placeholder с map_index=-1, создаёт 5 task_instance rows с map_index 0..4, state=scheduled. **Phase 3 critical section**: enqueues 5 mapped TI scheduled→queued. **Workers execute**: каждый worker получает 1 mapped TI, в execute() делает xcom_pull(get_files) → получает full list → picks element по map_index → calls callable → result → INSERT xcom (task_id=process, map_index=N, return_value). После всех 5 success: 5 xcom rows для process с разными map_index, downstream task aggregates.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Опиши state DB сразу после DagRun creation, но ДО запуска upstream get_files() для DAG с `process.expand(file=get_files())`

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6