Learning Platform
Глоссарий Troubleshooting
Урок 12.06 · 30 мин
Продвинутый
Slot ContentionDiagnosisSQLMitigationStuck Tasks

Slot contention scenarios — диагностика и mitigation

Production Airflow рано или поздно столкнётся с одним из четырёх классических «застрял» сценариев: pool exhausted, high-priority висит сзади, self-pool deadlock или queued state forever. Этот урок — справочник для on-call: SQL queries для diagnosis, root cause analysis, mitigation patterns. Все примеры на 2.10/2.11.


Map of common contention scenarios

Decision flow для застрявших tasks
Tasks застрялиOn-call alert: TI не запускается долго (>5 min). Первый шаг — какой state? scheduled vs queued — это разные проблемы и разные diagnosis paths.
check state
state='scheduled'Scheduler не передал TI executor. Проблема в одном из 5 уровней concurrency или в самом scheduler. Diagnosis: SELECT FROM slot_pool, проверить pool exhaustion, проверить cluster parallelism.
state='queued'Scheduler enqueue сделал — executor не подхватил. Проблема в worker / broker / K8s. Diagnosis: celery inspect, kubectl get pods, проверить connectivity Redis.
root causes
Pool exhaustedused_slots = slots. Возможные: stuck running TI, zombie workers, sustained heavy load, внешний resource throttled.
Self-deadlockmax_active_tasks=1 + sync sensor занимает slot. Sensor ждёт внешний event, который требует другой task запуститься — циклическая блокировка.
High-priority queuedweight_rule=downstream корни выигрывают. Critical task в середине графа не получает приоритет.
Worker / broker issueCelery worker не subscribed на queue, Redis down, K8s pod creation failure (quota / image pull).

Сценарий 1: Pool exhausted — used_slots = slots

Симптом: tasks в pool остаются scheduled несколько минут / часов. UI /pools показывает used_slots = slots.

Diagnosis SQL:

-- Какие pools затоплены?
SELECT
    sp.pool,
    sp.slots,
    SUM(CASE WHEN ti.state = 'running' THEN ti.pool_slots ELSE 0 END) AS running,
    SUM(CASE WHEN ti.state = 'queued' THEN ti.pool_slots ELSE 0 END) AS queued,
    SUM(CASE WHEN ti.state = 'scheduled' THEN ti.pool_slots ELSE 0 END) AS scheduled,
    sp.slots - SUM(CASE WHEN ti.state IN ('running', 'queued') THEN ti.pool_slots ELSE 0 END) AS open_slots
FROM slot_pool sp
LEFT JOIN task_instance ti ON ti.pool = sp.pool
GROUP BY sp.pool, sp.slots
HAVING sp.slots - SUM(CASE WHEN ti.state IN ('running', 'queued') THEN ti.pool_slots ELSE 0 END) <= 0;

Какие конкретно TI занимают?:

-- Top 20 LONG-RUNNING TI в забитом pool
SELECT
    ti.dag_id,
    ti.task_id,
    ti.run_id,
    ti.try_number,
    ti.state,
    ti.start_date,
    NOW() - ti.start_date AS duration,
    ti.pool,
    ti.pool_slots,
    ti.hostname,
    ti.queue
FROM task_instance ti
WHERE ti.pool = 'warehouse'  -- забитый pool
  AND ti.state IN ('running', 'queued')
ORDER BY ti.start_date ASC
LIMIT 20;

Root causes:

CauseIdentifying signalMitigation
Долго работающие задачи stuck в runningduration > expected, hostname показывает active workerKill через CLI: airflow tasks clear ..., поднять execution_timeout
Worker zombie (running в DB, нет process)hostname pointing на dead workerairflow scheduler сам adopt-нет через scheduler_zombie_task_threshold, или manual cleanup
Underestimated normal loadAll tasks healthy, просто их многоПоднять slots, либо разделить pool по query classes
Внешний resource throttledTasks running medianно дольше обычногоDiscover external (Snowflake, RDS) congestion

Mitigation commands:

# Поднять slots без редеплоя
airflow pools set warehouse 20 "Bumped from 10 to 20 — on-call action 2026-05-12"

# Очистить stuck TI (force scheduled → scheduler перезапустит)
airflow tasks clear my_dag --task-regex "heavy_query" --no-confirm

# Kill running TI (state → failed)
airflow tasks failed my_dag heavy_query 2026-05-12T00:00:00

Сценарий 2: High-priority TI queued, low-priority running

Симптом: вы добавили priority_weight=1000 на critical task, но она по-прежнему ждёт в scheduled за низкоприоритетными.

Diagnosis SQL:

-- Сортировка по effective priority в pool
SELECT
    ti.dag_id,
    ti.task_id,
    ti.priority_weight AS effective_priority,
    ti.state,
    ti.pool,
    ti.queued_dttm
FROM task_instance ti
WHERE ti.pool = 'warehouse'
  AND ti.state IN ('running', 'queued', 'scheduled')
ORDER BY ti.priority_weight DESC, ti.queued_dttm ASC
LIMIT 30;

