Scheduler main loop
Scheduler — это сердце Airflow. От его правильной работы зависят: когда DagRun будет создан, когда TaskInstance перейдёт в queued, когда executor получит команду, когда зомби-таски будут зачищены. Этот урок препарирует scheduler main loop до конкретного псевдокода и SQL-запросов, которые он отправляет в metadata DB.
Главный класс — SchedulerJob (в 2.x — airflow/jobs/scheduler_job.py; в 3.x переименован в SchedulerJobRunner).
Четыре фазы main loop
Scheduler работает в бесконечном цикле с периодом scheduler_heartbeat_sec (default 5 секунд). Каждый тик состоит из четырёх фаз:
Каждую фазу разберём детально.
Фаза 1: Create DagRuns
Scheduler определяет, какие DAGs нужно запустить сейчас:
# Псевдокод _create_dagruns_for_dags
def _create_dagruns_for_dags(self, session):
# Time-triggered DAGs
dags = session.query(DagModel).filter(
DagModel.is_paused == False,
DagModel.is_active == True,
DagModel.next_dagrun_create_after <= now()
).all()
for dag in dags:
dag_run = create_dagrun(
dag_id=dag.dag_id,
run_id=f"scheduled__{dag.next_dagrun}"
logical_date=dag.next_dagrun,
data_interval=dag.timetable.data_interval,
state=DagRunState.QUEUED,
run_type=DagRunType.SCHEDULED,
# В 3.x также: dag_version_id=dag.current_version_id (AIP-63 DAG Versioning)
)
session.add(dag_run)
# Advance next_dagrun
dag.next_dagrun = dag.timetable.next_dagrun_info(...).logical_date
# Asset-triggered DAGs (datasets/assets) — из dataset_dag_run_queue
asset_triggers = session.query(DatasetDagRunQueue).all()
for trigger in asset_triggers:
if all_assets_ready(trigger):
create_dagrun(...)
session.delete(trigger)
session.commit()
Ключевые моменты:
next_dagrun_create_after— момент, когда следующий run должен быть создан. Обновляется после каждого создания.- DagRun сразу попадает в state
queued— это значит «scheduler знает, что надо запускать, ждёт перехода в running». - Dataset-triggered runs — из таблицы
dataset_dag_run_queue(если все необходимые datasets обновлены — Datasets API, AIP-48 с 2.4+). - В 3.x добавилось
dag_version_id(AIP-63 DAG Versioning) — каждый DagRun привязан к версии bundle.
Фаза 2: Schedule TI
Для каждого DagRun(state in [queued, running]):
# Псевдокод _schedule_dag_run
def _schedule_dag_run(self, dag_run, session):
dag = serialized_dag_to_dag(dag_run.dag_id)
task_instances = dag_run.get_task_instances(session)
for ti in task_instances:
if ti.state in TERMINAL_STATES: # success, failed, skipped
continue
if ti.state == NONE:
# Check upstream deps
if all_upstreams_done(ti):
# All deps satisfied — set scheduled
ti.state = SCHEDULED
ti.scheduled_dttm = now()
session.merge(ti)
elif ti.state == UP_FOR_RETRY:
# Check retry_delay
if now() - ti.end_date >= ti.retry_delay:
ti.state = SCHEDULED
elif ti.state == UP_FOR_RESCHEDULE:
# Sensor reschedule mode
next_reschedule = get_next_reschedule_date(ti)
if now() >= next_reschedule:
ti.state = SCHEDULED
# If all TI done, transition DagRun
if all(ti.state in TERMINAL_STATES for ti in task_instances):
if any(ti.state == FAILED for ti in task_instances):
dag_run.state = DagRunState.FAILED
else:
dag_run.state = DagRunState.SUCCESS
elif dag_run.state == QUEUED and any(ti.state == SCHEDULED for ti in task_instances):
dag_run.state = DagRunState.RUNNING
session.commit()
Состояния TaskInstance на этой фазе:
none→scheduled(все deps satisfied)up_for_retry→scheduled(retry_delay прошёл)up_for_reschedule→scheduled(sensor reschedule timer)- DagRun
queued→running(хотя бы один TI scheduled)
Эта фаза O(N) по числу tasks в active DagRuns. Для DAG с 1000 mapped tasks через expand это становится bottleneck — scheduler loop может затянуться.
Фаза 3: Critical Section — переход в queued
Здесь начинается самое интересное. Critical section защищён row-level lock на slot_pool:
# Псевдокод critical_section
def _critical_section_enqueue_task_instances(self, session):
# Acquire row-level lock на ВСЕХ строках slot_pool
pools = session.query(Pool).with_for_update(nowait=True).all()
# ↑ SELECT * FROM slot_pool FOR UPDATE NOWAIT
# Если другой scheduler уже в critical section — NOWAIT → exception → skip tick
# (это нормально для HA scheduler)
# Получаем все scheduled TI, отсортированные по priority
task_instances = (
session.query(TaskInstance)
.filter(TaskInstance.state == SCHEDULED)
.order_by(TaskInstance.priority_weight.desc())
.with_for_update(skip_locked=True) # ← другие scheduler skip эти rows
.limit(MAX_TIS_PER_QUERY) # default 16
.all()
)
for ti in task_instances:
# Проверяем concurrency limits
if not check_concurrency_limits(ti, pools):
continue
# Проверяем pool slot availability
if not has_pool_slot(ti.pool, pools):
continue
# Atomic: переводим в queued
ti.state = QUEUED
ti.queued_dttm = now()
# Кладём в executor queue
executor.queue_command(ti.command())
# Critical section ends — commit
session.commit()
Что важно:
SELECT * FROM slot_pool FOR UPDATE NOWAIT— атомарная блокировка всех pool rows. Если второй scheduler пытается войти — получает ошибкуlock_not_available, тихо пропускает текущий тик.MAX_TIS_PER_QUERY(default 16) — batch size. Слишком маленький → много циклов; слишком большой → дольше в section.- После переключения TI →
queued, scheduler передаёт workload в executor. Executor сам отвечает заqueued → running.
Фаза 4: Housekeeping
Несколько вспомогательных операций:
Adopt orphans
-- Найти мёртвые scheduler-ы (heartbeat прокис)
UPDATE job SET state='failed'
WHERE job_type='SchedulerJob' AND state='running'
AND latest_heartbeat < now() - interval '30 seconds';
-- Adopt их TI (queued, running, restarting)
UPDATE task_instance SET state=NULL -- reset to scheduled
WHERE state IN ('queued', 'running', 'restarting')
AND ti.job_id IN (SELECT id FROM job WHERE state='failed');
Check zombies
-- Найти TI, чей heartbeat прокис
SELECT * FROM task_instance
WHERE state='running'
AND latest_heartbeat < now() - interval '300 seconds'; -- scheduler_zombie_task_threshold
Найденные zombie tasks помечаются failed и (если есть retries) переводятся в up_for_retry.
Emit metrics
# StatsD / OTel
metrics.gauge("scheduler.scheduler_loop_duration", elapsed_seconds)
metrics.gauge("executor.open_slots", executor.slots_available)
metrics.gauge("scheduler.tasks.executable", num_scheduled)
metrics.counter("scheduler.tasks.killed_externally", ...)
Идемпотентность пайплайнов — повтор не ломает данные
Ключевые конфиги
| Параметр | Default | Что |
|---|---|---|
scheduler_heartbeat_sec | 5 | Период main loop |
min_file_process_interval | 30 | Не парсить DAG-файл чаще |
parsing_processes | 2 | Parallel DagFileProcessor (рекомендация 2× vCPU, Astronomer) |
scheduler_health_check_threshold | 30 | Timeout для считать scheduler мёртвым |
max_dagruns_per_loop_to_schedule | 20 | Batch size for examine |
max_tis_per_query | 16 | Batch enqueue в critical section |
scheduler_zombie_task_threshold | 300 | Heartbeat для tasks |
dag_dir_list_interval | 300 | Scan папки DAGs на новые файлы |
Что наблюдать в metadata DB
В live system из psql:
-- Что делает scheduler сейчас
SELECT id, dag_id, state, queued_dttm, start_date, latest_heartbeat
FROM task_instance
WHERE state IN ('scheduled', 'queued', 'running')
ORDER BY queued_dttm DESC LIMIT 20;
-- Scheduler heartbeats (для HA debug)
SELECT id, job_type, state, latest_heartbeat,
now() - latest_heartbeat AS gap
FROM job WHERE job_type='SchedulerJob'
ORDER BY latest_heartbeat DESC;
-- Pool occupancy
SELECT pool, slots, used_slots, queued_slots, scheduled_slots
FROM slot_pool;
-- Active locks (для critical section анализа)
SELECT relation::regclass, mode, granted, query
FROM pg_locks pl
JOIN pg_stat_activity pa ON pl.pid = pa.pid
WHERE pl.relation = 'slot_pool'::regclass;
В следующем уроке мы запустим HA Scheduler Race lab — два scheduler-а в Docker, и через pg_locks будем смотреть, как они конкурируют за critical section.