Learning Platform
Глоссарий Troubleshooting
Урок 06.06 · 30 мин
Продвинутый
AIP-61Multiple ExecutorsAirflow 2.10executor overrideTeam routing

Multiple Executors (AIP-61, 2.10+) — несколько executors одновременно

До Airflow 2.10 deployment был ограничен одним executor (или специальной hybrid wrapper из двух). Это был чёткий operational constraint: команды с разными requirements (analytics + ML + ad-hoc) либо мирились с компромиссами, либо запускали несколько Airflow instances.

AIP-61 (Airflow Improvement Proposal #61) в 2.10 изменил это. Теперь можно сконфигурировать N executors одновременно в одном Airflow deployment:

[core]
executor = LocalExecutor,CeleryExecutor,KubernetesExecutor

И на уровне task — указать, какой executor выполнит её:

@task(executor="KubernetesExecutor")
def gpu_inference():
    pass

Этот урок — deep dive в AIP-61: конфигурация, routing logic в scheduler, сравнение с hybrid executors, и production patterns (team-specific routing, environment-specific scaling).


Зачем AIP-61

Реальные сценарии, которые решает AIP-61:

Сценарий 1: Mixed workload без compromise

Team имеет:

  • 80% short SQL tasks (1-10s) — нужна low latency
  • 15% medium Python tasks (30s - 5 min) — нужна middle isolation
  • 5% heavy ML/Spark tasks — нужны GPU, full isolation

До 2.10:

  • Pure Celery → ML tasks делят environment, нет GPU
  • Pure K8s → 30s overhead на коротких tasks (50%+ времени тратится впустую)
  • CeleryKubernetes → только 2 executor-а, всё non-K8s идёт на Celery (нет options для local)

С AIP-61:

executor = CeleryExecutor,KubernetesExecutor,LocalExecutor

Каждая task выбирает оптимально.

Сценарий 2: Team-specific routing

Большая организация: data team, ML team, analytics team. Каждая имеет:

  • Свои Python deps (CUDA для ML, dbt для analytics)
  • Свои resource requirements
  • Свои workers / pods configurations

С AIP-61:

  • ML team — KubernetesExecutor с GPU pod templates
  • Analytics team — CeleryExecutor с dbt-baked workers
  • Data team — CeleryExecutor с pandas-baked workers

Один Airflow instance, three executor pools.

Сценарий 3: Migration path

Migration from one executor to another. До AIP-61 — все-or-nothing migration. С AIP-61 — постепенный переход: новые tasks на новом executor, старые — на старом, без двух Airflow instances.


Конфигурация

Базовая

[core]
executor = LocalExecutor,CeleryExecutor,KubernetesExecutor
# Первый в списке — default. Если @task без executor= override → LocalExecutor.

Каждый executor требует свой config block:

[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow
worker_concurrency = 16
worker_prefetch_multiplier = 1
celery_acks_late = True

[kubernetes_executor]
namespace = airflow
pod_template_file = /opt/airflow/pod_template.yaml
worker_container_repository = my-registry.com/airflow
worker_container_tag = 2.10.5
delete_worker_pods = True
worker_pods_creation_batch_size = 16

# LocalExecutor doesn't need a separate block — uses [core] parallelism

Custom executor с alias

Можно дать executor-у alias:

[core]
executor = airflow.providers.celery.executors.celery_executor.CeleryExecutor:my_celery_high_priority,airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor:k8s_gpu

В DAG:

@task(executor="my_celery_high_priority")
def short_task(): ...

@task(executor="k8s_gpu")
def heavy_task(): ...

Полезно когда два экземпляра одного executor — например два Celery brokers (один Redis, другой RabbitMQ для critical workflows).


Routing logic в scheduler

Multiple Executors: routing decision flow
Scheduler — TI scheduledCritical section: TI deps satisfied, state=scheduled. Phase 3 main loop пытается enqueue. Здесь начинается routing decision.
resolve executor for TI
TI has executor override?Scheduler читает task definition (из serialized DAG): operator.executor_config? @task(executor=...)? Если override явный — использовать его.
Yes → use specified executorScheduler выбирает соответствующий executor instance из словаря. Если specified executor не сконфигурирован — ERROR (TI fail с понятным error).
No → use default (first in list)Если task не указала executor — используется default (первый в core.executor CSV). Полностью backward-compatible: старые DAG-и работают без изменений.
executor.queue_command()
Selected executor enqueues TIСтандартный flow для выбранного executor: Local → multiprocessing.Queue, Celery → broker, K8s → POST /pods. Watcher/result_queue для каждого executor работает independently.

Внутри: scheduler holds dict of executors

В airflow.jobs.scheduler_job_runner:

# Псевдокод
class SchedulerJobRunner:
    def __init__(self):
        executor_names = conf.get("core", "executor").split(",")
        self.executors = {
            name: ExecutorLoader.load_executor(name)
            for name in executor_names
        }
        self.default_executor = self.executors[executor_names[0]]

    def _enqueue_task_instance(self, ti):
        executor_name = ti.executor or self.default_executor.name
        executor = self.executors.get(executor_name)
        if not executor:
            raise UnknownExecutorException(f"{executor_name} not configured")
        executor.queue_command(ti, command, priority, queue=ti.queue)

    def heartbeat(self):
        for executor in self.executors.values():
            executor.heartbeat()

Каждый executor heartbeats independently. Watcher threads, broker connections — изолированы.


Per-task executor override

TaskFlow API

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule="@hourly", start_date=datetime(2026, 1, 1), catchup=False)
def multi_executor_pipeline():

    # Default → first in executor list (LocalExecutor or CeleryExecutor)
    @task
    def fetch_metadata():
        return "..."

    # Explicit Celery
    @task(executor="CeleryExecutor")
    def light_etl(meta):
        return "..."

    # Explicit K8s with GPU
    @task(
        executor="KubernetesExecutor"
        executor_config={
            "pod_override": V1Pod(spec=V1PodSpec(containers=[...]))
        },
    )
    def ml_inference(data):
        return "..."

    update_db(ml_inference(light_etl(fetch_metadata())))

multi_executor_pipeline()

Classic Operators

from airflow.operators.python import PythonOperator

heavy_task = PythonOperator(
    task_id="heavy"
    python_callable=my_func,
    executor="KubernetesExecutor",
)

executor — параметр BaseOperator с 2.10+. Доступен для всех operators.


Сравнение: AIP-61 vs Hybrid Executors

AspectMultiple Executors (AIP-61)CeleryKubernetesExecutor (Hybrid)
Сколько executorsДо N (любая комбинация)Фиксированно 2
APIexecutor="X" explicitqueue="kubernetes" indirect
Конфигурацияexecutor = X,Y,Z (CSV)executor = CeleryKubernetesExecutor
Default executorПервый в listCelery (или Local в LocalKubernetes)
AliasesYes — Class:alias syntaxNo
Multiple same executorYes — 2x CeleryExecutor с разными configsNo
Available since2.102.3
Backward compatYes — old DAGs work без changesYes

Преимущества AIP-61:

  1. Больше двух executors — Local + Celery + K8s одновременно
  2. Aliases — два экземпляра одного executor (2 Celery pools, например)
  3. Явный APIexecutor=X понятнее чем queue=X (queue был для другого — priority routing within executor)
  4. Cleaner mental model — каждая task знает свой executor

Hybrid (CeleryKubernetes) ещё имеет смысл:

  • Если у вас Airflow 2.3-2.9 (не upgrade)
  • Если ровно 2 executor-а нужны (без дополнительной flexibility)

Production patterns

Pattern 1: Standard mixed workload

[core]
executor = CeleryExecutor,KubernetesExecutor
@task                                      # default → Celery
def short_sql(): ...

@task(executor="KubernetesExecutor")        # heavy → K8s
def ml_inference(): ...

Pattern 2: Team-specific routing с aliases

[core]
executor = airflow.providers.celery.executors.celery_executor.CeleryExecutor:analytics_celery,airflow.providers.celery.executors.celery_executor.CeleryExecutor:data_celery,airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor:ml_k8s

Два Celery executors с разными broker_url (через env vars / overrides) — analytics workers и data workers изолированы.

# Analytics team DAG
@task(executor="analytics_celery")
def analytics_task(): ...

# Data team DAG
@task(executor="data_celery")
def data_task(): ...

# ML team
@task(executor="ml_k8s")
def ml_task(): ...

Pattern 3: Migration in progress

Старый Airflow используется только CeleryExecutor. Хочется переехать на K8s, но постепенно:

[core]
executor = CeleryExecutor,KubernetesExecutor   # Celery — default
# Existing tasks — без изменений, Celery
@task
def legacy_task(): ...

# New tasks — на K8s
@task(executor="KubernetesExecutor")
def new_task(): ...

Со временем — добавлять executor="KubernetesExecutor" к existing tasks, тестируя поодиночке.

Pattern 4: Development vs Production environments

# Dev environment
[core]
executor = LocalExecutor,KubernetesExecutor

# Prod environment
[core]
executor = CeleryExecutor,KubernetesExecutor

DAG код одинаковый — но default behaviour разный (Local для dev, Celery для prod). Tasks с explicit executor override остаются consistent.


Configuration cheat-sheet

[core]
# Multiple executors — CSV, первый = default
executor = CeleryExecutor,KubernetesExecutor
parallelism = 32   # для LocalExecutor если есть

[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow
worker_concurrency = 16
worker_prefetch_multiplier = 1
celery_acks_late = True
worker_max_tasks_per_child = 100

[kubernetes_executor]
namespace = airflow
pod_template_file = /opt/airflow/pod_template.yaml
worker_container_repository = my-registry.com/airflow
worker_container_tag = 2.10.5
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 16
worker_pods_pending_timeout = 300

Production observability

Сколько TI ушло через каждый executor

-- TI by executor (last hour)
SELECT
    executor,
    state,
    COUNT(*) AS ti_count,
    AVG(EXTRACT(EPOCH FROM (end_date - start_date))) AS avg_duration_sec
FROM task_instance
WHERE start_date > now() - interval '1 hour'
  AND executor IS NOT NULL  -- 2.10+ имеет executor column
GROUP BY executor, state
ORDER BY executor, ti_count DESC;

Executor-specific health

-- Stuck queued по executor
SELECT
    executor,
    queue,
    COUNT(*) AS stuck,
    MIN(queued_dttm) AS oldest_queued
FROM task_instance
WHERE state = 'queued'
  AND queued_dttm < now() - interval '2 minutes'
GROUP BY executor, queue
ORDER BY stuck DESC;

Метрики

С AIP-61 scheduler emit-ит per-executor метрики (через StatsD/OTel):

airflow.executor.CeleryExecutor.queued_tasks
airflow.executor.CeleryExecutor.running_tasks
airflow.executor.KubernetesExecutor.queued_tasks
airflow.executor.KubernetesExecutor.running_tasks

Можно строить per-executor dashboards в Grafana.


Production gotchas

Gotcha 1: Unknown executor in DAG

Если @task(executor="MyCustomExecutor"), но MyCustomExecutor не в [core] executor:

airflow.exceptions.UnknownExecutorException: 
Executor 'MyCustomExecutor' not configured in [core] executor

TI fails immediately. Detect в CI:

  • Lint DAG-и проверить, что все executor= references существуют в config
  • Centralize executor names как constants

Gotcha 2: Default executor shift

Изменили executor = LocalExecutor,Celery,K8sexecutor = CeleryExecutor,Local,K8s. Default executor сменился — все tasks без override теперь идут через Celery. Может сломать deployments, ожидающих Local.

Fix: при изменении order — explicit executor= для всех tasks. Или sticky к existing default через config.

Gotcha 3: Watcher / heartbeat — N executors = N times resource

Каждый K8s executor имеет свой watcher thread. Каждый Celery executor — свой broker connection pool. На 3 executor-ах — 3x scheduler footprint.

Mitigation: monitor scheduler RES, не больше 2-3 executors в одном deployment.

Gotcha 4: K8s pod label collision

Multiple K8s executors с одинаковым [kubernetes_executor] namespace — pods обоих executors будут иметь pересекающиеся labels. Watcher одного увидит pods другого → state corruption.

Fix: each K8s executor — свой namespace. Или unique label selector через config.

Gotcha 5: executor column в task_instance

executor column был добавлен в task_instance table в 2.10. До 2.10 это поле не существовало — у старых TI оно NULL. Поэтому queries с WHERE executor = X должны handle NULL для historical data.


Проверка знанийKnowledge check
Большая компания, Airflow 2.10. Три data teams (analytics, ML, finance) каждая со своими requirements. Analytics — short SQL/dbt tasks; ML — GPU pods, custom CUDA images; Finance — strict isolation (compliance), audit trail. Спроектируйте Multiple Executors конфигурацию.
ОтветAnswer
**Конфиг**: `executor = airflow.providers.celery.executors.celery_executor.CeleryExecutor:analytics_celery,airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor:ml_k8s,airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor:finance_k8s`. **Analytics_celery** — worker_concurrency=32, prefetch=1, image baked с dbt-core + pandas + duckdb. KEDA scale по queue depth. Default executor (первый в list) → большинство tasks по умолчанию идут сюда. **ml_k8s** — namespace=ml-airflow, pod_template_file=/configs/ml-pod.yaml с CUDA image, GPU node_selector, audit labels. `@task(executor='ml_k8s')` для всех ML DAG-ов. **finance_k8s** — namespace=finance-airflow с NetworkPolicy isolated (нет доступа кроме approved data sources), audit through K8s events + Falco, immutable image (signed). RBAC жёсткий — отдельный ServiceAccount. **Migration**: analytics tasks default → Celery (без changes), ML — добавить `executor='ml_k8s'` per task, Finance team — мигрирует свои DAG-и c `executor='finance_k8s'`. Observability: per-executor Grafana dashboards (queued/running/duration per executor), стороны pg_stat_activity tagged by executor. **Альтернатива** — 3 отдельных Airflow instances, но это 3x maintenance overhead и нет shared scheduler infrastructure.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. С Airflow 2.10 (AIP-61), как конфигурировать несколько executors?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7