Learning Platform
Глоссарий Troubleshooting
Урок 12.03 · 30 мин
Продвинутый
Poolslot_poolCritical Sectiondefault_poolpool_slots

Pools — slot_pool model, default_pool, critical section gate

Pool — это не просто «счётчик» или абстракция в config. Это физическая строка в таблице slot_pool в metadata DB, на которую scheduler берёт row-level lock в critical section. Из-за этого pools — самый «защищённый» уровень concurrency control: невозможно over-allocate даже при multi-scheduler HA с миллионом scheduled tasks.

Этот урок препарирует data model pool, разбирает четыре счётчика slots / used_slots / queued_slots / scheduled_slots, объясняет роль default_pool (128 slots), показывает CRUD через UI/CLI/REST API и финально — почему pool_slots > 1 это правильный механизм для resource-heavy tasks.


Таблица slot_pool

Pool — это row. Схема в 2.10/2.11:

CREATE TABLE slot_pool (
    id           SERIAL PRIMARY KEY,
    pool         VARCHAR(256) NOT NULL UNIQUE,
    slots        INTEGER,           -- capacity
    description  TEXT,
    include_deferred BOOLEAN NOT NULL DEFAULT FALSE  -- 2.7+
);

Что важно:

  • pool — уникальное имя
  • slots — capacity, integer (может быть отрицательным — это значит pool “disabled”)
  • include_deferred (2.7+) — учитывать ли deferred TI при подсчёте queued_slots
  • Нет колонок used_slots, queued_slots — они вычисляются динамически из task_instance через JOIN

Это критично: scheduler НЕ держит счётчики в slot_pool. Он каждый tick считает их через aggregate query, и lock берёт на саму строку pool — для serialization decisions.


Четыре счётчика, видимые в UI

В Airflow UI на /pools для каждого pool показываются:

СчётчикЧто показываетИсточник
slotsCapacity (configured)slot_pool.slots
used_slotsTI в state runningaggregate task_instance WHERE state=‘running’
queued_slotsTI в state queued (+ deferred если include_deferred=True)aggregate task_instance WHERE state IN (‘queued’, ‘deferred’)
scheduled_slotsTI в state scheduledaggregate task_instance WHERE state=‘scheduled’
open_slotsslots - used_slots - queued_slotscomputed

Пример query (упрощённо):

SELECT
    sp.pool,
    sp.slots,
    COALESCE(SUM(CASE WHEN ti.state = 'running' THEN ti.pool_slots ELSE 0 END), 0) AS used_slots,
    COALESCE(SUM(CASE WHEN ti.state IN ('queued', 'deferred') THEN ti.pool_slots ELSE 0 END), 0) AS queued_slots,
    COALESCE(SUM(CASE WHEN ti.state = 'scheduled' THEN ti.pool_slots ELSE 0 END), 0) AS scheduled_slots,
    sp.slots - COALESCE(SUM(CASE WHEN ti.state IN ('running', 'queued') THEN ti.pool_slots ELSE 0 END), 0) AS open_slots
FROM slot_pool sp
LEFT JOIN task_instance ti ON ti.pool = sp.pool
GROUP BY sp.pool, sp.slots
ORDER BY sp.pool;

Заметьте: каждый TI занимает не 1, а ti.pool_slots — это поле на TI, default 1, но можно задать @task(pool_slots=4) для тяжёлой задачи.


OLTP — транзакции и row-level locks

Pool в critical section — почему это самый защищённый gate

В модуле 04 мы разбирали critical section scheduler. Напомним ключевую цитату:

“The critical section is secured by acquiring a row-level write lock on every row of the Pool table, which is roughly equivalent to SELECT * FROM slot_pool FOR UPDATE NOWAIT.”

Это значит:

Pool decision внутри critical section
Scheduler tick startКаждые scheduler_heartbeat_sec (5s default). Scheduler видит scheduled TI и хочет переводить их в queued — но прежде чем UPDATE, нужно убедиться что не over-allocate pool.
SELECT ... FOR UPDATE NOWAIT
Lock all rows of slot_poolPostgreSQL: ROW EXCLUSIVE lock на каждую строку slot_pool. Если другой scheduler уже тут — наш получает lock_not_available и пропускает tick. NOWAIT critical: нет blocking, нет deadlock.
внутри section
Compute pool usageSELECT SUM(pool_slots) per pool FROM task_instance WHERE state IN ('running', 'queued'). Это snapshot — внутри section мы держим exclusive view на slot_pool.
Pick TI fitting in open_slotsДля каждого pool: сколько слотов свободно? Берём scheduled TI priority order, проверяем pool_slots ≤ open_slots, помечаем как queued. На каждой итерации обновляем local copy open_slots.
COMMIT
Lock released, TI now in queuedТранзакция commit-ится. ROW EXCLUSIVE locks автоматически освобождаются. TI в state=queued — теперь executor может их забрать.

