Hybrid executors — CeleryKubernetesExecutor и LocalKubernetesExecutor
До Airflow 2.3 deployment был ограничен одним executor. Команды с mixed workload (legкие SQL queries + тяжёлые ML inference) выбирали между:
- CeleryExecutor everywhere — низкая latency, но ML tasks делят environment, нет per-task GPU
- KubernetesExecutor everywhere — изоляция и GPU, но 15-30s overhead на короткие tasks
В 2.3 появились CeleryKubernetesExecutor и LocalKubernetesExecutor — гибридные модели, где queue parameter определяет, какой executor выполнит task. Это решение «один scheduler, два executor-а» осталось до Airflow 2.9. В 2.10 пришёл Multiple Executors (AIP-61) — более общий механизм с up to N executors одновременно (см. следующий урок).
Этот урок — гибридные executors 2.x: как они работают, когда их применять, и почему AIP-61 их частично заменяет.
Resource requests и limits — основа executor_config в K8s
Концепт: один scheduler, два executor-а
Главные точки:
- Один scheduler — никакой отдельной инфраструктуры для каждого executor-а
- Routing через
queueparameter —@task(queue="kubernetes")или@task(queue="default") - Configurable trigger queue — какая queue идёт на K8s, конфигурируется
CeleryKubernetesExecutor
Конфигурация
[core]
executor = CeleryKubernetesExecutor
[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow
worker_concurrency = 16
worker_prefetch_multiplier = 1
celery_acks_late = True
[kubernetes_executor]
namespace = airflow
pod_template_file = /opt/airflow/pod_template.yaml
worker_container_repository = my-registry.com/airflow
worker_container_tag = 2.10.5
delete_worker_pods = True
[celery_kubernetes_executor]
kubernetes_queue = kubernetes # tasks с queue='kubernetes' → K8s
Использование в DAG
from airflow.decorators import dag, task
from datetime import datetime
from kubernetes.client import V1Pod, V1PodSpec, V1Container, V1ResourceRequirements
@dag(schedule="@hourly", start_date=datetime(2026, 1, 1), catchup=False)
def hybrid_ml_pipeline():
@task(queue="default")
def fetch_metadata():
"""Light SQL — Celery (быстрый старт, low overhead)."""
return {"model_id": "v3", "data_path": "s3://..."}
@task(queue="default")
def validate_data(meta: dict):
"""Light pandas validation — Celery."""
return meta
@task(
queue="kubernetes", # → KubernetesExecutor
executor_config={
"pod_override": V1Pod(
spec=V1PodSpec(
node_selector={"gpu-type": "nvidia-a100"},
containers=[V1Container(
name="base"
image="my-registry.com/airflow-ml:2.10.5"
resources=V1ResourceRequirements(
requests={"cpu": "4", "memory": "16Gi", "nvidia.com/gpu": "1"},
limits={"cpu": "8", "memory": "32Gi", "nvidia.com/gpu": "1"},
),
)],
),
),
},
)
def run_inference(meta: dict):
"""Heavy ML — KubernetesExecutor (isolation + GPU)."""
import torch
# heavy ML
return "s3://output/results"
@task(queue="default")
def update_catalog(output_path: str):
"""Light catalog update — Celery."""
pass
update_catalog(run_inference(validate_data(fetch_metadata())))
hybrid_ml_pipeline()
В этом DAG:
fetch_metadata,validate_data,update_catalog— Celery (быстрые SQL/REST calls)run_inference— KubernetesExecutor (GPU pod с custom image)
Profit: 3 коротких tasks через Celery (1-3s startup) + 1 heavy с full GPU isolation, без overhead на коротких.
LocalKubernetesExecutor
То же, но базовый executor — LocalExecutor вместо Celery. Применимо если:
- Single-node deployment (нет Celery infrastructure)
- Маленький объём short tasks
- Но иногда нужны heavy GPU tasks
Конфигурация
[core]
executor = LocalKubernetesExecutor
parallelism = 32 # для LocalExecutor part
[kubernetes_executor]
namespace = airflow
pod_template_file = /opt/airflow/pod_template.yaml
worker_container_repository = my-registry.com/airflow
worker_container_tag = 2.10.5
[local_kubernetes_executor]
kubernetes_queue = kubernetes # tasks с queue='kubernetes' → K8s
Когда выбрать LocalKubernetesExecutor vs CeleryKubernetesExecutor
| Параметр | LocalKubernetesExecutor | CeleryKubernetesExecutor |
|---|---|---|
| Setup complexity | Минимум (no Redis) | Higher (Redis/RabbitMQ + workers) |
| Throughput for short tasks | До 100 tasks/min | До 10000 tasks/min (multi-worker) |
| Horizontal scaling | Нет (single-node) | Да (add workers) |
| HA | Только scheduler | Scheduler + workers |
| Recommended scale | Small (<100 short tasks/min) | Medium-Large |
Внутри: как routing работает в коде
В airflow.providers.celery.executors.celery_kubernetes_executor:
# Псевдокод
class CeleryKubernetesExecutor(BaseExecutor):
KUBERNETES_QUEUE = "kubernetes" # configurable
def __init__(self):
self.celery_executor = CeleryExecutor()
self.kubernetes_executor = KubernetesExecutor()
def queue_command(self, task_instance, command, priority, queue):
if queue == self.KUBERNETES_QUEUE:
self.kubernetes_executor.queue_command(
task_instance, command, priority, queue=queue
)
else:
self.celery_executor.queue_command(
task_instance, command, priority, queue=queue
)
def heartbeat(self):
self.celery_executor.heartbeat()
self.kubernetes_executor.heartbeat()
def end(self):
self.celery_executor.end()
self.kubernetes_executor.end()
Wrapper делает один thing: на каждый heartbeat вызывает heartbeat обоих child executors, на enqueue — routes по queue.
Use case patterns
Pattern 1: 90% short + 10% heavy
# All light tasks default queue
@task # queue="default" → Celery
def light_etl(): ...
# Heavy tasks explicit
@task(queue="kubernetes") # → KubernetesExecutor
def heavy_processing(): ...
Типичная картина — Shopify, например: 90% tasks на Celery, 10% (Spark submissions, ML) на K8s.
Pattern 2: Per-team isolation
# Team A — analytics, runs on shared Celery
@task(queue="team_a") # Celery, team A worker pool
def team_a_etl(): ...
# Team B — ML, needs custom image, K8s pod
@task(queue="kubernetes"
executor_config={"pod_override": V1Pod(...)})
def team_b_ml(): ...
Pattern 3: Resource sizing
@task(queue="default") # default 1 CPU / 1 GB
def quick(): ...
@task(queue="kubernetes", # explicit large
executor_config={"pod_override": V1Pod(
spec=V1PodSpec(containers=[V1Container(
name="base"
resources=V1ResourceRequirements(
requests={"cpu": "8", "memory": "32Gi"}
)
)])
)})
def heavy(): ...
Anti-pattern: всё через K8s
Если все tasks через queue="kubernetes" — зачем hybrid? Просто executor = KubernetesExecutor. Hybrid имеет смысл только когда большинство tasks выполняется через первый executor.
Hybrid vs Multiple Executors (AIP-61)
С Airflow 2.10 пришёл AIP-61 Multiple Executors — более общий механизм. Сравнение:
| Aspect | Hybrid (CeleryKubernetes / LocalKubernetes) | Multiple Executors (AIP-61, 2.10+) |
|---|---|---|
| Сколько executors | Фиксированно 2 (celery + k8s или local + k8s) | До N (любая комбинация) |
| Routing API | queue=... + специальная queue | @task(executor="K8sExecutor") |
| Конфигурация | executor = CeleryKubernetesExecutor | executor = Local,Celery,Kubernetes (CSV) |
| Введение | 2.3 | 2.10 (AIP-61) |
| Default executor | Первый (Celery) | Первый в list |
| Migrate path | Hybrid → AIP-61 — относительно простой | — |
Когда hybrid 2.x ещё имеет смысл (на 2.10+):
- Если Airflow версия 2.3-2.9 (не upgrade-нут до 2.10)
- Если ровно два executor-а (нет смысла в N)
- Legacy deployments без compelling reason переезжать
В новых deployments на 2.10+ — используйте Multiple Executors (более явный routing, more flexible).
Production observability
TI counts by executor (via job_id of queued_by_job)
-- Сколько TI ушло через каждый executor (за последний час)
SELECT
queue,
state,
COUNT(*) AS ti_count
FROM task_instance
WHERE start_date > now() - interval '1 hour'
GROUP BY queue, state
ORDER BY queue, state;
Если у вас kubernetes_queue = kubernetes:
queue='kubernetes'→ K8s pods- остальные queues → Celery workers
K8s pod count vs Celery message rate
# K8s pods active
kubectl get pods -n airflow -l app=airflow-worker --field-selector=status.phase=Running | wc -l
# Celery message rate (Redis LLEN)
redis-cli -h airflow-redis llen celery
Health — pods ~ heavy queue size, Celery LLEN low (workers consume fast).
Configuration cheat-sheet
# CeleryKubernetesExecutor
[core]
executor = CeleryKubernetesExecutor
[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow
worker_concurrency = 16
worker_prefetch_multiplier = 1
celery_acks_late = True
[kubernetes_executor]
namespace = airflow
pod_template_file = /opt/airflow/pod_template.yaml
worker_container_repository = my-registry.com/airflow
worker_container_tag = 2.10.5
delete_worker_pods = True
delete_worker_pods_on_failure = False
[celery_kubernetes_executor]
kubernetes_queue = kubernetes # ← название queue, которое идёт на K8s
# LocalKubernetesExecutor (single-node + occasional K8s)
[core]
executor = LocalKubernetesExecutor
parallelism = 32
[kubernetes_executor]
namespace = airflow
# ... same as above
[local_kubernetes_executor]
kubernetes_queue = kubernetes
Migration path
From CeleryExecutor → CeleryKubernetesExecutor
- Add
[kubernetes_executor]config block, deploy pod_template_file - Update
executor = CeleryKubernetesExecutor - Запустить scheduler — оба executor working
- Один за другим перенести heavy tasks через
@task(queue="kubernetes") - Test, monitor cold start
- Roll out
Старые tasks (без queue override) продолжают идти через Celery — backwards compatible.
From CeleryKubernetesExecutor → Multiple Executors (AIP-61)
- Upgrade Airflow 2.9 → 2.10
- Изменить
executor = CeleryExecutor,KubernetesExecutor - Optionally заменить
queue="kubernetes"→executor="KubernetesExecutor" - Старые DAG-и работают (queue routing fallback)
- Со временем мигрировать на explicit
executor=...
Production gotchas
Gotcha 1: Default queue для K8s tasks
Если задать kubernetes_queue = kubernetes, но забыть pollut в task queue="kubernetes" — task пойдёт через Celery (с pod_override игнорируется). Frustrating debug.
Fix: always check that task.queue matches kubernetes_queue config.
Gotcha 2: Two heartbeats — 2x DB calls
Wrapper делает heartbeat для обоих child executors. Это 2x DB load для executor state checks. На high throughput — заметно. Mitigate через connection pooling.
Gotcha 3: K8s часть может быть disabled (по ошибке)
Если [kubernetes_executor] blocks неправильно сконфигурирован (missing namespace, invalid pod_template_file) — KubernetesExecutor instance может не стартануть, но Celery part working. Task с queue="kubernetes" будет stuck в queued.
Fix: monitor scheduler logs на startup для K8s executor errors.
Gotcha 4: Hybrid не уменьшает cold start
Hybrid не магически делает K8s pods быстрее. Cold start = 5-30s остаётся для K8s tasks. Hybrid просто избегает этого для коротких tasks (Celery).
Gotcha 5: Migrate path к 2.10 AIP-61
Hybrid executor (CeleryKubernetesExecutor) и Multiple Executors (CeleryExecutor,KubernetesExecutor) — разные настройки. Нельзя use одновременно. Migrate: hybrid → выбрать AIP-61, конвертировать queue="kubernetes" → executor="KubernetesExecutor" (опционально, queue routing работает в AIP-61 как fallback).