Learning Platform
Глоссарий Troubleshooting
Урок 06.04 · 35 мин
Продвинутый
KubernetesExecutorpod-per-taskwatcher threadpod_overrideCold start

KubernetesExecutor deep — pod-per-task, watcher thread, pod_override, cold start

KubernetesExecutor — самый «современный» executor 2.x. Идея радикальная: каждая task = свой pod. Никаких shared workers, никаких prefetch pitfalls, полная изоляция Python deps / resources / network policies. Cost — startup latency (5-30s на pod) и сложность операционки.

В этом уроке мы разберём lifecycle pod-а от scheduler→K8s API→pod, препарируем watcher thread внутри scheduler-процесса, посмотрим как pod template работает и научимся override per-task через executor_config={"pod_override": V1Pod(...)} — например для GPU tasks или custom images.


Архитектура KubernetesExecutor

KubernetesExecutor: pod-per-task lifecycle
Scheduler processairflow scheduler. Внутри: SchedulerJobRunner + KubernetesExecutor + KubernetesJobWatcher thread. Все они делят interpreter. Scheduler сам не запускает tasks — он делает K8s API calls.
execute_async() → create pod
K8s API serverPOST /api/v1/namespaces/{ns}/pods. Body — pod manifest (из pod_template_file + executor_config override). API server валидирует, сохраняет в etcd, возвращает 201 Created. Pod state = Pending.
kube-scheduler + kubelet
Worker Pod (pod-per-task)K8s scheduler находит ноду, kubelet pulls image, запускает container. Внутри pod — Airflow image, container runs `airflow tasks run dag_id task_id ...`. Pod completes — kubelet shuts it down, scheduler watcher детектит.
Pod 2 / 3 / NN pods запускаются одновременно — limited [kubernetes_executor] worker_pods_creation_batch_size (default 16, max 128). На high parallelism до сотен pods одновременно.
watch events
Watcher thread (в scheduler)KubernetesJobWatcher — поток внутри scheduler-процесса. Subscribed на /api/v1/namespaces/{ns}/pods?watch=true&labelSelector=... Получает stream JSON events (ADDED, MODIFIED, DELETED). Парсит pod status, обновляет TI state в metadata DB.

Главные точки:

  1. Pod-per-task — каждый TaskInstance = свой K8s pod. Никаких shared workers.
  2. Watcher thread в scheduler-процессе — он подписан на K8s API events для нужных pods, парсит их status и обновляет metadata DB.
  3. Никакого broker — K8s API сам выполняет роль «task dispatcher».

Pod lifecycle: от scheduled до Completed

TI lifecycle через K8s API
Scheduler — TI scheduledCritical section enqueue: state=queued, executor.execute_async(key, command). Внутри KubernetesExecutor: запускается K8s API POST /pods с pod manifest. Никакой queue нет — direct call.
POST /pods
K8s API: Pod Pendingkubectl get pod показывает Pending. K8s scheduler ищет подходящую ноду (resource requests, nodeSelector, affinity). Если ресурсов нет — Pending зависает. worker_pods_pending_timeout (default 300s) — после этого scheduler reset TI scheduled.
schedule + kubelet pull image
Pod ContainerCreating → Runningkubelet pulls image (если первый раз на ноде — 5-20s). Init containers (если есть) — выполняются. Main container стартует, выполняет `airflow tasks run …`. Watcher детектит MODIFIED event с phase=Running, обновляет TI state=running в DB.
task completes
Pod Succeeded / FailedContainer exit code 0 → Pod Succeeded. Non-zero → Failed. Watcher получает MODIFIED event с phase, обновляет TI state. Logs — в stdout/stderr container-а, читаются через kubectl logs или log_aggregation.
delete_worker_pods=True
Pod deletedDefault delete_worker_pods=True. После завершения pod удаляется. delete_worker_pods_on_failure=False (default) — keep на failure для debug. На success — всегда delete.

Watcher thread — самая интересная деталь

kube-controller-manager: reconcile loops Жизненный цикл Pod в Kubernetes

Внутри airflow.providers.cncf.kubernetes.executors.kubernetes_executor есть класс KubernetesJobWatcher. Это поток внутри scheduler-процесса (не отдельный процесс!), который:

