Backfill-safe DAGs — datetime.now, hardcoded start_date, data_interval respect
Backfill — это запуск DAG за прошлые даты для восполнения исторических данных. Это критически важная функциональность Airflow: новый pipeline для orders нужно прогнать за последние 12 месяцев, или сбежавший pipeline восстановить за 3 дня. Backfill работает только если ваш DAG backfill-safe — то есть результат run(ds=2026-05-12) идентичен при запуске сейчас или через год.
Этот урок — конкретные правила и примеры: почему datetime.now() ломает backfill, как использовать data_interval, когда catchup=True/False, и как тестировать backfill-safety.
Что значит «backfill-safe»
Формальное определение:
DAG backfill-safe, если для любого
data_interval=[T1, T2], результат запуска DAG идентичен независимо от wall-clock time запуска.
Примеры:
Backfill-safe:
@task
def aggregate_revenue(data_interval_start, data_interval_end):
sql = f"""
INSERT INTO daily_revenue
SELECT date_trunc('day', created_at), SUM(amount)
FROM orders
WHERE created_at >= '{data_interval_start}' AND created_at < '{data_interval_end}'
GROUP BY 1
ON CONFLICT (day) DO UPDATE SET amount = EXCLUDED.amount;
"""
Запуск в 2026-05-12 за data_interval=[2026-05-11, 2026-05-12) или backfill в 2027-05-12 за тот же interval — identical SQL, identical result.
НЕ backfill-safe:
@task
def aggregate_revenue():
from datetime import datetime, timedelta
end = datetime.now() # ❌ wall-clock!
start = end - timedelta(days=1)
sql = f"SELECT ... WHERE created_at BETWEEN '{start}' AND '{end}'"
Запуск сейчас → за вчера. Backfill за 2024-01-15 → за вчера от 2027-05-12 (когда backfill запущен), а не за 2024-01-15 как должно быть.
Anti-pattern 1: datetime.now() в task
Самая частая ошибка:
# ❌ ПЛОХО
@task
def fetch_orders():
today = datetime.now().date()
yesterday = today - timedelta(days=1)
sql = f"SELECT * FROM orders WHERE created_date BETWEEN '{yesterday}' AND '{today}'"
Что происходит при backfill для 2024-06-15:
- Airflow scheduler creates DagRun с
execution_date=2024-06-15,data_interval=[2024-06-15, 2024-06-16) - Task запускается в 2027-05-12 (wall clock)
datetime.now()→ 2027-05-12, не 2024-06-15- Task fetch-ит данные за 2027-05-12, не за исторический 2024-06-15
- Backfill дал wrong data
Fix: использовать context переменные:
# ✅ ХОРОШО
@task
def fetch_orders(data_interval_start, data_interval_end):
sql = f"""
SELECT * FROM orders
WHERE created_at >= '{data_interval_start}' AND created_at < '{data_interval_end}'
"""
Или через get_current_context():
@task
def fetch_orders():
from airflow.operators.python import get_current_context
ctx = get_current_context()
start = ctx["data_interval_start"]
end = ctx["data_interval_end"]
...
При backfill для 2024-06-15 task получит data_interval_start=2024-06-15, data_interval_end=2024-06-16 — правильно.
Anti-pattern 2: start_date=datetime.now()
# ❌ КАТАСТРОФА
@dag(
start_date=datetime.now() - timedelta(days=7), # ❌
schedule="@daily",
)
datetime.now() выполняется на каждом DAG parse (~30s). Поведение:
- Parse в 09:00:00 → start_date=2026-05-05 09:00:00
- Parse в 09:00:30 → start_date=2026-05-05 09:00:30
- Scheduler видит non-deterministic start_date — может create runs, потом передумать
next_dagrunсдвигается → DagRuns могут пропускаться или дублироваться
Fix: hardcoded date.
# ✅ ХОРОШО
@dag(
start_date=datetime(2024, 1, 1), # hardcoded
schedule="@daily",
)
После change start_date в production — может потребоваться airflow db reset для DAG (drastic) или DagRun cleanup (less drastic).
Anti-pattern 3: catchup=True без понимания
@dag(
start_date=datetime(2020, 1, 1),
schedule="@daily"
catchup=True, # ❌ — DAG попытается прогнать с 2020!
)
При первом deployment Airflow попытается create DagRuns за каждый день с 2020-01-01 до сейчас. Это:
- 6 лет × 365 дней = ~2200 DagRuns одновременно
- Scheduler overload
- Resource exhaustion
- Inevitable failures cascade
Rule: для новых DAGs всегда catchup=False. Backfill делайте explicit:
# Explicit backfill за нужный range
airflow dags backfill my_dag \
--start-date 2024-01-01 \
--end-date 2024-12-31
Batch-обработка — окна, расписание, идемпотентность
Использование data_interval
data_interval — это центральный concept в Airflow 2.x. Каждый DagRun имеет:
data_interval_start— начало временного окна, которое run представляетdata_interval_end— конец окна
Для @daily schedule с execution_date=2026-05-12:
data_interval_start = 2026-05-12 00:00:00data_interval_end = 2026-05-13 00:00:00
Run для 2026-05-12 обрабатывает данные за 2026-05-12, и запускается в начале 2026-05-13 (когда interval завершён).
Это subtle but critical: execution_date для DAG = data_interval_start, не время запуска. Run для “12 мая” реально запускается ночью с 12 на 13.
@task
def process_day(data_interval_start, data_interval_end):
"""Process все события за data interval."""
sql = f"""
INSERT INTO daily_metrics
SELECT
'{data_interval_start.date()}' AS day,
COUNT(*) AS event_count
FROM events
WHERE event_time >= '{data_interval_start}'
AND event_time < '{data_interval_end}'
ON CONFLICT (day) DO UPDATE SET event_count = EXCLUDED.event_count;
"""
PostgresHook().run(sql)
При backfill за 2024-06-15:
data_interval_start = 2024-06-15data_interval_end = 2024-06-16- SQL фильтрует events strictly за этот день
- Result identical wall-clock independent
Backfill semantics — что Airflow гарантирует
При airflow dags backfill my_dag --start-date 2024-01-01 --end-date 2024-01-31:
- Airflow создаёт DagRuns за каждый scheduled interval в range
- DagRuns marked
run_type='backfill' - Запускаются параллельно (до
max_active_runslimit) - Каждый DagRun имеет свой
data_interval— изолирован
Что Airflow НЕ гарантирует:
- Порядок выполнения — DagRuns могут запускаться параллельно
- Уникальность wall-clock time — два runs могут happen одновременно
- Dependencies между DagRuns — DagRun за 2024-01-15 НЕ ждёт 2024-01-14 unless explicit
depends_on_past=True
depends_on_past — для sequential dependencies
Если task A для 2024-01-15 нужен complete A для 2024-01-14 (например, cumulative aggregation):
@task(depends_on_past=True)
def cumulative_total(data_interval_start):
"""Today's cumulative = yesterday's + today's increment."""
sql = f"""
INSERT INTO cumulative_totals
SELECT
'{data_interval_start.date()}' AS day,
(SELECT total FROM cumulative_totals
WHERE day = '{(data_interval_start.date() - timedelta(days=1))}') +
(SELECT SUM(amount) FROM orders WHERE created_date = '{data_interval_start.date()}')
"""
С depends_on_past=True:
- Task не запустится пока same task за prior data_interval не достиг success
- Backfill становится sequential — один-за-другим
- Slower, но safe для cumulative state
Caveat: depends_on_past может deadlock backfill если old runs missing. Better — design tasks так чтобы не нужен dependency on past (recompute from scratch когда возможно).
Anti-pattern 4: External state changing over time
@task
def fetch_top_products():
"""Use external API which returns 'top products today'."""
response = requests.get("https://api.example.com/top-products")
top_products = response.json()
# ❌ API возвращает today's top products, не historical
# Backfill за 2024-06-15 получит top products on day backfill running, не historical
Если backfill требует historical state — нужен:
- API endpoint с date parameter:
https://api.example.com/top-products?date={data_interval_start} - Local historical snapshot (loaded once, queried by date)
- Иначе — task fundamentally not backfill-safe
Тестирование backfill-safety
# tests/integration/test_backfill_safe.py
import subprocess
def test_backfill_deterministic():
"""Run task для historical date — verify it pulls historical data."""
# Setup test data — orders за 2024-06-15
setup_test_data_for_date("2024-06-15")
# Run task для 2024-06-15
result = subprocess.run(
["airflow", "tasks", "test", "orders_etl", "aggregate_revenue", "2024-06-15"],
capture_output=True, text=True,
)
assert result.returncode == 0
# Verify aggregation correct for that date
actual = query_db("SELECT amount FROM daily_revenue WHERE day = '2024-06-15'")
expected = sum(test_data_amounts_for("2024-06-15"))
assert actual == expected
Или через airflow dags test:
def test_full_dag_backfill():
"""Run full DAG за 2024-06-15 — все tasks должны быть backfill-safe."""
result = subprocess.run(
["airflow", "dags", "test", "orders_etl", "2024-06-15"],
capture_output=True, text=True,
)
assert result.returncode == 0
# Verify state correctly reflects 2024-06-15 data
Backfill best practices
1. Idempotent + deterministic = backfill-safe
Если task идемпотентен (модуль 17.02) и deterministic от data_interval — он автоматически backfill-safe.
2. Используйте deterministic paths
# Path определён data_interval — не wall-clock
s3_path = f"s3://datalake/orders/dt={data_interval_start.date()}/data.parquet"
3. catchup=False для новых DAGs
Backfill — explicit operation через CLI, не implicit при deploy.
4. max_active_runs для backfill control
@dag(max_active_runs=3, ...)
Без limit backfill за 365 дней запустит 365 DagRuns параллельно. С max_active_runs=3 — 3 одновременно, queue для остальных.
5. pool для resource limit
Если DAG использует limited resource (Spark cluster, API rate limit) — assign pool:
@task(pool="api_calls", pool_slots=1)
def call_external_api():
...
Backfill 365 runs — все запросы соберутся в pool, обработаются по очереди (до slots paralelizm).
Production gotchas
Timezone awareness. data_interval_start всегда UTC в Airflow 2.x. Если ваши данные в local timezone — convert explicitly. Иначе off-by-one bugs.
Schedule с irregular intervals. @hourly или cron 0 9 * * * — regular, easy. Business calendar (только weekdays, exclude holidays) — irregular. При backfill irregular schedule Airflow создаёт runs только для valid intervals. Test thoroughly.
end_date для DAG. Если DAG имеет end_date=datetime(2024, 12, 31), backfill за 2025 silently не создаст runs. Check end_date не expired.
Variable.get для config — может ломать backfill. Если task делает Variable.get('some_threshold') — value возможно изменилось с 2024-06-15. Если threshold matters для historical computation — use config table с date column instead Variable.
Partial data warnings. Backfill в реальном времени (run за yesterday) — данные могут быть incomplete (late-arriving events). Add validation: row count за data_interval >= expected_min, иначе fail задачу.
OpenLineage в backfill — генерируется normally, marked с run_type=backfill. Helps trace what was backfilled.