Learning Platform
Глоссарий Troubleshooting
Урок 18.05 · 22 мин
Продвинутый
BackfillData Intervalstart_datecatchupAnti-Patterns

Backfill-safe DAGs — datetime.now, hardcoded start_date, data_interval respect

Backfill — это запуск DAG за прошлые даты для восполнения исторических данных. Это критически важная функциональность Airflow: новый pipeline для orders нужно прогнать за последние 12 месяцев, или сбежавший pipeline восстановить за 3 дня. Backfill работает только если ваш DAG backfill-safe — то есть результат run(ds=2026-05-12) идентичен при запуске сейчас или через год.

Этот урок — конкретные правила и примеры: почему datetime.now() ломает backfill, как использовать data_interval, когда catchup=True/False, и как тестировать backfill-safety.


Что значит «backfill-safe»

Формальное определение:

DAG backfill-safe, если для любого data_interval=[T1, T2], результат запуска DAG идентичен независимо от wall-clock time запуска.

Примеры:

Backfill-safe:

@task
def aggregate_revenue(data_interval_start, data_interval_end):
    sql = f"""
    INSERT INTO daily_revenue
    SELECT date_trunc('day', created_at), SUM(amount)
    FROM orders
    WHERE created_at >= '{data_interval_start}' AND created_at < '{data_interval_end}'
    GROUP BY 1
    ON CONFLICT (day) DO UPDATE SET amount = EXCLUDED.amount;
    """

Запуск в 2026-05-12 за data_interval=[2026-05-11, 2026-05-12) или backfill в 2027-05-12 за тот же interval — identical SQL, identical result.

НЕ backfill-safe:

@task
def aggregate_revenue():
    from datetime import datetime, timedelta
    end = datetime.now()                          # ❌ wall-clock!
    start = end - timedelta(days=1)
    sql = f"SELECT ... WHERE created_at BETWEEN '{start}' AND '{end}'"

Запуск сейчас → за вчера. Backfill за 2024-01-15 → за вчера от 2027-05-12 (когда backfill запущен), а не за 2024-01-15 как должно быть.


Anti-pattern 1: datetime.now() в task

Самая частая ошибка:

# ❌ ПЛОХО
@task
def fetch_orders():
    today = datetime.now().date()
    yesterday = today - timedelta(days=1)
    sql = f"SELECT * FROM orders WHERE created_date BETWEEN '{yesterday}' AND '{today}'"

Что происходит при backfill для 2024-06-15:

  • Airflow scheduler creates DagRun с execution_date=2024-06-15, data_interval=[2024-06-15, 2024-06-16)
  • Task запускается в 2027-05-12 (wall clock)
  • datetime.now() → 2027-05-12, не 2024-06-15
  • Task fetch-ит данные за 2027-05-12, не за исторический 2024-06-15
  • Backfill дал wrong data

Fix: использовать context переменные:

# ✅ ХОРОШО
@task
def fetch_orders(data_interval_start, data_interval_end):
    sql = f"""
    SELECT * FROM orders
    WHERE created_at >= '{data_interval_start}' AND created_at < '{data_interval_end}'
    """

Или через get_current_context():

@task
def fetch_orders():
    from airflow.operators.python import get_current_context
    ctx = get_current_context()
    start = ctx["data_interval_start"]
    end = ctx["data_interval_end"]
    ...

При backfill для 2024-06-15 task получит data_interval_start=2024-06-15, data_interval_end=2024-06-16 — правильно.


Anti-pattern 2: start_date=datetime.now()

# ❌ КАТАСТРОФА
@dag(
    start_date=datetime.now() - timedelta(days=7),  # ❌
    schedule="@daily",
)

datetime.now() выполняется на каждом DAG parse (~30s). Поведение:

  • Parse в 09:00:00 → start_date=2026-05-05 09:00:00
  • Parse в 09:00:30 → start_date=2026-05-05 09:00:30
  • Scheduler видит non-deterministic start_date — может create runs, потом передумать
  • next_dagrun сдвигается → DagRuns могут пропускаться или дублироваться

Fix: hardcoded date.

