Learning Platform
Глоссарий Troubleshooting
Урок 12.02 · 30 мин
Продвинутый
ConcurrencyParallelismPoolsmax_active_tasksmax_active_runs

Пять уровней concurrency

В Airflow 2.10/2.11 одновременно работают пять независимых ограничителей на то, сколько task instances может находиться в состоянии running или queued. Все пять проверяются на каждом scheduling tick, и выигрывает (т.е. ограничивает) самый строгий из них. Понимание этой иерархии — обязательное условие для production tuning: если tasks застревают в scheduled, диагноз почти всегда сводится к «какой из пяти лимитов сработал».

В Airflow 3.x некоторые имена изменились (например, dag_concurrency deprecated в пользу max_active_tasks_per_dag), но в 2.10/2.11 LTS оба ещё рабочие — старое имя как alias.


Карта пяти уровней

Пять уровней concurrency — от cluster к task
Level 1: Cluster-wide[core] parallelism — потолок ВСЕХ одновременно running + queued TI на весь Airflow deployment. Default 32. Не зависит от количества DAG, workers, executors. Один глобальный счётчик в metadata DB.
cluster cap →
Level 2: Per DAG (tasks)[core] max_active_tasks_per_dag (бывший dag_concurrency, alias всё ещё работает в 2.x). Default 16. Максимум одновременно running TI ВНУТРИ одного DAG (по всем active DagRuns суммарно). Можно override per DAG через @dag(max_active_tasks=N).
per-DAG cap →
Level 3: Per DAG (runs)[core] max_active_runs_per_dag. Default 16. Максимум одновременно active DagRun одного DAG. Если 16 уже running, новый scheduled DagRun ждёт. Override через @dag(max_active_runs=N) — обычно ставят 1 для sequential ETL.
per-task cap →
Level 4: Per task@task(max_active_tis_per_dag=N) — старое имя task_concurrency, оба работают в 2.x. Default unlimited. Максимум одновременных running TI одной конкретной task ACROSS active DagRuns. Полезно для idempotency-критичных tasks (например, write в single Snowflake table).
pool gate →
Level 5: Pool slots@task(pool='name', pool_slots=N). Slot ограничение разделяемое между DAG. Pool = шарируемый resource quota. Critical section scheduler берёт row-level lock на slot_pool table — это самый защищённый layer от over-allocation.

Все пять — AND-ограничения: TI запустится только если все пять разрешают её. Самый узкий — bottleneck.


Level 1: [core] parallelism — cluster-wide

Это глобальный потолок на весь Airflow deployment. Параметр в airflow.cfg:

[core]
parallelism = 32

Что считается:

  • Сколько task instances могут быть в running или queued одновременно — на весь кластер.
  • Это count не по workers, не по executors — а по записям в metadata DB.

Где проверяется в scheduler loop (псевдокод):

# scheduler_job_runner.py, упрощённо
def _executable_task_instances_to_queued(self, max_tis: int, session):
    open_slots = self.executor.slots_available
    parallelism = conf.getint("core", "parallelism")
    running_count = session.query(TaskInstance).filter(
        TaskInstance.state.in_([State.RUNNING, State.QUEUED])
    ).count()
    available = min(max_tis, parallelism - running_count, open_slots)
    if available <= 0:
        return []
    # ... выбираем available штук TI с учётом всех остальных лимитов

Когда крутить:

  • Default 32 мало для production — типичные значения 128–512 для medium, 1000+ для large.
  • Не ставьте больше, чем суммарно worker_concurrency * число_workers (CeleryExecutor) — иначе TI queued, но negde executed.
WARNING

[core] parallelism — это потолок на состояние TI, а не на физический throughput. Если у вас 4 Celery worker × 16 concurrency = 64 worker slots, ставить parallelism = 1000 бессмысленно: 936 TI будут просто стоять в queued, забивая Celery очередь и съедая Redis memory. Согласуйте parallelism ≈ sum(worker_concurrency) для CeleryExecutor.


Level 2: max_active_tasks_per_dag — per DAG

Сколько одновременно running TI в пределах одного DAG (суммарно по всем active DagRuns этого DAG).

Конфигурация — два уровня:

# airflow.cfg — global default
[core]
max_active_tasks_per_dag = 16
# alias dag_concurrency = 16 — то же самое в 2.x
# DAG-level override
@dag(
    schedule="@hourly"
    max_active_tasks=32,  # 2.x новое имя; concurrency=32 — старое, всё ещё работает
)
def my_dag():
    ...

Когда поднимать:

  • DAG имеет много independent parallel branches (например, fan-out на 50 mapped tasks)
  • Сам DAG лёгкий по resources, но широкий

