Learning Platform
Глоссарий Troubleshooting
Урок 06.02 · 22 мин
Продвинутый
LocalExecutorSequentialExecutormultiprocessingparallelismSingle-node

LocalExecutor internals — multiprocessing.Queue и параллелизм внутри scheduler-процесса

LocalExecutor — самый недооценённый executor 2.x. Многие смотрят на него как на «для разработки», но в реальности он отлично обслуживает small-to-medium production deployments (до 100 tasks/min) на одной машине. И главное — он самый простой для понимания: никаких brokers, никаких pod-ов, никаких external dependencies. Только multiprocessing Python и metadata DB.

В этом уроке мы препарируем его до multiprocessing.Queue, os.fork() и airflow run subprocess. Поймём, почему parallelism=32 иногда означает «32 forked процесса, каждый по 200MB RES» и почему single-node всегда упирается либо в memory, либо в DB connections.


Архитектура одной картинкой

LocalExecutor живёт в том же процессе, что и scheduler. Это критическое отличие от Celery (отдельные worker pods) и Kubernetes (pod-per-task). Workflow:

LocalExecutor: процессы и очередь
Scheduler processГлавный процесс airflow scheduler. Внутри него живут: SchedulerJobRunner, DagFileProcessorManager (или отдельный процесс), LocalExecutor instance. Все они делят один interpreter и память.
execute_async()
multiprocessing.QueueСтандартная Python очередь между процессами. Использует pipe + lock внутри. Scheduler кладёт TaskInstanceKey + command, worker процесс читает. Никаких внешних брокеров — это IPC через ядро ОС.
fork() worker processes
Worker-1Forked process. Получает (key, command) из queue, запускает subprocess.Popen('airflow tasks run dag_id task_id ...'). Ждёт завершения, кладёт result в result_queue.
Worker-2Аналогичный forked process. Параллельно подбирает следующую task из той же task_queue. N таких worker-процессов запускается по parallelism config.
Worker-NN = [core] parallelism из конфига (default 32). Каждый worker — отдельный Python process с собственным memory footprint. На больших parallelism — десятки GB RAM.
exit code + state
result_queue → DBWorker процесс пишет state (success/failed) обратно в multiprocessing.Queue, scheduler читает и обновляет task_instance в metadata DB. Это вторая Queue, отделена от task_queue.

Главные точки:

  • Один процесс scheduler = один LocalExecutor. Поднять второй scheduler с LocalExecutor можно (HA через row-locks работает), но у каждого свой набор worker-процессов.
  • Workers — forked Python processes, не threads. Это значит, что они полностью изолированы по памяти, но дорого по RES (~200MB on bare Airflow).
  • task_queue — multiprocessing.Queue, базовая Python примитива (pipe + lock). Никаких Redis, никаких RabbitMQ.

Что именно делает worker

В airflow.executors.local_executor worker процесс — это вечный loop:

# Псевдокод LocalWorker
def run(self):
    while True:
        try:
            key, command = self.task_queue.get(timeout=1)
        except Empty:
            continue

        if key is None:  # sentinel — shutdown
            break

        # command — list like ['airflow', 'tasks', 'run', 'dag_id', 'task_id', '2026-05-12']
        try:
            subprocess.check_call(command, close_fds=True)
            state = State.SUCCESS
        except subprocess.CalledProcessError:
            state = State.FAILED

        self.result_queue.put((key, state))
        self.task_queue.task_done()

Несколько важных деталей:

  1. subprocess.check_call — это ещё один процесс. Так что workflow: scheduler (1) → fork → worker (N processes) → subprocess → airflow tasks run (N processes). На parallelism=32 это 64+ процесса на ноде.
  2. Worker не выполняет user code сам. Он запускает airflow tasks run <args> через subprocess, который заново парсит DAG-файл, импортирует callable, и зовёт его. Это даёт изоляцию от crash user-кода — exception в task не убьёт worker.
  3. close_fds=True — закрывает file descriptors при fork, чтобы child не наследовал open connections к Postgres (типичная DB leak проблема).

SequentialExecutor — только для testing

SequentialExecutor — это вырожденный случай LocalExecutor с parallelism=1. На каждый tick scheduler запускает task синхронно (через subprocess.run), ждёт окончания, переходит к следующей. Никакого параллелизма.

[core]
executor = SequentialExecutor
sql_alchemy_conn = sqlite:////path/to/airflow.db

Зачем оставлен:

  • Только SQLite-совместим (SQLite не любит concurrent writes)
  • Для smoke-test setup, airflow standalone, CI tests
  • Удалён в Airflow 3.0 — там SQLite больше не поддерживается