Возможные причины:

  1. weight_rule=downstream (default) перебивает absolute — корни графа имеют high effective priority. Ваш priority_weight=1000 на middle task всё равно меньше чем корень с большим хвостом.

    • Fix: переключить weight_rule="absolute" на critical task или на весь DAG через default_args.
  2. High-priority уже queued, но executor не освобождает slot — pool занят running task с long execution.

    • Fix: пока running не закончит, новая queued не начнётся. Можно:
      • Сократить execution_timeout на blocker
      • Использовать pool_slots=2 на critical task — но only если pool пустой (paradox)
  3. Priority применяется только при scheduled → queued transition. Если TI уже queued, она stays in queue order — executor забирает их в pickup order.

    • Fix: для Celery executor — pickup order по priority_weight (используется airflow_priority_queue), но Kubernetes executor игнорирует priority после queue.
  4. Effective priority precomputed at DAG serialization. Изменили priority — нужен parse DAG. До re-parse — старое значение.

    • Fix: airflow dags reserialize <dag_id> (2.7+).

Сценарий 3: Self-pool deadlock (concurrency=1)

Симптом: DAG с max_active_tasks=1 (или pool slots=1 single-task usage) — DAG зависает на середине.

Классический пример:

@dag(max_active_tasks=1)  # ← только 1 task running одновременно
def deadlocked_dag():
    @task
    def setup_resource(): ...

    @task
    def use_resource(resource):
        # Long-running, держит ту единственную concurrency
        process(resource)

    @task
    def cleanup_resource(): ...

    r = setup_resource()
    use_resource(r) >> cleanup_resource()

Тут логически нормально — sequential. Но если use_resource нуждается в отдельной task запущенной параллельно (sub-DAG-like wait), deadlock.

Реальный пример deadlock:

@dag(max_active_tasks=1)
def deadlock_dag():
    @task
    def trigger_external():
        # Trigger external job
        ...

    @task.sensor
    def wait_for_external():
        # Sensor polling
        ...

    @task
    def consume(): ...

    trigger_external() >> wait_for_external() >> consume()

wait_for_external через PokeSensor занимает единственный concurrency slot. Если external job сам требует новой Airflow task запуститься (например, через webhook trigger) — она не запустится, sensor таймаутится → DAG fail.

Mitigation:

  1. Использовать mode='reschedule' или async sensors через Triggerer — TI уходит в deferred, slot освобождается.
  2. Поднять max_active_tasks до 2+.
  3. Использовать pool с slots=1 instead of max_active_tasks=1 — pool deferred-friendly с include_deferred=False.

Сценарий 4: TI в queued state forever

Симптом: TI в queued несколько минут / часов, не переходит в running. Это значит scheduler уже всё проверил, передал executor, но executor не подхватывает.

Diagnosis SQL:

SELECT
    ti.dag_id,
    ti.task_id,
    ti.queue,
    ti.queued_dttm,
    NOW() - ti.queued_dttm AS in_queue,
    ti.executor_config
FROM task_instance ti
WHERE ti.state = 'queued'
  AND ti.queued_dttm < NOW() - INTERVAL '5 minutes'
ORDER BY ti.queued_dttm ASC;

Возможные причины:

ПричинаDiagnosticFix
CeleryExecutor — worker не подключён к нужному queuecelery inspect active_queues показывает несоответствиеWorker --queues my_queue flag, или ti.queue mismatch
Redis broker недоступенWorker logs ConnectionErrorRestart Redis, check network
KubernetesExecutor — pod creation failurePod в pending state, kubectl describe показывает quota / image pull errorsFix K8s resources, image
Executor reach limit parallelism[celery] worker_concurrency reachedПоднять worker concurrency, добавить workers
Stuck task adopt_or_reset_orphaned_tasksScheduler heartbeat для TI пропадал — был ли scheduler restart?Scheduler reset orphan на startup, TI должна перейти scheduled

Specifically Celery:

# Inspect Celery
celery -A airflow.providers.celery.executors.celery_executor inspect active

# Worker queues
celery -A airflow.providers.celery.executors.celery_executor inspect active_queues

# Force reset
airflow tasks clear my_dag --no-confirm  # → scheduled, scheduler перезапустит

Сценарий 5: Scheduler не входит в critical section

Симптом: scheduler logs показывают Other scheduler in critical section, skipping tick много раз. Tasks не enqueue-ятся.

Diagnosis — посмотреть locks в Postgres:

SELECT
    pa.pid,
    pa.application_name,
    pa.state,
    pl.mode,
    pl.granted,
    pa.query_start,
    NOW() - pa.query_start AS duration,
    SUBSTRING(pa.query, 1, 200) AS query
FROM pg_locks pl
JOIN pg_stat_activity pa ON pl.pid = pa.pid
WHERE pl.relation = 'slot_pool'::regclass
ORDER BY pa.query_start;

