CeleryExecutor deep — broker, prefetch pitfall, result backend, KEDA autoscaling
CeleryExecutor — главный workhorse production Airflow. На нём гоняют 80% deployments в опросах Airflow Summit. Он зрелый, простой по операционке (Python + Redis), масштабируется горизонтально и даёт низкую latency (1-3s task startup). Но у него есть один pitfall, на который натыкаются практически все команды: worker_prefetch_multiplier.
В этом уроке мы препарируем CeleryExecutor от scheduler-а до Celery worker prefork pool, разберём message flow через broker, поймём почему prefetch_multiplier=4 это бомба замедленного действия для long-task workloads, и научимся правильно настраивать queues для priority routing.
Архитектура CeleryExecutor
Важная деталь: result backend и metadata DB — это разные таблицы в одном Postgres, но семантически разные. Result backend это Celery’s view (celery_taskmeta table). Metadata DB — Airflow’s view (task_instance, etc).
Message flow: что реально летит через broker
Когда scheduler enqueue-ит task, он публикует Celery message через AMQP/Redis protocol. Содержимое:
{
"id": "8a31b8e4-2c1c-4dfa-8e0a-9f7f6d3a2c1b",
"task": "airflow.executors.celery_executor_utils.execute_command",
"args": [
["airflow", "tasks", "run", "etl_dag", "extract", "manual__2026-05-12T10:00:00+00:00", "--local"]
],
"kwargs": {},
"retries": 0,
"eta": null,
"expires": null,
"queue": "default",
"delivery_mode": 2,
"priority": 0
}
Worker подбирает message из своей subscribed queue (default, либо custom queue) и вызывает execute_command(command_list). Это запускает subprocess airflow tasks run …, как и LocalExecutor — только subprocess живёт на удалённом worker-host.
Important — Celery worker НЕ выполняет user code напрямую. Он запускает subprocess airflow tasks run, который заново парсит DAG, импортирует callable, и вызывает его. Это даёт isolation и graceful failure handling, но добавляет ~1-2s overhead на task startup (Python interpreter warmup + DAG parse).
Worker prefork pool: концерн worker_concurrency
Celery worker — это prefork pool: master process + N forked children:
airflow celery worker (master, PID 1)
├─ child-1 (PID 2) ← один task per child одновременно
├─ child-2 (PID 3)
├─ ...
└─ child-N (PID N+1)
worker_concurrency (default 16) = N (количество children). Каждый child может выполнять один task за раз. Если все 16 заняты, новые messages ждут в prefetch queue (см. ниже).
Tuning rules:
- Bare worker (no heavy task) →
concurrency = 2 × CPU(CPU-bound) - Workers с heavy memory tasks (pandas, ML) →
concurrency = CPUили меньше - I/O-bound tasks (HTTP calls, DB queries) →
concurrency = 4 × CPU(хорошо переваривают)
★ worker_prefetch_multiplier — главный pitfall
Это самая популярная боль production Airflow. Препарируем.
Что делает prefetch
Когда worker подключается к broker, он не подбирает messages по одному — он берёт batch авансом:
prefetched = worker_concurrency × worker_prefetch_multiplier
= 16 × 4 (default)
= 64 messages, зарезервированных за этим worker-ом
Эти 64 messages не видны другим worker-ам. Они помечены unacked в broker и сидят в local prefetch queue worker-а.
Когда это pitfall
Сценарий: один worker, concurrency=16, prefetch_multiplier=4 → 64 prefetched. В очереди — 200 коротких tasks (1 минута каждая) и 1 long task (1 час).
Worker prefetched 64 messages: 60 коротких + 4 длинных. 16 child процессов берутся за работу. Один из них хитит long task → занят 1 час. Остальные 15 child-ов разбираются с короткими.
Через 30 минут: 60 коротких tasks завершены. 44 коротких task ВСЁ ЕЩЁ сидят в local prefetch worker-а, ждут пока освободятся 16 slots. Но 1 slot занят long task → только 15 slots для 4 prefetched длинных + 44 коротких = bottleneck.
Между тем — broker ещё имеет 136 коротких tasks неконсюмеров (если бы был второй worker — он бы их взял). Но первый worker «зажал» их через prefetch.
Fix: worker_prefetch_multiplier = 1
[celery]
worker_prefetch_multiplier = 1 # default 4 — это для super-short tasks
Это значит: worker prefetches только concurrency × 1 = 16 messages. Long task в slot не блокирует prefetched batch — остальные 16 slots сразу подбирают новые messages из broker (где их доступно всем workers).
Trade-off: больше round-trips к broker (network overhead). Для tasks < 1 second это заметно. Для tasks > 5 seconds — pure win.
Правила:
| Workload | prefetch_multiplier |
|---|---|
| Sub-second tasks (Redis ops, light JSON) | 4 (default) |
| Mixed light tasks (1-30s) | 2 |
| Long tasks (>30s), mixed long/short | 1 ← production Airflow норма |
| Очень long tasks (>10 min) с many workers | 1 + queue routing |
celery_acks_late: at-least-once delivery
Default Celery ACK behaviour:
- Worker получает message
- ACK сразу (default
acks_late = False) - Выполняет task
- Если crash — message потерян
Для Airflow это плохо: crash = lost task. Решение — acks_late:
[celery]
celery_acks_late = True # ACK только после успешного завершения
С acks_late = True:
- Worker получает message
- NOT ACK’d ещё
- Выполняет task
- Если success → ACK → broker удаляет message
- Если crash → no ACK → broker возвращает message в queue → другой worker подберёт
At-least-once delivery: message может быть выполнен > 1 раз (если worker crashed после execute, до ACK). Tasks должны быть idempotent.
Без celery_acks_late = True — потеря messages при worker crash. С ним — possible duplicate execution. В Airflow idempotency на уровне task_instance state уже обеспечивает корректность (если TI уже success — повторный run обновит state, не выполнит callable дважды).
Queues для priority routing
Celery поддерживает multiple queues. Worker подписан на N queues. Tasks routed по queue=...:
@task(queue="high_priority")
def critical_task():
pass
@task(queue="gpu")
def ml_inference():
pass
# default queue — если не указано
@task
def regular_task():
pass
Запуск workers с разными подписками:
# Worker A — only high_priority and default
airflow celery worker --queues high_priority,default --concurrency 8
# Worker B — only gpu (на GPU-ноде)
airflow celery worker --queues gpu --concurrency 2
Это даёт:
- Priority isolation — heavy GPU tasks не блокируют high-priority short tasks
- Hardware affinity — GPU workers только на GPU-нодах, обычные workers — на CPU-нодах
- Team isolation —
queue=team_a,queue=team_bдля разных команд
KEDA autoscaling для Celery workers
KEDA (Kubernetes Event-driven Autoscaling) — стандарт для масштабирования Celery workers по queue depth:
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: airflow-celery-worker
spec:
scaleTargetRef:
name: airflow-worker
minReplicaCount: 1
maxReplicaCount: 20
triggers:
- type: postgresql
metadata:
connection: <postgres-conn>
query: |
SELECT ceil(COUNT(*) / 16.0) FROM task_instance
WHERE state IN ('queued', 'running')
AND queue = 'default'
AND executor_config IS NULL
targetQueryValue: "1"
KEDA опрашивает Postgres каждые 30s (configurable), считает сколько workers нужно по queued + running tasks / concurrency, и масштабирует Deployment. Idle → minReplicaCount (можно даже 0 если cold-start ок).
Альтернативы:
airflow celery workerintegration с Prometheus / metrics- Direct Redis LLEN опрос
- Cluster autoscaler через CPU metric (хуже — reactive, не proactive)
Production observability: SQL queries
Сколько TI зависло в queued > 1 min (worker busy / down?)
SELECT
queue,
COUNT(*) AS stuck_count,
MIN(queued_dttm) AS oldest_queued,
now() - MIN(queued_dttm) AS oldest_age
FROM task_instance
WHERE state = 'queued'
AND queued_dttm < now() - interval '1 minute'
GROUP BY queue
ORDER BY stuck_count DESC;
Celery worker heartbeat (если result_backend = db)
-- celery_taskmeta — last update per task
SELECT
task_id,
status,
date_done,
now() - date_done AS age
FROM celery_taskmeta
WHERE status = 'SUCCESS'
ORDER BY date_done DESC
LIMIT 20;
TI throughput by queue (last hour)
SELECT
queue,
state,
COUNT(*) AS ti_count
FROM task_instance
WHERE start_date > now() - interval '1 hour'
GROUP BY queue, state
ORDER BY queue, state;
try_adopt_task_instances — HA для worker crash
Когда Celery worker crash, scheduler через scheduler tick детектит «orphan TI» (running state без активного worker).
В airflow.executors.celery_executor.CeleryExecutor:
def try_adopt_task_instances(self, tis):
# Pings broker, checks если task ещё в-flight
# Если да — adopt (продолжает следить)
# Если нет — reset to scheduled (другой worker подберёт)
pass
Это работает потому что Celery task_id persistent — scheduler знает Celery task_id для каждой Airflow TI и может query broker.
Limit: для broker=Redis adoption работает имбер. Для RabbitMQ — отлично.
Configuration cheat-sheet (production-grade)
[core]
executor = CeleryExecutor
[celery]
broker_url = redis://airflow-redis-master:6379/0
result_backend = db+postgresql://airflow:airflow@airflow-postgres:5432/airflow
worker_concurrency = 16 # tasks per worker
worker_prefetch_multiplier = 1 # PROTECTS от long-task starvation
worker_max_tasks_per_child = 100 # restart worker process после N tasks (memory leak mitigation)
celery_acks_late = True # at-least-once delivery
broker_pool_limit = 10 # connection pool к broker
operation_timeout = 1.0 # таймаут на broker operations
task_track_started = True # update state=STARTED при начале
# Pool / connection tuning
broker_transport_options = {'visibility_timeout': 21600} # 6h для long tasks (RabbitMQ specific)
[celery_kubernetes_executor]
kubernetes_queue = kubernetes # tasks с queue='kubernetes' → K8s (hybrid)
Production gotchas
Gotcha 1: Redis durability
Default Redis — in-memory, snapshot RDB каждые N seconds. На crash потеряете messages между snapshots. Для production:
- Enable AOF:
appendonly yes,appendfsync everysec— durability с минимальным perf hit - Либо RabbitMQ с durable queues — better для critical workflows
Gotcha 2: Broker и result_backend на одном Redis — bottleneck
Видел: broker_url = redis://X:6379/0, result_backend = redis://X:6379/0. На high throughput Redis CPU 100% от двойной нагрузки.
Fix:
result_backend = db+postgresql://...(Postgres — лучше для durability)- Если хотите Redis для results → отдельный Redis или другая DB number
Gotcha 3: worker_max_tasks_per_child — против memory leak
Long-living Celery worker накапливает memory leak (Python imports, large pandas DataFrame refs, etc). worker_max_tasks_per_child = 100 — после 100 tasks child процесс рестартится. Свежий child = чистая память.
Trade-off: на high throughput restart overhead заметен. Подбирать по profilirovaniyu memory growth.
Gotcha 4: visibility_timeout < task duration
Для broker=Redis, Celery использует visibility_timeout (default 1 hour). Если task длится > 1 час, Celery считает его lost и дублирует message. Получите double execution.
Fix:
broker_transport_options = {'visibility_timeout': 21600} # 6 hours
Gotcha 5: connection_pool exhaustion
broker_pool_limit = 10 — default. На высокой нагрузке (десятки workers + scheduler) → connection pool exhausted, errors OperationalError: too many clients.
Fix: увеличить до 30-50 для большого deployment, мониторить pg_stat_activity для Postgres.