Learning Platform
Глоссарий Troubleshooting
Урок 08.05 · 28 мин
Продвинутый
Dynamic MappingScalingPerformanceBatch Processingmax_active_tis_per_dag

Scaling pitfalls — когда mapping вредит больше чем помогает

Dynamic Task Mapping мощный инструмент, но в production легко превращается в anti-pattern. Команды видят .expand(item=huge_list) и радуются “wow, parallelism из коробки”, не понимая что они только что создали 5000 mapped TI которые ломают scheduler.

В этом уроке — конкретные scaling impacts, как измерять, какие limits включить, и когда batch processing внутри одного task в 100× эффективнее чем 100 mapped tasks.


Что происходит при 1000 mapped TI

Возьмём .expand() с 1000 elements:

ResourceImpact
task_instance rows1000 INSERT при expand_mapped_task
xcom rows1000 (если каждая return value)
Log files1000 separate log files per run
Scheduler tick durationPhase 2 processes 1000 TI — секунды
Critical sectionLonger (более TI candidates для enqueue)
UI renderGrid view 1000 cells — slow scroll, browser hang
DB indexesBloat в task_instance, xcom indexes
Executor pressure1000 jobs to enqueue (если parallel allowed)
Worker startup1000 process forks или Celery tasks
Memory aggregationDownstream pulls 1000 XCom — может OOM

Если у вас 10 таких DAG × daily run × 1000 mapped TI = 10000 TI/day. Scheduler critical section длится дольше, replication lag растёт, UI становится unusable.


Real production incident

Команда написала DAG, который daily process S3 prefix:

@task
def list_files() -> list:
    s3 = S3Hook(...)
    return s3.list_keys(bucket="raw", prefix="2026-05")

@task
def process_file(key: str):
    # download, transform, upload
    pass

process_file.expand(key=list_files())

Первые недели — 20-30 files/day. Работало нормально. После migration upstream system — 5000 files/day.

**Last:

  • Scheduler tick длительность 30s+ (норма ~5s)
  • Critical section блокирует другие DAGs
  • UI grid view 5000 cells — chrome hang
  • DB size grows fast — task_instance table +2GB/week

Fix: batch processing внутри одного task — обработка 5000 файлов в одном Python task с pandas.

@task
def process_all_files():
    s3 = S3Hook(...)
    keys = s3.list_keys(bucket="raw", prefix="2026-05")
    for key in keys:
        # обработка
        ...

Один TI вместо 5000. Scheduler load — нулевой. UI быстрый. Простой код.


Когда mapping vs batch — decision framework

Mapping vs Batch Processing — decision tree
N items для обработкиСтартовая точка. Сколько items? 10? 100? 10000? Это не единственный фактор, но важный.
N > 100?
N маленькое (< 50)Под 50 mapped TI — OK, не bottleneck. Можно mapping.
N большое (100-1000)Серая зона. Зависит от других вопросов.
N огромное (>1000)Strong сигнал что mapping anti-pattern. Сначала рассмотрите batch.
нужен isolated retry per item?
Да — retry criticalЕсли items fail independently и retry expensive (re-do whole batch). Каждая mapped TI имеет own retry count.
Нет — failure ok all-or-nothingЕсли failure одного item OK дропнуть batch или handle internally — batch.
нужны independent logs?
Да — separate per itemКаждая mapped TI имеет own log file — debug easier для individual failures. Можно clear/retry один map_index.
Нет — combined log OKSingle log файл с loop iterations — debugging нормально для homogeneous processing.
item processing heavy (>10s)?
Да — heavy itemsЕсли каждый item 10s+, mapping parallelism настоящий win — 10 mapped TI × 10s vs 100s sequential.
Нет — items быстрые (ms)Если items быстрые — overhead Airflow (queue, schedule, exec) больше чем processing. Batch намного эффективнее.

Quantitative comparison

500 items, каждый 5s processing:

Mapping approach

process.expand(item=get_items())   # 500 mapped TI
  • Scheduling overhead: 500 × ~0.1s = 50s scheduler time
  • Worker startup: 500 × ~0.5s = 250s (Celery) или 500 × ~0.1s = 50s (Kubernetes)
  • Actual processing: 500 × 5s = 2500s total work
  • Parallelism: depends на pool/workers — say 16 workers → wall time ~160s + overhead
  • DB load: 500 INSERT task_instance + 500 INSERT xcom + 500 UPDATE state-changes × 5 (scheduled→queued→running→success)
  • Total wall time: ~3 minutes (parallel)
  • DB rows: 2500+ (500 TI + 500 XCom + 1500 state transitions)

Batch approach

