Learning Platform
Глоссарий Troubleshooting
Урок 06.07 · 35 мин
Продвинутый
BenchmarkLabdocker-composekindDecision framework

Executor comparison lab — benchmark Local vs Celery vs K8s на одном DAG

Теория препарирована — теперь пора потрогать руками. В этом hands-on уроке мы запустим один и тот же DAG на трёх executors (Local, Celery, K8s) и замерим:

  1. Startup latency — от scheduled до running
  2. Throughput — tasks/min
  3. Resource cost — RAM/CPU footprint
  4. Total time на полный DAG

Результат — concrete numbers + decision framework «когда какой executor выбрать».


Тест DAG

Стандартный benchmark DAG — 20 коротких tasks (sleep 2s) + 5 средних (sleep 30s):

# dags/benchmark_dag.py
from airflow.decorators import dag, task
from datetime import datetime
import time

@dag(
    dag_id="executor_benchmark"
    schedule=None,
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["benchmark"],
)
def benchmark():
    @task
    def short_task(idx: int):
        time.sleep(2)
        return idx

    @task
    def medium_task(idx: int):
        time.sleep(30)
        return idx

    @task
    def aggregator(short_results: list, medium_results: list):
        return {"total_short": len(short_results), "total_medium": len(medium_results)}

    shorts = short_task.expand(idx=list(range(20)))
    mediums = medium_task.expand(idx=list(range(5)))
    aggregator(shorts, mediums)

benchmark()

Total work: 20 × 2s + 5 × 30s = 40 + 150 = 190 seconds of pure work. С parallelism=8 и нулевым overhead — теоретически за 30-40 секунд можно прокатить.


Setup 1: LocalExecutor (docker-compose)

docker-compose.local.yml:

version: "3.8"
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow

  scheduler:
    image: apache/airflow:2.10.5
    depends_on: [postgres]
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__CORE__PARALLELISM: 16
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CORE__LOAD_EXAMPLES: "False"
    volumes:
      - ./dags:/opt/airflow/dags
    command: bash -c "airflow db migrate && airflow standalone"
    ports:
      - "8080:8080"

Запуск:

docker compose -f docker-compose.local.yml up -d
# Подождать ~30s — DAG появится в UI

# Trigger DAG
docker compose exec scheduler airflow dags trigger executor_benchmark

Setup 2: CeleryExecutor (docker-compose)

docker-compose.celery.yml:

version: "3.8"
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow

  redis:
    image: redis:7
    command: redis-server --appendonly yes

  scheduler:
    image: apache/airflow:2.10.5
    depends_on: [postgres, redis]
    environment: &airflow-env
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
      AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
      AIRFLOW__CELERY__WORKER_PREFETCH_MULTIPLIER: 1
      AIRFLOW__CELERY__WORKER_CONCURRENCY: 8
      AIRFLOW__CELERY__CELERY_ACKS_LATE: "True"
      AIRFLOW__CORE__LOAD_EXAMPLES: "False"
    volumes:
      - ./dags:/opt/airflow/dags
    command: scheduler

  worker:
    image: apache/airflow:2.10.5
    depends_on: [postgres, redis]
    environment: *airflow-env
    volumes:
      - ./dags:/opt/airflow/dags
    command: celery worker
    deploy:
      replicas: 2

  webserver:
    image: apache/airflow:2.10.5
    depends_on: [postgres, redis]
    environment: *airflow-env
    volumes:
      - ./dags:/opt/airflow/dags
    command: webserver
    ports:
      - "8080:8080"

Запуск:

docker compose -f docker-compose.celery.yml up -d
docker compose exec scheduler airflow db migrate
docker compose exec scheduler airflow users create --username admin --password admin --email [email protected] --firstname a --lastname a --role Admin

docker compose exec scheduler airflow dags trigger executor_benchmark

Setup 3: KubernetesExecutor (kind)

kind — lightweight K8s for local. Setup:

# Install kind + kubectl
brew install kind kubectl

# Create cluster
kind create cluster --name airflow-bench --config=- <<EOF
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
  - role: control-plane
  - role: worker
  - role: worker
EOF

# Install Airflow via helm
helm repo add apache-airflow https://airflow.apache.org
helm repo update

cat > values.yaml <<EOF
executor: "KubernetesExecutor"
images:
  airflow:
    tag: "2.10.5"
dags:
  gitSync:
    enabled: false
  persistence:
    enabled: false
  examples: false
data:
  metadataConnection:
    user: postgres
    pass: postgres
config:
  core:
    executor: "KubernetesExecutor"
    parallelism: "32"
  kubernetes_executor:
    namespace: "default"
    delete_worker_pods: "True"
    worker_pods_creation_batch_size: "16"
EOF

helm install airflow apache-airflow/airflow \
  --namespace airflow --create-namespace \
  -f values.yaml

# Wait for scheduler + webserver
kubectl wait --for=condition=ready pod -l component=scheduler -n airflow --timeout=300s

