Multiple Executors (AIP-61, 2.10+) — несколько executors одновременно
До Airflow 2.10 deployment был ограничен одним executor (или специальной hybrid wrapper из двух). Это был чёткий operational constraint: команды с разными requirements (analytics + ML + ad-hoc) либо мирились с компромиссами, либо запускали несколько Airflow instances.
AIP-61 (Airflow Improvement Proposal #61) в 2.10 изменил это. Теперь можно сконфигурировать N executors одновременно в одном Airflow deployment:
[core]
executor = LocalExecutor,CeleryExecutor,KubernetesExecutor
И на уровне task — указать, какой executor выполнит её:
@task(executor="KubernetesExecutor")
def gpu_inference():
pass
Этот урок — deep dive в AIP-61: конфигурация, routing logic в scheduler, сравнение с hybrid executors, и production patterns (team-specific routing, environment-specific scaling).
Зачем AIP-61
Реальные сценарии, которые решает AIP-61:
Сценарий 1: Mixed workload без compromise
Team имеет:
- 80% short SQL tasks (1-10s) — нужна low latency
- 15% medium Python tasks (30s - 5 min) — нужна middle isolation
- 5% heavy ML/Spark tasks — нужны GPU, full isolation
До 2.10:
- Pure Celery → ML tasks делят environment, нет GPU
- Pure K8s → 30s overhead на коротких tasks (50%+ времени тратится впустую)
- CeleryKubernetes → только 2 executor-а, всё non-K8s идёт на Celery (нет options для local)
С AIP-61:
executor = CeleryExecutor,KubernetesExecutor,LocalExecutor
Каждая task выбирает оптимально.
Сценарий 2: Team-specific routing
Большая организация: data team, ML team, analytics team. Каждая имеет:
- Свои Python deps (CUDA для ML, dbt для analytics)
- Свои resource requirements
- Свои workers / pods configurations
С AIP-61:
- ML team — KubernetesExecutor с GPU pod templates
- Analytics team — CeleryExecutor с dbt-baked workers
- Data team — CeleryExecutor с pandas-baked workers
Один Airflow instance, three executor pools.
Сценарий 3: Migration path
Migration from one executor to another. До AIP-61 — все-or-nothing migration. С AIP-61 — постепенный переход: новые tasks на новом executor, старые — на старом, без двух Airflow instances.
Конфигурация
Базовая
[core]
executor = LocalExecutor,CeleryExecutor,KubernetesExecutor
# Первый в списке — default. Если @task без executor= override → LocalExecutor.
Каждый executor требует свой config block:
[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow
worker_concurrency = 16
worker_prefetch_multiplier = 1
celery_acks_late = True
[kubernetes_executor]
namespace = airflow
pod_template_file = /opt/airflow/pod_template.yaml
worker_container_repository = my-registry.com/airflow
worker_container_tag = 2.10.5
delete_worker_pods = True
worker_pods_creation_batch_size = 16
# LocalExecutor doesn't need a separate block — uses [core] parallelism
Custom executor с alias
Можно дать executor-у alias:
[core]
executor = airflow.providers.celery.executors.celery_executor.CeleryExecutor:my_celery_high_priority,airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor:k8s_gpu
В DAG:
@task(executor="my_celery_high_priority")
def short_task(): ...
@task(executor="k8s_gpu")
def heavy_task(): ...
Полезно когда два экземпляра одного executor — например два Celery brokers (один Redis, другой RabbitMQ для critical workflows).
Routing logic в scheduler
Внутри: scheduler holds dict of executors
В airflow.jobs.scheduler_job_runner:
# Псевдокод
class SchedulerJobRunner:
def __init__(self):
executor_names = conf.get("core", "executor").split(",")
self.executors = {
name: ExecutorLoader.load_executor(name)
for name in executor_names
}
self.default_executor = self.executors[executor_names[0]]
def _enqueue_task_instance(self, ti):
executor_name = ti.executor or self.default_executor.name
executor = self.executors.get(executor_name)
if not executor:
raise UnknownExecutorException(f"{executor_name} not configured")
executor.queue_command(ti, command, priority, queue=ti.queue)
def heartbeat(self):
for executor in self.executors.values():
executor.heartbeat()
Каждый executor heartbeats independently. Watcher threads, broker connections — изолированы.
Per-task executor override
TaskFlow API
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule="@hourly", start_date=datetime(2026, 1, 1), catchup=False)
def multi_executor_pipeline():
# Default → first in executor list (LocalExecutor or CeleryExecutor)
@task
def fetch_metadata():
return "..."
# Explicit Celery
@task(executor="CeleryExecutor")
def light_etl(meta):
return "..."
# Explicit K8s with GPU
@task(
executor="KubernetesExecutor"
executor_config={
"pod_override": V1Pod(spec=V1PodSpec(containers=[...]))
},
)
def ml_inference(data):
return "..."
update_db(ml_inference(light_etl(fetch_metadata())))
multi_executor_pipeline()
Classic Operators
from airflow.operators.python import PythonOperator
heavy_task = PythonOperator(
task_id="heavy"
python_callable=my_func,
executor="KubernetesExecutor",
)
executor — параметр BaseOperator с 2.10+. Доступен для всех operators.
Сравнение: AIP-61 vs Hybrid Executors
| Aspect | Multiple Executors (AIP-61) | CeleryKubernetesExecutor (Hybrid) |
|---|---|---|
| Сколько executors | До N (любая комбинация) | Фиксированно 2 |
| API | executor="X" explicit | queue="kubernetes" indirect |
| Конфигурация | executor = X,Y,Z (CSV) | executor = CeleryKubernetesExecutor |
| Default executor | Первый в list | Celery (или Local в LocalKubernetes) |
| Aliases | Yes — Class:alias syntax | No |
| Multiple same executor | Yes — 2x CeleryExecutor с разными configs | No |
| Available since | 2.10 | 2.3 |
| Backward compat | Yes — old DAGs work без changes | Yes |
Преимущества AIP-61:
- Больше двух executors — Local + Celery + K8s одновременно
- Aliases — два экземпляра одного executor (2 Celery pools, например)
- Явный API —
executor=Xпонятнее чемqueue=X(queue был для другого — priority routing within executor) - Cleaner mental model — каждая task знает свой executor
Hybrid (CeleryKubernetes) ещё имеет смысл:
- Если у вас Airflow 2.3-2.9 (не upgrade)
- Если ровно 2 executor-а нужны (без дополнительной flexibility)
Production patterns
Pattern 1: Standard mixed workload
[core]
executor = CeleryExecutor,KubernetesExecutor
@task # default → Celery
def short_sql(): ...
@task(executor="KubernetesExecutor") # heavy → K8s
def ml_inference(): ...
Pattern 2: Team-specific routing с aliases
[core]
executor = airflow.providers.celery.executors.celery_executor.CeleryExecutor:analytics_celery,airflow.providers.celery.executors.celery_executor.CeleryExecutor:data_celery,airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor:ml_k8s
Два Celery executors с разными broker_url (через env vars / overrides) — analytics workers и data workers изолированы.
# Analytics team DAG
@task(executor="analytics_celery")
def analytics_task(): ...
# Data team DAG
@task(executor="data_celery")
def data_task(): ...
# ML team
@task(executor="ml_k8s")
def ml_task(): ...
Pattern 3: Migration in progress
Старый Airflow используется только CeleryExecutor. Хочется переехать на K8s, но постепенно:
[core]
executor = CeleryExecutor,KubernetesExecutor # Celery — default
# Existing tasks — без изменений, Celery
@task
def legacy_task(): ...
# New tasks — на K8s
@task(executor="KubernetesExecutor")
def new_task(): ...
Со временем — добавлять executor="KubernetesExecutor" к existing tasks, тестируя поодиночке.
Pattern 4: Development vs Production environments
# Dev environment
[core]
executor = LocalExecutor,KubernetesExecutor
# Prod environment
[core]
executor = CeleryExecutor,KubernetesExecutor
DAG код одинаковый — но default behaviour разный (Local для dev, Celery для prod). Tasks с explicit executor override остаются consistent.
Configuration cheat-sheet
[core]
# Multiple executors — CSV, первый = default
executor = CeleryExecutor,KubernetesExecutor
parallelism = 32 # для LocalExecutor если есть
[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow
worker_concurrency = 16
worker_prefetch_multiplier = 1
celery_acks_late = True
worker_max_tasks_per_child = 100
[kubernetes_executor]
namespace = airflow
pod_template_file = /opt/airflow/pod_template.yaml
worker_container_repository = my-registry.com/airflow
worker_container_tag = 2.10.5
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 16
worker_pods_pending_timeout = 300
Production observability
Сколько TI ушло через каждый executor
-- TI by executor (last hour)
SELECT
executor,
state,
COUNT(*) AS ti_count,
AVG(EXTRACT(EPOCH FROM (end_date - start_date))) AS avg_duration_sec
FROM task_instance
WHERE start_date > now() - interval '1 hour'
AND executor IS NOT NULL -- 2.10+ имеет executor column
GROUP BY executor, state
ORDER BY executor, ti_count DESC;
Executor-specific health
-- Stuck queued по executor
SELECT
executor,
queue,
COUNT(*) AS stuck,
MIN(queued_dttm) AS oldest_queued
FROM task_instance
WHERE state = 'queued'
AND queued_dttm < now() - interval '2 minutes'
GROUP BY executor, queue
ORDER BY stuck DESC;
Метрики
С AIP-61 scheduler emit-ит per-executor метрики (через StatsD/OTel):
airflow.executor.CeleryExecutor.queued_tasks
airflow.executor.CeleryExecutor.running_tasks
airflow.executor.KubernetesExecutor.queued_tasks
airflow.executor.KubernetesExecutor.running_tasks
Можно строить per-executor dashboards в Grafana.
Production gotchas
Gotcha 1: Unknown executor in DAG
Если @task(executor="MyCustomExecutor"), но MyCustomExecutor не в [core] executor:
airflow.exceptions.UnknownExecutorException:
Executor 'MyCustomExecutor' not configured in [core] executor
TI fails immediately. Detect в CI:
- Lint DAG-и проверить, что все
executor=references существуют в config - Centralize executor names как constants
Gotcha 2: Default executor shift
Изменили executor = LocalExecutor,Celery,K8s → executor = CeleryExecutor,Local,K8s. Default executor сменился — все tasks без override теперь идут через Celery. Может сломать deployments, ожидающих Local.
Fix: при изменении order — explicit executor= для всех tasks. Или sticky к existing default через config.
Gotcha 3: Watcher / heartbeat — N executors = N times resource
Каждый K8s executor имеет свой watcher thread. Каждый Celery executor — свой broker connection pool. На 3 executor-ах — 3x scheduler footprint.
Mitigation: monitor scheduler RES, не больше 2-3 executors в одном deployment.
Gotcha 4: K8s pod label collision
Multiple K8s executors с одинаковым [kubernetes_executor] namespace — pods обоих executors будут иметь pересекающиеся labels. Watcher одного увидит pods другого → state corruption.
Fix: each K8s executor — свой namespace. Или unique label selector через config.
Gotcha 5: executor column в task_instance
executor column был добавлен в task_instance table в 2.10. До 2.10 это поле не существовало — у старых TI оно NULL. Поэтому queries с WHERE executor = X должны handle NULL для historical data.