Learning Platform
Глоссарий Troubleshooting
Урок 06.03 · 35 мин
Продвинутый
CeleryExecutorRedisRabbitMQprefetchKEDAResult backend

CeleryExecutor deep — broker, prefetch pitfall, result backend, KEDA autoscaling

CeleryExecutor — главный workhorse production Airflow. На нём гоняют 80% deployments в опросах Airflow Summit. Он зрелый, простой по операционке (Python + Redis), масштабируется горизонтально и даёт низкую latency (1-3s task startup). Но у него есть один pitfall, на который натыкаются практически все команды: worker_prefetch_multiplier.

В этом уроке мы препарируем CeleryExecutor от scheduler-а до Celery worker prefork pool, разберём message flow через broker, поймём почему prefetch_multiplier=4 это бомба замедленного действия для long-task workloads, и научимся правильно настраивать queues для priority routing.


Архитектура CeleryExecutor

Полная архитектура CeleryExecutor
Schedulerairflow scheduler с CeleryExecutor instance. На enqueue: вызывает app.send_task('airflow.executors.celery_executor.execute_command', args=[command], queue=queue). Это публикует message в broker.
send_task(execute_command, queue=default)
Broker (Redis или RabbitMQ)Message broker. Redis (most common) — LIST per queue, BLPOP для consumer. RabbitMQ — durable queues, ack/nack semantics. Broker = transport, не storage результата. Default broker_url=redis://redis:6379/0.
BRPOPLPUSH / basic.consume
Worker prefork poolairflow celery worker. Master process forks N child processes (worker_concurrency, default 16). Master pulls messages из broker, распределяет round-robin между children. Each child выполняет один task за раз.
Worker prefork poolВторой worker pod на другой ноде или контейнере. Идентичная архитектура. Чем больше workers — тем выше throughput. Конкурируют за messages из одной queue.
execute task + write state
Result Backend (Postgres)Postgres (recommended) или Redis. Airflow использует result_backend для двух вещей: (1) task state updates from worker → scheduler; (2) Celery internal task results. Production — db+postgresql, не redis (durability).
Metadata DBТот же Postgres, обычная Airflow metadata. Worker subprocess `airflow tasks run` пишет state=running, state=success прямо сюда. Scheduler читает изменения.

Важная деталь: result backend и metadata DB — это разные таблицы в одном Postgres, но семантически разные. Result backend это Celery’s view (celery_taskmeta table). Metadata DB — Airflow’s view (task_instance, etc).


Message flow: что реально летит через broker

Когда scheduler enqueue-ит task, он публикует Celery message через AMQP/Redis protocol. Содержимое:

{
  "id": "8a31b8e4-2c1c-4dfa-8e0a-9f7f6d3a2c1b",
  "task": "airflow.executors.celery_executor_utils.execute_command",
  "args": [
    ["airflow", "tasks", "run", "etl_dag", "extract", "manual__2026-05-12T10:00:00+00:00", "--local"]
  ],
  "kwargs": {},
  "retries": 0,
  "eta": null,
  "expires": null,
  "queue": "default",
  "delivery_mode": 2,
  "priority": 0
}

Worker подбирает message из своей subscribed queue (default, либо custom queue) и вызывает execute_command(command_list). Это запускает subprocess airflow tasks run …, как и LocalExecutor — только subprocess живёт на удалённом worker-host.

Kafka Consumer API: как консьюмеры читают из топиков
NOTE

Important — Celery worker НЕ выполняет user code напрямую. Он запускает subprocess airflow tasks run, который заново парсит DAG, импортирует callable, и вызывает его. Это даёт isolation и graceful failure handling, но добавляет ~1-2s overhead на task startup (Python interpreter warmup + DAG parse).


Worker prefork pool: концерн worker_concurrency

Celery worker — это prefork pool: master process + N forked children:

airflow celery worker (master, PID 1)
├─ child-1 (PID 2)  ← один task per child одновременно
├─ child-2 (PID 3)
├─ ...
└─ child-N (PID N+1)

