Celery vs Kubernetes Executor — обзорное сравнение
Два главных executor-а для production Airflow 2.x: CeleryExecutor (зрелый, быстрый, shared workers) и KubernetesExecutor (pod-per-task isolation, dynamic scaling). Этот урок — высокоуровневое сравнение перед deep dive в Module 05. Цель — дать вам framework для выбора, какой подходит в вашем случае.
Также в 2.x доступны CeleryKubernetesExecutor и LocalKubernetesExecutor (hybrid) и Multiple Executors (AIP-61, 2.10+) — несколько одновременно.
Архитектурный обзор
CeleryExecutor — главные характеристики
Архитектура
Scheduler → Redis/RabbitMQ broker → Celery workers (N) → Postgres result backend
- Workers: Long-living processes (Celery prefork pool)
- Broker: Redis (most common) или RabbitMQ
- Result backend: Postgres (default) — для status updates
- Concurrency per worker:
worker_concurrency(default 16) — N processes per worker - Prefetch:
worker_prefetch_multiplier(default 4) — сколько messages worker берёт авансом
Pros
✅ Низкая latency: task startup 1-3s (worker уже греется) ✅ High throughput: тысячи tasks/min на одном worker ✅ Простота: workers — обычные Python processes, легко дебажить ✅ Autoscaling: через KEDA (Kubernetes Event-driven Autoscaling) можно масштабировать workers по queue depth
Deployment в Kubernetes: основной workload для stateless appsCons
❌ Shared environment: все tasks делят Python deps, librarys на worker. Конфликты пакетов → use venv в task, или KubernetesPodOperator.
❌ Долгие workers = долгая утечка памяти. Periodic restart через worker_max_tasks_per_child (Celery setting).
❌ Worker prefetch pitfall: long task на worker с prefetch=4 блокирует 4 task slots для всех (Module 05 lesson 03).
❌ Resource pooling: 1 worker = фиксированный CPU/memory budget. GPU? Большие задачи? — через K8s.
Configuration cheat-sheet
[core]
executor = CeleryExecutor
[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow
worker_concurrency = 16 # tasks per worker
worker_prefetch_multiplier = 1 # против long task starvation
worker_max_tasks_per_child = 100 # restart worker после N tasks (mem leak)
celery_acks_late = True # ACK после выполнения (no message loss)
broker_pool_limit = 10 # connection pool size
KubernetesExecutor — главные характеристики
Архитектура
Scheduler → K8s API → создаёт pod per task → pod выполняет task → удаляется
↓
Watcher thread в scheduler
подписан на K8s events
- Pod-per-task: каждая TaskInstance = свой pod
- Pod template: default через
pod_template_file, override per-task черезexecutor_config - Watcher: thread в scheduler-процессе слушает K8s API events (pod started/failed/completed)
Pros
✅ Полная изоляция: каждая task в своём pod с своими deps, image, resources, network policies
✅ Per-task resources: CPU/memory limits отдельно для каждой task. GPU? — executor_config={"pod_override": V1Pod(...)}
✅ Custom images: разные tasks могут использовать разные images (ML inference vs data extraction)
✅ Dynamic scaling: cluster autoscaler добавляет nodes по требованию, удаляет когда idle
✅ Cost efficiency: нет idle workers (для batch workloads)
Cons
❌ Cold start cost: 5-30s на task (pull image + K8s scheduling + init container + Python startup) ❌ Сложность: K8s knowledge нужно. RBAC, network policies, image registry. ❌ K8s API rate limits: при тысячах коротких tasks/min может hit limit ❌ Logging: per-pod logs — нужен log aggregator (S3, ELK, GCS) ❌ Не для коротких tasks: 10s task + 15s startup = 60% overhead
Configuration cheat-sheet
[core]
executor = KubernetesExecutor
[kubernetes_executor]
namespace = airflow
pod_template_file = /opt/airflow/pod_template.yaml
worker_container_repository = my-registry.com/airflow
worker_container_tag = 2.10.5
delete_worker_pods = True
delete_worker_pods_on_failure = False # keep pods для debugging при fail
worker_pods_creation_batch_size = 16 # max pods создавать одновременно
worker_pods_pending_timeout = 300 # timeout если pod в pending state
Pod override per-task
from kubernetes.client import V1Pod, V1PodSpec, V1Container, V1ResourceRequirements
@task(
executor_config={
"pod_override": V1Pod(
spec=V1PodSpec(
containers=[V1Container(
name="base"
resources=V1ResourceRequirements(
requests={"cpu": "2", "memory": "8Gi", "nvidia.com/gpu": "1"},
limits={"cpu": "4", "memory": "16Gi", "nvidia.com/gpu": "1"},
),
)]
)
)
}
)
def gpu_inference():
pass
Сравнительная таблица
| Aspect | CeleryExecutor | KubernetesExecutor |
|---|---|---|
| Task startup latency | 1-3s (worker готов) | 5-30s (pull image, K8s schedule, Python startup) |
| Throughput (tasks/min/worker) | 1000+ (short tasks) | ~100-300 (limited by K8s API) |
| Memory per task | Shared worker memory | Pod memory limit (per-task isolation) |
| CPU per task | Shared worker CPU | Pod CPU limit |
| Python deps | Shared environment | Per-task image |
| Custom images | ❌ (только через KubernetesPodOperator) | ✅ Native |
| GPU support | ❌ | ✅ Через resource_requests |
| Scaling | Manual workers count (или KEDA) | Auto через cluster autoscaler |
| Cost (idle) | Workers running 24/7 (paid) | Zero idle cost |
| Cost (high load) | Linear (workers × time) | Per-task overhead expensive |
| Operational complexity | Простая (Python + Redis) | Высокая (K8s expertise) |
| Best for | High throughput, short tasks | Heavy isolated tasks, ML, GPU |
Когда какой
Используйте CeleryExecutor когда:
- Тысячи коротких tasks (<1 minute) в час
- Все tasks делят одинаковую Python environment
- Хочется simple operational model (Python + Redis)
- Cost matters для idle time (можно держать минимум workers)
- Low latency critical
Используйте KubernetesExecutor когда:
- Tasks требуют разные Python deps / images
- Tasks нужны GPU или специфичные resources
- ML inference, data transformation на больших данных
- Уже есть Kubernetes infrastructure
- Per-task isolation важна (security, compliance)
Используйте CeleryKubernetesExecutor (hybrid) когда:
- Большинство tasks — light, нужна Celery скорость
- Несколько tasks — heavy/GPU, нужен K8s isolation
- Хотите best of both worlds
@task(queue="default") # default → Celery
def light_task(): ...
@task(queue="kubernetes") # → KubernetesExecutor
def heavy_gpu_task(): ...
Используйте Multiple Executors AIP-61 (2.10+) когда:
- Нужна максимальная гибкость
- Разные команды имеют разные runtime requirements
- Готовы к operational complexity
[core]
executor = LocalExecutor,CeleryExecutor,KubernetesExecutor
@task(executor="KubernetesExecutor")
def specific_task(): ...
Real-world примеры
Shopify (~10k DAGs, 150k TI/day)
- CeleryKubernetesExecutor: 90% tasks на Celery (light dbt, SQL queries), 10% на K8s (Spark submissions, ML)
- Multiple Celery worker pools per team queues
Lyft
- KubernetesExecutor only: каждая task в своём pod, full isolation
- Custom images per team, GPU для ML workloads
Astronomer customers (опрос Airflow Summit 2024)
- 40% — CeleryExecutor only
- 25% — KubernetesExecutor only
- 30% — CeleryKubernetesExecutor (hybrid)
- 5% — другие
Что дальше
В Module 05 мы детально разберём:
- LocalExecutor internals (multiprocessing.Queue)
- CeleryExecutor deep — broker, result backend, prefetch pitfall
- KubernetesExecutor — watcher thread, pod template, cold start mitigation
- CeleryKubernetesExecutor / LocalKubernetesExecutor — routing
- Multiple Executors (AIP-61) — production patterns