Пять уровней concurrency
В Airflow 2.10/2.11 одновременно работают пять независимых ограничителей на то, сколько task instances может находиться в состоянии running или queued. Все пять проверяются на каждом scheduling tick, и выигрывает (т.е. ограничивает) самый строгий из них. Понимание этой иерархии — обязательное условие для production tuning: если tasks застревают в scheduled, диагноз почти всегда сводится к «какой из пяти лимитов сработал».
В Airflow 3.x некоторые имена изменились (например, dag_concurrency deprecated в пользу max_active_tasks_per_dag), но в 2.10/2.11 LTS оба ещё рабочие — старое имя как alias.
Карта пяти уровней
Все пять — AND-ограничения: TI запустится только если все пять разрешают её. Самый узкий — bottleneck.
Level 1: [core] parallelism — cluster-wide
Это глобальный потолок на весь Airflow deployment. Параметр в airflow.cfg:
[core]
parallelism = 32
Что считается:
- Сколько task instances могут быть в
runningилиqueuedодновременно — на весь кластер. - Это count не по workers, не по executors — а по записям в metadata DB.
Где проверяется в scheduler loop (псевдокод):
# scheduler_job_runner.py, упрощённо
def _executable_task_instances_to_queued(self, max_tis: int, session):
open_slots = self.executor.slots_available
parallelism = conf.getint("core", "parallelism")
running_count = session.query(TaskInstance).filter(
TaskInstance.state.in_([State.RUNNING, State.QUEUED])
).count()
available = min(max_tis, parallelism - running_count, open_slots)
if available <= 0:
return []
# ... выбираем available штук TI с учётом всех остальных лимитов
Когда крутить:
- Default 32 мало для production — типичные значения 128–512 для medium, 1000+ для large.
- Не ставьте больше, чем суммарно
worker_concurrency * число_workers(CeleryExecutor) — иначе TI queued, но negde executed.
[core] parallelism — это потолок на состояние TI, а не на физический throughput. Если у вас 4 Celery worker × 16 concurrency = 64 worker slots, ставить parallelism = 1000 бессмысленно: 936 TI будут просто стоять в queued, забивая Celery очередь и съедая Redis memory. Согласуйте parallelism ≈ sum(worker_concurrency) для CeleryExecutor.
Level 2: max_active_tasks_per_dag — per DAG
Сколько одновременно running TI в пределах одного DAG (суммарно по всем active DagRuns этого DAG).
Конфигурация — два уровня:
# airflow.cfg — global default
[core]
max_active_tasks_per_dag = 16
# alias dag_concurrency = 16 — то же самое в 2.x
# DAG-level override
@dag(
schedule="@hourly"
max_active_tasks=32, # 2.x новое имя; concurrency=32 — старое, всё ещё работает
)
def my_dag():
...
Когда поднимать:
- DAG имеет много independent parallel branches (например, fan-out на 50 mapped tasks)
- Сам DAG лёгкий по resources, но широкий
Когда понижать:
- DAG делает много writes в один backend (например, Postgres replica) — лучше limit до 4-8
- Backfill — иначе 1 DagRun съест весь cluster parallelism
Level 3: max_active_runs_per_dag — сколько DagRun одновременно
Не путать с уровнем 2. Этот ограничивает сколько DagRun одного DAG могут быть running одновременно.
@dag(
schedule="@hourly"
max_active_runs=1, # ← sequential: следующий час не стартует, пока текущий не закончил
)
def sequential_etl():
...
Default — [core] max_active_runs_per_dag = 16. Это default достаточно высокий, чтобы catchup за неделю backfill-ил параллельно.
Когда max_active_runs=1 — почти всегда правильно для:
- Tasks с side effects (writes в одну таблицу)
- ETL с накопительным state (insert-on-conflict)
- Long-running pipelines где параллельные runs съели бы внешний resource
Когда оставлять default 16:
- Backfill за длинный период — параллельные runs ускорят
- DAG идемпотентный, independent per data interval
max_active_runs — самый частый «забытый» ограничитель. Симптом: вы стартуете backfill за 30 дней, но видите только 16 active DagRun одновременно, остальные queued. Это потому что 16 — default. Поднимите до 30 или временно через airflow dags backfill --reset-dagruns.
Level 4: max_active_tis_per_dag — per-task limit
Самый специфичный из всех. Ограничивает сколько TI одной конкретной task могут быть running одновременно — across DagRuns одного DAG.
@task(max_active_tis_per_dag=1)
def write_to_snowflake_orders():
# ← Только одна копия этой task running, даже если 5 DagRuns active
...
Старое имя task_concurrency всё ещё работает в 2.x как alias. В 2.10 рекомендуется новое max_active_tis_per_dag.
Use cases:
- Tasks с non-idempotent writes (single row INSERT на UNIQUE key)
- Tasks использующие external mutex (file lock, advisory lock в PG)
- Sensor-like tasks, которые “polling” одну external систему — не нужно 5 параллельных pollers
Обратите внимание: это per-DAG, не глобально. Если та же function запущена в 3 разных DAG — у каждого свой счётчик. Для cross-DAG mutex используйте pool с slots=1.
Level 5: Pool slots
Самый мощный и самый правильный уровень для protection of shared external resources (Snowflake warehouse, RDS, GPU pool).
@task(pool="snowflake_etl", pool_slots=2)
def heavy_etl_query():
...
Pool создаётся в UI или через CLI/API. Подробно — следующий урок. Здесь важно понять место в иерархии:
- Pool — это shared quota across DAGs. Один pool
snowflake_etlс 10 slots ограничит сумму всех@task(pool="snowflake_etl")всего deployment до 10 одновременных. - Pool decisions делаются внутри critical section scheduler (row-level lock на
slot_pool). Это даёт atomicity — невозможно over-allocate даже под multi-scheduler HA.
Чем pool отличается от levels 1-4:
- Levels 1-4 — конфиг (cfg + DAG/task kwargs)
- Pool — runtime resource в DB, можно менять без редеплоя через UI/CLI/API
Decision flow: какой ограничивает первым
На каждом scheduling tick scheduler фильтрует scheduled TI через все пять уровней. Псевдокод:
def find_runnable_tis(session):
# 1. Cluster cap
cluster_open = parallelism - running_count_global(session)
if cluster_open <= 0:
return [] # ← Bottleneck Level 1
candidates = scheduled_tis_ordered_by_priority(session)
runnable = []
per_dag_count = defaultdict(int)
per_task_count = defaultdict(int)
pool_used = current_pool_usage(session)
for ti in candidates:
dag = ti.dag
# 2. DAG tasks cap
if per_dag_count[dag.dag_id] >= dag.max_active_tasks:
continue # ← Level 2
# 3. DagRun cap уже отфильтрован при создании DagRun
# 4. Per-task cap
task_key = (dag.dag_id, ti.task_id)
if per_task_count[task_key] >= ti.task.max_active_tis_per_dag:
continue # ← Level 4
# 5. Pool
pool = ti.pool
if pool_used[pool] + ti.pool_slots > pool_capacity[pool]:
continue # ← Level 5
runnable.append(ti)
per_dag_count[dag.dag_id] += 1
per_task_count[task_key] += 1
pool_used[pool] += ti.pool_slots
if len(runnable) >= cluster_open:
break # ← снова Level 1
return runnable
Главные моменты:
- Level 1 проверяется первым — если cluster забит, остальные не имеют значения.
- Level 3 уже отработал раньше — DagRun не создан, если runs cap превышен. На этом этапе мы работаем со scheduled TI существующих DagRun.
- Level 5 в самом конце — это финальный gate, и он происходит внутри critical section с row-level lock на
slot_pool.
Реальный пример: где какой уровень
Сценарий: DAG daily_orders_etl с 30 mapped tasks process_partition, fan-out по 30 partition. Кластер с parallelism=128, default max_active_tasks_per_dag=16, pool warehouse со 8 slots.
| Уровень | Лимит | Эффект |
|---|---|---|
| 1. Cluster (128) | Много свободно | Не bottleneck |
| 2. DAG tasks (16) | Только 16 из 30 одновременно | ← Bottleneck: остальные 14 в queued |
| 3. DAG runs (16 default) | 1 active DagRun | Не bottleneck |
| 4. Per-task (unlimited) | Не задан | Не bottleneck |
| 5. Pool warehouse (8) | Будет bottleneck когда выйдет из level 2 | После level 2 — следующий gate |
Если хотим запускать все 30 параллельно — поднять max_active_tasks=30. Но тогда выйдет следующий ограничитель — pool slots = 8. То есть финальный bottleneck — pool, и поднимать max_active_tasks нет смысла, пока pool не расширен.
SQL для diagnosis
Проверить current usage пяти уровней:
-- Level 1: cluster
SELECT COUNT(*) AS running_total
FROM task_instance
WHERE state IN ('running', 'queued');
-- Level 2: per DAG
SELECT dag_id, COUNT(*) AS running
FROM task_instance
WHERE state IN ('running', 'queued')
GROUP BY dag_id
ORDER BY running DESC
LIMIT 10;
-- Level 3: active DagRuns per DAG
SELECT dag_id, COUNT(*) AS active_runs
FROM dag_run
WHERE state = 'running'
GROUP BY dag_id
ORDER BY active_runs DESC;
-- Level 4: per task in DAG
SELECT dag_id, task_id, COUNT(*) AS running
FROM task_instance
WHERE state IN ('running', 'queued')
GROUP BY dag_id, task_id
HAVING COUNT(*) > 1
ORDER BY running DESC;
-- Level 5: pool slots
SELECT pool, slots, used_slots, queued_slots, scheduled_slots
FROM slot_pool;
Эти пять запросов — стандартный «kit» для on-call инженера, когда DAG не запускается.
Production gotchas
1. Default max_active_runs_per_dag=16 кусает на backfill. Сделали backfill за 30 дней — ожидаете 30 параллельных runs, видите 16. Это default.
2. [core] parallelism=32 не подходит для production. Default из dev-окружения. Поднять до min(sum(worker_concurrency), разумное число в 200–500).
3. Pool slots = 128 в default_pool — не означает «128 одновременно». Если task без явного pool, она использует default_pool. Если default_pool забит — все no-pool tasks заблокированы. Best practice: per-resource pools, default — резерв.
4. max_active_tis_per_dag=1 не работает кросс-DAG. Если одна и та же write_orders функция дублирована в двух DAG — счётчик per-DAG. Для cross-DAG mutex — pool с slots=1.
5. Mapped tasks (Dynamic Task Mapping) учитываются как отдельные TI каждая. Один DAG с 1000 mapped tasks — это 1000 TI. Если у DAG max_active_tasks=16 и pool unlimited — будет 16 одновременно, остальные queued. Это часто counter-intuitive.