Pools и Concurrency — обзор модуля
Concurrency в Airflow — это пять уровней ограничений, которые работают одновременно. Понимание их взаимодействия критично для предотвращения over-allocation внешних ресурсов (например, не положить Snowflake warehouse).
Уроки модуля
| # | Урок | Что внутри |
|---|---|---|
| 01 | Обзор модуля | Текущий урок |
| 02 | 5 уровней concurrency | Parallelism, max_active_tasks, dag_concurrency, task_concurrency, pool slots |
| 03 | Pool slots — gate в critical section | Row-level lock, why pool changes идемпотентны serialized |
| 04 | Priority Weight Strategies | downstream / upstream / absolute, как scheduler сортирует |
| 05 | Slot contention scenarios | Common deadlock patterns, mitigations |
Пять уровней concurrency
| # | Уровень | Конфиг | Default |
|---|---|---|---|
| 1 | Cluster-wide | [core] parallelism | 32 |
| 2 | Per DAG | dag_concurrency (default), max_active_tasks_per_dag | 16 |
| 3 | Per DAG run | max_active_runs_per_dag | 16 |
| 4 | Per task | task_concurrency (legacy), max_active_tis_per_dag | unlimited |
| 5 | Pool slots | slots in slot_pool table | per pool |
Любой из этих ограничивает execution. Самый ограничивающий выигрывает.
Priority Weight
@task(priority_weight=10)
def important_task(): ...
@task(priority_weight=1)
def normal_task(): ...
weight_rule:
downstream(default) — priority = own + sum of downstreamupstream— priority = own + sum of upstreamabsolute— only own value
Pool как critical section gate
Pool — это не просто счётчик. Это row в таблице slot_pool, на которую scheduler берёт row-level lock в critical section. Это значит:
- Все pool decisions atomically serialized
- Multi-scheduler конкурирует за pool через NOWAIT
- Нельзя over-allocate — даже под high concurrency
Killer pattern: Snowflake warehouse protection
@task(pool="snowflake_xs", pool_slots=2)
def heavy_query():
pass
В UI создаём pool snowflake_xs с 10 slots = 5 одновременных queries. Это предотвращает положить warehouse даже при 100 готовых tasks.
Killer takeaway
Если в production tasks стоят в queued state и не запускаются — первым делом смотреть pool occupancy:
SELECT pool, slots, used_slots, queued_slots, scheduled_slots
FROM slot_pool;
Если used_slots = slots — pool exhausted. Не tuning executor concurrency, не больше workers — это pool.