Pools — slot_pool model, default_pool, critical section gate
Pool — это не просто «счётчик» или абстракция в config. Это физическая строка в таблице slot_pool в metadata DB, на которую scheduler берёт row-level lock в critical section. Из-за этого pools — самый «защищённый» уровень concurrency control: невозможно over-allocate даже при multi-scheduler HA с миллионом scheduled tasks.
Этот урок препарирует data model pool, разбирает четыре счётчика slots / used_slots / queued_slots / scheduled_slots, объясняет роль default_pool (128 slots), показывает CRUD через UI/CLI/REST API и финально — почему pool_slots > 1 это правильный механизм для resource-heavy tasks.
Таблица slot_pool
Pool — это row. Схема в 2.10/2.11:
CREATE TABLE slot_pool (
id SERIAL PRIMARY KEY,
pool VARCHAR(256) NOT NULL UNIQUE,
slots INTEGER, -- capacity
description TEXT,
include_deferred BOOLEAN NOT NULL DEFAULT FALSE -- 2.7+
);
Что важно:
pool— уникальное имяslots— capacity, integer (может быть отрицательным — это значит pool “disabled”)include_deferred(2.7+) — учитывать ли deferred TI при подсчётеqueued_slots- Нет колонок
used_slots,queued_slots— они вычисляются динамически изtask_instanceчерез JOIN
Это критично: scheduler НЕ держит счётчики в slot_pool. Он каждый tick считает их через aggregate query, и lock берёт на саму строку pool — для serialization decisions.
Четыре счётчика, видимые в UI
В Airflow UI на /pools для каждого pool показываются:
| Счётчик | Что показывает | Источник |
|---|---|---|
slots | Capacity (configured) | slot_pool.slots |
used_slots | TI в state running | aggregate task_instance WHERE state=‘running’ |
queued_slots | TI в state queued (+ deferred если include_deferred=True) | aggregate task_instance WHERE state IN (‘queued’, ‘deferred’) |
scheduled_slots | TI в state scheduled | aggregate task_instance WHERE state=‘scheduled’ |
open_slots | slots - used_slots - queued_slots | computed |
Пример query (упрощённо):
SELECT
sp.pool,
sp.slots,
COALESCE(SUM(CASE WHEN ti.state = 'running' THEN ti.pool_slots ELSE 0 END), 0) AS used_slots,
COALESCE(SUM(CASE WHEN ti.state IN ('queued', 'deferred') THEN ti.pool_slots ELSE 0 END), 0) AS queued_slots,
COALESCE(SUM(CASE WHEN ti.state = 'scheduled' THEN ti.pool_slots ELSE 0 END), 0) AS scheduled_slots,
sp.slots - COALESCE(SUM(CASE WHEN ti.state IN ('running', 'queued') THEN ti.pool_slots ELSE 0 END), 0) AS open_slots
FROM slot_pool sp
LEFT JOIN task_instance ti ON ti.pool = sp.pool
GROUP BY sp.pool, sp.slots
ORDER BY sp.pool;
Заметьте: каждый TI занимает не 1, а ti.pool_slots — это поле на TI, default 1, но можно задать @task(pool_slots=4) для тяжёлой задачи.
OLTP — транзакции и row-level locks
Pool в critical section — почему это самый защищённый gate
В модуле 04 мы разбирали critical section scheduler. Напомним ключевую цитату:
“The critical section is secured by acquiring a row-level write lock on every row of the Pool table, which is roughly equivalent to
SELECT * FROM slot_pool FOR UPDATE NOWAIT.”
Это значит:
Что это даёт практически:
- Even под 4 scheduler-а HA, pool никогда не over-allocate. Только один scheduler одновременно делает decision.
- No race condition между recompute
used_slotsиUPDATE state='queued'— оба внутри одной транзакции, lock держится до commit. - Failure-safe: если scheduler упал внутри section до commit — Postgres откатит транзакцию, locks освободятся, TI остался в
scheduled. Никаких phantom queued.
Это то, почему pool — самый правильный уровень для protection of external resources.
default_pool — 128 slots, дефолт для всех tasks без явного pool
При инициализации Airflow создаёт system pool:
# airflow/models/pool.py — упрощённо
DEFAULT_POOL_NAME = "default_pool"
# airflow db init создаёт:
INSERT INTO slot_pool (pool, slots, description)
VALUES ('default_pool', 128, 'Default pool');
Что важно знать:
- Любой task без явного
pool=...используетdefault_pool. - Capacity
128достаточно для small/dev, но в production легко исчерпывается. default_poolнельзя удалить. Можно изменить slots, нельзя rename.- Если default_pool становится bottleneck, симптом: tasks в queue → scheduled state forever, UI показывает
used_slots = 128, ни один task в default_pool не запускается.
Production paranoia: никогда не оставляйте tasks без явного pool, если deployment medium+. Создайте per-resource pools:
pool="warehouse"для Snowflake/BQ queriespool="rds_writes"для writes в Postgrespool="api_calls"для rate-limited external APIsdefault_pool— резерв на тривиальные tasks (sensors, branches, simple Python)
CRUD pools
Через UI
/pools → + создать. Поля: name, slots, description, include_deferred (checkbox 2.7+).
Через CLI
# Create
airflow pools set warehouse 10 "Snowflake warehouse XS protection"
# List
airflow pools list
# pool slots description
# --------------- ------- ------------------------------------
# default_pool 128 Default pool
# warehouse 10 Snowflake warehouse XS protection
# Get one
airflow pools get warehouse
# Delete
airflow pools delete warehouse
# Export/Import (JSON)
airflow pools export pools.json
airflow pools import pools.json
Export/import — критичная feature для GitOps: pools как код, не как UI clicks.
Через REST API (2.0+)
# Create
curl -X POST http://airflow:8080/api/v1/pools \
-H "Content-Type: application/json" \
-u airflow:airflow \
-d '{"name": "warehouse", "slots": 10, "description": "..."}'
# List
curl -u airflow:airflow http://airflow:8080/api/v1/pools
# Patch
curl -X PATCH http://airflow:8080/api/v1/pools/warehouse \
-H "Content-Type: application/json" \
-u airflow:airflow \
-d '{"slots": 20}'
# Delete
curl -X DELETE -u airflow:airflow http://airflow:8080/api/v1/pools/warehouse
GitOps pattern
Pools храните в repo как JSON, при deploy импортируйте:
# .github/workflows/airflow-deploy.yml
- name: Apply pools
run: airflow pools import infra/pools.json
// infra/pools.json
{
"warehouse_xs": {"slots": 8, "description": "Snowflake XS — read queries"},
"warehouse_l": {"slots": 4, "description": "Snowflake L — heavy ETL"},
"rds_writes": {"slots": 6, "description": "Postgres writes"},
"snowflake_ddl": {"slots": 1, "description": "Mutex для DDL — single writer"}
}
pool_slots — не всегда 1
Default: @task(pool="warehouse") → pool_slots=1. То есть один TI занимает один slot.
Для resource-heavy tasks можно занять больше:
@task(pool="warehouse", pool_slots=4)
def heavy_etl():
# 16-CPU query, должна занимать 4 slot-а
...
Зачем:
- Если pool имеет 8 slots, обычная task = 1 slot → 8 одновременных
- Heavy ETL занимает 4 slot → только 2 одновременных таких + 0 обычных, или 1 heavy + 4 обычных
- Это weighted concurrency — pool описывает физический resource, а tasks говорят сколько они потребляют
Real example — Snowflake warehouse XS (8 credits) можно представить так:
# Pool warehouse_xs со slots=8
@task(pool="warehouse_xs", pool_slots=1)
def simple_query():
# Лёгкий SELECT — 1 credit
...
@task(pool="warehouse_xs", pool_slots=4)
def heavy_aggregation():
# CTE + window functions — 4 credit
...
@task(pool="warehouse_xs", pool_slots=8)
def warehouse_maintenance():
# VACUUM analog — занимает весь warehouse
...
Pool как economic model — это очень мощно. Можно описывать бюджет на ресурс и динамически приоритизировать (см. следующий урок про priority_weight).
Pool slot vs Sensor slot — include_deferred
С Airflow 2.7+ pools имеют флаг include_deferred. Без него:
- Sensor в
deferredstate (черезmode='reschedule'или async via triggerer) НЕ занимает pool slot - Это default — sensor через triggerer спит async без worker, и slot уходит другим
С include_deferred=True:
- Deferred TI считается как queued — занимает slot
- Полезно когда вы хотите чтобы pool ограничивал именно «количество запущенных параллельных операций» включая ожидающие
Когда include_deferred=True оправдано:
- Pool ограничивает количество lifecycle TI (не worker resource) — например, RDS connections
- Один TI = одно подключение, не важно running или deferred
Когда default (False) лучше:
- Pool ограничивает worker compute / external query slots
- Deferred sensors не потребляют worker — не должны занимать pool
Production gotchas
1. default_pool забит — все no-pool tasks заблокированы. Когда used_slots = 128, ни один task без явного pool не запустится. Симптом: новые DagRun creates, но TI вечно в scheduled. SQL SELECT pool FROM slot_pool WHERE pool='default_pool'; и aggregate.
2. pool field на task должен совпадать с именем в slot_pool exactly. Опечатка → TI в state=scheduled и pool_slots_unavailable reason. Сегодня же UI помечает такой DAG как broken (since 2.6).
3. CLI airflow pools set name N overwrites existing. Не warning, не confirm. Можно случайно понизить с 100 до 10 и положить production.
4. Multi-scheduler — pool не race condition, но throughput. Если у вас 4 scheduler-а и часто все они хотят в critical section — три из четырёх получают lock_not_available и skip tick. Это нормально, но если scheduler logs показывают много таких — рассмотрите уменьшение числа scheduler-ов или увеличение scheduler_heartbeat_sec.
5. Pool slots NOT propagated to executor / worker capacity. Pool — только в scheduler decision. Worker исполнит любую assigned TI. Если pool slots=100 но workers могут run только 32 одновременно — TI accumulate в queued без execution.