worker_concurrency (default 16) = N (количество children). Каждый child может выполнять один task за раз. Если все 16 заняты, новые messages ждут в prefetch queue (см. ниже).

Tuning rules:

  • Bare worker (no heavy task) → concurrency = 2 × CPU (CPU-bound)
  • Workers с heavy memory tasks (pandas, ML) → concurrency = CPU или меньше
  • I/O-bound tasks (HTTP calls, DB queries) → concurrency = 4 × CPU (хорошо переваривают)

★ worker_prefetch_multiplier — главный pitfall

Это самая популярная боль production Airflow. Препарируем.

Что делает prefetch

Когда worker подключается к broker, он не подбирает messages по одному — он берёт batch авансом:

prefetched = worker_concurrency × worker_prefetch_multiplier
            = 16 × 4 (default)
            = 64 messages, зарезервированных за этим worker-ом

Эти 64 messages не видны другим worker-ам. Они помечены unacked в broker и сидят в local prefetch queue worker-а.

Когда это pitfall

Сценарий: один worker, concurrency=16, prefetch_multiplier=4 → 64 prefetched. В очереди — 200 коротких tasks (1 минута каждая) и 1 long task (1 час).

Worker prefetched 64 messages: 60 коротких + 4 длинных. 16 child процессов берутся за работу. Один из них хитит long task → занят 1 час. Остальные 15 child-ов разбираются с короткими.

Через 30 минут: 60 коротких tasks завершены. 44 коротких task ВСЁ ЕЩЁ сидят в local prefetch worker-а, ждут пока освободятся 16 slots. Но 1 slot занят long task → только 15 slots для 4 prefetched длинных + 44 коротких = bottleneck.

Между тем — broker ещё имеет 136 коротких tasks неконсюмеров (если бы был второй worker — он бы их взял). Но первый worker «зажал» их через prefetch.

Prefetch starvation: long task блокирует prefetched batch
Broker (200 short + 1 long)Изначально все 201 task в одной queue. Worker подключается и prefetches 64 messages (concurrency 16 × multiplier 4). Эти 64 включают long task с вероятностью ~32%.
prefetch 64 messages
Worker prefetched: 63 short + 1 longWorker зарезервировал 64 messages у broker. Остальные 137 short tasks доступны другим workers. Если их нет (single-worker setup) — они просто ждут.
dispatch к child процессам
16 children: 15 short + 1 long startedRound-robin dispatch к child процессам. 1 child получил long task → занят 1 час. Остальные 15 крутят short tasks (1 минута каждая).
через 30 мин — 1 long всё ещё running
Throughput: 15/16 slots active = 94%Кажется ок. НО: 48 prefetched short tasks стоят в очереди worker-а. Если у вас есть второй worker (или KEDA scaled up) — он не может их подобрать, потому что они уже в prefetch у первого. Throughput всей системы ограничен 15 slots на одном worker, хотя ресурсов больше.

Fix: worker_prefetch_multiplier = 1

[celery]
worker_prefetch_multiplier = 1   # default 4 — это для super-short tasks

Это значит: worker prefetches только concurrency × 1 = 16 messages. Long task в slot не блокирует prefetched batch — остальные 16 slots сразу подбирают новые messages из broker (где их доступно всем workers).

Trade-off: больше round-trips к broker (network overhead). Для tasks < 1 second это заметно. Для tasks > 5 seconds — pure win.

Правила:

Workloadprefetch_multiplier
Sub-second tasks (Redis ops, light JSON)4 (default)
Mixed light tasks (1-30s)2
Long tasks (>30s), mixed long/short1 ← production Airflow норма
Очень long tasks (>10 min) с many workers1 + queue routing

celery_acks_late: at-least-once delivery

Default Celery ACK behaviour:

  1. Worker получает message
  2. ACK сразу (default acks_late = False)
  3. Выполняет task
  4. Если crash — message потерян

Для Airflow это плохо: crash = lost task. Решение — acks_late:

[celery]
celery_acks_late = True   # ACK только после успешного завершения