# ✅ ХОРОШО
@dag(
    start_date=datetime(2024, 1, 1),  # hardcoded
    schedule="@daily",
)

После change start_date в production — может потребоваться airflow db reset для DAG (drastic) или DagRun cleanup (less drastic).


Anti-pattern 3: catchup=True без понимания

@dag(
    start_date=datetime(2020, 1, 1),
    schedule="@daily"
    catchup=True,  # ❌ — DAG попытается прогнать с 2020!
)

При первом deployment Airflow попытается create DagRuns за каждый день с 2020-01-01 до сейчас. Это:

  • 6 лет × 365 дней = ~2200 DagRuns одновременно
  • Scheduler overload
  • Resource exhaustion
  • Inevitable failures cascade

Rule: для новых DAGs всегда catchup=False. Backfill делайте explicit:

# Explicit backfill за нужный range
airflow dags backfill my_dag \
  --start-date 2024-01-01 \
  --end-date 2024-12-31

Batch-обработка — окна, расписание, идемпотентность

Использование data_interval

data_interval — это центральный concept в Airflow 2.x. Каждый DagRun имеет:

  • data_interval_start — начало временного окна, которое run представляет
  • data_interval_end — конец окна

Для @daily schedule с execution_date=2026-05-12:

  • data_interval_start = 2026-05-12 00:00:00
  • data_interval_end = 2026-05-13 00:00:00

Run для 2026-05-12 обрабатывает данные за 2026-05-12, и запускается в начале 2026-05-13 (когда interval завершён).

Это subtle but critical: execution_date для DAG = data_interval_start, не время запуска. Run для “12 мая” реально запускается ночью с 12 на 13.

data_interval semantics: start_date, data_interval_start/end
@dag(start_date=datetime(2024, 1, 1), schedule='@daily')start_date — hardcoded дата first scheduling window. Не datetime.now() (non-deterministic, ломает DagRun creation). Не должна быть в будущем. Обычно ставится на дату deployment или раньше. Менять start_date в production — drastic operation (требует cleanup DagRuns).
scheduler creates DagRun for each interval
execution_date = 2026-05-12 00:00Marker для DagRun — обычно = data_interval_start. В 2.x это term still used. В 3.x replaced на logical_date. Для @daily schedule: каждый день создаётся DagRun с execution_date в начале window. Это marker, не время запуска.
data_interval_start = 2026-05-12 00:00Начало временного окна которое run представляет. Tasks должны фильтровать data WHERE created_at >= data_interval_start AND created_at < data_interval_end. Это foundation backfill-safety: при backfill за 2024-06-15 получит data_interval_start=2024-06-15, не сегодняшнюю date.
data_interval_end = 2026-05-13 00:00Конец временного окна (exclusive). Для @daily — следующий день в начале. Это момент когда interval 'closes' и run actually starts wall-clock. Tasks используют < data_interval_end (не <=) для избежания overlap с next run.
window completes → wall-clock trigger
actual run starts: 2026-05-13 00:00:30 (wall-clock)Scheduler запускает run после того как data_interval_end достигнут. Это ~30 секунд delay в зависимости от scheduler tick. Run для 'May 12' реально стартует ночью с 12 на 13. Tasks fill data за прошедший интервал.
task receives context
task(data_interval_start, data_interval_end)Best practice: declare params в signature. Airflow auto-injects values. Или get_current_context()['data_interval_start']. ВАЖНО: НЕ datetime.now() — break backfill. Backfill за 2024-06-15 inject 2024-06-15/2024-06-16, не сегодня. Task deterministic относительно interval, независимо от wall-clock.
backfill scenario
airflow dags backfill orders_etl --start-date 2024-03-01 --end-date 2024-05-31Explicit CLI backfill за 3 месяца. Airflow создаёт DagRuns marked run_type='backfill' за каждый scheduled interval. data_interval для каждого — historical. Параллельно до max_active_runs. Backfill-safe DAG → correct historical data. Non-safe (datetime.now()) → catastrophe (current data inserted с historical execution_date).
@task
def process_day(data_interval_start, data_interval_end):
    """Process все события за data interval."""
    sql = f"""
    INSERT INTO daily_metrics
    SELECT
        '{data_interval_start.date()}' AS day,
        COUNT(*) AS event_count
    FROM events
    WHERE event_time >= '{data_interval_start}'
      AND event_time < '{data_interval_end}'
    ON CONFLICT (day) DO UPDATE SET event_count = EXCLUDED.event_count;
    """
    PostgresHook().run(sql)

