Learning Platform
Глоссарий Troubleshooting
Урок 05.06 · 28 мин
Продвинутый
BackfillBackfillJobCLIdepends_on_pastHistorical reruns

Backfill internals — BackfillJob в Airflow 2.x

Backfill — это запуск DAG-а за исторический период: “перепрогнать все daily runs с 2025-01-01 по 2025-12-31”. В 2.x это специальный режим работы, реализованный через отдельный класс BackfillJob (не scheduler). Backfill процесс конкурирует с обычным scheduler-ом за DB locks, pool slots, executor capacity. Понимание его внутренностей важно — backfills могут парализовать кластер, если запущены неправильно.

В 3.x backfill переехал в scheduler (AIP-78 Scheduler-Managed Backfill) и стал first-class citizen scheduling. В 2.x — отдельный legacy path с собственными проблемами.


Backfill в архитектуре

Где BackfillJob живёт
airflow schedulerMain loop: создаёт scheduled DagRuns, schedule-ит TIs, critical section. Living process в production deployment.
airflow dags backfillCLI command. Запускается на user machine или scheduled через CI/CD. Создаёт отдельный процесс BackfillJob, который работает изолированно.
Shared: metadata DBОбе jobs читают/пишут в одну metadata DB. Конкурируют за table_locks, pool.used_slots, max_active_runs.
Shared: executorЕсли backfill использует --local, у него собственный LocalExecutor. Иначе разделяет CeleryExecutor / KubernetesExecutor с scheduler-ом.

Backfill — это отдельный процесс с собственным main loop, который выглядит похоже на scheduler, но работает только над DagRuns в указанном диапазоне дат.


Batch-обработка — окна, расписание, идемпотентность

CLI: airflow dags backfill

# Базовый syntax
airflow dags backfill \
    --start-date 2025-01-01 \
    --end-date 2025-12-31 \
    my_dag_id

# Полный с options
airflow dags backfill \
    --start-date 2025-01-01 \
    --end-date 2025-12-31 \
    --reset-dagruns \
    --rerun-failed-tasks \
    --local \
    --pool backfill_pool \
    --delay-on-limit 30 \
    --donot-pickle \
    --ignore-first-depends-on-past \
    my_dag_id

Ключевые flags:

FlagЧто
--start-date / --end-dateДиапазон data_interval_start для runs
--reset-dagrunsУдалить существующие DagRuns в этом диапазоне и создать заново
--rerun-failed-tasksНе пересоздавать DagRun, только rerun TIs в failed state
--localUse LocalExecutor вместо configured executor (изолированно от prod)
--poolИспользовать указанный pool (изолировать от prod pool)
--delay-on-limitЗадержка в секундах, когда достигнут max_active_runs (default 1.0)
--ignore-first-depends-on-pastИгнорировать depends_on_past для первого run в диапазоне
--ignore-dependenciesПолностью игнорировать deps между tasks (DANGER!)
--continue-on-failuresПродолжать backfill, если предыдущие runs failed
--disable-retryОтключить retries для backfill runs

--reset-dagruns — самый important и dangerous flag. Он деструктивно удаляет DagRun rows в указанном диапазоне:

-- Что делает --reset-dagruns под капотом
BEGIN;
DELETE FROM task_instance WHERE dag_id='X' AND run_id IN (
    SELECT run_id FROM dag_run
    WHERE dag_id='X'
      AND data_interval_start BETWEEN '2025-01-01' AND '2025-12-31'
);
DELETE FROM dag_run WHERE dag_id='X'
  AND data_interval_start BETWEEN '2025-01-01' AND '2025-12-31';
COMMIT;

XCom для удалённых TI тоже удаляется. Audit trail теряется. Используйте с пониманием последствий.


BackfillJob lifecycle

В airflow/jobs/backfill_job.py:

