Learning Platform
Глоссарий Troubleshooting
Урок 05.05 · 30 мин
Продвинутый
Zombie tasksOrphan tasksRecoveryHAHeartbeatAdoption

Zombies, orphans и scheduler recovery

В production Airflow worker pods crash, scheduler pods evicted, nodes terminated через spot interruption. В normal SQL-world это привело бы к зависшим строкам “task running forever”. Airflow имеет двухслойный recovery mechanism: zombie detection (мёртвые workers) и orphan adoption (мёртвые schedulers). Это не просто housekeeping — без этих механизмов production кластер деградирует за часы.

В этом уроке разберём оба mechanism до конкретного pseudocode, рассмотрим production failure modes (Celery OOM, K8s pod evicted, spot termination) и mitigations.


Zombies vs orphans — терминология

ТерминЧто этоКто detect-итRecovery
Zombie taskTI в state=running, но worker мёртв (heartbeat прокис)Scheduler housekeeping (фаза 4)Mark failed + maybe retry
Orphan taskTI в queued/running/deferred, но scheduler-owner мёртвScheduler housekeeping (фаза 4)Adopt or reset to scheduled

Разница субтильна: zombie — это worker died, orphan — scheduler died. Worker и scheduler — разные процессы, и у них разные heartbeat механизмы. Но в production они часто смешиваются — pod crash убивает оба.


Heartbeat infrastructure

Каждая выполняемая job (SchedulerJob, LocalTaskJob, TriggererJob) записывает heartbeat в таблицу job:

-- Структура job table
CREATE TABLE job (
    id SERIAL PRIMARY KEY,
    job_type VARCHAR(30),  -- 'SchedulerJob', 'LocalTaskJob', 'TriggererJob'
    state VARCHAR(20),     -- 'running', 'success', 'failed'
    hostname VARCHAR(500),
    unixname VARCHAR(1000),
    start_date TIMESTAMP,
    end_date TIMESTAMP,
    latest_heartbeat TIMESTAMP,
    executor_class VARCHAR(500),  -- 'CeleryExecutor', 'KubernetesExecutor', ...
    ...
);

Heartbeat thread в каждой job:

# Псевдокод BaseJob.heartbeat_callback
def heartbeat_callback(self, session):
    """Called каждые heartrate seconds в отдельном thread."""
    while not self.shutdown_event.is_set():
        try:
            with create_session() as session:
                job = session.query(Job).filter(Job.id == self.id).first()
                job.latest_heartbeat = timezone.utcnow()
                session.commit()
        except Exception as e:
            log.exception("Failed to heartbeat")
        sleep(self.heartrate)  # SchedulerJob: 5s, LocalTaskJob: 5s, TriggererJob: 5s

Если процесс жив — heartbeat обновляется. Если crash, kill -9, OOM kill, node terminate без graceful shutdown → heartbeat остаётся прокисшим. Scheduler в фазе 4 это detect-ит.


Zombie detection — фаза 4 housekeeping

В SchedulerJobRunner._find_zombies (2.x — SchedulerJob._find_zombies):

# Псевдокод _find_zombies
def _find_zombies(self, session):
    """Найти running TI, чей LocalTaskJob heartbeat прокис."""
    threshold = conf.getint('scheduler', 'scheduler_zombie_task_threshold')  # default 300s

    # Query: running TIs, чей job latest_heartbeat прокис
    zombie_query = (
        session.query(TaskInstance, Job)
        .join(Job, TaskInstance.job_id == Job.id)
        .filter(TaskInstance.state == State.RUNNING)
        .filter(Job.latest_heartbeat < timezone.utcnow() - timedelta(seconds=threshold))
        .filter(Job.state == State.RUNNING)
    )

    zombies = zombie_query.all()
    for ti, job in zombies:
        log.error(
            "Found zombie task %s.%s (job_id=%s, last heartbeat %s ago)",
            ti.dag_id, ti.task_id, job.id,
            timezone.utcnow() - job.latest_heartbeat,
        )

        # Создать callback request для DAG processor
        callback_request = TaskCallbackRequest(
            full_filepath=ti.dag_model.fileloc,
            simple_task_instance=SimpleTaskInstance.from_ti(ti),
            msg=f"Zombie detected: heartbeat expired"
            is_failure_callback=True,
        )
        callback_queue.send(callback_request)

        # Mark job failed
        job.state = State.FAILED
        # Mark TI failed (or up_for_retry if retries left)
        ti.handle_failure_with_callback(error="Zombie job")

    session.commit()

