Learning Platform
Глоссарий Troubleshooting
Урок 02.02 · 22 мин
Продвинутый
SchedulerDagFileProcessorParsingMain Loop

Scheduler deep — что внутри главного процесса

Запустив airflow scheduler, вы запускаете не один процесс, а целое дерево процессов. Главный процесс (parent) держит main loop, координирует state в БД и общается с executor. Внутри него крутится DagFileProcessorManager, который через multiprocessing создаёт пул дочерних процессов (parsing_processes) — они парсят DAG-файлы параллельно. Этот урок — карта этой иерархии и того, что делает каждый процесс.

Детальный walkthrough main loop (с псевдокодом и SQL) — в модуле 04. Здесь — overview архитектуры scheduler-процесса.


Дерево процессов scheduler

Когда вы делаете ps aux | grep airflow:

airflow scheduler                  # parent — SchedulerJob main loop
├── airflow scheduler (DAG processor manager)  # forked manager
│   ├── airflow scheduler (parsing process 1)  # DagFileProcessor #1
│   └── airflow scheduler (parsing process 2)  # DagFileProcessor #2

Это default 2.x. Если включён standalone DAG Processor (опциональная оптимизация), дерево разделяется:

airflow scheduler                          # parent — только main loop
airflow dag-processor                      # standalone manager
├── airflow dag-processor (parsing 1)
└── airflow dag-processor (parsing 2)
Дерево процессов scheduler в 2.x
SchedulerJob (parent process)Главный процесс. Запущен через `airflow scheduler`. Держит main event loop: create DagRun-ы, schedule TI, critical section enqueue, housekeeping. Heartbeat пишет в таблицу job каждый scheduler_heartbeat_sec.
multiprocessing.Process (fork)
DagFileProcessorManagerДочерний процесс, forked из parent. Manages пул DagFileProcessor-ов. Сканирует папку DAGs (dag_dir_list_interval), решает какой файл когда парсить (min_file_process_interval), передаёт задачи parsing процессам.
multiprocessing.Process × N
DagFileProcessor #1Дочерний процесс manager-а. Один из parsing_processes. Каждый раз когда manager даёт ему файл — этот процесс импортирует .py, инстанцирует DAG-объекты, сериализует в JSON и пишет в таблицу serialized_dag. После — завершается (один файл = один процесс, для изоляции).
DagFileProcessor #2То же что #1, параллельно. По default parsing_processes = 2. Astronomer рекомендует 2× vCPU для production.

Что делает каждый процесс

Parent — SchedulerJob main loop

Главный процесс, который вы запустили командой airflow scheduler. Он:

  • Не парсит DAGs сам — это делает manager + parsing processes.
  • Читает serialized_dag из БД для своих decisions.
  • Бежит main loop каждые scheduler_heartbeat_sec (default 5s):
    • Phase 1: создаёт DagRuns по timetable + asset events
    • Phase 2: schedule TI (none → scheduled)
    • Phase 3: critical section (scheduled → queued + push to executor)
    • Phase 4: housekeeping (zombies, orphans, metrics)
  • Пишет heartbeat в таблицу job для HA detection.
  • Конкурирует с другими scheduler-ами через row-level lock на slot_pool (Module 04 lesson 02).

DagFileProcessorManager

Дочерний процесс, отвечающий за координацию parsing. Forked в начале работы scheduler:

# Упрощённо в SchedulerJob.__init__
self.processor_agent = DagFileProcessorAgent(
    dag_directory=conf.get('core', 'dags_folder'),
    parsing_processes=conf.getint('scheduler', 'parsing_processes'),
)
self.processor_agent.start()  # forks DagFileProcessorManager

Manager:

  • Сканирует папку DAGs каждые dag_dir_list_interval секунд (default 300s = 5 минут) — ищет новые файлы.
  • Поддерживает list known DAG files с last_parsed_time.
  • Решает приоритет: какой файл обработать сейчас. Учитывает min_file_process_interval (не парсить файл чаще раз в 30s по default).
  • Распределяет работу между N parsing processes через очередь.
  • Собирает результаты: каждый parsing process завершает работу с info “сколько DAGs обработал, какие ошибки” — manager аггрегирует.

DagFileProcessor (parsing processes)

Каждый раз когда manager даёт файл — создаётся новый процесс через multiprocessing.Process. Этот процесс:

  1. Импортирует .py файл (как import в Python).
  2. Находит все DAG-объекты в module namespace.
  3. Валидирует DAG (нет cycle, has start_date, dag_id valid, etc).
  4. Сериализует в JSON и пишет в таблицу serialized_dag (через SQLAlchemy direct в 2.x).
  5. Завершается — короткоживущий процесс.

