Zombies, orphans и scheduler recovery
В production Airflow worker pods crash, scheduler pods evicted, nodes terminated через spot interruption. В normal SQL-world это привело бы к зависшим строкам “task running forever”. Airflow имеет двухслойный recovery mechanism: zombie detection (мёртвые workers) и orphan adoption (мёртвые schedulers). Это не просто housekeeping — без этих механизмов production кластер деградирует за часы.
В этом уроке разберём оба mechanism до конкретного pseudocode, рассмотрим production failure modes (Celery OOM, K8s pod evicted, spot termination) и mitigations.
Zombies vs orphans — терминология
| Термин | Что это | Кто detect-ит | Recovery |
|---|---|---|---|
| Zombie task | TI в state=running, но worker мёртв (heartbeat прокис) | Scheduler housekeeping (фаза 4) | Mark failed + maybe retry |
| Orphan task | TI в queued/running/deferred, но scheduler-owner мёртв | Scheduler housekeeping (фаза 4) | Adopt or reset to scheduled |
Разница субтильна: zombie — это worker died, orphan — scheduler died. Worker и scheduler — разные процессы, и у них разные heartbeat механизмы. Но в production они часто смешиваются — pod crash убивает оба.
Heartbeat infrastructure
Каждая выполняемая job (SchedulerJob, LocalTaskJob, TriggererJob) записывает heartbeat в таблицу job:
-- Структура job table
CREATE TABLE job (
id SERIAL PRIMARY KEY,
job_type VARCHAR(30), -- 'SchedulerJob', 'LocalTaskJob', 'TriggererJob'
state VARCHAR(20), -- 'running', 'success', 'failed'
hostname VARCHAR(500),
unixname VARCHAR(1000),
start_date TIMESTAMP,
end_date TIMESTAMP,
latest_heartbeat TIMESTAMP,
executor_class VARCHAR(500), -- 'CeleryExecutor', 'KubernetesExecutor', ...
...
);
Heartbeat thread в каждой job:
# Псевдокод BaseJob.heartbeat_callback
def heartbeat_callback(self, session):
"""Called каждые heartrate seconds в отдельном thread."""
while not self.shutdown_event.is_set():
try:
with create_session() as session:
job = session.query(Job).filter(Job.id == self.id).first()
job.latest_heartbeat = timezone.utcnow()
session.commit()
except Exception as e:
log.exception("Failed to heartbeat")
sleep(self.heartrate) # SchedulerJob: 5s, LocalTaskJob: 5s, TriggererJob: 5s
Если процесс жив — heartbeat обновляется. Если crash, kill -9, OOM kill, node terminate без graceful shutdown → heartbeat остаётся прокисшим. Scheduler в фазе 4 это detect-ит.
Zombie detection — фаза 4 housekeeping
В SchedulerJobRunner._find_zombies (2.x — SchedulerJob._find_zombies):
# Псевдокод _find_zombies
def _find_zombies(self, session):
"""Найти running TI, чей LocalTaskJob heartbeat прокис."""
threshold = conf.getint('scheduler', 'scheduler_zombie_task_threshold') # default 300s
# Query: running TIs, чей job latest_heartbeat прокис
zombie_query = (
session.query(TaskInstance, Job)
.join(Job, TaskInstance.job_id == Job.id)
.filter(TaskInstance.state == State.RUNNING)
.filter(Job.latest_heartbeat < timezone.utcnow() - timedelta(seconds=threshold))
.filter(Job.state == State.RUNNING)
)
zombies = zombie_query.all()
for ti, job in zombies:
log.error(
"Found zombie task %s.%s (job_id=%s, last heartbeat %s ago)",
ti.dag_id, ti.task_id, job.id,
timezone.utcnow() - job.latest_heartbeat,
)
# Создать callback request для DAG processor
callback_request = TaskCallbackRequest(
full_filepath=ti.dag_model.fileloc,
simple_task_instance=SimpleTaskInstance.from_ti(ti),
msg=f"Zombie detected: heartbeat expired"
is_failure_callback=True,
)
callback_queue.send(callback_request)
# Mark job failed
job.state = State.FAILED
# Mark TI failed (or up_for_retry if retries left)
ti.handle_failure_with_callback(error="Zombie job")
session.commit()
Что происходит:
- Scheduler находит TI с state=running, у которых job (LocalTaskJob) не heartbeated > 300s.
- Создаётся callback request — DAG processor выполнит on_failure_callback.
- TI переводится в
failedилиup_for_retry(зависит от try_number). - Job mark-ится failed.
scheduler_zombie_task_threshold (default 300s = 5min) — критичный параметр:
- Слишком маленький → false positives при network blips, GC pauses, slow disk
- Слишком большой → реальные dead tasks висят, занимая pool slots
Production recommendation: 300-600s для Celery, 120-300s для K8s (там signals быстрее).
Orphan adoption — фаза 4 housekeeping
Orphan — это TI, чей scheduler died. Scheduler в каждом tick должен heartbeat-ить:
# Псевдокод _kill_zombies_and_adopt_orphans (упрощённый)
def adopt_or_reset_orphaned_tasks(self, session):
"""Найти TI, чей scheduler died, и adopt их."""
threshold = conf.getint('scheduler', 'scheduler_health_check_threshold') # default 30s
# Найти мёртвые scheduler jobs
dead_schedulers = session.query(Job).filter(
Job.job_type == 'SchedulerJob',
Job.state == State.RUNNING,
Job.latest_heartbeat < timezone.utcnow() - timedelta(seconds=threshold),
).all()
if not dead_schedulers:
return
log.warning("Found %d dead schedulers", len(dead_schedulers))
for dead_job in dead_schedulers:
dead_job.state = State.FAILED
dead_scheduler_ids = [j.id for j in dead_schedulers]
# Найти orphan TI — те, что были scheduled этими мёртвыми schedulers
# И в state, который executor мог обрабатывать
orphan_tis = session.query(TaskInstance).filter(
TaskInstance.scheduler_id.in_(dead_scheduler_ids), # legacy column
TaskInstance.state.in_([State.QUEUED, State.RUNNING, State.DEFERRED, State.RESTARTING]),
).all()
# Дать executor шанс adopt их (continue tracking)
adopted_tis = self.job.executor.try_adopt_task_instances(orphan_tis)
not_adopted = [ti for ti in orphan_tis if ti not in adopted_tis]
# Не adopted → reset state, чтобы они попали обратно в фазу 2-3
for ti in not_adopted:
if ti.state == State.DEFERRED:
# Особая логика — recreate Trigger row у нового triggerer
ti.state = State.SCHEDULED
ti.next_method = None # потеряли deferred context
log.warning("Reset deferred TI %s — context lost", ti)
else:
ti.state = State.SCHEDULED
ti.queued_dttm = None
ti.start_date = None
ti.end_date = None
session.commit()
Двушаговый recovery:
- executor.try_adopt_task_instances(orphans) — даёт executor шанс продолжить tracking без перезапуска (CeleryExecutor может re-attach к существующим Celery tasks через task UUID).
- Reset to scheduled — для тех, кого executor не смог adopt, state сбрасывается, и они попадут обратно в фазу 2 → 3.
Executor adopt: per-executor logic
Каждый executor реализует try_adopt_task_instances по-своему:
CeleryExecutor
# Псевдокод CeleryExecutor.try_adopt_task_instances
def try_adopt_task_instances(self, tis):
"""Re-attach к Celery tasks по external_executor_id."""
not_adopted = []
for ti in tis:
celery_task_id = ti.external_executor_id # UUID Celery task
if not celery_task_id:
not_adopted.append(ti)
continue
# Probe Celery — task ещё существует?
async_result = AsyncResult(celery_task_id, app=app)
if async_result.state in ('PENDING', 'STARTED'):
# Task жива в Celery — adopt
self.running.add(ti.key)
self.adopted_task_timeouts[ti.key] = (
timezone.utcnow() + self.task_adoption_timeout
)
else:
# Task мертва — reset
not_adopted.append(ti)
return [ti for ti in tis if ti not in not_adopted]
CeleryExecutor умеет re-attach к Celery brokers через task UUID. Если worker крашнулся, но task в Redis ещё ack-нута, новый executor продолжит мониторить result.
KubernetesExecutor
# Псевдокод KubernetesExecutor.try_adopt_task_instances
def try_adopt_task_instances(self, tis):
"""Re-attach к existing pods по labels."""
pod_list = self.kube_client.list_namespaced_pod(
namespace=self.namespace,
label_selector="airflow-worker"
)
adopted = []
for ti in tis:
pod = find_pod_by_ti_labels(pod_list, ti)
if pod and pod.status.phase in ('Running', 'Pending'):
# Pod ещё жив — adopt
self.running_pods[ti.key] = pod
adopted.append(ti)
# Update pod labels для нового scheduler
patch_pod_labels(pod, scheduler_job_id=self.scheduler_job_id)
return adopted
KubernetesExecutor использует pod labels для tracking. Pod с labels dag_id, task_id, run_id — adoptable, если он Running/Pending.
LocalExecutor / SequentialExecutor
Не имеют adoption — они in-process, при scheduler crash worker subprocess тоже умирает. Все TIs reset to scheduled.
Полная картина recovery
В steady-state — orphan recovery занимает 30-60 секунд после scheduler crash. Это acceptable для most workloads.
Production failure scenarios
Scenario 1: Celery worker OOM kill
Symptom: Worker процесс получил kernel SIGKILL за превышение memory limit. LocalTaskJob heartbeat не успел финализировать. Celery broker (Redis) считает task acknowledged.
Recovery:
- После 300s (scheduler_zombie_task_threshold) — scheduler detects zombie.
- TI marked failed (or up_for_retry).
- Celery task UUID dangling в Redis — eventually garbage collected.
Mitigation:
- Resource requests/limits в K8s — kernel kill вместо silent corruption.
airflow.cfg [celery] worker_pod_pending_timeout— kill stuck Celery tasks.- Memory monitoring на DAG level —
metrics.celery_task_memory_high_watermark.
Scenario 2: K8s pod evicted (node pressure)
Symptom: Pod получил SIGTERM, потом SIGKILL через 30s grace period. KubernetesExecutor видит pod как Evicted.
Recovery:
- KubernetesExecutor watch loop ловит pod status change Failed.
- Сразу (без waiting на zombie threshold!) сообщает scheduler — task failed.
- TI → failed (or up_for_retry).
K8s recovery быстрее Celery — благодаря watch API. Это одна из причин предпочтительности K8s executor для critical workloads.
Mitigation:
- Pod priorities + PodDisruptionBudget на critical DAGs.
tolerations+nodeSelectorдля dedicated nodes.- Resource requests чтобы pod не был evicted first.
Scenario 3: AWS spot termination
Symptom: Spot instance terminate notice (2 minutes). Pod выдан SIGTERM. Если не done за 2 минуты — SIGKILL.
Recovery:
- Если task завершилась в grace period — normal success/failed.
- Если не — same как pod evicted.
Mitigation:
- Karpenter / cluster autoscaler с spot interrupt handler — drain node заранее.
- Tasks с long runtime — НЕ на spot, либо с checkpointing.
AIRFLOW__CORE__DAG_DEFAULT_VIEW=gridдля quick zombie spotting.
Scenario 4: Scheduler GC pause / DB connection storm
Symptom: Scheduler жив, но heartbeat thread заблокирован: long DB query, lock wait, Python GC pause > 30s.
Recovery:
- Другой scheduler ошибочно считает его dead, orphan-ит его TIs.
- Реальный scheduler “просыпается” и продолжает работать с теми же TIs — double scheduling.
Это самый dangerous failure mode — двойная активность scheduler-ов. Mitigation:
scheduler_health_check_thresholdне меньше 30s (default).- PgBouncer для DB connection pooling.
- Monitoring GC pauses через JFR (Java) / cProfile (Python).
Scenario 5: Deferred task triggerer crashed
Symptom: Triggerer pod crashed, TI в state=deferred с Trigger row pointing к dead triggerer.
Recovery:
- После
triggerer_health_check_threshold(default 30s) — другой triggerer adopt-ит trigger row черезTrigger.assign_unassigned. - Если single triggerer и crash — TI deferred forever (до restart).
Mitigation:
- Минимум 2 triggerer replicas в production.
[triggerer] default_capacity1000 — баланс concurrency.
Inspect commands
-- Все мёртвые schedulers (latest_heartbeat прокис)
SELECT id, hostname, latest_heartbeat,
now() - latest_heartbeat AS gap
FROM job
WHERE job_type = 'SchedulerJob' AND state = 'running'
AND latest_heartbeat < now() - interval '30 seconds';
-- Zombie tasks (running, но job heartbeat прокис)
SELECT ti.dag_id, ti.task_id, ti.run_id, ti.start_date,
j.latest_heartbeat,
now() - j.latest_heartbeat AS heartbeat_gap
FROM task_instance ti
JOIN job j ON ti.job_id = j.id
WHERE ti.state = 'running'
AND j.latest_heartbeat < now() - interval '300 seconds'
ORDER BY heartbeat_gap DESC;
-- Orphan tasks (queued/running с failed scheduler)
SELECT ti.dag_id, ti.task_id, ti.state, ti.scheduler_id,
j.state AS scheduler_state, j.latest_heartbeat
FROM task_instance ti
JOIN job j ON ti.scheduler_id = j.id
WHERE ti.state IN ('queued', 'running', 'deferred')
AND j.state = 'failed';
-- Deferred tasks без active triggerer
SELECT ti.dag_id, ti.task_id, t.id AS trigger_id,
t.triggerer_id, j.latest_heartbeat
FROM task_instance ti
JOIN trigger t ON ti.trigger_id = t.id
LEFT JOIN job j ON t.triggerer_id = j.id
WHERE ti.state = 'deferred'
AND (j.id IS NULL OR j.latest_heartbeat < now() - interval '60 seconds');
Configuration tunables
| Параметр | Default | Что |
|---|---|---|
[scheduler] scheduler_health_check_threshold | 30s | Когда scheduler считается мёртвым |
[scheduler] scheduler_zombie_task_threshold | 300s | Когда TI считается zombie |
[triggerer] triggerer_health_check_threshold | 30s | Когда triggerer считается мёртвым |
[scheduler] orphaned_tasks_check_interval | 300s | Как часто scheduler проверяет orphans |
[scheduler] zombie_detection_interval | 10s | Как часто проверка zombies |
[celery] task_adoption_timeout | 600s | Сколько ждать adopted task завершения |
Production sweet spot:
- scheduler_health_check_threshold = 30s (default OK, не уменьшайте — false positives на GC pauses)
- scheduler_zombie_task_threshold = 300s (default OK для Celery, можно 120s для K8s)
- orphaned_tasks_check_interval = 60s (агрессивнее, чтобы recovery быстрее)
Production gotchas
1. Zombie threshold ниже task duration
Если у вас task running 10 минут, а zombie_threshold=300s (5 мин) — нормальные tasks могут быть mislabeled как zombies при network latency.
Fix: убедитесь threshold > max expected task duration + 50% buffer.
2. Double-execution after orphan reset
После reset state=scheduled, TI попадёт в queue заново — но оригинальная task в Celery/K8s могла всё ещё complete. Результат — task running дважды (idempotency violation).
Fix:
- Idempotent tasks (re-runnable без side effects)
- Critical workloads — use
Variable.getlock в начале task - Или K8s executor (быстрее detection)
3. Deferred reset → context lost
Если deferred task reset (triggerer crash), next_method обнуляется — task начнётся с execute(), а не с resume_from_deferred. Operator может re-defer, но state inside execute() лост.
Fix:
- Idempotent execute() в deferrable operators
- Сохраняйте state в XCom или external store, а не в local variables
4. orphaned_tasks_check_interval > heartbeat threshold
Если orphaned_tasks_check_interval (default 300s) больше scheduler_health_check_threshold (30s), может быть окно 30-300s, когда orphans лежат, но никто не recovery.
Fix: orphaned_tasks_check_interval ≤ scheduler_health_check_threshold * 5.
5. PostgreSQL VACUUM на job table
job table grows: одна row на каждый scheduler heartbeat, каждый LocalTaskJob start. За год — миллионы rows. Без VACUUM index bloat → slow queries → slow heartbeat → false zombies.
Fix:
airflow db clean --table job --clean-before-timestamp ...в cron- pg_repack для index bloat
- Autovacuum tuning (
autovacuum_naptime,autovacuum_vacuum_scale_factor)
В следующем уроке
Lesson 06 — Backfill internals. Backfill в 2.x использует отдельный BackfillJob, который параллелит работу со scheduler-ом и имеет свои гонки. Разберём, как airflow dags backfill --reset-dagruns работает изнутри, и почему в 3.x backfill переехал в scheduler (AIP-78).