@task
def process_all():
    items = get_items()
    for item in items:
        process(item)
  • Scheduling overhead: 1 task = negligible
  • Worker startup: 1 × 0.5s
  • Processing: 500 × 5s = 2500s sequential = ~42 minutes
  • DB rows: 2 (1 TI + 1 XCom)

Hybrid — chunks

def chunked(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i+n]

@task
def process_chunk(chunk: list):
    for item in chunk:
        process(item)

process_chunk.expand(chunk=list(chunked(get_items(), 25)))   # 20 mapped TI
  • 20 mapped TI × 25 items × 5s = 2500s total work
  • 16 workers parallel → wall time ~165s (similar to full mapping)
  • DB rows: 40 + state transitions
  • Much less scheduler load

Sweet spot: chunk size 10-50 items per mapped TI. Получаете parallelism + low overhead.


max_active_tis_per_dag — limit concurrency

Default mapped tasks могут run все одновременно (if pool/workers allow). Это часто backfire — DB connections overload, downstream service throttle.

@task(max_active_tis_per_dag=10)
def process_file(key: str):
    pass

process_file.expand(key=get_keys())   # 500 mapped TI, но не более 10 concurrent

max_active_tis_per_dag — per-task limit. Scheduler в Phase 3 не enqueue more than N concurrent for this task. Остальные stuck в state=scheduled до slot free.

Use cases:

  • API rate limits (process 5 requests/sec)
  • DB connection limits
  • External service throttling

vs pool

pool — globally shared concurrency limit (cross-DAG):

@task(pool="api_calls", pool_slots=1)
def call_api():
    pass

call_api.expand(req=requests)

api_calls pool size 10 → max 10 concurrent calls across all DAG-ов. max_active_tis_per_dag — только для этой mapped task.

Combine: pool для cross-DAG limit, max_active_tis_per_dag для intra-DAG fine-tune.


Critical section impact

Recap (Module 04): scheduler critical section — serialized lock на slot_pool, в течение которой Phase 3 enqueues TI scheduled→queued.

1000 mapped TI all at once:

critical section:
  pools = SELECT FROM slot_pool FOR UPDATE NOWAIT
  for each of 1000 mapped TI:
    check pool, concurrency, max_active_tis_per_dag
    UPDATE task_instance SET state='queued'
    UPDATE slot_pool SET used_slots = ...
  commit  ← lock released

Если каждое решение для TI занимает 1ms — 1000 × 1ms = 1s critical section. Это держит lock 1s, blocking другие schedulers (если HA). На production multi-scheduler — это ощутимо.

Mitigation:

  • [scheduler] max_tis_per_query = 16 (default) — критическая секция processes batch of 16 TI, потом releases lock. Меньше batch — короче lock, but slower throughput.
  • max_active_tis_per_dag — меньше eligible TI per tick.

UI rendering pitfall

UI grid view рендерит cell per TI per map_index. 1000 mapped TI = 1000 cells.

Chrome/Firefox handling 1000 cells:

  • Initial render ~5-15s
  • Scroll lag
  • Click responsiveness — bad

10k mapped TI — UI crashes browser.

Newer Airflow (2.7+) добавил pagination для mapped task views, но всё equal — UI not designed для huge mappings.


Logging volume

Каждая mapped TI имеет own log:

logs/dag_id=my_dag/run_id=2026-05-12/task_id=process/map_index=0/attempt=1.log
logs/dag_id=my_dag/run_id=2026-05-12/task_id=process/map_index=1/attempt=1.log
...

1000 mapped TI × 100 KB log each = 100 MB per run. Если remote logging (S3) — 1000 S3 PUT per run.

S3 LIST для UI — slow. Glob через log_filename_template — explodes.

Mitigation:

  • Remote logging с appropriate prefix structure
  • Log rotation policies
  • Conscious logging level (warning vs debug)

DB indexes bloat

task_instance indexes:

  • (dag_id, task_id, run_id, map_index) — unique
  • (state) для scheduling
  • (dag_id, execution_date) для historical queries

1000 mapped TI × 24 runs/day × 100 DAGs = 2.4M new rows/day. Через год — миллиарды.

Mitigation:

  • airflow db clean — periodic cleanup старых runs
  • Partitioning task_instance by execution_date (manual setup)
  • Archival pipeline

Decision matrix

ScenarioApproach
Process 1000 fast items (<1s each)Batch — overhead dominates
Process 100 slow items (>30s each)Mapping — parallelism wins
Need per-item retryMapping (only way)
Process unbounded N (could be 10k)Batch with internal parallelism (multiprocessing)
Process 500 items with rate limitMapping + max_active_tis_per_dag=10
Process external service shards (5-10)Mapping (clean semantics)
Process daily DB snapshots (huge data, fewest items)Mapping