# Псевдокод KubernetesJobWatcher
class KubernetesJobWatcher(multiprocessing.Process):
    def run(self):
        watcher = kubernetes.watch.Watch()
        kube_client = client.CoreV1Api()

        while True:
            try:
                for event in watcher.stream(
                    kube_client.list_namespaced_pod,
                    namespace=self.namespace,
                    label_selector=f"airflow-worker={self.scheduler_job_id}"
                    resource_version=last_resource_version,
                ):
                    # event: {'type': 'ADDED'/'MODIFIED'/'DELETED', 'object': V1Pod}
                    pod = event['object']
                    phase = pod.status.phase
                    ti_key = self._extract_ti_key_from_labels(pod.metadata.labels)

                    # Result queue → scheduler picks up
                    self.result_queue.put((ti_key, phase, pod.metadata.resource_version))

            except ApiException as e:
                # 410 Gone — resource_version устарел, restart watch
                # 401 — credentials expired
                log.error(...)
                time.sleep(1)
                continue

Что важно:

  1. Watcher subscribes на ВСЕ pods с label airflow-worker=<scheduler_job_id>. Не на конкретные pods — на label selector. Один stream для всех pods данного scheduler-а.
  2. resource_version — K8s API мapper для consistent watch. Если watcher disconnect — резюмирует с последнего resourceVersion. Это 410 Gone error — typical, just restart watch.
  3. Watcher живёт в multiprocessing.Process (не thread!), общается с scheduler через multiprocessing.Queue. Это для изоляции от scheduler — если watcher crash, scheduler жив.
NOTE

Если watcher умер и не успел перезапуститься — TI «застревают» в running состоянии, потому что nobody updates DB. Scheduler детектит zombie через scheduler_zombie_task_threshold (default 5 min) и помечает TI failed. Это safety net на случай watcher failures.


Pod template: где описывается pod

Default — глобальный pod_template_file:

# /opt/airflow/pod_template.yaml
apiVersion: v1
kind: Pod
metadata:
  name: airflow-worker-placeholder
  namespace: airflow
spec:
  serviceAccountName: airflow-worker
  restartPolicy: Never
  containers:
    - name: base
      image: my-registry.com/airflow:2.10.5
      imagePullPolicy: IfNotPresent
      args: ["airflow", "tasks", "run"]
      env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor   # внутри pod — Local
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: postgres-conn
      resources:
        requests:
          cpu: 500m
          memory: 1Gi
        limits:
          cpu: 2
          memory: 4Gi
      volumeMounts:
        - name: logs
          mountPath: /opt/airflow/logs
        - name: dags
          mountPath: /opt/airflow/dags
  volumes:
    - name: logs
      persistentVolumeClaim:
        claimName: airflow-logs
    - name: dags
      configMap:
        name: airflow-dags

Этот шаблон применяется ко всем pod-ам по default. Scheduler берёт его, заполняет name, labels, args, и POST в K8s API.

Конфигурируется через:

[kubernetes_executor]
pod_template_file = /opt/airflow/pod_template.yaml
namespace = airflow
worker_container_repository = my-registry.com/airflow
worker_container_tag = 2.10.5
delete_worker_pods = True
delete_worker_pods_on_failure = False    # debug при failure
worker_pods_creation_batch_size = 16
worker_pods_pending_timeout = 300

Per-task pod override через executor_config

Здесь магия. Каждая task может override pod template через executor_config:

from kubernetes.client import (
    V1Pod, V1PodSpec, V1Container,
    V1ResourceRequirements, V1EnvVar, V1Toleration,
)
from airflow.decorators import task

@task(
    executor_config={
        "pod_override": V1Pod(
            spec=V1PodSpec(
                node_selector={"gpu-type": "nvidia-a100"},
                tolerations=[
                    V1Toleration(key="nvidia.com/gpu", operator="Exists", effect="NoSchedule")
                ],
                containers=[V1Container(
                    name="base"
                    image="my-registry.com/airflow-ml:2.10.5",   # override image!
                    resources=V1ResourceRequirements(
                        requests={
                            "cpu": "4",
                            "memory": "16Gi",
                            "nvidia.com/gpu": "1",
                        },
                        limits={
                            "cpu": "8",
                            "memory": "32Gi",
                            "nvidia.com/gpu": "1",
                        },
                    ),
                    env=[
                        V1EnvVar(name="CUDA_VISIBLE_DEVICES", value="0"),
                        V1EnvVar(name="HF_HOME", value="/cache/huggingface"),
                    ],
                )],
            ),
        ),
    },
)
def gpu_inference(model_path: str):
    import torch
    model = torch.load(model_path)
    # ...