Что происходит:

  1. Scheduler находит TI с state=running, у которых job (LocalTaskJob) не heartbeated > 300s.
  2. Создаётся callback request — DAG processor выполнит on_failure_callback.
  3. TI переводится в failed или up_for_retry (зависит от try_number).
  4. Job mark-ится failed.

scheduler_zombie_task_threshold (default 300s = 5min) — критичный параметр:

  • Слишком маленький → false positives при network blips, GC pauses, slow disk
  • Слишком большой → реальные dead tasks висят, занимая pool slots

Production recommendation: 300-600s для Celery, 120-300s для K8s (там signals быстрее).


Orphan adoption — фаза 4 housekeeping

Orphan — это TI, чей scheduler died. Scheduler в каждом tick должен heartbeat-ить:

# Псевдокод _kill_zombies_and_adopt_orphans (упрощённый)
def adopt_or_reset_orphaned_tasks(self, session):
    """Найти TI, чей scheduler died, и adopt их."""
    threshold = conf.getint('scheduler', 'scheduler_health_check_threshold')  # default 30s

    # Найти мёртвые scheduler jobs
    dead_schedulers = session.query(Job).filter(
        Job.job_type == 'SchedulerJob',
        Job.state == State.RUNNING,
        Job.latest_heartbeat < timezone.utcnow() - timedelta(seconds=threshold),
    ).all()

    if not dead_schedulers:
        return

    log.warning("Found %d dead schedulers", len(dead_schedulers))

    for dead_job in dead_schedulers:
        dead_job.state = State.FAILED

    dead_scheduler_ids = [j.id for j in dead_schedulers]

    # Найти orphan TI — те, что были scheduled этими мёртвыми schedulers
    # И в state, который executor мог обрабатывать
    orphan_tis = session.query(TaskInstance).filter(
        TaskInstance.scheduler_id.in_(dead_scheduler_ids),  # legacy column
        TaskInstance.state.in_([State.QUEUED, State.RUNNING, State.DEFERRED, State.RESTARTING]),
    ).all()

    # Дать executor шанс adopt их (continue tracking)
    adopted_tis = self.job.executor.try_adopt_task_instances(orphan_tis)

    not_adopted = [ti for ti in orphan_tis if ti not in adopted_tis]

    # Не adopted → reset state, чтобы они попали обратно в фазу 2-3
    for ti in not_adopted:
        if ti.state == State.DEFERRED:
            # Особая логика — recreate Trigger row у нового triggerer
            ti.state = State.SCHEDULED
            ti.next_method = None  # потеряли deferred context
            log.warning("Reset deferred TI %s — context lost", ti)
        else:
            ti.state = State.SCHEDULED
            ti.queued_dttm = None
            ti.start_date = None
            ti.end_date = None

    session.commit()

Двушаговый recovery:

  1. executor.try_adopt_task_instances(orphans) — даёт executor шанс продолжить tracking без перезапуска (CeleryExecutor может re-attach к существующим Celery tasks через task UUID).
  2. Reset to scheduled — для тех, кого executor не смог adopt, state сбрасывается, и они попадут обратно в фазу 2 → 3.

Executor adopt: per-executor logic

Каждый executor реализует try_adopt_task_instances по-своему:

CeleryExecutor

