Learning Platform
Глоссарий Troubleshooting
Урок 08.02 · 26 мин
Продвинутый
Dynamic MappingAIP-42expandRuntimemap_index

expand() основы — синтаксис и runtime mechanics

Dynamic Task Mapping (AIP-42, Airflow 2.3+) — это runtime expansion одной task в N параллельных копий на основе upstream result. На surface — простой .expand(arg=iterable). Под капотом — нетривиальный scheduler dance: parse vs runtime, как создаются TI rows, как работает cartesian product при нескольких args.

List/dict/set comprehensions, generator expressions

Этот урок закладывает foundation, на котором строятся .expand_kwargs, .zip, .partial (урок 03) и runtime mechanics (урок 04).


Синтаксис

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule=None, start_date=datetime(2026, 1, 1), catchup=False)
def expand_demo():
    @task
    def get_files() -> list[str]:
        return ["a.csv", "b.csv", "c.csv"]

    @task
    def process(filename: str):
        print(f"Processing {filename}")

    process.expand(filename=get_files())   # ← 3 mapped TI

expand_demo()

Что произошло:

  • process — обычный TaskFlow task с одним аргументом
  • .expand(filename=get_files()) — вместо запуска один раз, scheduler создаст N экземпляров, по одному на каждый element upstream list
  • Каждый экземпляр получит свой filename и свой map_index (0, 1, 2)

После run UI покажет: один get_files (success), три process под одним node (success — success — success).


Что значит “expand”

.expand(arg=iterable) означает:

  • Аргумент arg будет проксирован на element-by-element basis
  • Создастся столько copies task, сколько elements в iterable
  • Каждая copy — independent TaskInstance с уникальным map_index

Это runtime expansion — scheduler сам решает количество TI на основе upstream data. В отличие от:

  • Static parallelism: вы пишете 3 копии process_a, process_b, process_c руками
  • For loop в DAG body: создаст N tasks с N task_id (collision если попытка одинаковых имён)
  • SubDAG: deprecated, requires nested DagRun

Dynamic Task Mapping — это elegant ответ Airflow на “у меня переменное число items на каждом run”.


Parse vs Runtime

Это критическое разделение. Airflow обрабатывает DAG в двух фазах:

Parse vs Runtime для expand
DAG processor — parseПри parse Airflow читает .py файл, выполняет top-level code (DAG body). Видит process.expand(filename=get_files()) — создаёт MappedOperator placeholder. Length неизвестна (зависит от upstream который ещё не запущен).
serialize в DB
serialized_dagDAG сохраняется с информацией о MappedOperator (что нужно expand). Точная length не известна. В UI MappedOperator показывается как один node с placeholder count.
DagRun created — scheduler tick
scheduler runs get_filesPhase 1-2 scheduler: создаёт TI для get_files, executor runs её. После success — return value в xcom (например [a.csv, b.csv, c.csv]).
get_files complete
scheduler expand_mapped_taskPhase 2 scheduler loop: reads XCom от get_files, видит len=3, вызывает task.expand_mapped_task(session) → создаёт 3 TI rows для process с map_index 0,1,2. Каждая получает свой filename.
3 process TI runningWorker запускает каждый mapped TI. При execute() context['ti'].map_index используется для resolve конкретного element upstream XCom. Например map_index=1 → filename='b.csv'.

Ключевое: при parse Airflow не знает количество N. Это determined только на runtime, когда upstream завершилось.


Что хранится в DB при expand

После get_files complete, scheduler выполняет expand_mapped_task. В task_instance table появляются:

SELECT task_id, map_index, state FROM task_instance
WHERE run_id = '<run_id>'
ORDER BY task_id, map_index;
task_idmap_indexstate
get_files-1success
process0scheduled
process1scheduled
process2scheduled

map_index=-1 — обычный TI. map_index=0..N-1 — mapped TI. Все имеют тот же task_id.

После запуска mapped TI обновляются:

  • state: scheduled → queued → running → success/failed
  • start_date, end_date, duration — per map_index

Сколько TI можно создать — max_map_length

Airflow ограничивает количество mapped TI чтобы не убить scheduler:

# airflow.cfg
[core]
max_map_length = 1024   # default

Если upstream возвращает 1500 элементов:

# Scheduler error при expand_mapped_task:
AirflowException: Number of mapped tasks (1500) exceeds max_map_length (1024)