Что происходит на runtime:

  1. Scheduler читает executor_config из serialized DAG
  2. Берёт pod_template_file как base
  3. Делает deep merge с pod_override (specific fields override base)
  4. POST результирующий manifest в K8s API

Use cases:

  • GPU tasksnvidia.com/gpu: 1, node_selector для GPU нод
  • Custom images — ML task с CUDA, dbt task с dbt-core, lightweight task с Alpine
  • Resource sizing — heavy ETL → 32Gi memory, light task → 256Mi
  • Secrets / configmaps — мount specific secrets per-task
  • Network policies — отдельные labels для NetworkPolicy

Cold start cost — главная боль K8s executor

Pod startup = 5-30 секунд:

StageВремяWhy
API call → pod created100-500msK8s API + etcd write
K8s scheduler — find node100ms - 5sЗависит от cluster load + scheduling complexity
Pull image (если первый раз на ноде)5-20sСамый большой компонент. После cache — 0s
Init containers (если есть)0-10sМожно избежать
Container start + Python startup1-3sPython interpreter + Airflow imports
airflow tasks run cold start2-5sDAG parse + import callable

Итого: 8-43 секунды только до user code. Для task длительностью 5 секунд — overhead 80%+.

Mitigation strategies

1. Image pre-pull через DaemonSet

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: airflow-image-prepuller
spec:
  selector:
    matchLabels:
      app: airflow-prepuller
  template:
    metadata:
      labels:
        app: airflow-prepuller
    spec:
      containers:
        - name: prepull-airflow
          image: my-registry.com/airflow:2.10.5
          command: ["sleep", "infinity"]
          resources:
            requests:
              cpu: 10m
              memory: 32Mi

DaemonSet запускается на каждой ноде, pulls image, и держит pod (с минимальными ресурсами). Image остаётся в kubelet’s image cache. Когда worker pod нужен — image уже там, pull skipped → cold start 5-8s вместо 25s.

2. Slim images

Default apache/airflow:2.10.5 ~ 1.3 GB. Build slim image только с нужными provider-ами:

FROM python:3.11-slim
RUN pip install --no-cache-dir apache-airflow==2.10.5 \
    apache-airflow-providers-postgres apache-airflow-providers-amazon
COPY dags/ /opt/airflow/dags/

Можно довести до ~ 500 MB. Pull в 2-3 раза быстрее.

3. ImagePullPolicy: IfNotPresent

containers:
  - name: base
    image: my-registry.com/airflow:2.10.5
    imagePullPolicy: IfNotPresent   # НЕ Always

Always — каждый раз re-pull (для :latest tag). IfNotPresent — если image в кеше, skip pull. Production — всегда конкретный tag + IfNotPresent.

4. K8s API rate limit

При burst (100 tasks за 1s) — POST /pods 100 раз. K8s API default rate limit может зайти. worker_pods_creation_batch_size = 16 контролирует scheduler batch — не делать > 16 одновременных API calls.

5. Не использовать для коротких tasks

Если task < 30 seconds — KubernetesExecutor — не правильный выбор. Используйте Celery, или Multiple Executors (AIP-61) с routing коротких на Celery.


RBAC requirements

K8s RBAC для KubernetesExecutor — scheduler service account должен иметь права на:

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow-worker
rules:
  - apiGroups: [""]
    resources: ["pods", "pods/log"]
    verbs: ["get", "list", "watch", "create", "delete", "patch"]
  - apiGroups: [""]
    resources: ["pods/exec"]
    verbs: ["create", "get"]
  - apiGroups: [""]
    resources: ["events"]
    verbs: ["get", "list", "watch"]

Minimal — pods (CRUD), pods/log (для log retrieval), events (для diagnostics).


Production observability

Pending pods (resource pressure / scheduling issues)

-- TI in queued > pending_timeout (300s default)
SELECT
    dag_id, task_id, run_id,
    queued_dttm,
    now() - queued_dttm AS pending_for
FROM task_instance
WHERE state = 'queued'
  AND queued_dttm < now() - interval '5 minutes'
ORDER BY queued_dttm;

Если есть результаты — К8s scheduler не может разместить pods. Проверьте:

  • kubectl get pods -n airflow | grep Pending
  • kubectl describe pod <pod-name> → события (insufficient memory/CPU/GPU)
  • Cluster autoscaler — добавляются ли nodes

Image pull failures

kubectl get pods -n airflow --field-selector=status.phase=Pending
kubectl describe pod <pod-name>
# Look for: ImagePullBackOff, ErrImagePull

