expand() основы — синтаксис и runtime mechanics
Dynamic Task Mapping (AIP-42, Airflow 2.3+) — это runtime expansion одной task в N параллельных копий на основе upstream result. На surface — простой .expand(arg=iterable). Под капотом — нетривиальный scheduler dance: parse vs runtime, как создаются TI rows, как работает cartesian product при нескольких args.
Этот урок закладывает foundation, на котором строятся .expand_kwargs, .zip, .partial (урок 03) и runtime mechanics (урок 04).
Синтаксис
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule=None, start_date=datetime(2026, 1, 1), catchup=False)
def expand_demo():
@task
def get_files() -> list[str]:
return ["a.csv", "b.csv", "c.csv"]
@task
def process(filename: str):
print(f"Processing {filename}")
process.expand(filename=get_files()) # ← 3 mapped TI
expand_demo()
Что произошло:
process— обычный TaskFlow task с одним аргументом.expand(filename=get_files())— вместо запуска один раз, scheduler создаст N экземпляров, по одному на каждый element upstream list- Каждый экземпляр получит свой
filenameи свойmap_index(0, 1, 2)
После run UI покажет: один get_files (success), три process под одним node (success — success — success).
Что значит “expand”
.expand(arg=iterable) означает:
- Аргумент
argбудет проксирован на element-by-element basis - Создастся столько copies task, сколько elements в iterable
- Каждая copy — independent TaskInstance с уникальным
map_index
Это runtime expansion — scheduler сам решает количество TI на основе upstream data. В отличие от:
- Static parallelism: вы пишете 3 копии
process_a,process_b,process_cруками - For loop в DAG body: создаст N tasks с N task_id (collision если попытка одинаковых имён)
- SubDAG: deprecated, requires nested DagRun
Dynamic Task Mapping — это elegant ответ Airflow на “у меня переменное число items на каждом run”.
Parse vs Runtime
Это критическое разделение. Airflow обрабатывает DAG в двух фазах:
Ключевое: при parse Airflow не знает количество N. Это determined только на runtime, когда upstream завершилось.
Что хранится в DB при expand
После get_files complete, scheduler выполняет expand_mapped_task. В task_instance table появляются:
SELECT task_id, map_index, state FROM task_instance
WHERE run_id = '<run_id>'
ORDER BY task_id, map_index;
| task_id | map_index | state |
|---|---|---|
get_files | -1 | success |
process | 0 | scheduled |
process | 1 | scheduled |
process | 2 | scheduled |
map_index=-1 — обычный TI. map_index=0..N-1 — mapped TI. Все имеют тот же task_id.
После запуска mapped TI обновляются:
- state: scheduled → queued → running → success/failed
- start_date, end_date, duration — per map_index
Сколько TI можно создать — max_map_length
Airflow ограничивает количество mapped TI чтобы не убить scheduler:
# airflow.cfg
[core]
max_map_length = 1024 # default
Если upstream возвращает 1500 элементов:
# Scheduler error при expand_mapped_task:
AirflowException: Number of mapped tasks (1500) exceeds max_map_length (1024)
Task переходит в failed state. Можно увеличить:
[core]
max_map_length = 4096
Но careful — каждая mapped TI это row в task_instance + потенциально XCom row. 4000 mapped TI значительно нагружают scheduler critical section и UI rendering.
См. урок 05 для подробного обсуждения scaling pitfalls.
Source — что можно передать в expand
.expand(arg=...) принимает:
1. XComArg (от upstream task)
process.expand(filename=get_files()) # XComArg
Самый частый case. Scheduler reads XCom upstream, считает length.
2. Static list/dict
process.expand(filename=["a.csv", "b.csv", "c.csv"]) # static list
Length известна сразу на parse — scheduler создаёт TI immediately после DagRun start (без ожидания upstream).
3. XComArg от mapped task
double_results = double.expand(x=[1, 2, 3]) # 3 mapped TI returns
square.expand(x=double_results) # ← chain: 3 mapped square TI
Chain mapped tasks — каждый mapped TI consumes corresponding upstream mapped TI.
4. airflow.models.param.Param (DAG params)
@dag(params={"files": ["a.csv", "b.csv"]})
def my_dag():
@task
def process(filename): pass
process.expand(filename="{{ params.files }}")
Static при DAG trigger, dynamic при manual run override.
Cartesian product — несколько args
Если в expand несколько keyword args — получаете cartesian product:
@task
def process(env: str, region: str):
print(f"Processing {env}/{region}")
process.expand(
env=["dev", "prod"],
region=["us", "eu", "ap"],
)
Результат: 2 × 3 = 6 mapped TI:
| map_index | env | region |
|---|---|---|
| 0 | dev | us |
| 1 | dev | eu |
| 2 | dev | ap |
| 3 | prod | us |
| 4 | prod | eu |
| 5 | prod | ap |
Order — по python itertools.product (left-most argument changes slowest).
Cartesian product быстро становится огромным: expand(a=[10 items], b=[10 items], c=[10 items]) = 1000 mapped TI. Превышает default max_map_length=1024 уже на 4-5 axes по 6 элементов. Если нужно lockstep pairs (a[0]+b[0], a[1]+b[1]…) — используйте .zip() (урок 03), не .expand().
Visualization: cartesian vs zip
Урок 03 раскроет каждое подробно.
Empty upstream — что произойдёт
@task
def get_files() -> list:
return [] # пустой list
process.expand(filename=get_files())
Scheduler видит upstream length=0:
processtask переходит в statesuccessбез mapped TI sub-instances- Downstream tasks выполняются normally (если их trigger_rule позволяет)
- UI показывает
processс 0/0 mapped TI
Это OK behavior — представляет “ничего обрабатывать”. Если хотите fail на empty:
@task
def get_files() -> list:
files = list_s3_files()
if not files:
raise ValueError("No files to process")
return files
Mapped TI lifecycle
Каждая mapped TI имеет полный TaskInstance lifecycle:
none → scheduled → queued → running → success/failed/up_for_retry/...
Это значит:
- Каждая mapped TI может independently retry (свои
retriescount) - Каждая может быть manually cleared в UI
- Каждая логирует в
task_instance.log(отдельные log files per map_index)
Это сильное преимущество vs batch processing — изолированный retry за конкретный item.
Code: end-to-end пример
from airflow.decorators import dag, task
from datetime import datetime
import os
@dag(
schedule="@daily"
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["dynamic-mapping"],
)
def s3_file_processor():
@task
def list_s3_files(bucket: str = "raw-data") -> list[dict]:
"""Возвращает список файлов с metadata."""
return [
{"key": f"orders_2026_{day:02d}.csv", "size": 1024 * day}
for day in range(1, 6) # ← 5 файлов
]
@task
def process_file(file_info: dict) -> dict:
"""Обрабатывает один файл. Получает один dict из списка."""
print(f"Processing {file_info['key']} ({file_info['size']} bytes)")
return {"key": file_info["key"], "rows_loaded": file_info["size"] // 100}
@task
def summarize(results: list[dict]):
"""Суммирует все результаты (aggregation)."""
total = sum(r["rows_loaded"] for r in results)
print(f"Total rows loaded: {total}")
files = list_s3_files()
processed = process_file.expand(file_info=files) # ← 5 mapped TI
summarize(processed)
s3_file_processor()
После run:
list_s3_files— 1 TI, return list of 5 dictsprocess_file— 5 mapped TI (map_index 0..4), каждая обрабатывает свой dictsummarize— 1 TI, получает list[5] aggregated через XCom (как в Module 06 lesson 06)
Naming mapped TI в UI и логах
В 2.x mapped TI отображаются как:
process_file
├── [0] orders_2026_01.csv ← logical name (по map_index)
├── [1] orders_2026_02.csv
├── [2] orders_2026_03.csv
├── [3] orders_2026_04.csv
└── [4] orders_2026_05.csv
UI показывает map_index в брекетах. Логи доступны per map_index: {dag_id}/{task_id}/map_index={N}/{run_id}.log.
В Airflow 2.9+ появилась возможность кастомизировать render через map_index_template (если знаете input — можно показать smart label).
Production gotchas
Gotcha 1: order не гарантирован
@task
def list_files() -> list:
return os.listdir("/data") # ← Python order не гарантирован
map_index 0 может оказаться любым файлом. Downstream aggregation в порядке map_index, но это не порядок alphabetical/timestamp. Если порядок важен — sort upstream: return sorted(os.listdir("/data")).
Gotcha 2: huge cartesian произведение
process.expand(env=envs, region=regions, model=models) # 5×5×10 = 250
Если каждый element list — несколько → быстро превышает max_map_length. Используйте explicit pairs через .expand_kwargs (урок 03).
Gotcha 3: side effects в expand callable
@task
def get_items() -> list:
items = []
for x in range(100):
send_metric("get_items_called", x) # ← side effect
items.append(x)
return items
Если upstream task имеет side effects (метрики, запись в DB) — они происходят на одном execution (как обычно). Но mapped tasks не “видят” side effects — они получают только return value через XCom.
Gotcha 4: Type annotation должна быть Iterable
@task
def get_items() -> int: # ❌ int — не iterable
return 42
process.expand(x=get_items()) # AirflowException на expand
Upstream task должен возвращать list, tuple, dict (для kwargs) или другой iterable. Иначе scheduler не может посчитать length.
Gotcha 5: dict в expand → keys not values
@task
def get_dict() -> dict:
return {"a": 1, "b": 2, "c": 3}
process.expand(x=get_dict()) # iterates по keys! ['a','b','c']
Если хотите values: process.expand(x=list(get_dict().values())). Если хотите pairs (key+value) — лучше expand_kwargs (урок 03).