Learning Platform
Глоссарий Troubleshooting
Урок 05.01 · 30 мин
Продвинутый
SchedulerMain loopSchedulerJobRunnerTaskInstance lifecycle

Scheduler main loop

Scheduler — это сердце Airflow. От его правильной работы зависят: когда DagRun будет создан, когда TaskInstance перейдёт в queued, когда executor получит команду, когда зомби-таски будут зачищены. Этот урок препарирует scheduler main loop до конкретного псевдокода и SQL-запросов, которые он отправляет в metadata DB.

Главный класс — SchedulerJob (в 2.x — airflow/jobs/scheduler_job.py; в 3.x переименован в SchedulerJobRunner).


Четыре фазы main loop

Scheduler работает в бесконечном цикле с периодом scheduler_heartbeat_sec (default 5 секунд). Каждый тик состоит из четырёх фаз:

Scheduler main loop — четыре фазы
ФАЗА 1: Create DagRunsИз таблицы dag читаем DAGs с next_dagrun_create_after ≤ now(). Для каждого создаём новый DagRun (state=queued). Также — asset-triggered runs (из dataset_dag_run_queue).
ФАЗА 2: Schedule TIДля каждого active DagRun (state=running): проверяем deps между tasks, переводим готовые TI в state=scheduled. Это О(N) по числу tasks в active runs.
★ HA-критичная
ФАЗА 3: Critical Section — enqueueБерём row-level lock на slot_pool. Внутри section: select scheduled TI, проверяем concurrency и pool limits, переводим в queued, кладём в executor queue. Только один scheduler одновременно в critical section.
ФАЗА 4: HousekeepingAdopt orphans (TI у мёртвых scheduler-ов), check zombies (TI без heartbeat), emit metrics, cleanup stale jobs.
sleep(scheduler_heartbeat_sec)

Каждую фазу разберём детально.


Фаза 1: Create DagRuns

Scheduler определяет, какие DAGs нужно запустить сейчас:

# Псевдокод _create_dagruns_for_dags
def _create_dagruns_for_dags(self, session):
    # Time-triggered DAGs
    dags = session.query(DagModel).filter(
        DagModel.is_paused == False,
        DagModel.is_active == True,
        DagModel.next_dagrun_create_after <= now()
    ).all()

    for dag in dags:
        dag_run = create_dagrun(
            dag_id=dag.dag_id,
            run_id=f"scheduled__{dag.next_dagrun}"
            logical_date=dag.next_dagrun,
            data_interval=dag.timetable.data_interval,
            state=DagRunState.QUEUED,
            run_type=DagRunType.SCHEDULED,
            # В 3.x также: dag_version_id=dag.current_version_id (AIP-63 DAG Versioning)
        )
        session.add(dag_run)

        # Advance next_dagrun
        dag.next_dagrun = dag.timetable.next_dagrun_info(...).logical_date

    # Asset-triggered DAGs (datasets/assets) — из dataset_dag_run_queue
    asset_triggers = session.query(DatasetDagRunQueue).all()
    for trigger in asset_triggers:
        if all_assets_ready(trigger):
            create_dagrun(...)
            session.delete(trigger)

    session.commit()

Ключевые моменты:

  • next_dagrun_create_after — момент, когда следующий run должен быть создан. Обновляется после каждого создания.
  • DagRun сразу попадает в state queued — это значит «scheduler знает, что надо запускать, ждёт перехода в running».
  • Dataset-triggered runs — из таблицы dataset_dag_run_queue (если все необходимые datasets обновлены — Datasets API, AIP-48 с 2.4+).
  • В 3.x добавилось dag_version_id (AIP-63 DAG Versioning) — каждый DagRun привязан к версии bundle.

Фаза 2: Schedule TI

Для каждого DagRun(state in [queued, running]):

# Псевдокод _schedule_dag_run
def _schedule_dag_run(self, dag_run, session):
    dag = serialized_dag_to_dag(dag_run.dag_id)
    task_instances = dag_run.get_task_instances(session)

    for ti in task_instances:
        if ti.state in TERMINAL_STATES:  # success, failed, skipped
            continue

        if ti.state == NONE:
            # Check upstream deps
            if all_upstreams_done(ti):
                # All deps satisfied — set scheduled
                ti.state = SCHEDULED
                ti.scheduled_dttm = now()
                session.merge(ti)

        elif ti.state == UP_FOR_RETRY:
            # Check retry_delay
            if now() - ti.end_date >= ti.retry_delay:
                ti.state = SCHEDULED

        elif ti.state == UP_FOR_RESCHEDULE:
            # Sensor reschedule mode
            next_reschedule = get_next_reschedule_date(ti)
            if now() >= next_reschedule:
                ti.state = SCHEDULED

    # If all TI done, transition DagRun
    if all(ti.state in TERMINAL_STATES for ti in task_instances):
        if any(ti.state == FAILED for ti in task_instances):
            dag_run.state = DagRunState.FAILED
        else:
            dag_run.state = DagRunState.SUCCESS

    elif dag_run.state == QUEUED and any(ti.state == SCHEDULED for ti in task_instances):
        dag_run.state = DagRunState.RUNNING

    session.commit()

Состояния TaskInstance на этой фазе:

  • nonescheduled (все deps satisfied)
  • up_for_retryscheduled (retry_delay прошёл)
  • up_for_reschedulescheduled (sensor reschedule timer)
  • DagRun queuedrunning (хотя бы один TI scheduled)

Эта фаза O(N) по числу tasks в active DagRuns. Для DAG с 1000 mapped tasks через expand это становится bottleneck — scheduler loop может затянуться.


Фаза 3: Critical Section — переход в queued