# Псевдокод CeleryExecutor.try_adopt_task_instances
def try_adopt_task_instances(self, tis):
    """Re-attach к Celery tasks по external_executor_id."""
    not_adopted = []
    for ti in tis:
        celery_task_id = ti.external_executor_id  # UUID Celery task
        if not celery_task_id:
            not_adopted.append(ti)
            continue

        # Probe Celery — task ещё существует?
        async_result = AsyncResult(celery_task_id, app=app)
        if async_result.state in ('PENDING', 'STARTED'):
            # Task жива в Celery — adopt
            self.running.add(ti.key)
            self.adopted_task_timeouts[ti.key] = (
                timezone.utcnow() + self.task_adoption_timeout
            )
        else:
            # Task мертва — reset
            not_adopted.append(ti)
    return [ti for ti in tis if ti not in not_adopted]

CeleryExecutor умеет re-attach к Celery brokers через task UUID. Если worker крашнулся, но task в Redis ещё ack-нута, новый executor продолжит мониторить result.

KubernetesExecutor

# Псевдокод KubernetesExecutor.try_adopt_task_instances
def try_adopt_task_instances(self, tis):
    """Re-attach к existing pods по labels."""
    pod_list = self.kube_client.list_namespaced_pod(
        namespace=self.namespace,
        label_selector="airflow-worker"
    )

    adopted = []
    for ti in tis:
        pod = find_pod_by_ti_labels(pod_list, ti)
        if pod and pod.status.phase in ('Running', 'Pending'):
            # Pod ещё жив — adopt
            self.running_pods[ti.key] = pod
            adopted.append(ti)
            # Update pod labels для нового scheduler
            patch_pod_labels(pod, scheduler_job_id=self.scheduler_job_id)
    return adopted

KubernetesExecutor использует pod labels для tracking. Pod с labels dag_id, task_id, run_id — adoptable, если он Running/Pending.

LocalExecutor / SequentialExecutor

Не имеют adoption — они in-process, при scheduler crash worker subprocess тоже умирает. Все TIs reset to scheduled.


Полная картина recovery

Recovery flow при scheduler death
Scheduler-1 alive, schedulingScheduler-1 heartbeat обновляется каждые 5s. У него 50 TIs в queued/running от его scheduling decisions.
kill -9 / node crash / OOM
Scheduler-1 deadPod crashed без SIGTERM. heartbeat не обновляется. job.latest_heartbeat остался last value. 50 TIs orphan-нуты (assigned to dead scheduler).
30s threshold elapsed
Scheduler-2 housekeepingScheduler-2 в фазе 4 видит: scheduler-1 job latest_heartbeat < now() - 30s. Mark job state=failed. Найдены 50 orphan TIs.
executor.try_adopt_task_instances(50 TIs)
35 adopted, 15 notCeleryExecutor: 35 tasks в Redis ещё с PENDING/STARTED. Adopted — продолжаем track их status. 15 tasks revoked / worker дед — not adopted.
35 продолжают normal flowAdopted TIs продолжают running по их Celery task UUID. Когда worker finish — scheduler-2 получит result.
15 reset → scheduledNot adopted TIs: state=scheduled, queued_dttm=null, попадут обратно в фазу 3 (critical section) и будут re-enqueued. try_number сохраняется.

В steady-state — orphan recovery занимает 30-60 секунд после scheduler crash. Это acceptable для most workloads.


Production failure scenarios

Scenario 1: Celery worker OOM kill

Symptom: Worker процесс получил kernel SIGKILL за превышение memory limit. LocalTaskJob heartbeat не успел финализировать. Celery broker (Redis) считает task acknowledged.

Recovery:

  1. После 300s (scheduler_zombie_task_threshold) — scheduler detects zombie.
  2. TI marked failed (or up_for_retry).
  3. Celery task UUID dangling в Redis — eventually garbage collected.

Mitigation:

  • Resource requests/limits в K8s — kernel kill вместо silent corruption.
  • airflow.cfg [celery] worker_pod_pending_timeout — kill stuck Celery tasks.
  • Memory monitoring на DAG level — metrics.celery_task_memory_high_watermark.

Scenario 2: K8s pod evicted (node pressure)

Symptom: Pod получил SIGTERM, потом SIGKILL через 30s grace period. KubernetesExecutor видит pod как Evicted.