# Copy DAG
kubectl cp dags/benchmark_dag.py airflow/$(kubectl get pod -n airflow -l component=scheduler -o name | head -1 | cut -d/ -f2):/opt/airflow/dags/

# Trigger
kubectl exec -n airflow deployment/airflow-scheduler -- airflow dags trigger executor_benchmark

Метрики, которые мы собираем

Метрика 1: Startup latency (per task)

-- Среднее время от queued_dttm до start_date
SELECT
    AVG(EXTRACT(EPOCH FROM (start_date - queued_dttm))) AS avg_startup_sec,
    MAX(EXTRACT(EPOCH FROM (start_date - queued_dttm))) AS max_startup_sec,
    COUNT(*) AS ti_count
FROM task_instance
WHERE dag_id = 'executor_benchmark'
  AND run_id = '<run_id>'
  AND state = 'success';

Метрика 2: Throughput (tasks/min)

-- Tasks completed per minute (rolling)
SELECT
    date_trunc('minute', end_date) AS minute,
    COUNT(*) AS completed
FROM task_instance
WHERE dag_id = 'executor_benchmark'
  AND state = 'success'
GROUP BY minute
ORDER BY minute;

Метрика 3: Total DagRun duration

SELECT
    run_id,
    EXTRACT(EPOCH FROM (end_date - start_date)) AS total_sec,
    state
FROM dag_run
WHERE dag_id = 'executor_benchmark'
ORDER BY start_date DESC
LIMIT 5;

Метрика 4: Resource cost

# LocalExecutor (single host)
docker stats --no-stream | grep airflow

# Celery (multiple hosts)
docker stats --no-stream | grep -E "(scheduler|worker)"

# K8s
kubectl top pods -n airflow

Результаты benchmark (типичные)

Запуск на MacBook M1 16 GB (примерно эквивалентно single c5.2xlarge AWS):

Benchmark results — same DAG, three executors
LocalExecutorparallelism=16, single scheduler process. Никакой broker overhead. Worker = forked process, startup ~1s на task. Total DAG (40s коротких + 150s sequential medium при parallelism=16) — ~210s. RAM 4GB peak. CPU ~30% avg.
CeleryExecutor (2 workers × concurrency 8)parallelism=16 общий (8 × 2 workers). Broker overhead ~1s на task. Total ~215s (немного медленнее Local из-за broker round-trip). RAM 5GB (scheduler + 2 workers + Redis + Postgres). CPU ~35%. Преимущества видны на multi-node — здесь не realized.
KubernetesExecutor (kind 3 nodes)Pod-per-task. Each pod cold start ~15-25s (kind image pull первого pod, потом cached). Total DAG ~480s (190s work + 25 tasks × 15s overhead). RAM 6GB (включая kind nodes). CPU spike при batch pod creation. Cold start доминирует.

Detailed numbers table

MetricLocalExecutorCeleryExecutorKubernetesExecutor
Total DAG duration~210s~215s~480s
Avg task startup0.8s1.5s14.5s
Max task startup2s3s28s
Throughput (tasks/min)~7~7~3
RAM peak4GB5GB6GB
Setup complexity1 container4 containersK8s cluster + helm
HA (worker)NoYes (2 replicas)Yes (pods)
Cost при idleAlways-on scheduler+ always-on workers + Redis+ control plane

Observations

  1. LocalExecutor — fastest по time-to-completion на этом workload. Никаких overhead.
  2. CeleryExecutor близко — broker overhead ~0.5s per task, незначительный на 32s tasks.
  3. KubernetesExecutor — самый медленный на коротких tasks. 80% времени тратится на cold start. Преимущества (isolation, GPU) на этом workload не realized.
  4. Если task length > 5 min — все три похожи (cold start амортизируется).

Decision framework

Decision tree: какой executor выбрать
Single-node deployment?Если у вас одна машина (dev, маленькая компания, on-prem без K8s) — LocalExecutor правильный выбор. Минимум setup, всё working out of the box. До ~100 tasks/min на нормальном железе.
No — multi-node
Все tasks нужно изолировать (deps, resources)?ML pipelines с разными CUDA versions, ETL c разными pandas/spark, multi-team isolation, security/compliance — нужна per-task isolation. Только K8s даёт это natively. Cold start — приемлемая цена.
No — большинство tasks делят environment
High throughput коротких tasks (>500/min)?Throughput-critical pipelines (real-time-ish dashboards, frequent micro-batches) — CeleryExecutor. Worker prefork pool обрабатывает thousands/min с low latency. KEDA scaling для elasticity.
Mixed workload
Mixed: light + heavy?80% short + 20% heavy → Multiple Executors (AIP-61, 2.10+) или CeleryKubernetesExecutor (2.3-2.9). Short → Celery (низкий overhead), heavy → K8s (isolation). Best of both worlds.

Rules of thumb