Здесь начинается самое интересное. Critical section защищён row-level lock на slot_pool:

# Псевдокод critical_section
def _critical_section_enqueue_task_instances(self, session):
    # Acquire row-level lock на ВСЕХ строках slot_pool
    pools = session.query(Pool).with_for_update(nowait=True).all()
    # ↑ SELECT * FROM slot_pool FOR UPDATE NOWAIT

    # Если другой scheduler уже в critical section — NOWAIT → exception → skip tick
    # (это нормально для HA scheduler)

    # Получаем все scheduled TI, отсортированные по priority
    task_instances = (
        session.query(TaskInstance)
        .filter(TaskInstance.state == SCHEDULED)
        .order_by(TaskInstance.priority_weight.desc())
        .with_for_update(skip_locked=True)  # ← другие scheduler skip эти rows
        .limit(MAX_TIS_PER_QUERY)  # default 16
        .all()
    )

    for ti in task_instances:
        # Проверяем concurrency limits
        if not check_concurrency_limits(ti, pools):
            continue

        # Проверяем pool slot availability
        if not has_pool_slot(ti.pool, pools):
            continue

        # Atomic: переводим в queued
        ti.state = QUEUED
        ti.queued_dttm = now()

        # Кладём в executor queue
        executor.queue_command(ti.command())

    # Critical section ends — commit
    session.commit()

Что важно:

  1. SELECT * FROM slot_pool FOR UPDATE NOWAIT — атомарная блокировка всех pool rows. Если второй scheduler пытается войти — получает ошибку lock_not_available, тихо пропускает текущий тик.
  2. MAX_TIS_PER_QUERY (default 16) — batch size. Слишком маленький → много циклов; слишком большой → дольше в section.
  3. После переключения TI → queued, scheduler передаёт workload в executor. Executor сам отвечает за queued → running.

Фаза 4: Housekeeping

Несколько вспомогательных операций:

Adopt orphans

-- Найти мёртвые scheduler-ы (heartbeat прокис)
UPDATE job SET state='failed'
WHERE job_type='SchedulerJob' AND state='running'
  AND latest_heartbeat < now() - interval '30 seconds';

-- Adopt их TI (queued, running, restarting)
UPDATE task_instance SET state=NULL  -- reset to scheduled
WHERE state IN ('queued', 'running', 'restarting')
  AND ti.job_id IN (SELECT id FROM job WHERE state='failed');

Check zombies

-- Найти TI, чей heartbeat прокис
SELECT * FROM task_instance
WHERE state='running'
  AND latest_heartbeat < now() - interval '300 seconds';  -- scheduler_zombie_task_threshold

Найденные zombie tasks помечаются failed и (если есть retries) переводятся в up_for_retry.

Emit metrics

# StatsD / OTel
metrics.gauge("scheduler.scheduler_loop_duration", elapsed_seconds)
metrics.gauge("executor.open_slots", executor.slots_available)
metrics.gauge("scheduler.tasks.executable", num_scheduled)
metrics.counter("scheduler.tasks.killed_externally", ...)

Идемпотентность пайплайнов — повтор не ломает данные

Ключевые конфиги

ПараметрDefaultЧто
scheduler_heartbeat_sec5Период main loop
min_file_process_interval30Не парсить DAG-файл чаще
parsing_processes2Parallel DagFileProcessor (рекомендация 2× vCPU, Astronomer)
scheduler_health_check_threshold30Timeout для считать scheduler мёртвым
max_dagruns_per_loop_to_schedule20Batch size for examine
max_tis_per_query16Batch enqueue в critical section
scheduler_zombie_task_threshold300Heartbeat для tasks
dag_dir_list_interval300Scan папки DAGs на новые файлы

Что наблюдать в metadata DB

В live system из psql:

-- Что делает scheduler сейчас
SELECT id, dag_id, state, queued_dttm, start_date, latest_heartbeat
FROM task_instance
WHERE state IN ('scheduled', 'queued', 'running')
ORDER BY queued_dttm DESC LIMIT 20;

-- Scheduler heartbeats (для HA debug)
SELECT id, job_type, state, latest_heartbeat,
       now() - latest_heartbeat AS gap
FROM job WHERE job_type='SchedulerJob'
ORDER BY latest_heartbeat DESC;

-- Pool occupancy
SELECT pool, slots, used_slots, queued_slots, scheduled_slots
FROM slot_pool;

-- Active locks (для critical section анализа)
SELECT relation::regclass, mode, granted, query
FROM pg_locks pl
JOIN pg_stat_activity pa ON pl.pid = pa.pid
WHERE pl.relation = 'slot_pool'::regclass;

В следующем уроке мы запустим HA Scheduler Race lab — два scheduler-а в Docker, и через pg_locks будем смотреть, как они конкурируют за critical section.


Проверка знанийKnowledge check
Почему в фазе 3 (critical section) scheduler берёт row-level lock именно на slot_pool, а не на task_instance напрямую?
ОтветAnswer
Slot_pool — это discriminator, через который scheduler-ы координируются. Локая slot_pool гарантирует: (1) одновременно только один scheduler делает enqueue decisions, что предотвращает race condition при проверке concurrency limits (если два scheduler-а одновременно увидят 1 free slot, оба могут попытаться занять его); (2) lock на маленькой таблице (slot_pool обычно ≤ 100 строк) дёшев и быстр, в отличие от lock на task_instance (миллионы строк); (3) NOWAIT гарантирует, что второй scheduler получит ошибку и просто пропустит tick, не блокируясь. Это elegant design — distributed coordination через RDBMS lock без Raft/ZK.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какие четыре фазы main loop SchedulerJob в правильном порядке?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6