Recovery:

  1. KubernetesExecutor watch loop ловит pod status change Failed.
  2. Сразу (без waiting на zombie threshold!) сообщает scheduler — task failed.
  3. TI → failed (or up_for_retry).

K8s recovery быстрее Celery — благодаря watch API. Это одна из причин предпочтительности K8s executor для critical workloads.

Mitigation:

  • Pod priorities + PodDisruptionBudget на critical DAGs.
  • tolerations + nodeSelector для dedicated nodes.
  • Resource requests чтобы pod не был evicted first.

Scenario 3: AWS spot termination

Symptom: Spot instance terminate notice (2 minutes). Pod выдан SIGTERM. Если не done за 2 минуты — SIGKILL.

Recovery:

  • Если task завершилась в grace period — normal success/failed.
  • Если не — same как pod evicted.

Mitigation:

  • Karpenter / cluster autoscaler с spot interrupt handler — drain node заранее.
  • Tasks с long runtime — НЕ на spot, либо с checkpointing.
  • AIRFLOW__CORE__DAG_DEFAULT_VIEW=grid для quick zombie spotting.

Scenario 4: Scheduler GC pause / DB connection storm

Symptom: Scheduler жив, но heartbeat thread заблокирован: long DB query, lock wait, Python GC pause > 30s.

Recovery:

  • Другой scheduler ошибочно считает его dead, orphan-ит его TIs.
  • Реальный scheduler “просыпается” и продолжает работать с теми же TIs — double scheduling.

Это самый dangerous failure mode — двойная активность scheduler-ов. Mitigation:

  • scheduler_health_check_threshold не меньше 30s (default).
  • PgBouncer для DB connection pooling.
  • Monitoring GC pauses через JFR (Java) / cProfile (Python).

Scenario 5: Deferred task triggerer crashed

Symptom: Triggerer pod crashed, TI в state=deferred с Trigger row pointing к dead triggerer.

Recovery:

  1. После triggerer_health_check_threshold (default 30s) — другой triggerer adopt-ит trigger row через Trigger.assign_unassigned.
  2. Если single triggerer и crash — TI deferred forever (до restart).

Mitigation:

  • Минимум 2 triggerer replicas в production.
  • [triggerer] default_capacity 1000 — баланс concurrency.

Inspect commands

-- Все мёртвые schedulers (latest_heartbeat прокис)
SELECT id, hostname, latest_heartbeat,
       now() - latest_heartbeat AS gap
FROM job
WHERE job_type = 'SchedulerJob' AND state = 'running'
  AND latest_heartbeat < now() - interval '30 seconds';

-- Zombie tasks (running, но job heartbeat прокис)
SELECT ti.dag_id, ti.task_id, ti.run_id, ti.start_date,
       j.latest_heartbeat,
       now() - j.latest_heartbeat AS heartbeat_gap
FROM task_instance ti
JOIN job j ON ti.job_id = j.id
WHERE ti.state = 'running'
  AND j.latest_heartbeat < now() - interval '300 seconds'
ORDER BY heartbeat_gap DESC;

-- Orphan tasks (queued/running с failed scheduler)
SELECT ti.dag_id, ti.task_id, ti.state, ti.scheduler_id,
       j.state AS scheduler_state, j.latest_heartbeat
FROM task_instance ti
JOIN job j ON ti.scheduler_id = j.id
WHERE ti.state IN ('queued', 'running', 'deferred')
  AND j.state = 'failed';

-- Deferred tasks без active triggerer
SELECT ti.dag_id, ti.task_id, t.id AS trigger_id,
       t.triggerer_id, j.latest_heartbeat
FROM task_instance ti
JOIN trigger t ON ti.trigger_id = t.id
LEFT JOIN job j ON t.triggerer_id = j.id
WHERE ti.state = 'deferred'
  AND (j.id IS NULL OR j.latest_heartbeat < now() - interval '60 seconds');

Configuration tunables

