Learning Platform
Глоссарий Troubleshooting
Урок 06.05 · 25 мин
Продвинутый
CeleryKubernetesExecutorLocalKubernetesExecutorHybridQueue routing

Hybrid executors — CeleryKubernetesExecutor и LocalKubernetesExecutor

До Airflow 2.3 deployment был ограничен одним executor. Команды с mixed workload (legкие SQL queries + тяжёлые ML inference) выбирали между:

  • CeleryExecutor everywhere — низкая latency, но ML tasks делят environment, нет per-task GPU
  • KubernetesExecutor everywhere — изоляция и GPU, но 15-30s overhead на короткие tasks

В 2.3 появились CeleryKubernetesExecutor и LocalKubernetesExecutor — гибридные модели, где queue parameter определяет, какой executor выполнит task. Это решение «один scheduler, два executor-а» осталось до Airflow 2.9. В 2.10 пришёл Multiple Executors (AIP-61) — более общий механизм с up to N executors одновременно (см. следующий урок).

Этот урок — гибридные executors 2.x: как они работают, когда их применять, и почему AIP-61 их частично заменяет.


Resource requests и limits — основа executor_config в K8s

Концепт: один scheduler, два executor-а

CeleryKubernetesExecutor routing
SchedulerОдин scheduler-процесс. Внутри: SchedulerJobRunner + CeleryKubernetesExecutor (wrapper) + два child executor instances — CeleryExecutor и KubernetesExecutor. Wrapper route-ит по queue.
enqueue based on queue
queue == 'kubernetes' ?CeleryKubernetesExecutor.queue_command() читает TI.queue. Если совпадает с [celery_kubernetes_executor] kubernetes_queue (default 'kubernetes') → KubernetesExecutor. Иначе → CeleryExecutor.
No → CeleryExecutorqueue в ('default', 'high_priority', 'team_a', ...) → CeleryExecutor.queue_command(). Message публикуется в Redis/RabbitMQ broker. Celery worker подбирает по своим subscribed queues.
Yes → KubernetesExecutorqueue == 'kubernetes' (configurable) → KubernetesExecutor.queue_command(). Создаётся pod через K8s API. Watcher thread следит за статусом.

Главные точки:

  • Один scheduler — никакой отдельной инфраструктуры для каждого executor-а
  • Routing через queue parameter@task(queue="kubernetes") или @task(queue="default")
  • Configurable trigger queue — какая queue идёт на K8s, конфигурируется

CeleryKubernetesExecutor

Конфигурация

[core]
executor = CeleryKubernetesExecutor