Watcher disconnect frequency

# Logs scheduler
kubectl logs -n airflow deployment/airflow-scheduler -c scheduler | grep -i watcher
# Look for: "Watch stream closed", "Restarting watch"

Healthy — < 1 restart per hour. Частые disconnects — проблема с API server (load, network).


Configuration cheat-sheet (production-grade)

[core]
executor = KubernetesExecutor

[kubernetes_executor]
namespace = airflow
pod_template_file = /opt/airflow/pod_template.yaml
worker_container_repository = my-registry.com/airflow
worker_container_tag = 2.10.5

delete_worker_pods = True
delete_worker_pods_on_failure = False    # keep на failure — debugging

worker_pods_creation_batch_size = 16     # max concurrent POST /pods
worker_pods_pending_timeout = 300        # 5 min pending → reset TI
worker_pods_pending_timeout_batch_size = 100
worker_pods_pending_timeout_check_interval = 120

multi_namespace_mode = False             # single namespace для simplicity
in_cluster = True                        # scheduler runs внутри cluster

[kubernetes_secrets]
sql_alchemy_conn = airflow-secrets=postgres-conn   # secret backend
fernet_key = airflow-secrets=fernet-key

Production gotchas

Gotcha 1: Pods «protected» by Pod Disruption Budget

Если у вас PDB на namespace, K8s может отказаться удалять completed pods (delete_worker_pods=True бесполезен). Result — pods накапливаются, eventually node ENOSPC.

Fix: PDB не нужен для ephemeral worker pods. Удалить PDB или exclude airflow-worker label.

Gotcha 2: PVC mount slow

volumes с PVC (logs, dags) — каждый mount = wait на K8s storage provisioner. На некоторых cloud providers (EBS attach) — 10-30s.

Fix:

  • Logs → emptyDir + log_aggregation (S3/GCS via Airflow remote logs)
  • DAGs → bake в image (вместо ConfigMap/PVC sync)

Gotcha 3: K8s API server rate limit

При scheduler restart + 200 tasks running → одновременная попытка create 200 pods. K8s API throttles (default 100 req/s).

Fix: worker_pods_creation_batch_size = 16 (default ok), monitor apiserver_request_total метрики.

Gotcha 4: Stuck zombie TI после kill scheduler

kill -9 scheduler → watcher thread мёртв → events lost → TI stuck в running, pods orphaned.

Fix:

  • Graceful shutdown (SIGTERM)
  • scheduler_zombie_task_threshold = 300 — fallback detection
  • Periodic kubectl get pods -n airflow -l app=airflow-worker cleanup script

Gotcha 5: Logs lost после pod delete

delete_worker_pods=True → pod delete → logs lost (если не aggregated).

Fix: configure [logging] remote logging:

[logging]
remote_logging = True
remote_base_log_folder = s3://my-bucket/airflow-logs/
remote_log_conn_id = aws_default

Workers пишут logs в S3 sync; UI читает оттуда. Pods можно удалять безопасно.


Проверка знанийKnowledge check
Production Airflow с KubernetesExecutor. Замеряете cold start — от scheduled до running проходит 35 секунд для ML inference task. Tasks длятся 2 минуты. Cold start = 28% overhead. Список действий для оптимизации?
ОтветAnswer
1) **Pre-pull images** через DaemonSet — DaemonSet с image, держится sleep infinity, image остаётся в kubelet cache всех нодов; первый pull исключается. 2) **Slim image** — собрать custom image только с needed providers (apache/airflow:slim ~600 MB вместо 1.3 GB), pull в 2x быстрее. 3) **`imagePullPolicy: IfNotPresent`** + конкретный tag (не :latest) — re-pull не делается, если image в кеше. 4) **Reserved node pool** для ML workloads — `nodeSelector` + tolerations на always-on GPU node, K8s scheduling latency 0.1s вместо ожидания node scale-up. 5) **Pod warm pool** через alternative tools (Knative, custom controller) — keep N idle pods, assign к task на запросе. 6) **Multiple Executors (AIP-61, 2.10+)** — короткие tasks (<30s) routed на CeleryExecutor (cold start 1s), heavy ML — K8sExecutor; на 35s startup для 5s task — это 86% overhead, явный сигнал перейти на hybrid. 7) Профилирование: измерить через `kubectl get events -n airflow --sort-by=lastTimestamp` каждую фазу (Scheduled→Pulled→Created→Started) — понять, что доминирует.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Что такое watcher thread в KubernetesExecutor?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7