ПараметрDefaultЧто
[scheduler] scheduler_health_check_threshold30sКогда scheduler считается мёртвым
[scheduler] scheduler_zombie_task_threshold300sКогда TI считается zombie
[triggerer] triggerer_health_check_threshold30sКогда triggerer считается мёртвым
[scheduler] orphaned_tasks_check_interval300sКак часто scheduler проверяет orphans
[scheduler] zombie_detection_interval10sКак часто проверка zombies
[celery] task_adoption_timeout600sСколько ждать adopted task завершения

Production sweet spot:

  • scheduler_health_check_threshold = 30s (default OK, не уменьшайте — false positives на GC pauses)
  • scheduler_zombie_task_threshold = 300s (default OK для Celery, можно 120s для K8s)
  • orphaned_tasks_check_interval = 60s (агрессивнее, чтобы recovery быстрее)

Production gotchas

1. Zombie threshold ниже task duration

Если у вас task running 10 минут, а zombie_threshold=300s (5 мин) — нормальные tasks могут быть mislabeled как zombies при network latency.

Fix: убедитесь threshold > max expected task duration + 50% buffer.

2. Double-execution after orphan reset

После reset state=scheduled, TI попадёт в queue заново — но оригинальная task в Celery/K8s могла всё ещё complete. Результат — task running дважды (idempotency violation).

Fix:

  • Idempotent tasks (re-runnable без side effects)
  • Critical workloads — use Variable.get lock в начале task
  • Или K8s executor (быстрее detection)

3. Deferred reset → context lost

Если deferred task reset (triggerer crash), next_method обнуляется — task начнётся с execute(), а не с resume_from_deferred. Operator может re-defer, но state inside execute() лост.

Fix:

  • Idempotent execute() в deferrable operators
  • Сохраняйте state в XCom или external store, а не в local variables

4. orphaned_tasks_check_interval > heartbeat threshold

Если orphaned_tasks_check_interval (default 300s) больше scheduler_health_check_threshold (30s), может быть окно 30-300s, когда orphans лежат, но никто не recovery.

Fix: orphaned_tasks_check_interval ≤ scheduler_health_check_threshold * 5.

5. PostgreSQL VACUUM на job table

job table grows: одна row на каждый scheduler heartbeat, каждый LocalTaskJob start. За год — миллионы rows. Без VACUUM index bloat → slow queries → slow heartbeat → false zombies.

Fix:

  • airflow db clean --table job --clean-before-timestamp ... в cron
  • pg_repack для index bloat
  • Autovacuum tuning (autovacuum_naptime, autovacuum_vacuum_scale_factor)

В следующем уроке

Lesson 06 — Backfill internals. Backfill в 2.x использует отдельный BackfillJob, который параллелит работу со scheduler-ом и имеет свои гонки. Разберём, как airflow dags backfill --reset-dagruns работает изнутри, и почему в 3.x backfill переехал в scheduler (AIP-78).


Проверка знанийKnowledge check
Scheduler в K8s deployment имеет GC pause 45 секунд. Что произойдёт с его TIs, и какие могут быть последствия для production?
ОтветAnswer
GC pause > scheduler_health_check_threshold (30s default) → другой scheduler считает его мёртвым → orphan_or_adopt TIs запускается. Executor пытается adopt — например, CeleryExecutor re-attach к Celery tasks по external_executor_id. Те, кого не смог adopt — reset state=scheduled. Через 15s GC завершается, original scheduler возобновляет работу, но его TIs уже потеряны / reset / двойной tracking. Опасность: **double execution** — original Celery task всё ещё running, но scheduler 2 повторно enqueue-ил task. Если task не idempotent (INSERT INTO без ON CONFLICT, файлы без overwrite, payment processing) — duplicate side effects. Mitigation: (1) increase scheduler_health_check_threshold до 60-120s; (2) PgBouncer pooling чтобы heartbeat не страдал от connection storm; (3) tune JVM/Python GC (для Python — gc.set_threshold tuning, или PyPy); (4) idempotent tasks как absolute requirement; (5) monitoring Python GC pauses через monitoring agent.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Разница между zombie task и orphan task?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6