Critical Section и HA через PostgreSQL row-level locks
Это самая уникальная и недооценённая часть Airflow. Большинство distributed систем для координации используют Raft (etcd, CockroachDB), Paxos (Cassandra), ZooKeeper (старая Kafka, HBase). Airflow выбрал radикально иной путь — координировать scheduler-ы через row-level locks в metadata database.
Это решение было сознательным: «operational simplicity» (см. wiki Airflow 2.0 scheduler design). Оператору Airflow не нужно поднимать отдельный consensus cluster — у него уже есть PostgreSQL, и он переиспользуется для distributed coordination.
OLTP: транзакции и точечные операцииИз официальной документации Airflow
“To maintain performance and throughput, there is one part of the scheduling loop that does a number of calculations in memory … only a single scheduler can be in this critical section at once, which is achieved using database row-level locks (specifically,
SELECT ... FOR UPDATE). This critical section is responsible for transitioning TaskInstances from a scheduled state to being enqueued to the executor, while simultaneously ensuring that various concurrency and pool limits are honored. The critical section is secured by acquiring a row-level write lock on every row of the Pool table, which is roughly equivalent toSELECT * FROM slot_pool FOR UPDATE NOWAIT.”
Этот абзац — ключ к пониманию Airflow HA. Препарируем его.
Зачем нужна critical section
Представим, что нет critical section. Два scheduler-а одновременно:
Scheduler 1: видит scheduled TI (priority=10), pool 'gpu' имеет 1 free slot
Scheduler 2: видит scheduled TI (priority=5), pool 'gpu' имеет 1 free slot
Scheduler 1: UPDATE ti SET state='queued' WHERE id=X
Scheduler 2: UPDATE ti SET state='queued' WHERE id=Y
→ Pool 'gpu' has 2 queued tasks, but only 1 slot → over-allocation
Без synchronization scheduler-ы могут одновременно занять один и тот же slot. Critical section — это mutex over pool decisions.
Implementation: SELECT … FOR UPDATE NOWAIT
В коде SchedulerJobRunner:
# Псевдокод
def _critical_section_enqueue_task_instances(self, session):
try:
# Acquire row-level lock на slot_pool table
pools = (
session.query(Pool)
.with_for_update(nowait=True, of=Pool) # ← NOWAIT
.all()
)
except OperationalError as e:
# PostgreSQL: lock_not_available
# Это нормально — другой scheduler уже в critical section
log.debug("Other scheduler in critical section, skipping tick")
return
# Внутри critical section — у нас exclusive lock
# ... делаем enqueue decisions ...
session.commit() # ← lock освобождается
Что делает NOWAIT:
- Без
NOWAIT: запрос блокируется (ждёт освобождения lock) - С
NOWAIT: получает ошибкуlock_not_availableсразу
Это критично: scheduler не должен блокироваться. Если второй scheduler не может в critical section, он просто пропускает tick и пробует на следующем (через 5 секунд). Никаких deadlocks, никаких длинных ожиданий.
Подробно: SELECT … FOR UPDATE
Что именно делает PostgreSQL при таком запросе:
BEGIN;
SELECT * FROM slot_pool FOR UPDATE NOWAIT;
-- Acquires ROW EXCLUSIVE lock на каждую строку slot_pool
-- Внутри транзакции — exclusive access
-- Другие транзакции с FOR UPDATE на эти rows ждут (или получают NOWAIT error)
UPDATE slot_pool SET used_slots = used_slots + 1 WHERE pool = 'gpu';
UPDATE task_instance SET state = 'queued' WHERE id = X;
COMMIT; -- locks released
Lock держится до COMMIT/ROLLBACK. PostgreSQL автоматически освобождает locks по завершении транзакции.
HA Scheduler — два сценария
Сценарий 1: Оба scheduler-а пытаются войти одновременно
Сценарий 2: Scheduler-1 умер, scheduler-2 adopt-ит его TI
Тонкости: MariaDB не работает
Этот механизм опирается на SKIP LOCKED и NOWAIT для row-level locks. PostgreSQL поддерживает оба с давних версий. MariaDB < 10.6 не имеет SKIP LOCKED / NOWAIT → multi-scheduler HA не работает.
Из официальных доков:
“It is highly recommended to use Postgres for production deployments. MySQL is supported but for Multi-Scheduler setups, MySQL 8.0+ is required because of
SKIP LOCKED/NOWAITsupport.”
Для production — PostgreSQL golden path.
Сколько scheduler-ов имеет смысл
Запустить 10 scheduler-ов вы можете, но толку не будет:
- Critical section serialized — только один scheduler одновременно делает enqueue.
- DAG parsing distributed — но это делает DAG Processor (отдельный процесс с собственным parallelism).
- Scheduling decisions (phase 2) — могут параллелиться, но обычно не bottleneck.
Sweet spot:
- 1 scheduler — dev / small (< 50 DAGs)
- 2 scheduler-а — small-medium (50-500 DAGs)
- 3-4 scheduler-а — medium-large (500-5k DAGs)
- 5+ scheduler-ов — diminishing returns, DB bottleneck
Если очевидно мало throughput — сначала смотреть на DB tuning (PgBouncer, indexes, VACUUM), а не добавлять ещё scheduler-ы.
Где deadlock невозможен (и где возможен)
SELECT ... FOR UPDATE NOWAIT гарантирует отсутствие deadlock между scheduler-ами:
- Один lock acquired → второй сразу получает error → no waiting cycle.
Но deadlock возможен между:
- Scheduler и user task, который сам делает UPDATE на task_instance (например, через airflow CLI
tasks set-state) - Scheduler и пользовательский query из stale connection
Если в Postgres логах видите deadlock detected — это указывает на user-initiated SQL queries, конфликтующие со scheduler.
Hands-on: смотрим locks live
В живой системе:
-- Активные locks на slot_pool
SELECT
pa.pid,
pa.application_name,
pa.state,
pl.mode,
pl.granted,
pa.query_start,
now() - pa.query_start AS duration,
pa.query
FROM pg_locks pl
JOIN pg_stat_activity pa ON pl.pid = pa.pid
WHERE pl.relation = 'slot_pool'::regclass
ORDER BY pa.query_start;
Если у вас два scheduler-а, во время race условия увидите:
- Один с
granted=true, mode='RowExclusiveLock'(внутри critical section) - Другой получит error и не появится в списке (NOWAIT не блокируется)
В следующем уроке — Lab: HA Scheduler Race — запустим 2 scheduler-а в docker-compose, и через streaming pg_locks запрос будем смотреть как они конкурируют в realtime.