С acks_late = True:

  1. Worker получает message
  2. NOT ACK’d ещё
  3. Выполняет task
  4. Если success → ACK → broker удаляет message
  5. Если crash → no ACK → broker возвращает message в queue → другой worker подберёт

At-least-once delivery: message может быть выполнен > 1 раз (если worker crashed после execute, до ACK). Tasks должны быть idempotent.

WARNING

Без celery_acks_late = True — потеря messages при worker crash. С ним — possible duplicate execution. В Airflow idempotency на уровне task_instance state уже обеспечивает корректность (если TI уже success — повторный run обновит state, не выполнит callable дважды).


Queues для priority routing

Celery поддерживает multiple queues. Worker подписан на N queues. Tasks routed по queue=...:

@task(queue="high_priority")
def critical_task():
    pass

@task(queue="gpu")
def ml_inference():
    pass

# default queue — если не указано
@task
def regular_task():
    pass

Запуск workers с разными подписками:

# Worker A — only high_priority and default
airflow celery worker --queues high_priority,default --concurrency 8

# Worker B — only gpu (на GPU-ноде)
airflow celery worker --queues gpu --concurrency 2

Это даёт:

  • Priority isolation — heavy GPU tasks не блокируют high-priority short tasks
  • Hardware affinity — GPU workers только на GPU-нодах, обычные workers — на CPU-нодах
  • Team isolationqueue=team_a, queue=team_b для разных команд

KEDA autoscaling для Celery workers

KEDA (Kubernetes Event-driven Autoscaling) — стандарт для масштабирования Celery workers по queue depth:

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: airflow-celery-worker
spec:
  scaleTargetRef:
    name: airflow-worker
  minReplicaCount: 1
  maxReplicaCount: 20
  triggers:
    - type: postgresql
      metadata:
        connection: <postgres-conn>
        query: |
          SELECT ceil(COUNT(*) / 16.0) FROM task_instance
          WHERE state IN ('queued', 'running')
            AND queue = 'default'
            AND executor_config IS NULL
        targetQueryValue: "1"

KEDA опрашивает Postgres каждые 30s (configurable), считает сколько workers нужно по queued + running tasks / concurrency, и масштабирует Deployment. Idle → minReplicaCount (можно даже 0 если cold-start ок).

Альтернативы:

  • airflow celery worker integration с Prometheus / metrics
  • Direct Redis LLEN опрос
  • Cluster autoscaler через CPU metric (хуже — reactive, не proactive)

Production observability: SQL queries

Сколько TI зависло в queued > 1 min (worker busy / down?)

SELECT
    queue,
    COUNT(*) AS stuck_count,
    MIN(queued_dttm) AS oldest_queued,
    now() - MIN(queued_dttm) AS oldest_age
FROM task_instance
WHERE state = 'queued'
  AND queued_dttm < now() - interval '1 minute'
GROUP BY queue
ORDER BY stuck_count DESC;

Celery worker heartbeat (если result_backend = db)

-- celery_taskmeta — last update per task
SELECT
    task_id,
    status,
    date_done,
    now() - date_done AS age
FROM celery_taskmeta
WHERE status = 'SUCCESS'
ORDER BY date_done DESC
LIMIT 20;

TI throughput by queue (last hour)

SELECT
    queue,
    state,
    COUNT(*) AS ti_count
FROM task_instance
WHERE start_date > now() - interval '1 hour'
GROUP BY queue, state
ORDER BY queue, state;

try_adopt_task_instances — HA для worker crash

Когда Celery worker crash, scheduler через scheduler tick детектит «orphan TI» (running state без активного worker).

В airflow.executors.celery_executor.CeleryExecutor:

def try_adopt_task_instances(self, tis):
    # Pings broker, checks если task ещё в-flight
    # Если да — adopt (продолжает следить)
    # Если нет — reset to scheduled (другой worker подберёт)
    pass

Это работает потому что Celery task_id persistent — scheduler знает Celery task_id для каждой Airflow TI и может query broker.