УсловиеExecutor
Dev environmentLocalExecutor
< 100 tasks/min, single-node, simpleLocalExecutor
> 100 tasks/min, shared environmentCeleryExecutor
Per-task isolation, GPU, custom imagesKubernetesExecutor
Mixed light + heavy (на 2.10+)Multiple Executors AIP-61
Mixed light + heavy (на 2.3-2.9)CeleryKubernetesExecutor
Multi-team, different requirementsMultiple Executors с aliases

Cost analysis (cloud, monthly)

Грубая оценка для medium workload (1000 tasks/day, mix short+heavy):

LocalExecutor

  • 1 × c5.2xlarge (8 CPU / 16 GB): ~$240/mo
  • Постоянно on (idle и active)
    • RDS PostgreSQL db.t3.medium: ~$60/mo
  • Total: ~$300/mo

CeleryExecutor

  • Scheduler: 1 × c5.large: ~$60/mo
  • 2 × c5.xlarge workers: ~$240/mo
  • Redis (ElastiCache cache.t3.medium): ~$50/mo
  • RDS: ~$60/mo
  • Total: ~$410/mo

KubernetesExecutor (на EKS)

  • EKS control plane: ~$70/mo
  • 2 × c5.large nodes (always-on для system pods + scheduler): ~$120/mo
  • Spot/on-demand для worker pods: ~$50-150/mo (variable)
  • RDS: ~$60/mo
  • Total: ~$300-400/mo

Multiple Executors (Celery + K8s)

  • Scheduler + Celery workers: ~$300/mo
  • K8s cluster overhead: ~$120/mo
  • RDS: ~$60/mo
  • Total: ~$480/mo

TL;DR

  • Lowest cost: LocalExecutor (если throughput fits)
  • Best throughput / $: CeleryExecutor (для shared environment workloads)
  • Best flexibility: K8s или Multiple Executors (но cost +30-50%)

Production gotchas (применимы ко всем)

Gotcha 1: Benchmark на single-node не показывает horizontal advantages

Local выглядит fastest на single-node — но не масштабируется. На multi-node benchmark с 200 tasks one DAG run — Local уходит в OOM, Celery/K8s продолжают работать.

Gotcha 2: Cold start cache effects

K8s benchmark первый run — 30s startup из-за image pull. Следующие runs — 5s (image cached на ноде). Benchmark realistic — на pre-warmed cluster.

Gotcha 3: DAG parse time доминирует на коротких runs

Бенчмарк-DAG простой (~1ms parse). Реальные DAG-и с heavy imports — parse time 1-5s. Это видно как «pre-startup» delay (DAG processor reload).

Gotcha 4: Network latency между scheduler и broker/K8s API

В docker-compose всё на одном host (lokerelhost). В production — across nodes, иногда across AZs. Add 10-50ms на каждый roundtrip → масштабируется на throughput.


Repo с lab

Полный benchmark setup доступен в reference repo:

benchmark/
├── dags/benchmark_dag.py
├── docker-compose.local.yml
├── docker-compose.celery.yml
├── kind/values.yaml
├── scripts/
│   ├── run_local.sh
│   ├── run_celery.sh
│   ├── run_k8s.sh
│   └── collect_metrics.py
└── results/
    └── (CSVs with run times)

Прогоните своё железо, соберите свои numbers. Decision framework — guideline; реальные measurements — единственный source of truth для вашей situation.


Проверка знанийKnowledge check
Команда планирует production Airflow для следующего workload: 5000 short SQL/dbt tasks per day (avg 10s каждая), 50 ML inference tasks per day (avg 5 min, нужен GPU), 200 ad-hoc data exploration jobs (varies 30s - 30 min). Какой executor и почему?
ОтветAnswer
**Multiple Executors (AIP-61, 2.10+) — CeleryExecutor + KubernetesExecutor.** Reasoning: (1) 5000 short tasks/day = ~3.5/min average, но bursts могут хитить 50/min. CeleryExecutor с 2-4 workers (concurrency 8 each) обрабатывает легко, low latency ~1.5s startup. KEDA scaling если burst. Default queue → Celery, default executor → CeleryExecutor. (2) 50 ML tasks/day с GPU — `@task(executor='KubernetesExecutor', executor_config={'pod_override': V1Pod(...)})`. Pod-per-task isolation, GPU resource_request, custom CUDA image. Cold start 15s overhead приемлем для 5-min tasks (5%). (3) Ad-hoc — варьируются по resources. Short → default Celery. Long/heavy → `executor='KubernetesExecutor'`. (4) Не использовать pure K8s — 5000 × 15s startup = 21 hours wasted/day на коротких. (5) Не использовать pure Celery — ML tasks без GPU, делят env с другими, конфликты CUDA versions. (6) **Cost**: ~$500/mo на medium load (Celery workers + EKS spot для K8s + RDS). (7) **Migration path**: начать с pure Celery, постепенно перенести ML tasks на K8s через `executor=` override без infrastructure changes.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Benchmark: одинаковый DAG (20 short × 2s + 5 medium × 30s). Local: 210s. Celery: 215s. K8s: 480s. Почему K8s так медленный?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7