# Псевдокод BackfillJob._execute
class BackfillJob(BaseJob):
    def _execute(self):
        """Main loop backfill."""
        # 1. Создать или найти DagRuns для всего диапазона
        if self.reset_dagruns:
            self._delete_existing_dagruns()

        dagruns_to_run = self._create_dagruns_for_range()

        # 2. Цикл обработки — пока есть active dagruns
        while dagruns_to_run:
            # Запустить batch dagruns параллельно (max_active_runs limit)
            batch = self._next_batch(dagruns_to_run, max_active=self.dag.max_active_runs)

            # Schedule TIs для batch (аналог scheduler фаза 2-3)
            for dagrun in batch:
                self._schedule_dagrun_tasks(dagrun)

            # Wait for batch completion
            self._wait_for_batch(batch)

            # Remove done dagruns from list
            dagruns_to_run = [dr for dr in dagruns_to_run
                              if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED)]

            if self.delay_on_limit:
                sleep(self.delay_on_limit)

        # 3. Verdict
        if any(dr.state == DagRunState.FAILED for dr in self.original_dagruns):
            if not self.continue_on_failures:
                raise AirflowException("Backfill failed")

Key differences от scheduler:

  1. Backfill создаёт все DagRuns upfront (или находит существующие, если не reset).
  2. Backfill синхронно ждёт completion каждого batch (scheduler — нет, он fire-and-forget).
  3. Backfill завершается, когда все DagRuns terminal (scheduler — бесконечный loop).
  4. Backfill управляет executor сам — если --local, поднимает свой LocalExecutor; иначе shared.

Конкурирование со scheduler — гонки и проблемы

Когда backfill running одновременно с production scheduler, возможны конфликты:

1. Pool exhaustion

Backfill использует тот же pool (default_pool или указанный --pool). Если backfill enqueue 200 TIs одновременно — production scheduling блокируется.

Mitigation: всегда используйте --pool backfill_pool с dedicated pool slots (например 5), чтобы оставить место production-у.

2. max_active_runs collision

Если у DAG max_active_runs=3, и backfill уже использует 3 active runs, production scheduler не может создать новый scheduled DagRun. Production lag-ает за backfill.

Mitigation: запускайте backfill в окно низкой production активности, или временно увеличьте max_active_runs.

3. Critical section contention

Backfill тоже захватывает row lock на slot_pool (через with_for_update). Если backfill держит lock долго, production scheduler пропускает tick-и (через NOWAIT — graceful).

Mitigation: обычно не проблема — backfill держит lock короткие моменты. Но при огромных backfills (>1000 TIs/batch) — есть наблюдаемый latency на production scheduling.

4. Order of execution — sequential vs parallel

Backfill по умолчанию обрабатывает DagRuns в date order (oldest first). Это важно для DAGs с depends_on_past=True — без этого order broken.

# Псевдокод _create_dagruns_for_range
def _create_dagruns_for_range(self):
    dates = list(self.dag.timetable.iter_between(self.start_date, self.end_date))
    dates.sort()  # ← strictly ascending

    dagruns = []
    for date in dates:
        dr = self.dag.create_dagrun(
            run_id=f"backfill__{date}"
            run_type=DagRunType.BACKFILL,
            data_interval_start=date,
            data_interval_end=self.dag.timetable.data_interval_end(date),
            state=DagRunState.QUEUED,
        )
        dagruns.append(dr)
    return dagruns

Если у DAG depends_on_past=True и backfill даёт 365 days, 365-й day будет ждать 364-го — общая duration линейная по количеству dates.


run_type=backfill — где это видно

В UI и API можно отличить backfill runs от scheduled:

-- Все backfill runs
SELECT dag_id, run_id, state, start_date, end_date,
       end_date - start_date AS duration
FROM dag_run
WHERE run_type = 'backfill'
ORDER BY start_date DESC;

В UI Grid view backfill runs имеют отдельную иконку. В REST API: dag_runs?run_type=backfill.


depends_on_past поведение при backfill

depends_on_past=True означает “current TI ждёт success previous run’s TI того же task”. При backfill это создаёт chain:

depends_on_past при backfill (5 days)
Day 1: backfill__2025-01-01Первый run в backfill. previous_run отсутствует или нет в backfill range. Без --ignore-first-depends-on-past — TI висит forever, ждёт previous. С --ignore-first — выполняется как обычно.
success
Day 2: backfill__2025-01-02Зависит от Day 1. Backfill дождётся success Day 1, потом enqueue Day 2.
success
Day 3-5Sequential chain. Каждый ждёт previous. Total time = sum (per-day execution time).

Без --ignore-first-depends-on-past для первого run в backfill range и если в DB нет previous successful run — backfill застрянет с TI в state none ждущим несуществующий previous.