Limit: для broker=Redis adoption работает имбер. Для RabbitMQ — отлично.


Configuration cheat-sheet (production-grade)

[core]
executor = CeleryExecutor

[celery]
broker_url = redis://airflow-redis-master:6379/0
result_backend = db+postgresql://airflow:airflow@airflow-postgres:5432/airflow

worker_concurrency = 16                  # tasks per worker
worker_prefetch_multiplier = 1           # PROTECTS от long-task starvation
worker_max_tasks_per_child = 100         # restart worker process после N tasks (memory leak mitigation)
celery_acks_late = True                  # at-least-once delivery
broker_pool_limit = 10                   # connection pool к broker
operation_timeout = 1.0                  # таймаут на broker operations
task_track_started = True                # update state=STARTED при начале

# Pool / connection tuning
broker_transport_options = {'visibility_timeout': 21600}   # 6h для long tasks (RabbitMQ specific)

[celery_kubernetes_executor]
kubernetes_queue = kubernetes            # tasks с queue='kubernetes' → K8s (hybrid)

Production gotchas

Gotcha 1: Redis durability

Default Redis — in-memory, snapshot RDB каждые N seconds. На crash потеряете messages между snapshots. Для production:

  • Enable AOF: appendonly yes, appendfsync everysec — durability с минимальным perf hit
  • Либо RabbitMQ с durable queues — better для critical workflows

Gotcha 2: Broker и result_backend на одном Redis — bottleneck

Видел: broker_url = redis://X:6379/0, result_backend = redis://X:6379/0. На high throughput Redis CPU 100% от двойной нагрузки.

Fix:

  • result_backend = db+postgresql://... (Postgres — лучше для durability)
  • Если хотите Redis для results → отдельный Redis или другая DB number

Gotcha 3: worker_max_tasks_per_child — против memory leak

Long-living Celery worker накапливает memory leak (Python imports, large pandas DataFrame refs, etc). worker_max_tasks_per_child = 100 — после 100 tasks child процесс рестартится. Свежий child = чистая память.

Trade-off: на high throughput restart overhead заметен. Подбирать по profilirovaniyu memory growth.

Gotcha 4: visibility_timeout < task duration

Для broker=Redis, Celery использует visibility_timeout (default 1 hour). Если task длится > 1 час, Celery считает его lost и дублирует message. Получите double execution.

Fix:

broker_transport_options = {'visibility_timeout': 21600}   # 6 hours

Gotcha 5: connection_pool exhaustion

broker_pool_limit = 10 — default. На высокой нагрузке (десятки workers + scheduler) → connection pool exhausted, errors OperationalError: too many clients.

Fix: увеличить до 30-50 для большого deployment, мониторить pg_stat_activity для Postgres.


Проверка знанийKnowledge check
Production Airflow с CeleryExecutor. Workload: 90% short tasks (1-5 sec) и 10% long tasks (30 мин - 2 часа). На single worker pool stuck queued TI копятся, second worker pool idle. Что не так и как починить?
ОтветAnswer
Это classic prefetch starvation. Default `worker_prefetch_multiplier=4` + `worker_concurrency=16` → worker prefetches 64 messages из broker (помечает unacked). Эти 64 невидимы для других workers. Long tasks занимают slots, prefetched short tasks ждут локально, broker имеет ещё tasks но другой worker idle (не имеет подписки или нет подходящих). Fix: (1) `worker_prefetch_multiplier=1` — worker берёт только концурент-сколько-есть; (2) Separate queues — `queue='long_tasks'` для heavy, `queue='default'` для short, отдельные worker pools; (3) Для long tasks > 1h обязательно `broker_transport_options.visibility_timeout` > duration (Redis broker), иначе duplicate execution; (4) Альтернатива — Multiple Executors (2.10+, AIP-61): long tasks → KubernetesExecutor, short → CeleryExecutor. (5) Не забыть KEDA scaling по queue depth для elasticity.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Worker концурент=16, prefetch_multiplier=4. В очереди 200 short (1 min) + 1 long (1 hour) task. Что произойдёт?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7