WARNING

SequentialExecutor категорически нельзя в production. Даже для small deployment используйте LocalExecutor + Postgres. SequentialExecutor не масштабируется и блокируется на первой long task.


Конфигурация: parallelism, dag_concurrency, max_active_runs

В [core] несколько параметров одновременности, которые работают одновременно (применяется минимум):

[core]
executor = LocalExecutor
parallelism = 32                       # total tasks одновременно на all DAGs
max_active_tasks_per_dag = 16          # max running tasks per DAG (default 16)
max_active_runs_per_dag = 16           # max parallel DagRuns per DAG (default 16)

[scheduler]
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5

Сценарий: DAG с 100 tasks, parallelism=32, max_active_tasks_per_dag=16 → одновременно крутятся 16 (минимум из двух). Если запустить второй DAG, второй тоже получит до 16 → итого 32 → блок на parallelism=32.

ПараметрДействиеКогда хитит
parallelismГлобальный кап на all running TIМного DAG-ов одновременно
max_active_tasks_per_dagPer-DAG капWide DAG с многими parallel tasks
max_active_runs_per_dagPer-DAG кап на active DagRunsBackfill истории
Pool slotsPer-pool капShared resource (DB pool, API rate limit)

Ограничения LocalExecutor

1. Single-node

LocalExecutor живёт в одном процессе scheduler-а. Нельзя масштабировать tasks горизонтально:

  • Если scheduler на машине с 8 CPU / 32 GB RAM → потолок ~16 параллельных tasks (зависит от task footprint)
  • Поднять scheduler на другой машине = поднять второй LocalExecutor. Они не делят workload — у каждого свой queue.

2. Memory-bound

Каждый worker процесс + subprocess = ~150-300 MB RES на bare Airflow. На parallelism=32:

32 worker processes  × 200 MB =  6.4 GB
32 subprocesses (airflow tasks run) × 250 MB = 8 GB
─────────────────────────────────────────────
Total LocalExecutor footprint:    ~14.4 GB

Плюс сам scheduler-процесс (~500 MB) и DAG-парсинг (~1 GB). На 16 GB ноде уже впритык. Если task делает heavy lifting (pandas DataFrame, ML model load) — добавьте ещё.

3. DB connection limit

Каждый subprocess airflow tasks run открывает свой connection к Postgres. На parallelism=32 — 32+ active connections, плюс scheduler, плюс webserver. PostgreSQL default max_connections=100 — упирается.

Решение: PgBouncer in transaction mode перед Postgres, или поднимать max_connections.


Когда LocalExecutor — правильный выбор

LocalExecutor отлично работает в следующих случаях:

СценарийПочему ок
Dev environmentМинимум setup — одна машина + PostgreSQL
Small production (<100 tasks/min)Простая операция, нет overhead Celery/K8s
One team, one machineНе нужно межтимовое isolation
Cost-sensitive deploymentНет idle workers, нет K8s cluster
Onprem без K8sАльтернативы — Celery (требует Redis), Local — самый дешёвый

И — не подходит когда:

  • >200 tasks/min — single node bottleneck
  • Mixed Python deps — LocalExecutor делит environment scheduler-а
  • GPU/heavy isolation — каждая task в общем процессе
  • HA для workers — нет (HA только для scheduler)

Production observability: SQL для LocalExecutor

Несмотря на простоту, надо мониторить. Полезные queries к metadata DB:

-- Сколько TI в каждом state, by executor (если multiple)
SELECT
    state,
    queued_by_job_id,
    COUNT(*) AS ti_count
FROM task_instance
WHERE start_date > now() - interval '1 hour'
GROUP BY state, queued_by_job_id
ORDER BY ti_count DESC;
-- Tasks, которые «застряли» в queued > 1 minute
-- Признак, что worker занят или executor zombie
SELECT
    dag_id,
    task_id,
    run_id,
    queued_dttm,
    now() - queued_dttm AS queued_for
FROM task_instance
WHERE state = 'queued'
  AND queued_dttm < now() - interval '1 minute'
ORDER BY queued_dttm
LIMIT 20;
-- Active connections от Airflow к Postgres
SELECT
    application_name,
    state,
    COUNT(*) AS conn_count
FROM pg_stat_activity
WHERE application_name LIKE '%airflow%'
GROUP BY application_name, state
ORDER BY conn_count DESC;

На LocalExecutor: scheduler + workers + subprocesses + webserver — общее число активных connections не должно превышать max_connections * 0.7. Иначе — PgBouncer.


