KubernetesExecutor deep — pod-per-task, watcher thread, pod_override, cold start
KubernetesExecutor — самый «современный» executor 2.x. Идея радикальная: каждая task = свой pod. Никаких shared workers, никаких prefetch pitfalls, полная изоляция Python deps / resources / network policies. Cost — startup latency (5-30s на pod) и сложность операционки.
В этом уроке мы разберём lifecycle pod-а от scheduler→K8s API→pod, препарируем watcher thread внутри scheduler-процесса, посмотрим как pod template работает и научимся override per-task через executor_config={"pod_override": V1Pod(...)} — например для GPU tasks или custom images.
Архитектура KubernetesExecutor
Главные точки:
- Pod-per-task — каждый TaskInstance = свой K8s pod. Никаких shared workers.
- Watcher thread в scheduler-процессе — он подписан на K8s API events для нужных pods, парсит их status и обновляет metadata DB.
- Никакого broker — K8s API сам выполняет роль «task dispatcher».
Pod lifecycle: от scheduled до Completed
Watcher thread — самая интересная деталь
kube-controller-manager: reconcile loops Жизненный цикл Pod в KubernetesВнутри airflow.providers.cncf.kubernetes.executors.kubernetes_executor есть класс KubernetesJobWatcher. Это поток внутри scheduler-процесса (не отдельный процесс!), который:
# Псевдокод KubernetesJobWatcher
class KubernetesJobWatcher(multiprocessing.Process):
def run(self):
watcher = kubernetes.watch.Watch()
kube_client = client.CoreV1Api()
while True:
try:
for event in watcher.stream(
kube_client.list_namespaced_pod,
namespace=self.namespace,
label_selector=f"airflow-worker={self.scheduler_job_id}"
resource_version=last_resource_version,
):
# event: {'type': 'ADDED'/'MODIFIED'/'DELETED', 'object': V1Pod}
pod = event['object']
phase = pod.status.phase
ti_key = self._extract_ti_key_from_labels(pod.metadata.labels)
# Result queue → scheduler picks up
self.result_queue.put((ti_key, phase, pod.metadata.resource_version))
except ApiException as e:
# 410 Gone — resource_version устарел, restart watch
# 401 — credentials expired
log.error(...)
time.sleep(1)
continue
Что важно:
- Watcher subscribes на ВСЕ pods с label
airflow-worker=<scheduler_job_id>. Не на конкретные pods — на label selector. Один stream для всех pods данного scheduler-а. resource_version— K8s API мapper для consistent watch. Если watcher disconnect — резюмирует с последнего resourceVersion. Это410 Goneerror — typical, just restart watch.- Watcher живёт в multiprocessing.Process (не thread!), общается с scheduler через
multiprocessing.Queue. Это для изоляции от scheduler — если watcher crash, scheduler жив.
Если watcher умер и не успел перезапуститься — TI «застревают» в running состоянии, потому что nobody updates DB. Scheduler детектит zombie через scheduler_zombie_task_threshold (default 5 min) и помечает TI failed. Это safety net на случай watcher failures.
Pod template: где описывается pod
Default — глобальный pod_template_file:
# /opt/airflow/pod_template.yaml
apiVersion: v1
kind: Pod
metadata:
name: airflow-worker-placeholder
namespace: airflow
spec:
serviceAccountName: airflow-worker
restartPolicy: Never
containers:
- name: base
image: my-registry.com/airflow:2.10.5
imagePullPolicy: IfNotPresent
args: ["airflow", "tasks", "run"]
env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor # внутри pod — Local
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: postgres-conn
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 2
memory: 4Gi
volumeMounts:
- name: logs
mountPath: /opt/airflow/logs
- name: dags
mountPath: /opt/airflow/dags
volumes:
- name: logs
persistentVolumeClaim:
claimName: airflow-logs
- name: dags
configMap:
name: airflow-dags
Этот шаблон применяется ко всем pod-ам по default. Scheduler берёт его, заполняет name, labels, args, и POST в K8s API.
Конфигурируется через:
[kubernetes_executor]
pod_template_file = /opt/airflow/pod_template.yaml
namespace = airflow
worker_container_repository = my-registry.com/airflow
worker_container_tag = 2.10.5
delete_worker_pods = True
delete_worker_pods_on_failure = False # debug при failure
worker_pods_creation_batch_size = 16
worker_pods_pending_timeout = 300
Per-task pod override через executor_config
Здесь магия. Каждая task может override pod template через executor_config:
from kubernetes.client import (
V1Pod, V1PodSpec, V1Container,
V1ResourceRequirements, V1EnvVar, V1Toleration,
)
from airflow.decorators import task
@task(
executor_config={
"pod_override": V1Pod(
spec=V1PodSpec(
node_selector={"gpu-type": "nvidia-a100"},
tolerations=[
V1Toleration(key="nvidia.com/gpu", operator="Exists", effect="NoSchedule")
],
containers=[V1Container(
name="base"
image="my-registry.com/airflow-ml:2.10.5", # override image!
resources=V1ResourceRequirements(
requests={
"cpu": "4",
"memory": "16Gi",
"nvidia.com/gpu": "1",
},
limits={
"cpu": "8",
"memory": "32Gi",
"nvidia.com/gpu": "1",
},
),
env=[
V1EnvVar(name="CUDA_VISIBLE_DEVICES", value="0"),
V1EnvVar(name="HF_HOME", value="/cache/huggingface"),
],
)],
),
),
},
)
def gpu_inference(model_path: str):
import torch
model = torch.load(model_path)
# ...
Что происходит на runtime:
- Scheduler читает
executor_configиз serialized DAG - Берёт
pod_template_fileкак base - Делает deep merge с
pod_override(specific fields override base) - POST результирующий manifest в K8s API
Use cases:
- GPU tasks —
nvidia.com/gpu: 1, node_selector для GPU нод - Custom images — ML task с CUDA, dbt task с dbt-core, lightweight task с Alpine
- Resource sizing — heavy ETL → 32Gi memory, light task → 256Mi
- Secrets / configmaps — мount specific secrets per-task
- Network policies — отдельные labels для NetworkPolicy
Cold start cost — главная боль K8s executor
Pod startup = 5-30 секунд:
| Stage | Время | Why |
|---|---|---|
| API call → pod created | 100-500ms | K8s API + etcd write |
| K8s scheduler — find node | 100ms - 5s | Зависит от cluster load + scheduling complexity |
| Pull image (если первый раз на ноде) | 5-20s | Самый большой компонент. После cache — 0s |
| Init containers (если есть) | 0-10s | Можно избежать |
| Container start + Python startup | 1-3s | Python interpreter + Airflow imports |
airflow tasks run cold start | 2-5s | DAG parse + import callable |
Итого: 8-43 секунды только до user code. Для task длительностью 5 секунд — overhead 80%+.
Mitigation strategies
1. Image pre-pull через DaemonSet
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: airflow-image-prepuller
spec:
selector:
matchLabels:
app: airflow-prepuller
template:
metadata:
labels:
app: airflow-prepuller
spec:
containers:
- name: prepull-airflow
image: my-registry.com/airflow:2.10.5
command: ["sleep", "infinity"]
resources:
requests:
cpu: 10m
memory: 32Mi
DaemonSet запускается на каждой ноде, pulls image, и держит pod (с минимальными ресурсами). Image остаётся в kubelet’s image cache. Когда worker pod нужен — image уже там, pull skipped → cold start 5-8s вместо 25s.
2. Slim images
Default apache/airflow:2.10.5 ~ 1.3 GB. Build slim image только с нужными provider-ами:
FROM python:3.11-slim
RUN pip install --no-cache-dir apache-airflow==2.10.5 \
apache-airflow-providers-postgres apache-airflow-providers-amazon
COPY dags/ /opt/airflow/dags/
Можно довести до ~ 500 MB. Pull в 2-3 раза быстрее.
3. ImagePullPolicy: IfNotPresent
containers:
- name: base
image: my-registry.com/airflow:2.10.5
imagePullPolicy: IfNotPresent # НЕ Always
Always — каждый раз re-pull (для :latest tag). IfNotPresent — если image в кеше, skip pull. Production — всегда конкретный tag + IfNotPresent.
4. K8s API rate limit
При burst (100 tasks за 1s) — POST /pods 100 раз. K8s API default rate limit может зайти. worker_pods_creation_batch_size = 16 контролирует scheduler batch — не делать > 16 одновременных API calls.
5. Не использовать для коротких tasks
Если task < 30 seconds — KubernetesExecutor — не правильный выбор. Используйте Celery, или Multiple Executors (AIP-61) с routing коротких на Celery.
RBAC requirements
K8s RBAC для KubernetesExecutor — scheduler service account должен иметь права на:
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: airflow
name: airflow-worker
rules:
- apiGroups: [""]
resources: ["pods", "pods/log"]
verbs: ["get", "list", "watch", "create", "delete", "patch"]
- apiGroups: [""]
resources: ["pods/exec"]
verbs: ["create", "get"]
- apiGroups: [""]
resources: ["events"]
verbs: ["get", "list", "watch"]
Minimal — pods (CRUD), pods/log (для log retrieval), events (для diagnostics).
Production observability
Pending pods (resource pressure / scheduling issues)
-- TI in queued > pending_timeout (300s default)
SELECT
dag_id, task_id, run_id,
queued_dttm,
now() - queued_dttm AS pending_for
FROM task_instance
WHERE state = 'queued'
AND queued_dttm < now() - interval '5 minutes'
ORDER BY queued_dttm;
Если есть результаты — К8s scheduler не может разместить pods. Проверьте:
kubectl get pods -n airflow | grep Pendingkubectl describe pod <pod-name>→ события (insufficient memory/CPU/GPU)- Cluster autoscaler — добавляются ли nodes
Image pull failures
kubectl get pods -n airflow --field-selector=status.phase=Pending
kubectl describe pod <pod-name>
# Look for: ImagePullBackOff, ErrImagePull
Watcher disconnect frequency
# Logs scheduler
kubectl logs -n airflow deployment/airflow-scheduler -c scheduler | grep -i watcher
# Look for: "Watch stream closed", "Restarting watch"
Healthy — < 1 restart per hour. Частые disconnects — проблема с API server (load, network).
Configuration cheat-sheet (production-grade)
[core]
executor = KubernetesExecutor
[kubernetes_executor]
namespace = airflow
pod_template_file = /opt/airflow/pod_template.yaml
worker_container_repository = my-registry.com/airflow
worker_container_tag = 2.10.5
delete_worker_pods = True
delete_worker_pods_on_failure = False # keep на failure — debugging
worker_pods_creation_batch_size = 16 # max concurrent POST /pods
worker_pods_pending_timeout = 300 # 5 min pending → reset TI
worker_pods_pending_timeout_batch_size = 100
worker_pods_pending_timeout_check_interval = 120
multi_namespace_mode = False # single namespace для simplicity
in_cluster = True # scheduler runs внутри cluster
[kubernetes_secrets]
sql_alchemy_conn = airflow-secrets=postgres-conn # secret backend
fernet_key = airflow-secrets=fernet-key
Production gotchas
Gotcha 1: Pods «protected» by Pod Disruption Budget
Если у вас PDB на namespace, K8s может отказаться удалять completed pods (delete_worker_pods=True бесполезен). Result — pods накапливаются, eventually node ENOSPC.
Fix: PDB не нужен для ephemeral worker pods. Удалить PDB или exclude airflow-worker label.
Gotcha 2: PVC mount slow
volumes с PVC (logs, dags) — каждый mount = wait на K8s storage provisioner. На некоторых cloud providers (EBS attach) — 10-30s.
Fix:
- Logs → emptyDir + log_aggregation (S3/GCS via Airflow remote logs)
- DAGs → bake в image (вместо ConfigMap/PVC sync)
Gotcha 3: K8s API server rate limit
При scheduler restart + 200 tasks running → одновременная попытка create 200 pods. K8s API throttles (default 100 req/s).
Fix: worker_pods_creation_batch_size = 16 (default ok), monitor apiserver_request_total метрики.
Gotcha 4: Stuck zombie TI после kill scheduler
kill -9 scheduler → watcher thread мёртв → events lost → TI stuck в running, pods orphaned.
Fix:
- Graceful shutdown (SIGTERM)
scheduler_zombie_task_threshold = 300— fallback detection- Periodic
kubectl get pods -n airflow -l app=airflow-workercleanup script
Gotcha 5: Logs lost после pod delete
delete_worker_pods=True → pod delete → logs lost (если не aggregated).
Fix: configure [logging] remote logging:
[logging]
remote_logging = True
remote_base_log_folder = s3://my-bucket/airflow-logs/
remote_log_conn_id = aws_default
Workers пишут logs в S3 sync; UI читает оттуда. Pods можно удалять безопасно.