Slot contention scenarios — диагностика и mitigation
Production Airflow рано или поздно столкнётся с одним из четырёх классических «застрял» сценариев: pool exhausted, high-priority висит сзади, self-pool deadlock или queued state forever. Этот урок — справочник для on-call: SQL queries для diagnosis, root cause analysis, mitigation patterns. Все примеры на 2.10/2.11.
Map of common contention scenarios
Сценарий 1: Pool exhausted — used_slots = slots
Симптом: tasks в pool остаются scheduled несколько минут / часов. UI /pools показывает used_slots = slots.
Diagnosis SQL:
-- Какие pools затоплены?
SELECT
sp.pool,
sp.slots,
SUM(CASE WHEN ti.state = 'running' THEN ti.pool_slots ELSE 0 END) AS running,
SUM(CASE WHEN ti.state = 'queued' THEN ti.pool_slots ELSE 0 END) AS queued,
SUM(CASE WHEN ti.state = 'scheduled' THEN ti.pool_slots ELSE 0 END) AS scheduled,
sp.slots - SUM(CASE WHEN ti.state IN ('running', 'queued') THEN ti.pool_slots ELSE 0 END) AS open_slots
FROM slot_pool sp
LEFT JOIN task_instance ti ON ti.pool = sp.pool
GROUP BY sp.pool, sp.slots
HAVING sp.slots - SUM(CASE WHEN ti.state IN ('running', 'queued') THEN ti.pool_slots ELSE 0 END) <= 0;
Какие конкретно TI занимают?:
-- Top 20 LONG-RUNNING TI в забитом pool
SELECT
ti.dag_id,
ti.task_id,
ti.run_id,
ti.try_number,
ti.state,
ti.start_date,
NOW() - ti.start_date AS duration,
ti.pool,
ti.pool_slots,
ti.hostname,
ti.queue
FROM task_instance ti
WHERE ti.pool = 'warehouse' -- забитый pool
AND ti.state IN ('running', 'queued')
ORDER BY ti.start_date ASC
LIMIT 20;
Root causes:
| Cause | Identifying signal | Mitigation |
|---|---|---|
| Долго работающие задачи stuck в running | duration > expected, hostname показывает active worker | Kill через CLI: airflow tasks clear ..., поднять execution_timeout |
| Worker zombie (running в DB, нет process) | hostname pointing на dead worker | airflow scheduler сам adopt-нет через scheduler_zombie_task_threshold, или manual cleanup |
| Underestimated normal load | All tasks healthy, просто их много | Поднять slots, либо разделить pool по query classes |
| Внешний resource throttled | Tasks running medianно дольше обычного | Discover external (Snowflake, RDS) congestion |
Mitigation commands:
# Поднять slots без редеплоя
airflow pools set warehouse 20 "Bumped from 10 to 20 — on-call action 2026-05-12"
# Очистить stuck TI (force scheduled → scheduler перезапустит)
airflow tasks clear my_dag --task-regex "heavy_query" --no-confirm
# Kill running TI (state → failed)
airflow tasks failed my_dag heavy_query 2026-05-12T00:00:00
Сценарий 2: High-priority TI queued, low-priority running
Симптом: вы добавили priority_weight=1000 на critical task, но она по-прежнему ждёт в scheduled за низкоприоритетными.
Diagnosis SQL:
-- Сортировка по effective priority в pool
SELECT
ti.dag_id,
ti.task_id,
ti.priority_weight AS effective_priority,
ti.state,
ti.pool,
ti.queued_dttm
FROM task_instance ti
WHERE ti.pool = 'warehouse'
AND ti.state IN ('running', 'queued', 'scheduled')
ORDER BY ti.priority_weight DESC, ti.queued_dttm ASC
LIMIT 30;
Возможные причины:
-
weight_rule=downstream(default) перебивает absolute — корни графа имеют high effective priority. Ваш priority_weight=1000 на middle task всё равно меньше чем корень с большим хвостом.- Fix: переключить
weight_rule="absolute"на critical task или на весь DAG черезdefault_args.
- Fix: переключить
-
High-priority уже queued, но executor не освобождает slot — pool занят running task с long execution.
- Fix: пока running не закончит, новая queued не начнётся. Можно:
- Сократить
execution_timeoutна blocker - Использовать
pool_slots=2на critical task — но only если pool пустой (paradox)
- Сократить
- Fix: пока running не закончит, новая queued не начнётся. Можно:
-
Priority применяется только при scheduled → queued transition. Если TI уже queued, она stays in queue order — executor забирает их в pickup order.
- Fix: для Celery executor — pickup order по
priority_weight(используетсяairflow_priority_queue), но Kubernetes executor игнорирует priority после queue.
- Fix: для Celery executor — pickup order по
-
Effective priority precomputed at DAG serialization. Изменили priority — нужен parse DAG. До re-parse — старое значение.
- Fix:
airflow dags reserialize <dag_id>(2.7+).
- Fix:
Сценарий 3: Self-pool deadlock (concurrency=1)
Симптом: DAG с max_active_tasks=1 (или pool slots=1 single-task usage) — DAG зависает на середине.
Классический пример:
@dag(max_active_tasks=1) # ← только 1 task running одновременно
def deadlocked_dag():
@task
def setup_resource(): ...
@task
def use_resource(resource):
# Long-running, держит ту единственную concurrency
process(resource)
@task
def cleanup_resource(): ...
r = setup_resource()
use_resource(r) >> cleanup_resource()
Тут логически нормально — sequential. Но если use_resource нуждается в отдельной task запущенной параллельно (sub-DAG-like wait), deadlock.
Реальный пример deadlock:
@dag(max_active_tasks=1)
def deadlock_dag():
@task
def trigger_external():
# Trigger external job
...
@task.sensor
def wait_for_external():
# Sensor polling
...
@task
def consume(): ...
trigger_external() >> wait_for_external() >> consume()
wait_for_external через PokeSensor занимает единственный concurrency slot. Если external job сам требует новой Airflow task запуститься (например, через webhook trigger) — она не запустится, sensor таймаутится → DAG fail.
Mitigation:
- Использовать
mode='reschedule'или async sensors через Triggerer — TI уходит вdeferred, slot освобождается. - Поднять
max_active_tasksдо 2+. - Использовать
poolс slots=1 instead ofmax_active_tasks=1— pool deferred-friendly сinclude_deferred=False.
Сценарий 4: TI в queued state forever
Симптом: TI в queued несколько минут / часов, не переходит в running. Это значит scheduler уже всё проверил, передал executor, но executor не подхватывает.
Diagnosis SQL:
SELECT
ti.dag_id,
ti.task_id,
ti.queue,
ti.queued_dttm,
NOW() - ti.queued_dttm AS in_queue,
ti.executor_config
FROM task_instance ti
WHERE ti.state = 'queued'
AND ti.queued_dttm < NOW() - INTERVAL '5 minutes'
ORDER BY ti.queued_dttm ASC;
Возможные причины:
| Причина | Diagnostic | Fix |
|---|---|---|
| CeleryExecutor — worker не подключён к нужному queue | celery inspect active_queues показывает несоответствие | Worker --queues my_queue flag, или ti.queue mismatch |
| Redis broker недоступен | Worker logs ConnectionError | Restart Redis, check network |
| KubernetesExecutor — pod creation failure | Pod в pending state, kubectl describe показывает quota / image pull errors | Fix K8s resources, image |
Executor reach limit parallelism | [celery] worker_concurrency reached | Поднять worker concurrency, добавить workers |
Stuck task adopt_or_reset_orphaned_tasks | Scheduler heartbeat для TI пропадал — был ли scheduler restart? | Scheduler reset orphan на startup, TI должна перейти scheduled |
Specifically Celery:
# Inspect Celery
celery -A airflow.providers.celery.executors.celery_executor inspect active
# Worker queues
celery -A airflow.providers.celery.executors.celery_executor inspect active_queues
# Force reset
airflow tasks clear my_dag --no-confirm # → scheduled, scheduler перезапустит
Сценарий 5: Scheduler не входит в critical section
Симптом: scheduler logs показывают Other scheduler in critical section, skipping tick много раз. Tasks не enqueue-ятся.
Diagnosis — посмотреть locks в Postgres:
SELECT
pa.pid,
pa.application_name,
pa.state,
pl.mode,
pl.granted,
pa.query_start,
NOW() - pa.query_start AS duration,
SUBSTRING(pa.query, 1, 200) AS query
FROM pg_locks pl
JOIN pg_stat_activity pa ON pl.pid = pa.pid
WHERE pl.relation = 'slot_pool'::regclass
ORDER BY pa.query_start;
Если один scheduler держит lock >5 секунд — это уже подозрительно. Возможные причины:
- Slow query внутри critical section — например, slot_pool count over millions of TI без индекса.
- Fix: indexes на
task_instance(pool, state)обязательны.
- Fix: indexes на
- Slow DB — PG нагружен, query plan suboptimal.
- Fix: VACUUM ANALYZE, рассмотреть partitioning task_instance.
- User-initiated transaction держит lock на slot_pool (например, manual UPDATE).
- Fix: kill blocking PID (
SELECT pg_cancel_backend(pid);).
- Fix: kill blocking PID (
Сценарий 6: include_deferred mismatch
Если pool настроен include_deferred=False (default), а task переходит в deferred (через async sensor), slot освобождается.
Несколько deferred sensors одного pool — могут все одновременно. Это side effect, который иногда обходит intended concurrency limit.
# Pool 'api_stripe' slots=5, include_deferred=False (default)
@task.sensor(pool="api_stripe", mode="reschedule") # async через triggerer
async def wait_stripe_webhook(): ...
# 100 параллельных sensors могут быть deferred одновременно — pool slot не занят
Fix: pool с include_deferred=True:
airflow pools set api_stripe 5 "Stripe API" --include-deferred
Тогда deferred тоже считается → limit реально 5 параллельных, включая ожидающих.
Diagnostic dashboard SQL
Один query для quick overview во время инцидента:
-- Pool diagnostics dashboard
WITH pool_stats AS (
SELECT
sp.pool,
sp.slots,
SUM(CASE WHEN ti.state = 'running' THEN ti.pool_slots ELSE 0 END) AS running,
SUM(CASE WHEN ti.state = 'queued' THEN ti.pool_slots ELSE 0 END) AS queued,
SUM(CASE WHEN ti.state = 'scheduled' THEN ti.pool_slots ELSE 0 END) AS scheduled,
SUM(CASE WHEN ti.state = 'deferred' THEN ti.pool_slots ELSE 0 END) AS deferred
FROM slot_pool sp
LEFT JOIN task_instance ti ON ti.pool = sp.pool
GROUP BY sp.pool, sp.slots
)
SELECT
pool,
slots,
running,
queued,
scheduled,
deferred,
slots - running - queued AS open,
CASE
WHEN running + queued >= slots THEN 'EXHAUSTED'
WHEN running + queued >= slots * 0.8 THEN 'WARN'
WHEN scheduled > slots THEN 'BACKLOG'
ELSE 'OK'
END AS status
FROM pool_stats
ORDER BY status, pool;
Запоминаемый — добавьте в Grafana / Metabase как always-on dashboard.
Mitigation playbook — quick reference
| Симптом | First check | First action |
|---|---|---|
Tasks в scheduled long | Pool exhausted? | SELECT pool, used_slots FROM slot_pool; → bump slots или kill stuck running |
Tasks в queued long | Executor health | Check Celery workers / K8s pods, check connectivity Redis |
| High-priority не первая | weight_rule effective? | Check effective priority via SQL, consider absolute |
| DAG stops mid-run | Self-pool deadlock | Раздать max_active_tasks, async sensors |
| Lock contention в PG | Stuck transaction | pg_locks query, possibly kill blocking session |
| Scheduler skipping ticks | Lock duration | Indexes, VACUUM, possibly fewer schedulers |