Что это даёт практически:

  • Even под 4 scheduler-а HA, pool никогда не over-allocate. Только один scheduler одновременно делает decision.
  • No race condition между recompute used_slots и UPDATE state='queued' — оба внутри одной транзакции, lock держится до commit.
  • Failure-safe: если scheduler упал внутри section до commit — Postgres откатит транзакцию, locks освободятся, TI остался в scheduled. Никаких phantom queued.

Это то, почему pool — самый правильный уровень для protection of external resources.


default_pool — 128 slots, дефолт для всех tasks без явного pool

При инициализации Airflow создаёт system pool:

# airflow/models/pool.py — упрощённо
DEFAULT_POOL_NAME = "default_pool"

# airflow db init создаёт:
INSERT INTO slot_pool (pool, slots, description)
VALUES ('default_pool', 128, 'Default pool');

Что важно знать:

  • Любой task без явного pool=... использует default_pool.
  • Capacity 128 достаточно для small/dev, но в production легко исчерпывается.
  • default_pool нельзя удалить. Можно изменить slots, нельзя rename.
  • Если default_pool становится bottleneck, симптом: tasks в queue → scheduled state forever, UI показывает used_slots = 128, ни один task в default_pool не запускается.

Production paranoia: никогда не оставляйте tasks без явного pool, если deployment medium+. Создайте per-resource pools:

  • pool="warehouse" для Snowflake/BQ queries
  • pool="rds_writes" для writes в Postgres
  • pool="api_calls" для rate-limited external APIs
  • default_pool — резерв на тривиальные tasks (sensors, branches, simple Python)

CRUD pools

Через UI

/pools+ создать. Поля: name, slots, description, include_deferred (checkbox 2.7+).

Через CLI

# Create
airflow pools set warehouse 10 "Snowflake warehouse XS protection"

# List
airflow pools list
# pool             slots    description
# ---------------  -------  ------------------------------------
# default_pool     128      Default pool
# warehouse        10       Snowflake warehouse XS protection

# Get one
airflow pools get warehouse

# Delete
airflow pools delete warehouse

# Export/Import (JSON)
airflow pools export pools.json
airflow pools import pools.json

Export/import — критичная feature для GitOps: pools как код, не как UI clicks.

Через REST API (2.0+)

# Create
curl -X POST http://airflow:8080/api/v1/pools \
    -H "Content-Type: application/json" \
    -u airflow:airflow \
    -d '{"name": "warehouse", "slots": 10, "description": "..."}'

# List
curl -u airflow:airflow http://airflow:8080/api/v1/pools

# Patch
curl -X PATCH http://airflow:8080/api/v1/pools/warehouse \
    -H "Content-Type: application/json" \
    -u airflow:airflow \
    -d '{"slots": 20}'

# Delete
curl -X DELETE -u airflow:airflow http://airflow:8080/api/v1/pools/warehouse

GitOps pattern

Pools храните в repo как JSON, при deploy импортируйте:

# .github/workflows/airflow-deploy.yml
- name: Apply pools
  run: airflow pools import infra/pools.json
// infra/pools.json
{
  "warehouse_xs": {"slots": 8, "description": "Snowflake XS — read queries"},
  "warehouse_l": {"slots": 4, "description": "Snowflake L — heavy ETL"},
  "rds_writes": {"slots": 6, "description": "Postgres writes"},
  "snowflake_ddl": {"slots": 1, "description": "Mutex для DDL — single writer"}
}

pool_slots — не всегда 1

Default: @task(pool="warehouse")pool_slots=1. То есть один TI занимает один slot.

Для resource-heavy tasks можно занять больше:

@task(pool="warehouse", pool_slots=4)
def heavy_etl():
    # 16-CPU query, должна занимать 4 slot-а
    ...

