Dynamic Task Mapping — обзор модуля
Dynamic Task Mapping (AIP-42, 2.3+) — runtime expansion task в N параллельных копий на основе upstream result. Это pandas.apply для Airflow. Концептуально просто, но имеет важные scaling implications.
Уроки модуля
| # | Урок | Что внутри |
|---|---|---|
| 01 | Обзор модуля | Текущий урок |
| 02 | expand() — основы | process.expand(filename=get_files()), cartesian product |
| 03 | expand_kwargs / zip / partial | List of dicts, parallel zip, partial + dynamic |
| 04 | Mapped tasks runtime | Как scheduler runtime expands, что хранится в БД |
| 05 | Scaling pitfalls | 1000 mapped tasks = scheduler load, max_map_length |
| 06 | Real-world patterns | S3 prefix processing, DB shards, multi-region (см. lesson 06) |
Ключевые концепты
@task
def get_files() -> list[str]:
return ["a.csv", "b.csv", "c.csv"]
@task
def process(filename: str):
pass
process.expand(filename=get_files()) # 3 mapped TI
Варианты:
.expand(arg=iterable)— cartesian product если несколько args.expand_kwargs([{...}, {...}])— list of dicts.zip(a, b)— parallel zip.partial(const=...).expand(...)— constant + dynamic
Scaling pitfalls
Лимит: max_map_length = 1024 по умолчанию. Можно поднять, но осторожно:
- 1000 mapped tasks = 1000 строк в
task_instance+ 1000 xcom records - Scheduler load → critical section longer
- UI становится медленным
Mitigation:
max_active_tis_per_dagограничивает одновременно running- Batch processing — внутри одного task обработать N items, не N task
Killer takeaway
Dynamic Task Mapping — не silver bullet для parallelism. Если можете batch внутри одного task (например, обработать 100 файлов в одном Python task), это в 100 раз эффективнее, чем 100 mapped tasks. Используйте mapping когда нужна изолированная retry каждого item.