Backfill internals — BackfillJob в Airflow 2.x
Backfill — это запуск DAG-а за исторический период: “перепрогнать все daily runs с 2025-01-01 по 2025-12-31”. В 2.x это специальный режим работы, реализованный через отдельный класс BackfillJob (не scheduler). Backfill процесс конкурирует с обычным scheduler-ом за DB locks, pool slots, executor capacity. Понимание его внутренностей важно — backfills могут парализовать кластер, если запущены неправильно.
В 3.x backfill переехал в scheduler (AIP-78 Scheduler-Managed Backfill) и стал first-class citizen scheduling. В 2.x — отдельный legacy path с собственными проблемами.
Backfill в архитектуре
Backfill — это отдельный процесс с собственным main loop, который выглядит похоже на scheduler, но работает только над DagRuns в указанном диапазоне дат.
Batch-обработка — окна, расписание, идемпотентность
CLI: airflow dags backfill
# Базовый syntax
airflow dags backfill \
--start-date 2025-01-01 \
--end-date 2025-12-31 \
my_dag_id
# Полный с options
airflow dags backfill \
--start-date 2025-01-01 \
--end-date 2025-12-31 \
--reset-dagruns \
--rerun-failed-tasks \
--local \
--pool backfill_pool \
--delay-on-limit 30 \
--donot-pickle \
--ignore-first-depends-on-past \
my_dag_id
Ключевые flags:
| Flag | Что |
|---|---|
--start-date / --end-date | Диапазон data_interval_start для runs |
--reset-dagruns | Удалить существующие DagRuns в этом диапазоне и создать заново |
--rerun-failed-tasks | Не пересоздавать DagRun, только rerun TIs в failed state |
--local | Use LocalExecutor вместо configured executor (изолированно от prod) |
--pool | Использовать указанный pool (изолировать от prod pool) |
--delay-on-limit | Задержка в секундах, когда достигнут max_active_runs (default 1.0) |
--ignore-first-depends-on-past | Игнорировать depends_on_past для первого run в диапазоне |
--ignore-dependencies | Полностью игнорировать deps между tasks (DANGER!) |
--continue-on-failures | Продолжать backfill, если предыдущие runs failed |
--disable-retry | Отключить retries для backfill runs |
--reset-dagruns — самый important и dangerous flag. Он деструктивно удаляет DagRun rows в указанном диапазоне:
-- Что делает --reset-dagruns под капотом
BEGIN;
DELETE FROM task_instance WHERE dag_id='X' AND run_id IN (
SELECT run_id FROM dag_run
WHERE dag_id='X'
AND data_interval_start BETWEEN '2025-01-01' AND '2025-12-31'
);
DELETE FROM dag_run WHERE dag_id='X'
AND data_interval_start BETWEEN '2025-01-01' AND '2025-12-31';
COMMIT;
XCom для удалённых TI тоже удаляется. Audit trail теряется. Используйте с пониманием последствий.
BackfillJob lifecycle
В airflow/jobs/backfill_job.py:
# Псевдокод BackfillJob._execute
class BackfillJob(BaseJob):
def _execute(self):
"""Main loop backfill."""
# 1. Создать или найти DagRuns для всего диапазона
if self.reset_dagruns:
self._delete_existing_dagruns()
dagruns_to_run = self._create_dagruns_for_range()
# 2. Цикл обработки — пока есть active dagruns
while dagruns_to_run:
# Запустить batch dagruns параллельно (max_active_runs limit)
batch = self._next_batch(dagruns_to_run, max_active=self.dag.max_active_runs)
# Schedule TIs для batch (аналог scheduler фаза 2-3)
for dagrun in batch:
self._schedule_dagrun_tasks(dagrun)
# Wait for batch completion
self._wait_for_batch(batch)
# Remove done dagruns from list
dagruns_to_run = [dr for dr in dagruns_to_run
if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED)]
if self.delay_on_limit:
sleep(self.delay_on_limit)
# 3. Verdict
if any(dr.state == DagRunState.FAILED for dr in self.original_dagruns):
if not self.continue_on_failures:
raise AirflowException("Backfill failed")
Key differences от scheduler:
- Backfill создаёт все DagRuns upfront (или находит существующие, если не reset).
- Backfill синхронно ждёт completion каждого batch (scheduler — нет, он fire-and-forget).
- Backfill завершается, когда все DagRuns terminal (scheduler — бесконечный loop).
- Backfill управляет executor сам — если
--local, поднимает свой LocalExecutor; иначе shared.
Конкурирование со scheduler — гонки и проблемы
Когда backfill running одновременно с production scheduler, возможны конфликты:
1. Pool exhaustion
Backfill использует тот же pool (default_pool или указанный --pool). Если backfill enqueue 200 TIs одновременно — production scheduling блокируется.
Mitigation: всегда используйте --pool backfill_pool с dedicated pool slots (например 5), чтобы оставить место production-у.
2. max_active_runs collision
Если у DAG max_active_runs=3, и backfill уже использует 3 active runs, production scheduler не может создать новый scheduled DagRun. Production lag-ает за backfill.
Mitigation: запускайте backfill в окно низкой production активности, или временно увеличьте max_active_runs.
3. Critical section contention
Backfill тоже захватывает row lock на slot_pool (через with_for_update). Если backfill держит lock долго, production scheduler пропускает tick-и (через NOWAIT — graceful).
Mitigation: обычно не проблема — backfill держит lock короткие моменты. Но при огромных backfills (>1000 TIs/batch) — есть наблюдаемый latency на production scheduling.
4. Order of execution — sequential vs parallel
Backfill по умолчанию обрабатывает DagRuns в date order (oldest first). Это важно для DAGs с depends_on_past=True — без этого order broken.
# Псевдокод _create_dagruns_for_range
def _create_dagruns_for_range(self):
dates = list(self.dag.timetable.iter_between(self.start_date, self.end_date))
dates.sort() # ← strictly ascending
dagruns = []
for date in dates:
dr = self.dag.create_dagrun(
run_id=f"backfill__{date}"
run_type=DagRunType.BACKFILL,
data_interval_start=date,
data_interval_end=self.dag.timetable.data_interval_end(date),
state=DagRunState.QUEUED,
)
dagruns.append(dr)
return dagruns
Если у DAG depends_on_past=True и backfill даёт 365 days, 365-й day будет ждать 364-го — общая duration линейная по количеству dates.
run_type=backfill — где это видно
В UI и API можно отличить backfill runs от scheduled:
-- Все backfill runs
SELECT dag_id, run_id, state, start_date, end_date,
end_date - start_date AS duration
FROM dag_run
WHERE run_type = 'backfill'
ORDER BY start_date DESC;
В UI Grid view backfill runs имеют отдельную иконку. В REST API: dag_runs?run_type=backfill.
depends_on_past поведение при backfill
depends_on_past=True означает “current TI ждёт success previous run’s TI того же task”. При backfill это создаёт chain:
Без --ignore-first-depends-on-past для первого run в backfill range и если в DB нет previous successful run — backfill застрянет с TI в state none ждущим несуществующий previous.
wait_for_downstream=True — ещё сложнее. Это означает “previous run’s ALL downstream tasks of this task должны быть success”. При backfill это создаёт plait dependencies — task A current run depends_on_past, и ALL of A’s downstream tasks predecessor run должны быть success. Backfill это honor-ит, но debugging сложен.
—rerun-failed-tasks vs —reset-dagruns
Два разных способа “перезапустить” historical data:
| Flag | Что делает | Когда использовать |
|---|---|---|
--rerun-failed-tasks | Найти DagRun-ы в диапазоне, для каждого clear failed TIs, re-run только их | Точечный rerun после fix bug в DAG, не хочу терять успешные TIs |
--reset-dagruns | Удалить все DagRun-ы и TIs в диапазоне, создать заново | Полностью очистить historical data, например после schema change |
Rule of thumb: 95% случаев — --rerun-failed-tasks. --reset-dagruns только когда нужно полностью пересоздать (TI structure changed — added/removed tasks, и existing TIs incoherent).
Backfill при изменённом DAG-коде
Самый опасный gotcha. Допустим:
- На 2025-01-01 DAG имел tasks A → B → C.
- На 2026-01-01 DAG имеет tasks A → B → D (C удалён, D добавлен).
- Запускаем backfill за 2025-01-01.
Что произойдёт в 2.x:
- Backfill использует текущую версию DAG (A → B → D).
- Создаётся DagRun с data_interval_start=2025-01-01 и TIs текущей структуры (A, B, D).
- НО! Если у вас есть
serialized_dagот 2026-01-01, оригинальная structure для 2025-01-01 потеряна. - Историческая правильность нарушена.
В 3.x с DAG Versioning (AIP-63) каждый DagRun привязан к dag_version_id — backfill за 2025-01-01 использует версию DAG от 2025-01-01. В 2.x этого нет.
Mitigation в 2.x:
- Git tag versions DAG-кода. Перед backfill — checkout к старой версии, restart DAG processor.
- Использовать отдельный staging env с правильной версией кода для backfill.
- Или принять, что backfill — это “rerun с текущим кодом”, не “historical replay”.
Production patterns
Pattern 1: Dedicated backfill pool
# В Airflow UI / via CLI
airflow pools set backfill_pool 5 "Backfill execution pool"
# При backfill использовать
airflow dags backfill --pool backfill_pool ...
Backfill не сможет занять больше 5 slots — production safe.
Pattern 2: —local execution для smoke test
# Запустить backfill на 1 день локально с LocalExecutor
airflow dags backfill \
--local \
--start-date 2025-01-01 \
--end-date 2025-01-01 \
my_dag
Не использует production executor (Celery / K8s). Все TIs выполняются в subprocess текущего CLI. Полезно для smoke test перед massive backfill.
Pattern 3: Backfill через PythonOperator
Иногда удобнее запустить backfill из airflow DAG (мета-DAG):
@task
def trigger_backfill(start_date: str, end_date: str, dag_id: str):
"""Trigger backfill для target DAG через Python API."""
from airflow.models import DagBag
from airflow.api.common.experimental.mark_tasks import set_dag_run_state_to_running
# Используйте airflow CLI или Python API
subprocess.run([
"airflow", "dags", "backfill",
"--start-date", start_date,
"--end-date", end_date,
"--pool", "backfill_pool",
dag_id,
], check=True)
Позволяет schedule backfill через cron.
Inspection и monitoring
-- Активные backfill jobs
SELECT id, hostname, state, start_date, latest_heartbeat,
now() - latest_heartbeat AS heartbeat_gap
FROM job
WHERE job_type = 'BackfillJob' AND state = 'running';
-- Progress конкретного backfill
SELECT state, count(*) AS cnt
FROM dag_run
WHERE dag_id = 'my_dag'
AND run_type = 'backfill'
AND queued_at >= now() - interval '24 hours'
GROUP BY state;
-- TIs от backfill, которые long-running
SELECT ti.dag_id, ti.task_id, ti.run_id, ti.state,
now() - ti.start_date AS running_time
FROM task_instance ti
JOIN dag_run dr ON ti.dag_id = dr.dag_id AND ti.run_id = dr.run_id
WHERE dr.run_type = 'backfill'
AND ti.state = 'running'
AND ti.start_date < now() - interval '1 hour'
ORDER BY ti.start_date;
Production gotchas
1. Backfill убивает production через pool contention
Без --pool backfill_pool backfill использует default_pool — тот же, что и production. 100 backfill TIs в queue → production не получает slots → SLA breach.
Always: --pool backfill_pool с dedicated slots.
2. --ignore-dependencies — last resort only
Этот флаг пропускает task dependencies — A → B → C запустятся параллельно. Может казаться удобным для speedup, но:
- Ломает data correctness (B запустится без output A)
- Side effects parallelism (двойная запись)
Never use в production data pipelines. Только для smoke tests с idempotent tasks.
3. Backfill после code change даёт неправильный data
Описано выше (changed DAG section). В 2.x нет защиты — backfill использует текущий код. Это часто причина “почему данные 2025 года выглядят как структуру 2026 года”.
4. CLI backfill завершается с ошибкой через Ctrl+C — DagRuns остаются
Backfill завершается, но созданные DagRuns в state running или queued остаются. При restart — backfill начнётся сначала, конфликтуя с existing.
Fix перед restart: airflow dags delete my_dag или --reset-dagruns при restart.
5. depends_on_past=True + большой range = forever
Backfill за год daily с depends_on_past=True и 5 минут на run = 365 * 5 = 30 hours. Нельзя параллелизовать (sequential by design).
Workaround:
- Если order не критичен —
--ignore-first-depends-on-past+ small parallel runs - Если критичен — accept long duration, run на dedicated worker
6. Backfill memory usage в scheduler process
BackfillJob создаёт все DagRun rows upfront и держит их в memory. Backfill за 5 лет daily = 1800 DagRuns + ~100 TIs each = 180k objects в memory. На 100-task DAG — 18 million objects. OOM на scheduler-машине.
Fix:
- Run backfill в batches: 1 месяц за раз, в шелл-loop.
- 3.x AIP-78 scheduler-managed решает это — батчи обрабатываются streaming.
Comparison с 3.x scheduler-managed backfill (AIP-78)
| Aspect | 2.x BackfillJob | 3.x scheduler-managed |
|---|---|---|
| Process | Отдельный BackfillJob | Scheduler сам обрабатывает |
| API trigger | CLI / airflow dags backfill | REST API + UI button |
| Conflict resolution | Через pools / max_active_runs | First-class scheduling priority |
| DAG version | Use current code (broken) | Pinned к dag_version_id |
| Memory | All upfront | Streaming batches |
| Pause / resume | Не поддерживается | Native pause / resume / cancel |
| Observability | Логи отдельно от scheduler | В scheduler logs / metrics |
В 3.x backfill стал first-class citizen — это главное упрощение operational story Airflow 3.x.
Migration tips для 2.x → 3.x
Если планируете upgrade к 3.x:
- Не используйте
--reset-dagrunsмассово в 2.x — в 3.x это будет другой API. - Использовать
--pool backfill_poolуже сейчас — в 3.x backfill также respects pools. - Документировать your backfill workflows — пригодится при migration к scheduler-managed API.
- DAG Versioning в 3.x — начните думать о tagging DAG versions сейчас.