[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow
worker_concurrency = 16
worker_prefetch_multiplier = 1
celery_acks_late = True

[kubernetes_executor]
namespace = airflow
pod_template_file = /opt/airflow/pod_template.yaml
worker_container_repository = my-registry.com/airflow
worker_container_tag = 2.10.5
delete_worker_pods = True

[celery_kubernetes_executor]
kubernetes_queue = kubernetes   # tasks с queue='kubernetes' → K8s

Использование в DAG

from airflow.decorators import dag, task
from datetime import datetime
from kubernetes.client import V1Pod, V1PodSpec, V1Container, V1ResourceRequirements

@dag(schedule="@hourly", start_date=datetime(2026, 1, 1), catchup=False)
def hybrid_ml_pipeline():

    @task(queue="default")
    def fetch_metadata():
        """Light SQL — Celery (быстрый старт, low overhead)."""
        return {"model_id": "v3", "data_path": "s3://..."}

    @task(queue="default")
    def validate_data(meta: dict):
        """Light pandas validation — Celery."""
        return meta

    @task(
        queue="kubernetes",   # → KubernetesExecutor
        executor_config={
            "pod_override": V1Pod(
                spec=V1PodSpec(
                    node_selector={"gpu-type": "nvidia-a100"},
                    containers=[V1Container(
                        name="base"
                        image="my-registry.com/airflow-ml:2.10.5"
                        resources=V1ResourceRequirements(
                            requests={"cpu": "4", "memory": "16Gi", "nvidia.com/gpu": "1"},
                            limits={"cpu": "8", "memory": "32Gi", "nvidia.com/gpu": "1"},
                        ),
                    )],
                ),
            ),
        },
    )
    def run_inference(meta: dict):
        """Heavy ML — KubernetesExecutor (isolation + GPU)."""
        import torch
        # heavy ML
        return "s3://output/results"

    @task(queue="default")
    def update_catalog(output_path: str):
        """Light catalog update — Celery."""
        pass

    update_catalog(run_inference(validate_data(fetch_metadata())))

hybrid_ml_pipeline()

В этом DAG:

  • fetch_metadata, validate_data, update_catalogCelery (быстрые SQL/REST calls)
  • run_inferenceKubernetesExecutor (GPU pod с custom image)

Profit: 3 коротких tasks через Celery (1-3s startup) + 1 heavy с full GPU isolation, без overhead на коротких.


LocalKubernetesExecutor

То же, но базовый executor — LocalExecutor вместо Celery. Применимо если:

  • Single-node deployment (нет Celery infrastructure)
  • Маленький объём short tasks
  • Но иногда нужны heavy GPU tasks

Конфигурация

[core]
executor = LocalKubernetesExecutor
parallelism = 32   # для LocalExecutor part

[kubernetes_executor]
namespace = airflow
pod_template_file = /opt/airflow/pod_template.yaml
worker_container_repository = my-registry.com/airflow
worker_container_tag = 2.10.5

[local_kubernetes_executor]
kubernetes_queue = kubernetes   # tasks с queue='kubernetes' → K8s

Когда выбрать LocalKubernetesExecutor vs CeleryKubernetesExecutor

ПараметрLocalKubernetesExecutorCeleryKubernetesExecutor
Setup complexityМинимум (no Redis)Higher (Redis/RabbitMQ + workers)
Throughput for short tasksДо 100 tasks/minДо 10000 tasks/min (multi-worker)
Horizontal scalingНет (single-node)Да (add workers)
HAТолько schedulerScheduler + workers
Recommended scaleSmall (<100 short tasks/min)Medium-Large

Внутри: как routing работает в коде

В airflow.providers.celery.executors.celery_kubernetes_executor:

# Псевдокод
class CeleryKubernetesExecutor(BaseExecutor):
    KUBERNETES_QUEUE = "kubernetes"   # configurable

    def __init__(self):
        self.celery_executor = CeleryExecutor()
        self.kubernetes_executor = KubernetesExecutor()

    def queue_command(self, task_instance, command, priority, queue):
        if queue == self.KUBERNETES_QUEUE:
            self.kubernetes_executor.queue_command(
                task_instance, command, priority, queue=queue
            )
        else:
            self.celery_executor.queue_command(
                task_instance, command, priority, queue=queue
            )

    def heartbeat(self):
        self.celery_executor.heartbeat()
        self.kubernetes_executor.heartbeat()

    def end(self):
        self.celery_executor.end()
        self.kubernetes_executor.end()

Wrapper делает один thing: на каждый heartbeat вызывает heartbeat обоих child executors, на enqueue — routes по queue.


Use case patterns

Pattern 1: 90% short + 10% heavy

# All light tasks default queue
@task                       # queue="default" → Celery
def light_etl(): ...

# Heavy tasks explicit
@task(queue="kubernetes")   # → KubernetesExecutor
def heavy_processing(): ...

Типичная картина — Shopify, например: 90% tasks на Celery, 10% (Spark submissions, ML) на K8s.

Pattern 2: Per-team isolation

# Team A — analytics, runs on shared Celery
@task(queue="team_a")        # Celery, team A worker pool
def team_a_etl(): ...

# Team B — ML, needs custom image, K8s pod
@task(queue="kubernetes"
      executor_config={"pod_override": V1Pod(...)})
def team_b_ml(): ...

Pattern 3: Resource sizing

@task(queue="default")        # default 1 CPU / 1 GB
def quick(): ...

@task(queue="kubernetes",     # explicit large
      executor_config={"pod_override": V1Pod(
          spec=V1PodSpec(containers=[V1Container(
              name="base"
              resources=V1ResourceRequirements(
                  requests={"cpu": "8", "memory": "32Gi"}
              )
          )])
      )})
def heavy(): ...

Anti-pattern: всё через K8s

Если все tasks через queue="kubernetes" — зачем hybrid? Просто executor = KubernetesExecutor. Hybrid имеет смысл только когда большинство tasks выполняется через первый executor.


Hybrid vs Multiple Executors (AIP-61)

С Airflow 2.10 пришёл AIP-61 Multiple Executors — более общий механизм. Сравнение:

AspectHybrid (CeleryKubernetes / LocalKubernetes)Multiple Executors (AIP-61, 2.10+)
Сколько executorsФиксированно 2 (celery + k8s или local + k8s)До N (любая комбинация)
Routing APIqueue=... + специальная queue@task(executor="K8sExecutor")
Конфигурацияexecutor = CeleryKubernetesExecutorexecutor = Local,Celery,Kubernetes (CSV)
Введение2.32.10 (AIP-61)
Default executorПервый (Celery)Первый в list
Migrate pathHybrid → AIP-61 — относительно простой

Когда hybrid 2.x ещё имеет смысл (на 2.10+):

  • Если Airflow версия 2.3-2.9 (не upgrade-нут до 2.10)
  • Если ровно два executor-а (нет смысла в N)
  • Legacy deployments без compelling reason переезжать

В новых deployments на 2.10+ — используйте Multiple Executors (более явный routing, more flexible).


Production observability

TI counts by executor (via job_id of queued_by_job)

-- Сколько TI ушло через каждый executor (за последний час)
SELECT
    queue,
    state,
    COUNT(*) AS ti_count
FROM task_instance
WHERE start_date > now() - interval '1 hour'
GROUP BY queue, state
ORDER BY queue, state;

Если у вас kubernetes_queue = kubernetes:

  • queue='kubernetes' → K8s pods
  • остальные queues → Celery workers

K8s pod count vs Celery message rate

# K8s pods active
kubectl get pods -n airflow -l app=airflow-worker --field-selector=status.phase=Running | wc -l

# Celery message rate (Redis LLEN)
redis-cli -h airflow-redis llen celery

Health — pods ~ heavy queue size, Celery LLEN low (workers consume fast).


Configuration cheat-sheet

# CeleryKubernetesExecutor
[core]
executor = CeleryKubernetesExecutor

[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow
worker_concurrency = 16
worker_prefetch_multiplier = 1
celery_acks_late = True

[kubernetes_executor]
namespace = airflow
pod_template_file = /opt/airflow/pod_template.yaml
worker_container_repository = my-registry.com/airflow
worker_container_tag = 2.10.5
delete_worker_pods = True
delete_worker_pods_on_failure = False

[celery_kubernetes_executor]
kubernetes_queue = kubernetes  # ← название queue, которое идёт на K8s
# LocalKubernetesExecutor (single-node + occasional K8s)
[core]
executor = LocalKubernetesExecutor
parallelism = 32

[kubernetes_executor]
namespace = airflow
# ... same as above

[local_kubernetes_executor]
kubernetes_queue = kubernetes

Migration path

From CeleryExecutor → CeleryKubernetesExecutor

  1. Add [kubernetes_executor] config block, deploy pod_template_file
  2. Update executor = CeleryKubernetesExecutor
  3. Запустить scheduler — оба executor working
  4. Один за другим перенести heavy tasks через @task(queue="kubernetes")
  5. Test, monitor cold start
  6. Roll out

Старые tasks (без queue override) продолжают идти через Celery — backwards compatible.

From CeleryKubernetesExecutor → Multiple Executors (AIP-61)

  1. Upgrade Airflow 2.9 → 2.10
  2. Изменить executor = CeleryExecutor,KubernetesExecutor
  3. Optionally заменить queue="kubernetes"executor="KubernetesExecutor"
  4. Старые DAG-и работают (queue routing fallback)
  5. Со временем мигрировать на explicit executor=...

Production gotchas

Gotcha 1: Default queue для K8s tasks

Если задать kubernetes_queue = kubernetes, но забыть pollut в task queue="kubernetes" — task пойдёт через Celery (с pod_override игнорируется). Frustrating debug.

Fix: always check that task.queue matches kubernetes_queue config.

Gotcha 2: Two heartbeats — 2x DB calls

Wrapper делает heartbeat для обоих child executors. Это 2x DB load для executor state checks. На high throughput — заметно. Mitigate через connection pooling.

Gotcha 3: K8s часть может быть disabled (по ошибке)

Если [kubernetes_executor] blocks неправильно сконфигурирован (missing namespace, invalid pod_template_file) — KubernetesExecutor instance может не стартануть, но Celery part working. Task с queue="kubernetes" будет stuck в queued.

Fix: monitor scheduler logs на startup для K8s executor errors.

Gotcha 4: Hybrid не уменьшает cold start

Hybrid не магически делает K8s pods быстрее. Cold start = 5-30s остаётся для K8s tasks. Hybrid просто избегает этого для коротких tasks (Celery).

Gotcha 5: Migrate path к 2.10 AIP-61

Hybrid executor (CeleryKubernetesExecutor) и Multiple Executors (CeleryExecutor,KubernetesExecutor) — разные настройки. Нельзя use одновременно. Migrate: hybrid → выбрать AIP-61, конвертировать queue="kubernetes"executor="KubernetesExecutor" (опционально, queue routing работает в AIP-61 как fallback).


Проверка знанийKnowledge check
У вас 95% short tasks (data validations, SQL queries) и 5% heavy (Spark submissions, иногда GPU ML). На Airflow 2.9. Стоит ли использовать CeleryKubernetesExecutor или ждать upgrade до 2.10 для AIP-61?
ОтветAnswer
**Сейчас (на 2.9): CeleryKubernetesExecutor — лучший выбор.** Конфиг: `executor = CeleryKubernetesExecutor`, `kubernetes_queue = kubernetes`. 95% short tasks default `queue='default'` → Celery (1-3s startup, high throughput). 5% heavy — `@task(queue='kubernetes', executor_config={'pod_override': V1Pod(...)})` → K8s pod-per-task с GPU/big memory. Это даёт low latency для большинства tasks + isolation для heavy. **При upgrade до 2.10 — переход на AIP-61 несложный**: (1) `executor = CeleryExecutor,KubernetesExecutor`; (2) опционально `@task(executor='KubernetesExecutor', ...)` instead of `queue='kubernetes'`; (3) queue routing продолжает работать backward-compat. **Не ждите**: CeleryKubernetesExecutor working уже в 2.3, прокачка миграции на 2.10 — отдельный проект (release notes, breaking changes review). Migrate когда есть compelling reason (AIP-61 features — больше двух executors одновременно, например team-specific routing).

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. В CeleryKubernetesExecutor как scheduler определяет, какой executor выполнит task?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7