При backfill за 2024-06-15:

  • data_interval_start = 2024-06-15
  • data_interval_end = 2024-06-16
  • SQL фильтрует events strictly за этот день
  • Result identical wall-clock independent

Backfill semantics — что Airflow гарантирует

При airflow dags backfill my_dag --start-date 2024-01-01 --end-date 2024-01-31:

  1. Airflow создаёт DagRuns за каждый scheduled interval в range
  2. DagRuns marked run_type='backfill'
  3. Запускаются параллельно (до max_active_runs limit)
  4. Каждый DagRun имеет свой data_interval — изолирован

Что Airflow НЕ гарантирует:

  • Порядок выполнения — DagRuns могут запускаться параллельно
  • Уникальность wall-clock time — два runs могут happen одновременно
  • Dependencies между DagRuns — DagRun за 2024-01-15 НЕ ждёт 2024-01-14 unless explicit depends_on_past=True

depends_on_past — для sequential dependencies

Если task A для 2024-01-15 нужен complete A для 2024-01-14 (например, cumulative aggregation):

@task(depends_on_past=True)
def cumulative_total(data_interval_start):
    """Today's cumulative = yesterday's + today's increment."""
    sql = f"""
    INSERT INTO cumulative_totals
    SELECT
        '{data_interval_start.date()}' AS day,
        (SELECT total FROM cumulative_totals
         WHERE day = '{(data_interval_start.date() - timedelta(days=1))}') +
        (SELECT SUM(amount) FROM orders WHERE created_date = '{data_interval_start.date()}')
    """

С depends_on_past=True:

  • Task не запустится пока same task за prior data_interval не достиг success
  • Backfill становится sequential — один-за-другим
  • Slower, но safe для cumulative state

Caveat: depends_on_past может deadlock backfill если old runs missing. Better — design tasks так чтобы не нужен dependency on past (recompute from scratch когда возможно).


Anti-pattern 4: External state changing over time

@task
def fetch_top_products():
    """Use external API which returns 'top products today'."""
    response = requests.get("https://api.example.com/top-products")
    top_products = response.json()
    # ❌ API возвращает today's top products, не historical
    # Backfill за 2024-06-15 получит top products on day backfill running, не historical

Если backfill требует historical state — нужен:

  • API endpoint с date parameter: https://api.example.com/top-products?date={data_interval_start}
  • Local historical snapshot (loaded once, queried by date)
  • Иначе — task fundamentally not backfill-safe

Тестирование backfill-safety

# tests/integration/test_backfill_safe.py
import subprocess

def test_backfill_deterministic():
    """Run task для historical date — verify it pulls historical data."""
    # Setup test data — orders за 2024-06-15
    setup_test_data_for_date("2024-06-15")

    # Run task для 2024-06-15
    result = subprocess.run(
        ["airflow", "tasks", "test", "orders_etl", "aggregate_revenue", "2024-06-15"],
        capture_output=True, text=True,
    )
    assert result.returncode == 0

    # Verify aggregation correct for that date
    actual = query_db("SELECT amount FROM daily_revenue WHERE day = '2024-06-15'")
    expected = sum(test_data_amounts_for("2024-06-15"))
    assert actual == expected

Или через airflow dags test:

def test_full_dag_backfill():
    """Run full DAG за 2024-06-15 — все tasks должны быть backfill-safe."""
    result = subprocess.run(
        ["airflow", "dags", "test", "orders_etl", "2024-06-15"],
        capture_output=True, text=True,
    )
    assert result.returncode == 0
    # Verify state correctly reflects 2024-06-15 data

Backfill best practices

1. Idempotent + deterministic = backfill-safe

Если task идемпотентен (модуль 17.02) и deterministic от data_interval — он автоматически backfill-safe.

