LocalExecutor internals — multiprocessing.Queue и параллелизм внутри scheduler-процесса
LocalExecutor — самый недооценённый executor 2.x. Многие смотрят на него как на «для разработки», но в реальности он отлично обслуживает small-to-medium production deployments (до 100 tasks/min) на одной машине. И главное — он самый простой для понимания: никаких brokers, никаких pod-ов, никаких external dependencies. Только multiprocessing Python и metadata DB.
В этом уроке мы препарируем его до multiprocessing.Queue, os.fork() и airflow run subprocess. Поймём, почему parallelism=32 иногда означает «32 forked процесса, каждый по 200MB RES» и почему single-node всегда упирается либо в memory, либо в DB connections.
Архитектура одной картинкой
LocalExecutor живёт в том же процессе, что и scheduler. Это критическое отличие от Celery (отдельные worker pods) и Kubernetes (pod-per-task). Workflow:
Главные точки:
- Один процесс scheduler = один LocalExecutor. Поднять второй scheduler с LocalExecutor можно (HA через row-locks работает), но у каждого свой набор worker-процессов.
- Workers — forked Python processes, не threads. Это значит, что они полностью изолированы по памяти, но дорого по RES (~200MB on bare Airflow).
- task_queue —
multiprocessing.Queue, базовая Python примитива (pipe + lock). Никаких Redis, никаких RabbitMQ.
Что именно делает worker
В airflow.executors.local_executor worker процесс — это вечный loop:
# Псевдокод LocalWorker
def run(self):
while True:
try:
key, command = self.task_queue.get(timeout=1)
except Empty:
continue
if key is None: # sentinel — shutdown
break
# command — list like ['airflow', 'tasks', 'run', 'dag_id', 'task_id', '2026-05-12']
try:
subprocess.check_call(command, close_fds=True)
state = State.SUCCESS
except subprocess.CalledProcessError:
state = State.FAILED
self.result_queue.put((key, state))
self.task_queue.task_done()
Несколько важных деталей:
subprocess.check_call— это ещё один процесс. Так что workflow: scheduler (1) → fork → worker (N processes) → subprocess →airflow tasks run(N processes). Наparallelism=32это 64+ процесса на ноде.- Worker не выполняет user code сам. Он запускает
airflow tasks run <args>через subprocess, который заново парсит DAG-файл, импортирует callable, и зовёт его. Это даёт изоляцию от crash user-кода — exception в task не убьёт worker. close_fds=True— закрывает file descriptors при fork, чтобы child не наследовал open connections к Postgres (типичная DB leak проблема).
SequentialExecutor — только для testing
SequentialExecutor — это вырожденный случай LocalExecutor с parallelism=1. На каждый tick scheduler запускает task синхронно (через subprocess.run), ждёт окончания, переходит к следующей. Никакого параллелизма.
[core]
executor = SequentialExecutor
sql_alchemy_conn = sqlite:////path/to/airflow.db
Зачем оставлен:
- Только SQLite-совместим (SQLite не любит concurrent writes)
- Для smoke-test setup,
airflow standalone, CI tests - Удалён в Airflow 3.0 — там SQLite больше не поддерживается
SequentialExecutor категорически нельзя в production. Даже для small deployment используйте LocalExecutor + Postgres. SequentialExecutor не масштабируется и блокируется на первой long task.
Конфигурация: parallelism, dag_concurrency, max_active_runs
В [core] несколько параметров одновременности, которые работают одновременно (применяется минимум):
[core]
executor = LocalExecutor
parallelism = 32 # total tasks одновременно на all DAGs
max_active_tasks_per_dag = 16 # max running tasks per DAG (default 16)
max_active_runs_per_dag = 16 # max parallel DagRuns per DAG (default 16)
[scheduler]
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5
Сценарий: DAG с 100 tasks, parallelism=32, max_active_tasks_per_dag=16 → одновременно крутятся 16 (минимум из двух). Если запустить второй DAG, второй тоже получит до 16 → итого 32 → блок на parallelism=32.
| Параметр | Действие | Когда хитит |
|---|---|---|
parallelism | Глобальный кап на all running TI | Много DAG-ов одновременно |
max_active_tasks_per_dag | Per-DAG кап | Wide DAG с многими parallel tasks |
max_active_runs_per_dag | Per-DAG кап на active DagRuns | Backfill истории |
| Pool slots | Per-pool кап | Shared resource (DB pool, API rate limit) |
Ограничения LocalExecutor
1. Single-node
LocalExecutor живёт в одном процессе scheduler-а. Нельзя масштабировать tasks горизонтально:
- Если scheduler на машине с 8 CPU / 32 GB RAM → потолок ~16 параллельных tasks (зависит от task footprint)
- Поднять scheduler на другой машине = поднять второй LocalExecutor. Они не делят workload — у каждого свой queue.
2. Memory-bound
Каждый worker процесс + subprocess = ~150-300 MB RES на bare Airflow. На parallelism=32:
32 worker processes × 200 MB = 6.4 GB
32 subprocesses (airflow tasks run) × 250 MB = 8 GB
─────────────────────────────────────────────
Total LocalExecutor footprint: ~14.4 GB
Плюс сам scheduler-процесс (~500 MB) и DAG-парсинг (~1 GB). На 16 GB ноде уже впритык. Если task делает heavy lifting (pandas DataFrame, ML model load) — добавьте ещё.
3. DB connection limit
Каждый subprocess airflow tasks run открывает свой connection к Postgres. На parallelism=32 — 32+ active connections, плюс scheduler, плюс webserver. PostgreSQL default max_connections=100 — упирается.
Решение: PgBouncer in transaction mode перед Postgres, или поднимать max_connections.
Когда LocalExecutor — правильный выбор
LocalExecutor отлично работает в следующих случаях:
| Сценарий | Почему ок |
|---|---|
| Dev environment | Минимум setup — одна машина + PostgreSQL |
| Small production (<100 tasks/min) | Простая операция, нет overhead Celery/K8s |
| One team, one machine | Не нужно межтимовое isolation |
| Cost-sensitive deployment | Нет idle workers, нет K8s cluster |
| Onprem без K8s | Альтернативы — Celery (требует Redis), Local — самый дешёвый |
И — не подходит когда:
- >200 tasks/min — single node bottleneck
- Mixed Python deps — LocalExecutor делит environment scheduler-а
- GPU/heavy isolation — каждая task в общем процессе
- HA для workers — нет (HA только для scheduler)
Production observability: SQL для LocalExecutor
Несмотря на простоту, надо мониторить. Полезные queries к metadata DB:
-- Сколько TI в каждом state, by executor (если multiple)
SELECT
state,
queued_by_job_id,
COUNT(*) AS ti_count
FROM task_instance
WHERE start_date > now() - interval '1 hour'
GROUP BY state, queued_by_job_id
ORDER BY ti_count DESC;
-- Tasks, которые «застряли» в queued > 1 minute
-- Признак, что worker занят или executor zombie
SELECT
dag_id,
task_id,
run_id,
queued_dttm,
now() - queued_dttm AS queued_for
FROM task_instance
WHERE state = 'queued'
AND queued_dttm < now() - interval '1 minute'
ORDER BY queued_dttm
LIMIT 20;
-- Active connections от Airflow к Postgres
SELECT
application_name,
state,
COUNT(*) AS conn_count
FROM pg_stat_activity
WHERE application_name LIKE '%airflow%'
GROUP BY application_name, state
ORDER BY conn_count DESC;
На LocalExecutor: scheduler + workers + subprocesses + webserver — общее число активных connections не должно превышать max_connections * 0.7. Иначе — PgBouncer.
Жизненный цикл TaskInstance в LocalExecutor
Production gotchas
Gotcha 1: parallelism слишком высокий → OOM scheduler-а
Видел случай: команда подняла parallelism=64 на ноде с 16 GB RAM. Через час scheduler упал по OOM, потому что каждый worker процесс заюзал 250 MB, subprocess ещё 250 MB. 64 × 500 = 32 GB — больше чем доступно.
Fix: parallelism = min(2 × CPU, RAM_GB / 0.5). Для 8 CPU, 16 GB → parallelism=16-32, но мониторить RES.
Gotcha 2: stale subprocess после scheduler restart
При kill -9 scheduler worker и subprocess остаются работать (не получили SIGTERM от parent). User-code продолжает писать в DB. Когда новый scheduler стартует, он находит TI в running, но без активного scheduler-а.
Mitigation:
- Graceful shutdown через
SIGTERM(handled by scheduler) - При
scheduler_zombie_task_threshold(default 300s) zombie task будет detected и reset - Не использовать
kill -9, толькоkill(SIGTERM)
Gotcha 3: shared module imports → globals contamination
Один worker импортирует module my_lib, другой тоже. Если в my_lib есть mutable global state — он шарится между tasks (одного worker-а). Это редко проблема, но иногда видим: «task случайно увидел state от предыдущего run».
Fix: Не использовать mutable globals в DAG-коде. Если уж надо — worker_max_tasks_per_child нет в LocalExecutor (это Celery setting), но можно ограничить через container restart.
Gotcha 4: DB connection leak
Если subprocess не закрывает DB connection (например, exception до finally), connection остаётся в idle. На parallelism=32 и долгом uptime — accumulating idle connections.
Detection:
SELECT state, count(*) FROM pg_stat_activity
WHERE application_name LIKE 'airflow%' GROUP BY state;
-- watch для idle in transaction
Сравнение с Celery (preview)
| Aspect | LocalExecutor | CeleryExecutor (для контраста) |
|---|---|---|
| Архитектура | Один процесс | Distributed |
| Workers | multiprocessing.fork() | Long-living Celery workers |
| Queue | multiprocessing.Queue (IPC) | Redis / RabbitMQ |
| Result backend | Прямой DB write от subprocess | Result backend (DB) |
| Setup complexity | Минимум — только Airflow | Redis + workers + config |
| Scaling | Vertical (RAM/CPU ноды) | Horizontal (add workers) |
| Failure isolation | Subprocess crash изолирован | Worker crash может потерять task (acks_late) |
| Best for | <100 tasks/min, single-node | High throughput, multi-node |
В следующем уроке — CeleryExecutor deep dive — препарируем broker, prefetch pitfall и worker autoscaling до уровня message flow.