Scaling pitfalls — когда mapping вредит больше чем помогает
Dynamic Task Mapping мощный инструмент, но в production легко превращается в anti-pattern. Команды видят .expand(item=huge_list) и радуются “wow, parallelism из коробки”, не понимая что они только что создали 5000 mapped TI которые ломают scheduler.
В этом уроке — конкретные scaling impacts, как измерять, какие limits включить, и когда batch processing внутри одного task в 100× эффективнее чем 100 mapped tasks.
Что происходит при 1000 mapped TI
Возьмём .expand() с 1000 elements:
| Resource | Impact |
|---|---|
| task_instance rows | 1000 INSERT при expand_mapped_task |
| xcom rows | 1000 (если каждая return value) |
| Log files | 1000 separate log files per run |
| Scheduler tick duration | Phase 2 processes 1000 TI — секунды |
| Critical section | Longer (более TI candidates для enqueue) |
| UI render | Grid view 1000 cells — slow scroll, browser hang |
| DB indexes | Bloat в task_instance, xcom indexes |
| Executor pressure | 1000 jobs to enqueue (если parallel allowed) |
| Worker startup | 1000 process forks или Celery tasks |
| Memory aggregation | Downstream pulls 1000 XCom — может OOM |
Если у вас 10 таких DAG × daily run × 1000 mapped TI = 10000 TI/day. Scheduler critical section длится дольше, replication lag растёт, UI становится unusable.
Real production incident
Команда написала DAG, который daily process S3 prefix:
@task
def list_files() -> list:
s3 = S3Hook(...)
return s3.list_keys(bucket="raw", prefix="2026-05")
@task
def process_file(key: str):
# download, transform, upload
pass
process_file.expand(key=list_files())
Первые недели — 20-30 files/day. Работало нормально. После migration upstream system — 5000 files/day.
**Last:
- Scheduler tick длительность 30s+ (норма ~5s)
- Critical section блокирует другие DAGs
- UI grid view 5000 cells — chrome hang
- DB size grows fast — task_instance table +2GB/week
Fix: batch processing внутри одного task — обработка 5000 файлов в одном Python task с pandas.
@task
def process_all_files():
s3 = S3Hook(...)
keys = s3.list_keys(bucket="raw", prefix="2026-05")
for key in keys:
# обработка
...
Один TI вместо 5000. Scheduler load — нулевой. UI быстрый. Простой код.
Когда mapping vs batch — decision framework
Quantitative comparison
500 items, каждый 5s processing:
Mapping approach
process.expand(item=get_items()) # 500 mapped TI
- Scheduling overhead: 500 × ~0.1s = 50s scheduler time
- Worker startup: 500 × ~0.5s = 250s (Celery) или 500 × ~0.1s = 50s (Kubernetes)
- Actual processing: 500 × 5s = 2500s total work
- Parallelism: depends на pool/workers — say 16 workers → wall time ~160s + overhead
- DB load: 500 INSERT task_instance + 500 INSERT xcom + 500 UPDATE state-changes × 5 (scheduled→queued→running→success)
- Total wall time: ~3 minutes (parallel)
- DB rows: 2500+ (500 TI + 500 XCom + 1500 state transitions)
Batch approach
@task
def process_all():
items = get_items()
for item in items:
process(item)
- Scheduling overhead: 1 task = negligible
- Worker startup: 1 × 0.5s
- Processing: 500 × 5s = 2500s sequential = ~42 minutes
- DB rows: 2 (1 TI + 1 XCom)
Hybrid — chunks
def chunked(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i+n]
@task
def process_chunk(chunk: list):
for item in chunk:
process(item)
process_chunk.expand(chunk=list(chunked(get_items(), 25))) # 20 mapped TI
- 20 mapped TI × 25 items × 5s = 2500s total work
- 16 workers parallel → wall time ~165s (similar to full mapping)
- DB rows: 40 + state transitions
- Much less scheduler load
Sweet spot: chunk size 10-50 items per mapped TI. Получаете parallelism + low overhead.
max_active_tis_per_dag — limit concurrency
Default mapped tasks могут run все одновременно (if pool/workers allow). Это часто backfire — DB connections overload, downstream service throttle.
@task(max_active_tis_per_dag=10)
def process_file(key: str):
pass
process_file.expand(key=get_keys()) # 500 mapped TI, но не более 10 concurrent
max_active_tis_per_dag — per-task limit. Scheduler в Phase 3 не enqueue more than N concurrent for this task. Остальные stuck в state=scheduled до slot free.
Use cases:
- API rate limits (process 5 requests/sec)
- DB connection limits
- External service throttling
vs pool
pool — globally shared concurrency limit (cross-DAG):
@task(pool="api_calls", pool_slots=1)
def call_api():
pass
call_api.expand(req=requests)
api_calls pool size 10 → max 10 concurrent calls across all DAG-ов. max_active_tis_per_dag — только для этой mapped task.
Combine: pool для cross-DAG limit, max_active_tis_per_dag для intra-DAG fine-tune.
Critical section impact
Recap (Module 04): scheduler critical section — serialized lock на slot_pool, в течение которой Phase 3 enqueues TI scheduled→queued.
1000 mapped TI all at once:
critical section:
pools = SELECT FROM slot_pool FOR UPDATE NOWAIT
for each of 1000 mapped TI:
check pool, concurrency, max_active_tis_per_dag
UPDATE task_instance SET state='queued'
UPDATE slot_pool SET used_slots = ...
commit ← lock released
Если каждое решение для TI занимает 1ms — 1000 × 1ms = 1s critical section. Это держит lock 1s, blocking другие schedulers (если HA). На production multi-scheduler — это ощутимо.
Mitigation:
[scheduler] max_tis_per_query = 16(default) — критическая секция processes batch of 16 TI, потом releases lock. Меньше batch — короче lock, but slower throughput.max_active_tis_per_dag— меньше eligible TI per tick.
UI rendering pitfall
UI grid view рендерит cell per TI per map_index. 1000 mapped TI = 1000 cells.
Chrome/Firefox handling 1000 cells:
- Initial render ~5-15s
- Scroll lag
- Click responsiveness — bad
10k mapped TI — UI crashes browser.
Newer Airflow (2.7+) добавил pagination для mapped task views, но всё equal — UI not designed для huge mappings.
Logging volume
Каждая mapped TI имеет own log:
logs/dag_id=my_dag/run_id=2026-05-12/task_id=process/map_index=0/attempt=1.log
logs/dag_id=my_dag/run_id=2026-05-12/task_id=process/map_index=1/attempt=1.log
...
1000 mapped TI × 100 KB log each = 100 MB per run. Если remote logging (S3) — 1000 S3 PUT per run.
S3 LIST для UI — slow. Glob через log_filename_template — explodes.
Mitigation:
- Remote logging с appropriate prefix structure
- Log rotation policies
- Conscious logging level (warning vs debug)
DB indexes bloat
task_instance indexes:
(dag_id, task_id, run_id, map_index)— unique(state)для scheduling(dag_id, execution_date)для historical queries
1000 mapped TI × 24 runs/day × 100 DAGs = 2.4M new rows/day. Через год — миллиарды.
Mitigation:
airflow db clean— periodic cleanup старых runs- Partitioning task_instance by execution_date (manual setup)
- Archival pipeline
Decision matrix
| Scenario | Approach |
|---|---|
| Process 1000 fast items (<1s each) | Batch — overhead dominates |
| Process 100 slow items (>30s each) | Mapping — parallelism wins |
| Need per-item retry | Mapping (only way) |
| Process unbounded N (could be 10k) | Batch with internal parallelism (multiprocessing) |
| Process 500 items with rate limit | Mapping + max_active_tis_per_dag=10 |
| Process external service shards (5-10) | Mapping (clean semantics) |
| Process daily DB snapshots (huge data, fewest items) | Mapping |
Pattern: chunked mapping
Best of both worlds для medium N:
@task
def get_items() -> list:
return list(range(10000)) # 10k items
@task
def chunk_items(items: list, chunk_size: int = 100) -> list[list]:
return [items[i:i+chunk_size] for i in range(0, len(items), chunk_size)]
@task
def process_chunk(chunk: list):
for item in chunk:
process(item)
return {"count": len(chunk)}
@task
def summarize(results: list[dict]):
total = sum(r["count"] for r in results)
print(f"Total: {total}")
items = get_items()
chunks = chunk_items(items) # 10000 / 100 = 100 chunks
results = process_chunk.expand(chunk=chunks) # 100 mapped TI
summarize(results)
100 mapped TI (manageable), each processing 100 items batch. Parallelism + low Airflow overhead.
Production checklist для mapping DAG
Перед deploy DAG с .expand():
- N estimate: какое typical/max N? Если > 500 — рассмотрите batch/chunk.
- max_map_length: будет ли превышаться? Set explicit cap.
- max_active_tis_per_dag: нужен ли concurrency limit (rate limit, DB load)?
- Worker capacity: достаточно ли workers для parallelism?
- Pool: cross-DAG resource sharing — нужен pool?
- Per-item retry: важна ли isolated retry? Если нет — batch.
- Log volume: 1000 mapped × 100KB = 100MB/run — OK для storage?
- Downstream aggregation: pulls N XCom — memory OK?
- Custom XCom Backend: large XCom → S3 backend нужен?
- UI usage: будут ли users часто просматривать grid view?
Hands-on: измерять mapping impact
Postgres metrics:
-- Counts per DAG за неделю
SELECT
dag_id,
COUNT(*) AS ti_count,
SUM(CASE WHEN map_index >= 0 THEN 1 ELSE 0 END) AS mapped_count,
COUNT(DISTINCT run_id) AS run_count,
SUM(CASE WHEN map_index >= 0 THEN 1 ELSE 0 END) * 1.0 / COUNT(DISTINCT run_id) AS avg_mapped_per_run
FROM task_instance
WHERE start_date > now() - interval '7 days'
GROUP BY dag_id
ORDER BY mapped_count DESC
LIMIT 10;
Top DAGs по mapped TI count — candidates для batch refactoring.
Scheduler tick duration:
-- через logs или Prometheus
SELECT
avg(duration_ms) AS avg_tick_ms,
max(duration_ms) AS max_tick_ms,
percentile_cont(0.99) WITHIN GROUP (ORDER BY duration_ms) AS p99_tick_ms
FROM scheduler_metrics
WHERE ts > now() - interval '1 hour';
Если p99 > 30s — scheduler перегружен, likely candidate mapping DAG.