Если один scheduler держит lock >5 секунд — это уже подозрительно. Возможные причины:

  1. Slow query внутри critical section — например, slot_pool count over millions of TI без индекса.
    • Fix: indexes на task_instance(pool, state) обязательны.
  2. Slow DB — PG нагружен, query plan suboptimal.
    • Fix: VACUUM ANALYZE, рассмотреть partitioning task_instance.
  3. User-initiated transaction держит lock на slot_pool (например, manual UPDATE).
    • Fix: kill blocking PID (SELECT pg_cancel_backend(pid);).

Сценарий 6: include_deferred mismatch

Если pool настроен include_deferred=False (default), а task переходит в deferred (через async sensor), slot освобождается.

Несколько deferred sensors одного pool — могут все одновременно. Это side effect, который иногда обходит intended concurrency limit.

# Pool 'api_stripe' slots=5, include_deferred=False (default)

@task.sensor(pool="api_stripe", mode="reschedule")  # async через triggerer
async def wait_stripe_webhook(): ...

# 100 параллельных sensors могут быть deferred одновременно — pool slot не занят

Fix: pool с include_deferred=True:

airflow pools set api_stripe 5 "Stripe API" --include-deferred

Тогда deferred тоже считается → limit реально 5 параллельных, включая ожидающих.


Diagnostic dashboard SQL

Один query для quick overview во время инцидента:

-- Pool diagnostics dashboard
WITH pool_stats AS (
    SELECT
        sp.pool,
        sp.slots,
        SUM(CASE WHEN ti.state = 'running' THEN ti.pool_slots ELSE 0 END) AS running,
        SUM(CASE WHEN ti.state = 'queued' THEN ti.pool_slots ELSE 0 END) AS queued,
        SUM(CASE WHEN ti.state = 'scheduled' THEN ti.pool_slots ELSE 0 END) AS scheduled,
        SUM(CASE WHEN ti.state = 'deferred' THEN ti.pool_slots ELSE 0 END) AS deferred
    FROM slot_pool sp
    LEFT JOIN task_instance ti ON ti.pool = sp.pool
    GROUP BY sp.pool, sp.slots
)
SELECT
    pool,
    slots,
    running,
    queued,
    scheduled,
    deferred,
    slots - running - queued AS open,
    CASE
        WHEN running + queued >= slots THEN 'EXHAUSTED'
        WHEN running + queued >= slots * 0.8 THEN 'WARN'
        WHEN scheduled > slots THEN 'BACKLOG'
        ELSE 'OK'
    END AS status
FROM pool_stats
ORDER BY status, pool;

Запоминаемый — добавьте в Grafana / Metabase как always-on dashboard.


Mitigation playbook — quick reference

СимптомFirst checkFirst action
Tasks в scheduled longPool exhausted?SELECT pool, used_slots FROM slot_pool; → bump slots или kill stuck running
Tasks в queued longExecutor healthCheck Celery workers / K8s pods, check connectivity Redis
High-priority не перваяweight_rule effective?Check effective priority via SQL, consider absolute
DAG stops mid-runSelf-pool deadlockРаздать max_active_tasks, async sensors
Lock contention в PGStuck transactionpg_locks query, possibly kill blocking session
Scheduler skipping ticksLock durationIndexes, VACUUM, possibly fewer schedulers

Проверка знанийKnowledge check
Tasks из DAG 'orders_etl' в pool 'snowflake_l_etl' (slots=4) застряли в scheduled state на 30 минут. Pool показывает used_slots=4. Дай 5-шаговый diagnostic flow и решение для каждого root cause.
ОтветAnswer
Шаг 1 — SELECT FROM slot_pool WHERE pool='snowflake_l_etl': подтверждаем used=4, slots=4. Шаг 2 — `SELECT dag_id, task_id, state, start_date, NOW()-start_date AS duration FROM task_instance WHERE pool='snowflake_l_etl' AND state IN ('running', 'queued')` — увидим какие 4 TI занимают. Шаг 3 — определяем root cause по duration: (a) если start_date > expected execution_time × 3, скорее всего stuck/zombie — проверить hostname workers активен (`celery inspect active` или `kubectl get pods`), при zombie — clear TI или ждать `scheduler_zombie_task_threshold`; (b) если все 4 healthy running но долго — внешняя Snowflake congestion, проверить `query_history` в Snowflake `SHOW QUERIES`, возможно queries блокируются на warehouse capacity или ждут lock на таблицу; (c) если 4 разные DAG / heterogeneous tasks — может pool правильный, просто sustained load. Шаг 4 — выбор mitigation: при stuck — `airflow tasks clear orders_etl --task-regex 'heavy_etl'`; при healthy busy — поднять slots `airflow pools set snowflake_l_etl 6`, временно до investigation; при внешней Snowflake — нужно scale warehouse или reduce upstream throughput. Шаг 5 — verify — повторно SELECT FROM slot_pool, наблюдаем `open > 0`, scheduled TI начали переходить в queued/running. Если нет — проверять scheduler logs (`Other scheduler in critical section`?) и executor health. Дополнительный preventive — добавить alert `pool utilization > 90% for 10 min` чтобы предупреждать ДО полной exhaustion.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Pool exhausted (used_slots = slots), tasks застряли в scheduled. Первый шаг diagnosis?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6