wait_for_downstream=True — ещё сложнее. Это означает “previous run’s ALL downstream tasks of this task должны быть success”. При backfill это создаёт plait dependencies — task A current run depends_on_past, и ALL of A’s downstream tasks predecessor run должны быть success. Backfill это honor-ит, но debugging сложен.


—rerun-failed-tasks vs —reset-dagruns

Два разных способа “перезапустить” historical data:

FlagЧто делаетКогда использовать
--rerun-failed-tasksНайти DagRun-ы в диапазоне, для каждого clear failed TIs, re-run только ихТочечный rerun после fix bug в DAG, не хочу терять успешные TIs
--reset-dagrunsУдалить все DagRun-ы и TIs в диапазоне, создать зановоПолностью очистить historical data, например после schema change

Rule of thumb: 95% случаев — --rerun-failed-tasks. --reset-dagruns только когда нужно полностью пересоздать (TI structure changed — added/removed tasks, и existing TIs incoherent).


Backfill при изменённом DAG-коде

Самый опасный gotcha. Допустим:

  1. На 2025-01-01 DAG имел tasks A → B → C.
  2. На 2026-01-01 DAG имеет tasks A → B → D (C удалён, D добавлен).
  3. Запускаем backfill за 2025-01-01.

Что произойдёт в 2.x:

  • Backfill использует текущую версию DAG (A → B → D).
  • Создаётся DagRun с data_interval_start=2025-01-01 и TIs текущей структуры (A, B, D).
  • НО! Если у вас есть serialized_dag от 2026-01-01, оригинальная structure для 2025-01-01 потеряна.
  • Историческая правильность нарушена.

В 3.x с DAG Versioning (AIP-63) каждый DagRun привязан к dag_version_id — backfill за 2025-01-01 использует версию DAG от 2025-01-01. В 2.x этого нет.

Mitigation в 2.x:

  • Git tag versions DAG-кода. Перед backfill — checkout к старой версии, restart DAG processor.
  • Использовать отдельный staging env с правильной версией кода для backfill.
  • Или принять, что backfill — это “rerun с текущим кодом”, не “historical replay”.

Production patterns

Pattern 1: Dedicated backfill pool

# В Airflow UI / via CLI
airflow pools set backfill_pool 5 "Backfill execution pool"

# При backfill использовать
airflow dags backfill --pool backfill_pool ...

Backfill не сможет занять больше 5 slots — production safe.

Pattern 2: —local execution для smoke test

# Запустить backfill на 1 день локально с LocalExecutor
airflow dags backfill \
    --local \
    --start-date 2025-01-01 \
    --end-date 2025-01-01 \
    my_dag

Не использует production executor (Celery / K8s). Все TIs выполняются в subprocess текущего CLI. Полезно для smoke test перед massive backfill.

Pattern 3: Backfill через PythonOperator

Иногда удобнее запустить backfill из airflow DAG (мета-DAG):

@task
def trigger_backfill(start_date: str, end_date: str, dag_id: str):
    """Trigger backfill для target DAG через Python API."""
    from airflow.models import DagBag
    from airflow.api.common.experimental.mark_tasks import set_dag_run_state_to_running

    # Используйте airflow CLI или Python API
    subprocess.run([
        "airflow", "dags", "backfill",
        "--start-date", start_date,
        "--end-date", end_date,
        "--pool", "backfill_pool",
        dag_id,
    ], check=True)

Позволяет schedule backfill через cron.


Inspection и monitoring

-- Активные backfill jobs
SELECT id, hostname, state, start_date, latest_heartbeat,
       now() - latest_heartbeat AS heartbeat_gap
FROM job
WHERE job_type = 'BackfillJob' AND state = 'running';

-- Progress конкретного backfill
SELECT state, count(*) AS cnt
FROM dag_run
WHERE dag_id = 'my_dag'
  AND run_type = 'backfill'
  AND queued_at >= now() - interval '24 hours'
GROUP BY state;

-- TIs от backfill, которые long-running
SELECT ti.dag_id, ti.task_id, ti.run_id, ti.state,
       now() - ti.start_date AS running_time
FROM task_instance ti
JOIN dag_run dr ON ti.dag_id = dr.dag_id AND ti.run_id = dr.run_id
WHERE dr.run_type = 'backfill'
  AND ti.state = 'running'
  AND ti.start_date < now() - interval '1 hour'
ORDER BY ti.start_date;

Production gotchas

1. Backfill убивает production через pool contention