Когда понижать:

  • DAG делает много writes в один backend (например, Postgres replica) — лучше limit до 4-8
  • Backfill — иначе 1 DagRun съест весь cluster parallelism

Level 3: max_active_runs_per_dag — сколько DagRun одновременно

Не путать с уровнем 2. Этот ограничивает сколько DagRun одного DAG могут быть running одновременно.

@dag(
    schedule="@hourly"
    max_active_runs=1,  # ← sequential: следующий час не стартует, пока текущий не закончил
)
def sequential_etl():
    ...

Default — [core] max_active_runs_per_dag = 16. Это default достаточно высокий, чтобы catchup за неделю backfill-ил параллельно.

Когда max_active_runs=1 — почти всегда правильно для:

  • Tasks с side effects (writes в одну таблицу)
  • ETL с накопительным state (insert-on-conflict)
  • Long-running pipelines где параллельные runs съели бы внешний resource

Когда оставлять default 16:

  • Backfill за длинный период — параллельные runs ускорят
  • DAG идемпотентный, independent per data interval
NOTE

max_active_runs — самый частый «забытый» ограничитель. Симптом: вы стартуете backfill за 30 дней, но видите только 16 active DagRun одновременно, остальные queued. Это потому что 16 — default. Поднимите до 30 или временно через airflow dags backfill --reset-dagruns.


Level 4: max_active_tis_per_dag — per-task limit

Самый специфичный из всех. Ограничивает сколько TI одной конкретной task могут быть running одновременно — across DagRuns одного DAG.

@task(max_active_tis_per_dag=1)
def write_to_snowflake_orders():
    # ← Только одна копия этой task running, даже если 5 DagRuns active
    ...

Старое имя task_concurrency всё ещё работает в 2.x как alias. В 2.10 рекомендуется новое max_active_tis_per_dag.

Use cases:

  • Tasks с non-idempotent writes (single row INSERT на UNIQUE key)
  • Tasks использующие external mutex (file lock, advisory lock в PG)
  • Sensor-like tasks, которые “polling” одну external систему — не нужно 5 параллельных pollers

Обратите внимание: это per-DAG, не глобально. Если та же function запущена в 3 разных DAG — у каждого свой счётчик. Для cross-DAG mutex используйте pool с slots=1.


Level 5: Pool slots

Самый мощный и самый правильный уровень для protection of shared external resources (Snowflake warehouse, RDS, GPU pool).

@task(pool="snowflake_etl", pool_slots=2)
def heavy_etl_query():
    ...

Pool создаётся в UI или через CLI/API. Подробно — следующий урок. Здесь важно понять место в иерархии:

  • Pool — это shared quota across DAGs. Один pool snowflake_etl с 10 slots ограничит сумму всех @task(pool="snowflake_etl") всего deployment до 10 одновременных.
  • Pool decisions делаются внутри critical section scheduler (row-level lock на slot_pool). Это даёт atomicity — невозможно over-allocate даже под multi-scheduler HA.

Чем pool отличается от levels 1-4:

  • Levels 1-4 — конфиг (cfg + DAG/task kwargs)
  • Pool — runtime resource в DB, можно менять без редеплоя через UI/CLI/API

Decision flow: какой ограничивает первым

На каждом scheduling tick scheduler фильтрует scheduled TI через все пять уровней. Псевдокод:

def find_runnable_tis(session):
    # 1. Cluster cap
    cluster_open = parallelism - running_count_global(session)
    if cluster_open <= 0:
        return []  # ← Bottleneck Level 1

    candidates = scheduled_tis_ordered_by_priority(session)

    runnable = []
    per_dag_count = defaultdict(int)
    per_task_count = defaultdict(int)
    pool_used = current_pool_usage(session)

    for ti in candidates:
        dag = ti.dag
        # 2. DAG tasks cap
        if per_dag_count[dag.dag_id] >= dag.max_active_tasks:
            continue  # ← Level 2

        # 3. DagRun cap уже отфильтрован при создании DagRun
        # 4. Per-task cap
        task_key = (dag.dag_id, ti.task_id)
        if per_task_count[task_key] >= ti.task.max_active_tis_per_dag:
            continue  # ← Level 4

        # 5. Pool
        pool = ti.pool
        if pool_used[pool] + ti.pool_slots > pool_capacity[pool]:
            continue  # ← Level 5

        runnable.append(ti)
        per_dag_count[dag.dag_id] += 1
        per_task_count[task_key] += 1
        pool_used[pool] += ti.pool_slots

        if len(runnable) >= cluster_open:
            break  # ← снова Level 1

    return runnable