Жизненный цикл TaskInstance в LocalExecutor

State machine TaskInstance в LocalExecutor
scheduledScheduler решил, что TI можно запустить (deps satisfied). Phase 2 main loop поставила state=scheduled. Ещё не в executor queue.
critical section enqueue
queuedCritical section перевела state в queued и вызвала executor.queue_command(). Внутри LocalExecutor — put в multiprocessing.Queue. Worker подберёт в порядке FIFO.
worker.get() + subprocess.run
runningWorker fork-нул subprocess `airflow tasks run …`. Subprocess открыл DB connection, UPDATE task_instance SET state='running'. Запускает user callable.
callable return / raise
success / failedSubprocess завершается. Worker читает exit code: 0 → SUCCESS, non-zero → FAILED. Кладёт (key, state) в result_queue. Scheduler читает, обновляет TI в DB. Если failed + retries — переход в up_for_retry.

Production gotchas

Gotcha 1: parallelism слишком высокий → OOM scheduler-а

Видел случай: команда подняла parallelism=64 на ноде с 16 GB RAM. Через час scheduler упал по OOM, потому что каждый worker процесс заюзал 250 MB, subprocess ещё 250 MB. 64 × 500 = 32 GB — больше чем доступно.

Fix: parallelism = min(2 × CPU, RAM_GB / 0.5). Для 8 CPU, 16 GB → parallelism=16-32, но мониторить RES.

Gotcha 2: stale subprocess после scheduler restart

При kill -9 scheduler worker и subprocess остаются работать (не получили SIGTERM от parent). User-code продолжает писать в DB. Когда новый scheduler стартует, он находит TI в running, но без активного scheduler-а.

Mitigation:

  • Graceful shutdown через SIGTERM (handled by scheduler)
  • При scheduler_zombie_task_threshold (default 300s) zombie task будет detected и reset
  • Не использовать kill -9, только kill (SIGTERM)

Gotcha 3: shared module imports → globals contamination

Один worker импортирует module my_lib, другой тоже. Если в my_lib есть mutable global state — он шарится между tasks (одного worker-а). Это редко проблема, но иногда видим: «task случайно увидел state от предыдущего run».

Fix: Не использовать mutable globals в DAG-коде. Если уж надо — worker_max_tasks_per_child нет в LocalExecutor (это Celery setting), но можно ограничить через container restart.

Gotcha 4: DB connection leak

Если subprocess не закрывает DB connection (например, exception до finally), connection остаётся в idle. На parallelism=32 и долгом uptime — accumulating idle connections.

Detection:

SELECT state, count(*) FROM pg_stat_activity
WHERE application_name LIKE 'airflow%' GROUP BY state;
-- watch для idle in transaction

Сравнение с Celery (preview)

AspectLocalExecutorCeleryExecutor (для контраста)
АрхитектураОдин процессDistributed
Workersmultiprocessing.fork()Long-living Celery workers
Queuemultiprocessing.Queue (IPC)Redis / RabbitMQ
Result backendПрямой DB write от subprocessResult backend (DB)
Setup complexityМинимум — только AirflowRedis + workers + config
ScalingVertical (RAM/CPU ноды)Horizontal (add workers)
Failure isolationSubprocess crash изолированWorker crash может потерять task (acks_late)
Best for<100 tasks/min, single-nodeHigh throughput, multi-node

В следующем уроке — CeleryExecutor deep dive — препарируем broker, prefetch pitfall и worker autoscaling до уровня message flow.


Проверка знанийKnowledge check
На ноде с 8 CPU / 16 GB RAM настроили LocalExecutor с parallelism=64. После 30 минут scheduler падает с OOM. Что произошло и как починить?
ОтветAnswer
LocalExecutor создаёт N worker процессов и каждый запускает subprocess `airflow tasks run`. Footprint: worker ~200 MB + subprocess ~250 MB = ~450 MB на task. При parallelism=64 → 64 × 450 = 28.8 GB, плюс scheduler + DAG parsing + webserver. На 16 GB машине это в 2 раза превышает доступную память. Postgres connection limit (default 100) тоже хитится. Fix: (1) snizit parallelism до 16-24 для этого железа; (2) формула parallelism ≈ min(2 × CPU, RAM_GB × 2); (3) поставить PgBouncer перед Postgres; (4) мониторить RES процессов через `ps aux | grep airflow`; (5) если throughput не хватает — переход на CeleryExecutor (workers на отдельных нодах) или KubernetesExecutor (pod-per-task).

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Где живёт LocalExecutor с точки зрения процессов?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7