Scheduler deep — что внутри главного процесса
Запустив airflow scheduler, вы запускаете не один процесс, а целое дерево процессов. Главный процесс (parent) держит main loop, координирует state в БД и общается с executor. Внутри него крутится DagFileProcessorManager, который через multiprocessing создаёт пул дочерних процессов (parsing_processes) — они парсят DAG-файлы параллельно. Этот урок — карта этой иерархии и того, что делает каждый процесс.
Детальный walkthrough main loop (с псевдокодом и SQL) — в модуле 04. Здесь — overview архитектуры scheduler-процесса.
Дерево процессов scheduler
Когда вы делаете ps aux | grep airflow:
airflow scheduler # parent — SchedulerJob main loop
├── airflow scheduler (DAG processor manager) # forked manager
│ ├── airflow scheduler (parsing process 1) # DagFileProcessor #1
│ └── airflow scheduler (parsing process 2) # DagFileProcessor #2
Это default 2.x. Если включён standalone DAG Processor (опциональная оптимизация), дерево разделяется:
airflow scheduler # parent — только main loop
airflow dag-processor # standalone manager
├── airflow dag-processor (parsing 1)
└── airflow dag-processor (parsing 2)
Что делает каждый процесс
Parent — SchedulerJob main loop
Главный процесс, который вы запустили командой airflow scheduler. Он:
- Не парсит DAGs сам — это делает manager + parsing processes.
- Читает serialized_dag из БД для своих decisions.
- Бежит main loop каждые
scheduler_heartbeat_sec(default 5s):- Phase 1: создаёт DagRuns по timetable + asset events
- Phase 2: schedule TI (none → scheduled)
- Phase 3: critical section (scheduled → queued + push to executor)
- Phase 4: housekeeping (zombies, orphans, metrics)
- Пишет heartbeat в таблицу
jobдля HA detection. - Конкурирует с другими scheduler-ами через row-level lock на
slot_pool(Module 04 lesson 02).
DagFileProcessorManager
Дочерний процесс, отвечающий за координацию parsing. Forked в начале работы scheduler:
# Упрощённо в SchedulerJob.__init__
self.processor_agent = DagFileProcessorAgent(
dag_directory=conf.get('core', 'dags_folder'),
parsing_processes=conf.getint('scheduler', 'parsing_processes'),
)
self.processor_agent.start() # forks DagFileProcessorManager
Manager:
- Сканирует папку DAGs каждые
dag_dir_list_intervalсекунд (default 300s = 5 минут) — ищет новые файлы. - Поддерживает list known DAG files с last_parsed_time.
- Решает приоритет: какой файл обработать сейчас. Учитывает
min_file_process_interval(не парсить файл чаще раз в 30s по default). - Распределяет работу между N parsing processes через очередь.
- Собирает результаты: каждый parsing process завершает работу с info “сколько DAGs обработал, какие ошибки” — manager аггрегирует.
DagFileProcessor (parsing processes)
Каждый раз когда manager даёт файл — создаётся новый процесс через multiprocessing.Process. Этот процесс:
- Импортирует .py файл (как
importв Python). - Находит все DAG-объекты в module namespace.
- Валидирует DAG (нет cycle, has start_date, dag_id valid, etc).
- Сериализует в JSON и пишет в таблицу
serialized_dag(через SQLAlchemy direct в 2.x). - Завершается — короткоживущий процесс.
Использование отдельного процесса на каждый файл — намеренная изоляция:
- DAG-файл может крашнуть процесс (например, segfault в C extension)
- Память не утекает между разными DAGs
- Один битый DAG не блокирует другие
Это 2.x behavior. В 3.x DagFileProcessor вынесен в standalone process (airflow dag-processor) и стал mandatory отдельным процессом со своим lifecycle. Это улучшает изоляцию: медленный DAG больше не мешает scheduler loop напрямую (хотя и в 2.x с отдельным DagFileProcessorManager это уже было mitigated).
Стандарт vs Standalone DAG Processor в 2.x
В 2.x есть два варианта запуска parsing:
Стандартный (default) — DAG Processor встроен в scheduler
airflow scheduler # запускает scheduler + DagFileProcessorManager + N parsing processes
Это default. Простой, single-binary. Хорошо для маленьких deployments.
Standalone — отдельный процесс
airflow scheduler # только main loop, не парсит
airflow dag-processor # отдельно — manager + parsing processes
Конфиг:
[scheduler]
standalone_dag_processor = True
Преимущества для production:
- Изоляция CPU/memory: parsing process не делит ресурсы с main loop.
- Independent scaling: можно запустить 2 scheduler-а + 1 dag-processor (parsing не bottleneck).
- Отдельные logs: видно отдельно, что делает parsing.
- Migration path к 3.x: в 3.x этот режим стал mandatory.
Production recommendation: для >500 DAGs или сложных DAGs с тяжёлым parsing — включайте standalone.
Ключевые конфиги parsing
| Параметр | Default | Что |
|---|---|---|
parsing_processes | 2 | Сколько параллельных DagFileProcessor процессов |
min_file_process_interval | 30 | Не парсить один файл чаще раз в N секунд |
dag_dir_list_interval | 300 | Сканировать папку DAGs на новые/удалённые файлы |
parsing_pre_import_modules | True | Pre-import heavy modules в parent для speedup |
dag_file_processor_timeout | 50 | Timeout на parse одного файла |
print_stats_interval | 30 | Logs со статистикой parsing |
standalone_dag_processor | False | Использовать standalone (рекомендуется в production) |
Тюнинг для большого числа DAGs
Если у вас 5000+ DAGs:
[scheduler]
parsing_processes = 4 # 2× vCPU (Astronomer recommendation)
min_file_process_interval = 60 # реже парсить, меньше CPU
dag_dir_list_interval = 600 # реже scan папки
standalone_dag_processor = True # отдельный процесс
И не делайте top-level network calls в DAG-файлах! Каждый parse будет hit external service.
Что хранит scheduler в metadata DB
Каждый tick scheduler чтит/пишет в этих таблицах:
| Таблица | Что |
|---|---|
job | Heartbeat scheduler (для HA detection) |
dag | last_parsed_time, is_paused, next_dagrun |
serialized_dag | Сериализованная версия DAG (читает) |
dag_run | Create new runs, transition states |
task_instance | Schedule TI (none → scheduled → queued) |
slot_pool | Critical section lock + concurrency limits |
dataset_event | Read для asset-triggered runs |
dataset_dag_run_queue | Process pending dataset triggers |
В модуле 04 мы детально пройдёмся по SQL-запросам, которые scheduler выполняет на каждом tick.
Production observability scheduler
Ключевые метрики (через OTel с 2.10+ или StatsD):
scheduler.scheduler_loop_duration # должно быть < scheduler_heartbeat_sec
scheduler.tasks.executable # сколько TI готовы к queued
scheduler.dag_processing.total_parse_time # общее время parsing
scheduler.dag_processing.last_runtime # последний parsing run
scheduler.dag_processing.import_errors # количество DAGs с ошибками
scheduler.executor.open_slots # available worker capacity
Если scheduler_loop_duration > heartbeat_sec — scheduler не успевает. Причины: слишком много active DagRuns, медленный DB, parsing блокирует loop (миграция на standalone).