Главные моменты:

  1. Level 1 проверяется первым — если cluster забит, остальные не имеют значения.
  2. Level 3 уже отработал раньше — DagRun не создан, если runs cap превышен. На этом этапе мы работаем со scheduled TI существующих DagRun.
  3. Level 5 в самом конце — это финальный gate, и он происходит внутри critical section с row-level lock на slot_pool.

Реальный пример: где какой уровень

Сценарий: DAG daily_orders_etl с 30 mapped tasks process_partition, fan-out по 30 partition. Кластер с parallelism=128, default max_active_tasks_per_dag=16, pool warehouse со 8 slots.

УровеньЛимитЭффект
1. Cluster (128)Много свободноНе bottleneck
2. DAG tasks (16)Только 16 из 30 одновременноBottleneck: остальные 14 в queued
3. DAG runs (16 default)1 active DagRunНе bottleneck
4. Per-task (unlimited)Не заданНе bottleneck
5. Pool warehouse (8)Будет bottleneck когда выйдет из level 2После level 2 — следующий gate

Если хотим запускать все 30 параллельно — поднять max_active_tasks=30. Но тогда выйдет следующий ограничитель — pool slots = 8. То есть финальный bottleneck — pool, и поднимать max_active_tasks нет смысла, пока pool не расширен.


SQL для diagnosis

Проверить current usage пяти уровней:

-- Level 1: cluster
SELECT COUNT(*) AS running_total
FROM task_instance
WHERE state IN ('running', 'queued');

-- Level 2: per DAG
SELECT dag_id, COUNT(*) AS running
FROM task_instance
WHERE state IN ('running', 'queued')
GROUP BY dag_id
ORDER BY running DESC
LIMIT 10;

-- Level 3: active DagRuns per DAG
SELECT dag_id, COUNT(*) AS active_runs
FROM dag_run
WHERE state = 'running'
GROUP BY dag_id
ORDER BY active_runs DESC;

-- Level 4: per task in DAG
SELECT dag_id, task_id, COUNT(*) AS running
FROM task_instance
WHERE state IN ('running', 'queued')
GROUP BY dag_id, task_id
HAVING COUNT(*) > 1
ORDER BY running DESC;

-- Level 5: pool slots
SELECT pool, slots, used_slots, queued_slots, scheduled_slots
FROM slot_pool;

Эти пять запросов — стандартный «kit» для on-call инженера, когда DAG не запускается.


Production gotchas

1. Default max_active_runs_per_dag=16 кусает на backfill. Сделали backfill за 30 дней — ожидаете 30 параллельных runs, видите 16. Это default.

2. [core] parallelism=32 не подходит для production. Default из dev-окружения. Поднять до min(sum(worker_concurrency), разумное число в 200–500).

3. Pool slots = 128 в default_pool — не означает «128 одновременно». Если task без явного pool, она использует default_pool. Если default_pool забит — все no-pool tasks заблокированы. Best practice: per-resource pools, default — резерв.

4. max_active_tis_per_dag=1 не работает кросс-DAG. Если одна и та же write_orders функция дублирована в двух DAG — счётчик per-DAG. Для cross-DAG mutex — pool с slots=1.

5. Mapped tasks (Dynamic Task Mapping) учитываются как отдельные TI каждая. Один DAG с 1000 mapped tasks — это 1000 TI. Если у DAG max_active_tasks=16 и pool unlimited — будет 16 одновременно, остальные queued. Это часто counter-intuitive.


Проверка знанийKnowledge check
DAG имеет 30 mapped tasks, max_active_tasks_per_dag=16 (default), pool 'warehouse' со slots=8. Сколько mapped TI могут running одновременно?
ОтветAnswer
Восемь. Все пять уровней работают как AND: TI запустится только если все пять разрешают. Здесь cluster parallelism=128 не bottleneck, max_active_runs=16 default не bottleneck (1 DagRun), max_active_tis_per_dag не задан, max_active_tasks_per_dag=16 пропускает первые 16, но pool 'warehouse' имеет только 8 slots — это финальный gate. Восемь TI running, восемь queued (consumed pool slot не получен), четырнадцать (30 - 16) даже до pool check не доходят — заблокированы level 2. Если поднять max_active_tasks=30, всё равно будет 8 одновременных — pool bottleneck. Чтобы реально получить 30 параллельно, нужно: max_active_tasks≥30 AND pool slots≥30 AND cluster parallelism достаточный. Это типичный production debugging case: tasks застряли — последовательно проверять все 5 уровней через SQL queries к task_instance / dag_run / slot_pool.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Какой из пяти уровней concurrency проверяется первым и блокирует scheduling даже если все остальные свободны?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6