Task переходит в failed state. Можно увеличить:

[core]
max_map_length = 4096

Но careful — каждая mapped TI это row в task_instance + потенциально XCom row. 4000 mapped TI значительно нагружают scheduler critical section и UI rendering.

См. урок 05 для подробного обсуждения scaling pitfalls.


Source — что можно передать в expand

.expand(arg=...) принимает:

1. XComArg (от upstream task)

process.expand(filename=get_files())   # XComArg

Самый частый case. Scheduler reads XCom upstream, считает length.

2. Static list/dict

process.expand(filename=["a.csv", "b.csv", "c.csv"])   # static list

Length известна сразу на parse — scheduler создаёт TI immediately после DagRun start (без ожидания upstream).

3. XComArg от mapped task

double_results = double.expand(x=[1, 2, 3])   # 3 mapped TI returns
square.expand(x=double_results)               # ← chain: 3 mapped square TI

Chain mapped tasks — каждый mapped TI consumes corresponding upstream mapped TI.

4. airflow.models.param.Param (DAG params)

@dag(params={"files": ["a.csv", "b.csv"]})
def my_dag():
    @task
    def process(filename): pass
    process.expand(filename="{{ params.files }}")

Static при DAG trigger, dynamic при manual run override.


Cartesian product — несколько args

Если в expand несколько keyword args — получаете cartesian product:

@task
def process(env: str, region: str):
    print(f"Processing {env}/{region}")

process.expand(
    env=["dev", "prod"],
    region=["us", "eu", "ap"],
)

Результат: 2 × 3 = 6 mapped TI:

map_indexenvregion
0devus
1deveu
2devap
3produs
4prodeu
5prodap

Order — по python itertools.product (left-most argument changes slowest).

WARNING

Cartesian product быстро становится огромным: expand(a=[10 items], b=[10 items], c=[10 items]) = 1000 mapped TI. Превышает default max_map_length=1024 уже на 4-5 axes по 6 элементов. Если нужно lockstep pairs (a[0]+b[0], a[1]+b[1]…) — используйте .zip() (урок 03), не .expand().


Visualization: cartesian vs zip

expand() — cartesian product vs zip
expand(a=[1,2], b=[x,y])Cartesian product: 2×2=4 mapped TI. map_index 0=(1,x), 1=(1,y), 2=(2,x), 3=(2,y). Используется когда нужны все комбинации (env×region, model×dataset)
expand_kwargs([{a:1,b:x},{a:2,b:y}])Explicit list of dicts. Length = 2 (а не 4). Pairs не cross-multiplied. Используется когда у вас есть готовые комбинации (например, конфиги из DB).
zip(a, b).map(...) → expand_kwargsParallel pairs: a[0]+b[0], a[1]+b[1]. Length = min(len(a), len(b)). Используется когда a и b — parallel lists same length (users + scores).

Урок 03 раскроет каждое подробно.


Empty upstream — что произойдёт

@task
def get_files() -> list:
    return []   # пустой list

process.expand(filename=get_files())

Scheduler видит upstream length=0:

  • process task переходит в state success без mapped TI sub-instances
  • Downstream tasks выполняются normally (если их trigger_rule позволяет)
  • UI показывает process с 0/0 mapped TI

Это OK behavior — представляет “ничего обрабатывать”. Если хотите fail на empty:

@task
def get_files() -> list:
    files = list_s3_files()
    if not files:
        raise ValueError("No files to process")
    return files

Mapped TI lifecycle

Каждая mapped TI имеет полный TaskInstance lifecycle:

none → scheduled → queued → running → success/failed/up_for_retry/...

Это значит:

  • Каждая mapped TI может independently retry (свои retries count)
  • Каждая может быть manually cleared в UI
  • Каждая логирует в task_instance.log (отдельные log files per map_index)

Это сильное преимущество vs batch processing — изолированный retry за конкретный item.


Code: end-to-end пример

from airflow.decorators import dag, task
from datetime import datetime
import os