Pattern: chunked mapping

Best of both worlds для medium N:

@task
def get_items() -> list:
    return list(range(10000))   # 10k items

@task
def chunk_items(items: list, chunk_size: int = 100) -> list[list]:
    return [items[i:i+chunk_size] for i in range(0, len(items), chunk_size)]

@task
def process_chunk(chunk: list):
    for item in chunk:
        process(item)
    return {"count": len(chunk)}

@task
def summarize(results: list[dict]):
    total = sum(r["count"] for r in results)
    print(f"Total: {total}")

items = get_items()
chunks = chunk_items(items)              # 10000 / 100 = 100 chunks
results = process_chunk.expand(chunk=chunks)   # 100 mapped TI
summarize(results)

100 mapped TI (manageable), each processing 100 items batch. Parallelism + low Airflow overhead.


Production checklist для mapping DAG

Перед deploy DAG с .expand():

  • N estimate: какое typical/max N? Если > 500 — рассмотрите batch/chunk.
  • max_map_length: будет ли превышаться? Set explicit cap.
  • max_active_tis_per_dag: нужен ли concurrency limit (rate limit, DB load)?
  • Worker capacity: достаточно ли workers для parallelism?
  • Pool: cross-DAG resource sharing — нужен pool?
  • Per-item retry: важна ли isolated retry? Если нет — batch.
  • Log volume: 1000 mapped × 100KB = 100MB/run — OK для storage?
  • Downstream aggregation: pulls N XCom — memory OK?
  • Custom XCom Backend: large XCom → S3 backend нужен?
  • UI usage: будут ли users часто просматривать grid view?

Hands-on: измерять mapping impact

Postgres metrics:

-- Counts per DAG за неделю
SELECT
    dag_id,
    COUNT(*) AS ti_count,
    SUM(CASE WHEN map_index >= 0 THEN 1 ELSE 0 END) AS mapped_count,
    COUNT(DISTINCT run_id) AS run_count,
    SUM(CASE WHEN map_index >= 0 THEN 1 ELSE 0 END) * 1.0 / COUNT(DISTINCT run_id) AS avg_mapped_per_run
FROM task_instance
WHERE start_date > now() - interval '7 days'
GROUP BY dag_id
ORDER BY mapped_count DESC
LIMIT 10;

Top DAGs по mapped TI count — candidates для batch refactoring.

Scheduler tick duration:

-- через logs или Prometheus
SELECT
    avg(duration_ms) AS avg_tick_ms,
    max(duration_ms) AS max_tick_ms,
    percentile_cont(0.99) WITHIN GROUP (ORDER BY duration_ms) AS p99_tick_ms
FROM scheduler_metrics
WHERE ts > now() - interval '1 hour';

Если p99 > 30s — scheduler перегружен, likely candidate mapping DAG.


Проверка знанийKnowledge check
DAG обрабатывает 2000 files daily через `.expand()`. Symptoms: scheduler slow, UI медленный. Какие 3 fix-а и в каком приоритете применить?
ОтветAnswer
**Приоритет 1 — refactoring на chunked mapping**: вместо 2000 mapped TI делать chunks по 50-100 items: `process_chunk.expand(chunk=chunked(files, 100))` → 20 mapped TI. Это reduce scheduler load в 100×, UI быстрый, parallelism сохраняется (20 параллельных chunks через 16 workers = почти всё). Это **самый impactful** fix без потери isolated retry на chunk-level. **Приоритет 2 — max_active_tis_per_dag**: если chunked не возможно (например, нужна per-file isolated retry), добавить `@task(max_active_tis_per_dag=20)` — limits concurrent mapped TI до 20 даже при 2000 scheduled. Это решает scheduler/DB pressure (одна задача в pool слот), но wall time увеличивается (2000/20 batches). Use case: rate limits, DB connection limits, external service throttle. **Приоритет 3 — увеличить max_map_length и tune scheduler**: если по логике batch processing не подходит и нужны все 2000 isolated TI: (a) `[core] max_map_length=4096`, (b) `[scheduler] max_tis_per_query=8` (smaller batches — короче critical section), (c) дополнительный scheduler для HA, (d) tune Postgres (pgbouncer, partition task_instance, indexes), (e) `airflow db clean` aggressive policy для удаления old TI. Это band-aid — реальное решение всё равно refactoring. **Plus**: Custom XCom Backend если каждая mapped TI пишет large XCom (offload в S3); remote logging правильный prefix structure для S3 LIST efficiency; UI usage education (use list view, не grid, для huge mappings).

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. DAG ежедневно обрабатывает 2000 files через .expand(). Production symptoms: scheduler tick 30s+, UI медленная. Какой fix приоритет #1?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6