Runtime mechanics — как scheduler expands mapped task
Это самый глубокий урок модуля. До этого мы рассматривали .expand() с client perspective: “пишу .expand, получаю N mapped TI”. Но под капотом scheduler выполняет нетривиальный dance: reads upstream XCom, calculates length, creates N task_instance rows with right map_index, applies upstream dependencies, handles edge cases.
Понимание этого механизма критично для:
- Debugging stuck mapped tasks
- Optimizing scheduler performance при high mapping count
- Понимания почему
max_map_lengthимеет именно такое значение - Кастомизации MappedOperator subclasses (advanced use case)
List comprehensions и генераторы — фундамент expand
DAG processing phases recap
Сначала вспомним scheduler main loop (Module 04):
| Phase | Что делает |
|---|---|
| Phase 1 — DAG processor | Parse .py файлов, serialize в DB |
| Phase 2 — Scheduler | Создание DagRun, scheduling TI, expand mapped tasks |
| Phase 3 — Critical section | Acquire lock, enqueue scheduled → queued |
| Phase 4 — Housekeeping | Orphan cleanup, log cleanup |
Expand mapped task происходит в Phase 2, между schedule TI и critical section.
Что хранится после parse
При parse Airflow видит:
process.expand(filename=get_files())
Создаётся MappedOperator — placeholder для mapped task. В serialized_dag table:
{
"task_id": "process",
"_is_mapped": true,
"task_type": "_PythonDecoratedOperator",
"expand_input": {
"type": "dict-of-lists",
"value": {
"filename": {"__type__": "XComArg", "task_id": "get_files", "key": "return_value"}
}
},
"partial_kwargs": {},
...
}
Length не известна при parse. Это XComArg reference на upstream task.
В task_instance table при создании DagRun:
SELECT task_id, map_index, state FROM task_instance
WHERE run_id = '<run_id>';
| task_id | map_index | state |
|---|---|---|
get_files | -1 | scheduled |
process | -1 | scheduled |
process с map_index=-1 — это unmapped placeholder. На этом этапе ещё нет N mapped TI.
Phase 2 scheduler loop
После того как get_files success, scheduler в следующем tick делает:
Implementation: упрощённый псевдокод
В коде airflow/models/mappedoperator.py:
class MappedOperator(BaseOperator):
"""Operator that hasn't been "expanded" yet."""
def expand_mapped_task(self, run_id: str, *, session: Session) -> tuple[Sequence[TaskInstance], int]:
"""
Expand mapped task в N task_instance rows.
Returns: (new TIs created, total length)
"""
# 1. Resolve expand_input — read upstream XCom
expand_input = self.expand_input.resolve(run_id=run_id, session=session)
# expand_input — dict: {arg_name: iterable_value}
# 2. Calculate total length
total_length = self._get_unmap_length(expand_input)
if total_length > conf.getint("core", "max_map_length"):
raise AirflowException(
f"Number of mapped tasks ({total_length}) exceeds max_map_length"
)
# 3. Get/create TI rows
dag_run = session.query(DagRun).filter_by(run_id=run_id, dag_id=self.dag_id).one()
existing_tis = session.query(TaskInstance).filter(
TaskInstance.dag_id == self.dag_id,
TaskInstance.run_id == run_id,
TaskInstance.task_id == self.task_id,
).all()
# Если есть placeholder map_index=-1 — удаляем
for ti in existing_tis:
if ti.map_index == -1:
session.delete(ti)
# 4. Insert N new TI с map_index 0..N-1
new_tis = []
for i in range(total_length):
ti = TaskInstance(
task=self,
run_id=run_id,
map_index=i,
state=State.SCHEDULED,
)
session.add(ti)
new_tis.append(ti)
session.flush()
return new_tis, total_length
Этот метод вызывается на каждый scheduler tick для каждого ready mapped task.
Resolve: что worker видит на execute
После expand — worker запускает mapped TI. В execute():
class _PythonDecoratedOperator:
def execute(self, context):
ti = context["ti"]
map_index = ti.map_index # например, 1
# Resolve каждый kwarg expand_input
resolved_kwargs = {}
for kwarg_name, kwarg_source in self.expand_input.items():
if isinstance(kwarg_source, XComArg):
# SELECT FROM xcom WHERE task_id=upstream AND key=...
upstream_value = kwarg_source.resolve(context)
# upstream_value — это full list/iterable
# Pick element по map_index
resolved_kwargs[kwarg_name] = upstream_value[map_index]
elif isinstance(kwarg_source, list):
# Static list
resolved_kwargs[kwarg_name] = kwarg_source[map_index]
else:
raise TypeError(f"Cannot resolve {kwarg_source}")
# Merge с partial_kwargs (constants)
resolved_kwargs.update(self.partial_kwargs)
# Call user function
result = self.python_callable(**resolved_kwargs)
# Push XCom с map_index
ti.xcom_push(key="return_value", value=result)
# → INSERT INTO xcom (task_id, map_index, key, value)
# VALUES ('process', 1, 'return_value', result_bytes)
Key insight: каждая mapped TI делает свой xcom_pull upstream и picks element по map_index. Это N round-trips для N mapped TI — но scheduler уже знает total_length, так что нет coordination.
Cartesian product internals
При нескольких expand args — itertools.product:
def _get_unmap_length(self, expand_input):
if isinstance(expand_input, dict):
# expand(a=[1,2], b=[x,y])
lengths = [len(v) for v in expand_input.values()]
return reduce(operator.mul, lengths, 1) # cartesian
...
При resolve worker:
# expand(env=['dev','prod'], region=['us','eu','ap'])
# map_index=4 → ?
from itertools import product
combinations = list(product(['dev','prod'], ['us','eu','ap']))
# [('dev','us'), ('dev','eu'), ('dev','ap'), ('prod','us'), ('prod','eu'), ('prod','ap')]
env, region = combinations[4] # ('prod', 'eu')
Internally implemented через index unraveling — не storing полный product в memory.
Dependencies — upstream/downstream
После expand:
get_files (1 TI, map_idx=-1)
│
├─ process[0] ─┐
├─ process[1] ─┤
└─ process[2] ─┘
│
summarize (1 TI, map_idx=-1, depends on ALL process mapped TI)
Каждая mapped TI имеет dependency на upstream get_files. Downstream summarize — на все mapped TI process (через task_instance_dependency table или dynamic resolve).
trigger_rule для downstream — обычно all_success (default). Если одна из mapped TI failed → summarize не запускается.
С trigger_rule="all_done":
@task(trigger_rule="all_done")
def summarize(results):
pass
summarize запустится даже если некоторые mapped failed — получит partial list.
Batch-обработка — параллельные vs последовательные паттерны
max_map_length — почему именно 1024
Default max_map_length = 1024 выбран как baseline для:
| Resource | Impact |
|---|---|
| DB rows | 1024 task_instance + 1024 xcom + 1024 log records per run |
| Scheduler memory | Holds list of TI в memory во время Phase 2 |
| Critical section duration | UPDATE state=queued × 1024 → дольше lock |
| UI render | Grid view 1024 cells — slow but tolerable |
| Logging volume | 1024 separate log files per run |
При 1024 mapped TI per run × 100 DAG-ов × 24 runs/day = 2.5M task_instance per day. Это уже значимая нагрузка.
Если очень нужно больше:
[core]
max_map_length = 4096
Но understand trade-offs:
- Scheduler tick длительность увеличивается
- UI становится slow для DAG с >2k mapped TI
- DB size grows
Alternative — batch processing (см. урок 05).
Edge case: dynamic re-expansion (clear + retry)
Что если вы делаете airflow tasks clear на mapped task?
airflow tasks clear -t process my_dag --execution-date 2026-05-12
Airflow:
- Удаляет N mapped TI rows
- Создаёт placeholder с map_index=-1 (если upstream still cleared)
- На следующем scheduler tick — re-expand: scheduler читает upstream XCom (которое stil there) и создаёт новые N mapped TI
Если upstream length изменился (например, upstream был cleared и re-ran → возвращает другой list) — re-expansion с новой N. Это может surprise: после clear N mapped TI = 100, после re-run N = 95.
Mapping с empty input
@task
def get_items() -> list:
return [] # empty
process.expand(item=get_items())
В expand_mapped_task:
total_length = 0
# Удаляет placeholder
# Не создаёт новых TI
# Mapped task переходит в state=success (с 0 sub-instances)
UI показывает process с 0/0 mapped TI. Downstream summarize([]) запускается с empty list.
State machine для mapped TI
Каждая mapped TI имеет full lifecycle, independent от других:
scheduled → queued → running → success/failed/up_for_retry/skipped
Что важно:
process[0]может failed → retry → successprocess[1]параллельно — running- Independent state per map_index
Это огромное преимущество vs batch: isolated retry per item. Если 1 из 100 items упадёт — retry только эту 1 с её retry count, не все 100.
Cross-mapping limitation
Mapped task не может consume specific другой mapped task element:
# ❌ Не работает напрямую
double = process.expand(x=[1, 2, 3])
@task
def use_previous(x, prev):
pass
# Хотим: use_previous[i] видит process[i-1] result
use_previous.expand(x=range(3), prev=???)
Mapped TI выполняются independently, нет ordering guarantee. Если нужна cross-element logic:
- Aggregate в non-mapped task сначала
- Or batch processing внутри одного task с sequential loop
Production gotchas
Gotcha 1: upstream не возвращает iterable
@task
def get_x() -> int:
return 42
process.expand(x=get_x()) # AirflowException: not iterable
Type checker не помогает (т.к. runtime resolution). Fix: ensure return type list/tuple.
Gotcha 2: длительная expand_mapped_task замедляет scheduler
Phase 2 включает expand. Если у вас 100 DAG-ов с 1000 mapped TI каждая, scheduler tick тратит много time на expand_mapped_task. Mitigation: dedicated scheduler для compute-heavy mappings (через dag_processor_subdir partitioning).
Gotcha 3: deserialize upstream XCom heavy
@task
def get_files() -> list[dict]:
return [{"key": "...", "data": "<big blob>"} for _ in range(500)]
process.expand_kwargs(get_files())
Каждая mapped TI делает xcom_pull(get_files) → pulls full 500-item list (10MB+). 500 mapped TI × 10MB pulls = 5GB network traffic. Use Custom Backend для offload или return лишь references.
Gotcha 4: changing upstream между runs
@task
def get_files() -> list:
return list_s3_files() # 100 today, 50 завтра
Каждый run — разный count mapped TI. Не предсказуемо. Если ваш DAG требует stable count — pin upstream via Param или scheduled snapshot.
Gotcha 5: expand + custom XCom backend race
При expand scheduler reads upstream XCom. Если Custom Backend имеет latency (S3 hit ~100ms), scheduler ждёт. На high-mapping DAG это ощутимый delay.
Hands-on: SQL для debugging stuck mapped
Если mapped task stuck в “scheduled”:
-- Все mapped TI для DAG
SELECT
task_id,
map_index,
state,
start_date,
end_date,
try_number,
queued_dttm
FROM task_instance
WHERE dag_id = 'my_dag'
AND run_id = '<run_id>'
AND task_id IN (SELECT task_id FROM serialized_dag_task WHERE _is_mapped = true)
ORDER BY task_id, map_index;
Если все mapped TI в state=scheduled — scheduler не enqueues (pool full, max_active_tasks, etc).
Если placeholder map_index=-1 — expand_mapped_task ещё не выполнен (upstream not complete или scheduler busy).