@dag(
    schedule="@daily"
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["dynamic-mapping"],
)
def s3_file_processor():

    @task
    def list_s3_files(bucket: str = "raw-data") -> list[dict]:
        """Возвращает список файлов с metadata."""
        return [
            {"key": f"orders_2026_{day:02d}.csv", "size": 1024 * day}
            for day in range(1, 6)   # ← 5 файлов
        ]

    @task
    def process_file(file_info: dict) -> dict:
        """Обрабатывает один файл. Получает один dict из списка."""
        print(f"Processing {file_info['key']} ({file_info['size']} bytes)")
        return {"key": file_info["key"], "rows_loaded": file_info["size"] // 100}

    @task
    def summarize(results: list[dict]):
        """Суммирует все результаты (aggregation)."""
        total = sum(r["rows_loaded"] for r in results)
        print(f"Total rows loaded: {total}")

    files = list_s3_files()
    processed = process_file.expand(file_info=files)   # ← 5 mapped TI
    summarize(processed)

s3_file_processor()

После run:

  • list_s3_files — 1 TI, return list of 5 dicts
  • process_file — 5 mapped TI (map_index 0..4), каждая обрабатывает свой dict
  • summarize — 1 TI, получает list[5] aggregated через XCom (как в Module 06 lesson 06)

Naming mapped TI в UI и логах

В 2.x mapped TI отображаются как:

process_file
├── [0] orders_2026_01.csv     ← logical name (по map_index)
├── [1] orders_2026_02.csv
├── [2] orders_2026_03.csv
├── [3] orders_2026_04.csv
└── [4] orders_2026_05.csv

UI показывает map_index в брекетах. Логи доступны per map_index: {dag_id}/{task_id}/map_index={N}/{run_id}.log.

В Airflow 2.9+ появилась возможность кастомизировать render через map_index_template (если знаете input — можно показать smart label).


Production gotchas

Gotcha 1: order не гарантирован

@task
def list_files() -> list:
    return os.listdir("/data")   # ← Python order не гарантирован

map_index 0 может оказаться любым файлом. Downstream aggregation в порядке map_index, но это не порядок alphabetical/timestamp. Если порядок важен — sort upstream: return sorted(os.listdir("/data")).

Gotcha 2: huge cartesian произведение

process.expand(env=envs, region=regions, model=models)   # 5×5×10 = 250

Если каждый element list — несколько → быстро превышает max_map_length. Используйте explicit pairs через .expand_kwargs (урок 03).

Gotcha 3: side effects в expand callable

@task
def get_items() -> list:
    items = []
    for x in range(100):
        send_metric("get_items_called", x)   # ← side effect
        items.append(x)
    return items

Если upstream task имеет side effects (метрики, запись в DB) — они происходят на одном execution (как обычно). Но mapped tasks не “видят” side effects — они получают только return value через XCom.

Gotcha 4: Type annotation должна быть Iterable

@task
def get_items() -> int:   # ❌ int — не iterable
    return 42

process.expand(x=get_items())   # AirflowException на expand

Upstream task должен возвращать list, tuple, dict (для kwargs) или другой iterable. Иначе scheduler не может посчитать length.

Gotcha 5: dict в expand → keys not values

@task
def get_dict() -> dict:
    return {"a": 1, "b": 2, "c": 3}

process.expand(x=get_dict())   # iterates по keys! ['a','b','c']

Если хотите values: process.expand(x=list(get_dict().values())). Если хотите pairs (key+value) — лучше expand_kwargs (урок 03).


Проверка знанийKnowledge check
DAG `process.expand(env=['dev','prod'], region=['us','eu','ap'])`. Сколько mapped TI создаст scheduler, в каком порядке map_index, и при каком max_map_length эта структура сломается?
ОтветAnswer
Cartesian product: 2 × 3 = **6 mapped TI**. map_index по itertools.product (left-most меняется slowest): 0=(dev,us), 1=(dev,eu), 2=(dev,ap), 3=(prod,us), 4=(prod,eu), 5=(prod,ap). Default max_map_length=1024 — эта DAG безопасна. Сломается на (env=5×region=5×model=10×subset=5)=1250 — превысит. AirflowException на expand_mapped_task в Phase 2 scheduler loop. Mitigation: (1) `[core] max_map_length=4096` — но careful, scheduler load растёт; (2) лучше через `.expand_kwargs([{...}, {...}])` с explicit pairs только нужных комбинаций (не cross-multiplication); (3) batch processing — один task обрабатывает N items внутри (если изолированный retry per item не нужен). Pattern: cartesian product красив, но scales плохо — при 3+ axes уже подумайте о explicit list of dicts.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Когда определяется количество N для `process.expand(item=get_items())` где get_items returns list?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6