2. Используйте deterministic paths

# Path определён data_interval — не wall-clock
s3_path = f"s3://datalake/orders/dt={data_interval_start.date()}/data.parquet"

3. catchup=False для новых DAGs

Backfill — explicit operation через CLI, не implicit при deploy.

4. max_active_runs для backfill control

@dag(max_active_runs=3, ...)

Без limit backfill за 365 дней запустит 365 DagRuns параллельно. С max_active_runs=3 — 3 одновременно, queue для остальных.

5. pool для resource limit

Если DAG использует limited resource (Spark cluster, API rate limit) — assign pool:

@task(pool="api_calls", pool_slots=1)
def call_external_api():
    ...

Backfill 365 runs — все запросы соберутся в pool, обработаются по очереди (до slots paralelizm).


Production gotchas

Timezone awareness. data_interval_start всегда UTC в Airflow 2.x. Если ваши данные в local timezone — convert explicitly. Иначе off-by-one bugs.

Schedule с irregular intervals. @hourly или cron 0 9 * * * — regular, easy. Business calendar (только weekdays, exclude holidays) — irregular. При backfill irregular schedule Airflow создаёт runs только для valid intervals. Test thoroughly.

end_date для DAG. Если DAG имеет end_date=datetime(2024, 12, 31), backfill за 2025 silently не создаст runs. Check end_date не expired.

Variable.get для config — может ломать backfill. Если task делает Variable.get('some_threshold') — value возможно изменилось с 2024-06-15. Если threshold matters для historical computation — use config table с date column instead Variable.

Partial data warnings. Backfill в реальном времени (run за yesterday) — данные могут быть incomplete (late-arriving events). Add validation: row count за data_interval >= expected_min, иначе fail задачу.

OpenLineage в backfill — генерируется normally, marked с run_type=backfill. Helps trace what was backfilled.


Проверка знанийKnowledge check
Production DAG `orders_etl` running 6 months использует `datetime.now()` в одном из tasks для определения date range. Сейчас понадобилось backfill за 3 месяца (старый bug fixed). Какие шаги нужны и какие risks?
ОтветAnswer
Backfill 3 months с non-deterministic DAG — minefield. Steps + risks: (1) **STOP — не backfill пока не fix code**. Если запустить backfill сейчас, task для каждой historical date выполнится с datetime.now() = wall-clock, обработает CURRENT data, INSERT/UPDATE с historical execution_date. Result: 90 days × inserts с identical content → массовая corruption данных. (2) **Fix code** — replace `datetime.now()` на `data_interval_start` / `data_interval_end` from context. Use `get_current_context()` или signature `def task(data_interval_start, data_interval_end)`. (3) **Deploy fix** через standard CI/CD — verify в staging что fix работает на historical date (airflow tasks test orders_etl my_task 2024-03-15 — should query data for 2024-03-15, not now). (4) **Verify idempotency** — backfill будет overwrite (надеемся). Если используется INSERT — придётся cleanup duplicates first. Check для current ON CONFLICT clause. (5) **Test backfill на single date** перед full range — `airflow tasks test orders_etl my_task 2024-03-01`, verify данные за 2024-03-01 specifically. (6) **Run backfill с max_active_runs limit** — `airflow dags backfill orders_etl --start-date 2024-03-01 --end-date 2024-05-31 --max-active-runs 5` — не overwhelm scheduler. (7) **Monitor**: alert на failed TI, scheduler.scheduler_loop_duration spike, pool exhaustion. (8) **Verify**: spot-check 5-10 dates manually — daily_revenue для 2024-03-15 должно match orders в orders table за 2024-03-15. (9) **Document**: blameless postmortem — почему `datetime.now()` дожило 6 months? Add static check в CI (custom ruff rule или grep 'datetime.now()' в dags/). Add test_backfill_deterministic. Long-term: каждый new DAG обязательно review для backfill-safety перед production deploy. Cost lesson: 6 months bug → days of recovery. Один lint rule + один unit test предотвратили бы всё.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Task использует `datetime.now()` для определения date range. Какая проблема при backfill?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8