Использование отдельного процесса на каждый файл — намеренная изоляция:

  • DAG-файл может крашнуть процесс (например, segfault в C extension)
  • Память не утекает между разными DAGs
  • Один битый DAG не блокирует другие
NOTE

Это 2.x behavior. В 3.x DagFileProcessor вынесен в standalone process (airflow dag-processor) и стал mandatory отдельным процессом со своим lifecycle. Это улучшает изоляцию: медленный DAG больше не мешает scheduler loop напрямую (хотя и в 2.x с отдельным DagFileProcessorManager это уже было mitigated).


Стандарт vs Standalone DAG Processor в 2.x

В 2.x есть два варианта запуска parsing:

Стандартный (default) — DAG Processor встроен в scheduler

airflow scheduler   # запускает scheduler + DagFileProcessorManager + N parsing processes

Это default. Простой, single-binary. Хорошо для маленьких deployments.

Standalone — отдельный процесс

airflow scheduler        # только main loop, не парсит
airflow dag-processor    # отдельно — manager + parsing processes

Конфиг:

[scheduler]
standalone_dag_processor = True

Преимущества для production:

  • Изоляция CPU/memory: parsing process не делит ресурсы с main loop.
  • Independent scaling: можно запустить 2 scheduler-а + 1 dag-processor (parsing не bottleneck).
  • Отдельные logs: видно отдельно, что делает parsing.
  • Migration path к 3.x: в 3.x этот режим стал mandatory.

Production recommendation: для >500 DAGs или сложных DAGs с тяжёлым parsing — включайте standalone.


Ключевые конфиги parsing

ПараметрDefaultЧто
parsing_processes2Сколько параллельных DagFileProcessor процессов
min_file_process_interval30Не парсить один файл чаще раз в N секунд
dag_dir_list_interval300Сканировать папку DAGs на новые/удалённые файлы
parsing_pre_import_modulesTruePre-import heavy modules в parent для speedup
dag_file_processor_timeout50Timeout на parse одного файла
print_stats_interval30Logs со статистикой parsing
standalone_dag_processorFalseИспользовать standalone (рекомендуется в production)

Тюнинг для большого числа DAGs

Если у вас 5000+ DAGs:

[scheduler]
parsing_processes = 4              # 2× vCPU (Astronomer recommendation)
min_file_process_interval = 60     # реже парсить, меньше CPU
dag_dir_list_interval = 600        # реже scan папки
standalone_dag_processor = True    # отдельный процесс

И не делайте top-level network calls в DAG-файлах! Каждый parse будет hit external service.


Что хранит scheduler в metadata DB

Каждый tick scheduler чтит/пишет в этих таблицах:

ТаблицаЧто
jobHeartbeat scheduler (для HA detection)
daglast_parsed_time, is_paused, next_dagrun
serialized_dagСериализованная версия DAG (читает)
dag_runCreate new runs, transition states
task_instanceSchedule TI (none → scheduled → queued)
slot_poolCritical section lock + concurrency limits
dataset_eventRead для asset-triggered runs
dataset_dag_run_queueProcess pending dataset triggers

В модуле 04 мы детально пройдёмся по SQL-запросам, которые scheduler выполняет на каждом tick.


Production observability scheduler

Ключевые метрики (через OTel с 2.10+ или StatsD):

scheduler.scheduler_loop_duration       # должно быть < scheduler_heartbeat_sec
scheduler.tasks.executable              # сколько TI готовы к queued
scheduler.dag_processing.total_parse_time  # общее время parsing
scheduler.dag_processing.last_runtime     # последний parsing run
scheduler.dag_processing.import_errors    # количество DAGs с ошибками
scheduler.executor.open_slots             # available worker capacity

Если scheduler_loop_duration > heartbeat_sec — scheduler не успевает. Причины: слишком много active DagRuns, медленный DB, parsing блокирует loop (миграция на standalone).


Проверка знанийKnowledge check
В Airflow 2.x при запуске `airflow scheduler` создаётся дерево процессов. Что делает каждый процесс, и почему DagFileProcessor — отдельный процесс на каждый файл (не thread)?
ОтветAnswer
Дерево: parent (SchedulerJob main loop) → DagFileProcessorManager (управление parsing) → N DagFileProcessor (один процесс на каждый файл, короткоживущий). Parent делает scheduling decisions, DAG Processor — parsing. Каждый parsing — отдельный multiprocessing.Process для трёх причин: (1) изоляция краша — если DAG-файл сегфолтит из-за битого C extension, упадёт только parsing process, не scheduler; (2) изоляция памяти — Python не освобождает память между imports, процесс завершается полностью и память освобождается; (3) cleanup state — globals и cached imports не утекают между DAGs. Альтернатива — threading — не работает: GIL ограничивает CPU parallelism, и изоляция краша не достигается.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что делает DagFileProcessor в Airflow 2.x?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6