Зачем:

  • Если pool имеет 8 slots, обычная task = 1 slot → 8 одновременных
  • Heavy ETL занимает 4 slot → только 2 одновременных таких + 0 обычных, или 1 heavy + 4 обычных
  • Это weighted concurrency — pool описывает физический resource, а tasks говорят сколько они потребляют

Real example — Snowflake warehouse XS (8 credits) можно представить так:

# Pool warehouse_xs со slots=8

@task(pool="warehouse_xs", pool_slots=1)
def simple_query():
    # Лёгкий SELECT — 1 credit
    ...

@task(pool="warehouse_xs", pool_slots=4)
def heavy_aggregation():
    # CTE + window functions — 4 credit
    ...

@task(pool="warehouse_xs", pool_slots=8)
def warehouse_maintenance():
    # VACUUM analog — занимает весь warehouse
    ...

Pool как economic model — это очень мощно. Можно описывать бюджет на ресурс и динамически приоритизировать (см. следующий урок про priority_weight).


Pool slot vs Sensor slot — include_deferred

С Airflow 2.7+ pools имеют флаг include_deferred. Без него:

  • Sensor в deferred state (через mode='reschedule' или async via triggerer) НЕ занимает pool slot
  • Это default — sensor через triggerer спит async без worker, и slot уходит другим

С include_deferred=True:

  • Deferred TI считается как queued — занимает slot
  • Полезно когда вы хотите чтобы pool ограничивал именно «количество запущенных параллельных операций» включая ожидающие

Когда include_deferred=True оправдано:

  • Pool ограничивает количество lifecycle TI (не worker resource) — например, RDS connections
  • Один TI = одно подключение, не важно running или deferred

Когда default (False) лучше:

  • Pool ограничивает worker compute / external query slots
  • Deferred sensors не потребляют worker — не должны занимать pool

Production gotchas

1. default_pool забит — все no-pool tasks заблокированы. Когда used_slots = 128, ни один task без явного pool не запустится. Симптом: новые DagRun creates, но TI вечно в scheduled. SQL SELECT pool FROM slot_pool WHERE pool='default_pool'; и aggregate.

2. pool field на task должен совпадать с именем в slot_pool exactly. Опечатка → TI в state=scheduled и pool_slots_unavailable reason. Сегодня же UI помечает такой DAG как broken (since 2.6).

3. CLI airflow pools set name N overwrites existing. Не warning, не confirm. Можно случайно понизить с 100 до 10 и положить production.

4. Multi-scheduler — pool не race condition, но throughput. Если у вас 4 scheduler-а и часто все они хотят в critical section — три из четырёх получают lock_not_available и skip tick. Это нормально, но если scheduler logs показывают много таких — рассмотрите уменьшение числа scheduler-ов или увеличение scheduler_heartbeat_sec.

5. Pool slots NOT propagated to executor / worker capacity. Pool — только в scheduler decision. Worker исполнит любую assigned TI. Если pool slots=100 но workers могут run только 32 одновременно — TI accumulate в queued без execution.


Проверка знанийKnowledge check
Чем pool как уровень concurrency principled отличается от `max_active_tasks_per_dag` и почему он считается самым 'правильным' для protection внешних resources?
ОтветAnswer
Три фундаментальных отличия: (1) Atomicity — pool decision происходит внутри critical section с row-level lock на slot_pool table (SELECT FOR UPDATE NOWAIT). Это гарантирует, что even под multi-scheduler HA невозможно over-allocate — два scheduler-а физически не могут одновременно решать «есть ли свободные slots». max_active_tasks_per_dag проверяется через простой query без lock, теоретически возможен race (хотя на практике редко). (2) Cross-DAG resource sharing — pool это shared quota across the entire deployment. Один pool 'warehouse' ограничивает SUM всех @task(pool='warehouse') во всех DAG. max_active_tasks_per_dag — только в пределах одного DAG. Если у вас три DAG все пишут в Snowflake, нужен общий gate — это pool. (3) Runtime mutability — pool это row в DB, менять можно UI/CLI/API без редеплоя кода. max_active_tasks — в DAG code, изменение требует commit + DAG re-parse. Поэтому для on-call: «положили warehouse — нужно сократить concurrency» — это секундная операция в pool, не часовой rollout. И weighted: pool_slots > 1 позволяет описывать heavy tasks как занимающие N slots — это economic model resource, не просто счётчик.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. В таблице slot_pool хранятся used_slots, queued_slots, scheduled_slots?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6