Без --pool backfill_pool backfill использует default_pool — тот же, что и production. 100 backfill TIs в queue → production не получает slots → SLA breach.

Always: --pool backfill_pool с dedicated slots.

2. --ignore-dependencies — last resort only

Этот флаг пропускает task dependencies — A → B → C запустятся параллельно. Может казаться удобным для speedup, но:

  • Ломает data correctness (B запустится без output A)
  • Side effects parallelism (двойная запись)

Never use в production data pipelines. Только для smoke tests с idempotent tasks.

3. Backfill после code change даёт неправильный data

Описано выше (changed DAG section). В 2.x нет защиты — backfill использует текущий код. Это часто причина “почему данные 2025 года выглядят как структуру 2026 года”.

4. CLI backfill завершается с ошибкой через Ctrl+C — DagRuns остаются

Backfill завершается, но созданные DagRuns в state running или queued остаются. При restart — backfill начнётся сначала, конфликтуя с existing.

Fix перед restart: airflow dags delete my_dag или --reset-dagruns при restart.

5. depends_on_past=True + большой range = forever

Backfill за год daily с depends_on_past=True и 5 минут на run = 365 * 5 = 30 hours. Нельзя параллелизовать (sequential by design).

Workaround:

  • Если order не критичен — --ignore-first-depends-on-past + small parallel runs
  • Если критичен — accept long duration, run на dedicated worker

6. Backfill memory usage в scheduler process

BackfillJob создаёт все DagRun rows upfront и держит их в memory. Backfill за 5 лет daily = 1800 DagRuns + ~100 TIs each = 180k objects в memory. На 100-task DAG — 18 million objects. OOM на scheduler-машине.

Fix:

  • Run backfill в batches: 1 месяц за раз, в шелл-loop.
  • 3.x AIP-78 scheduler-managed решает это — батчи обрабатываются streaming.

Comparison с 3.x scheduler-managed backfill (AIP-78)

Aspect2.x BackfillJob3.x scheduler-managed
ProcessОтдельный BackfillJobScheduler сам обрабатывает
API triggerCLI / airflow dags backfillREST API + UI button
Conflict resolutionЧерез pools / max_active_runsFirst-class scheduling priority
DAG versionUse current code (broken)Pinned к dag_version_id
MemoryAll upfrontStreaming batches
Pause / resumeНе поддерживаетсяNative pause / resume / cancel
ObservabilityЛоги отдельно от schedulerВ scheduler logs / metrics

В 3.x backfill стал first-class citizen — это главное упрощение operational story Airflow 3.x.


Migration tips для 2.x → 3.x

Если планируете upgrade к 3.x:

  1. Не используйте --reset-dagruns массово в 2.x — в 3.x это будет другой API.
  2. Использовать --pool backfill_pool уже сейчас — в 3.x backfill также respects pools.
  3. Документировать your backfill workflows — пригодится при migration к scheduler-managed API.
  4. DAG Versioning в 3.x — начните думать о tagging DAG versions сейчас.

Проверка знанийKnowledge check
Production: запустили backfill за 6 месяцев daily DAG с 50 tasks, max_active_runs=3, depends_on_past=True. Через час scheduler-логи показывают огромный backlog scheduled TIs, production DAGs visually 'застряли'. Что произошло и как починить?
ОтветAnswer
Сразу несколько проблем сложились: (1) depends_on_past=True заставляет backfill быть sequential — даже max_active_runs=3 не даёт параллелизм для одного DAG, потому что каждый day ждёт предыдущий. (2) Backfill использует default_pool — занял все slots (50 TIs * 3 active = 150 в queue). (3) Production scheduler не получает pool slots, scheduled TIs накапливаются. (4) Если backfill не закончит за reasonable time, SLAs нарушены. Fix: (a) ABORT backfill сейчас, через kill CLI process. (b) Создать dedicated backfill_pool с 5-10 slots. (c) Запустить backfill заново с --pool backfill_pool, --start-date / --end-date в меньших batches (1 месяц за раз). (d) Если depends_on_past не нужен для backfill — добавить --ignore-first-depends-on-past и обработать каждый день параллельно (но careful с data correctness). (e) Run backfill в off-hours когда production lull. (f) Долгосрочно: upgrade к 3.x с scheduler-managed backfill — он не делает upfront create DagRuns и батч-friendly.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Backfill в Airflow 2.x — как реализован?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6