Learning Platform
Глоссарий Troubleshooting
Урок 03.04 · 24 мин
Продвинутый
TimetablesAIP-39ScheduleCustom

Custom Timetables (AIP-39, 2.2+)

Cron-выражения покрывают 95% use cases расписаний, но иногда вам нужно что-то более sophisticated: «запускать DAG только в business days, кроме holidays», «следовать fiscal calendar (April-March)», «бежать каждые 4 часа кроме maintenance window». Custom Timetables (AIP-39, появились в Airflow 2.2) позволяют это.

Этот урок — детальное препарирование Timetable interface, реализация production-grade custom timetables и примеры из реального мира.

Timetable interaction с scheduler
Scheduler main loopSchedulerJob каждый tick (default 5s) проходит по active DAGs и для каждого решает — нужно ли создать новый DagRun. Для этого вызывает timetable.next_dagrun_info().
next_dagrun_info(last_interval, restriction)
Timetable.next_dagrun_info()Главный метод. Принимает last_automated_data_interval (когда был последний run) и TimeRestriction (start_date, end_date, catchup). Возвращает DagRunInfo или None если runs больше не нужны.
DagRunInfoСтруктура с двумя полями: run_after (когда scheduler должен создать DagRun) и data_interval (DataInterval start, end — какие данные этот run обрабатывает). Scheduler ждёт до run_after момента.
DataInterval(start, end)Временное окно которое представляет этот DagRun. Для @daily: start=date 00:00, end=date+1 00:00. Доступно в task через context['data_interval_start'] и data_interval_end.
когда now() >= run_after
Scheduler creates DagRunINSERT в dag_run table: dag_id, run_id, data_interval_start/end, state='running'. Затем создаются TaskInstance rows для всех tasks DAG. Tasks становятся eligible для scheduling.

Batch-обработка — окна, расписание, идемпотентность

Что такое Timetable

В Airflow 2.2 introduced abstraction Timetable — класс, который scheduler спрашивает: «когда должен быть следующий DagRun?» и «какой data interval он представляет?». Это заменяет старый schedule_interval (deprecated в 2.2+, удалён в 3.x).

Каждое значение schedule под капотом конвертируется в Timetable:

# schedule="@daily" → CronTimetable
# schedule=timedelta(hours=1) → DeltaTimetable
# schedule="0 9 * * 1-5" → CronTimetable
# schedule=None → NullTimetable
# schedule=[my_dataset] → DatasetTriggeredTimetable
# schedule=MyCustomTimetable() → MyCustomTimetable

Timetable interface

airflow.timetables.base.Timetable — abstract base:

from airflow.timetables.base import Timetable, DagRunInfo, DataInterval, TimeRestriction
from pendulum import DateTime

class Timetable:
    """Abstract Timetable interface."""

    description: str = ""           # Show in UI
    periodic: bool = True           # True для regular schedule, False для event-based
    can_be_scheduled: bool = True   # False для DatasetTriggered

    def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
        """For manual runs — what data interval should this run represent?"""

    def next_dagrun_info(
        self,
        last_automated_data_interval: DataInterval | None,
        restriction: TimeRestriction,
    ) -> DagRunInfo | None:
        """The main method — when next DagRun should run, what interval?"""

    def serialize(self) -> dict:
        """Serialize для DB storage."""

    @classmethod
    def deserialize(cls, data: dict) -> "Timetable":
        """Deserialize из DB."""

Концепции

  • DataInterval(start, end) — какой временной период данные представляет этот DagRun
  • DagRunInfo(run_after, data_interval) — когда scheduler должен create этот run + interval
  • TimeRestriction(earliest, latest, catchup) — bounds от DAG (start_date, end_date, catchup)

Differences: data_interval vs execution_date

В Airflow 2.x было два понятия времени:

  • execution_date (legacy 1.x semantic) — start момент interval, использовался как label
  • data_interval_start / data_interval_end — точные границы interval (2.2+)

Для @daily DAG, который запускается 2026-05-13 00:00:00:

  • execution_date = 2026-05-13 00:00:00 (legacy alias)
  • data_interval_start = 2026-05-13 00:00:00
  • data_interval_end = 2026-05-14 00:00:00

Этот run обрабатывает данные за 13 мая (полный день). Запуск 14 мая (next interval start).

В 3.x execution_date deprecated в пользу logical_date (alias для data_interval_start).


Простой пример: каждую субботу в 02:00

from pendulum import DateTime, Duration
from airflow.timetables.base import Timetable, DagRunInfo, DataInterval, TimeRestriction

class SaturdayTimetable(Timetable):
    """Запускать каждую субботу в 02:00."""

    description = "On Saturdays at 02:00 UTC"

    def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
        # Для manual run — previous week ending Saturday morning
        return DataInterval(
            start=run_after - Duration(days=7),
            end=run_after,
        )

    def next_dagrun_info(
        self,
        last_automated_data_interval: DataInterval | None,
        restriction: TimeRestriction,
    ) -> DagRunInfo | None:
        # Find next Saturday 02:00 after last run или start_date
        if last_automated_data_interval is None:
            # First run — from start_date
            base = restriction.earliest
        else:
            base = last_automated_data_interval.end

        # Compute next Saturday 02:00 UTC
        days_until_saturday = (5 - base.weekday()) % 7
        if days_until_saturday == 0 and base.hour >= 2:
            days_until_saturday = 7
        next_saturday = base.replace(hour=2, minute=0, second=0, microsecond=0).add(
            days=days_until_saturday
        )

        # Check restriction
        if restriction.latest and next_saturday > restriction.latest:
            return None

        return DagRunInfo.interval(
            start=next_saturday - Duration(days=7),
            end=next_saturday,
        )

Использование в DAG:

@dag(
    schedule=SaturdayTimetable(),
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def weekly_report(): ...

Production пример: business days excluding holidays

Самый частый use case — запускать только в рабочие дни, исключая holidays:

from pendulum import DateTime, Duration
from datetime import datetime, date
from airflow.timetables.base import Timetable, DagRunInfo, DataInterval, TimeRestriction

HOLIDAYS_2026 = {
    date(2026, 1, 1),   # New Year
    date(2026, 1, 7),   # Christmas (RU)
    date(2026, 2, 23),  # Defender of Fatherland Day
    date(2026, 3, 8),   # Women's Day
    date(2026, 5, 1),   # Labour Day
    date(2026, 5, 9),   # Victory Day
    date(2026, 6, 12),  # Russia Day
    date(2026, 11, 4),  # Unity Day
    # ... can load from external service
}

class BusinessDayTimetable(Timetable):
    """Запускать в каждый business day в 06:00."""

    description = "On business days (Mon-Fri excl. holidays) at 06:00 UTC"

    def __init__(self, hour: int = 6, minute: int = 0):
        self.hour = hour
        self.minute = minute

    def _is_business_day(self, d: DateTime) -> bool:
        return d.weekday() < 5 and d.date() not in HOLIDAYS_2026

    def _next_business_day(self, after: DateTime) -> DateTime:
        candidate = after.replace(hour=self.hour, minute=self.minute, second=0, microsecond=0)
        if candidate <= after:
            candidate = candidate.add(days=1)

        # Skip weekends and holidays
        while not self._is_business_day(candidate):
            candidate = candidate.add(days=1)

        return candidate

    def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
        # Previous business day morning to this morning
        prev_day = run_after - Duration(days=1)
        while not self._is_business_day(prev_day):
            prev_day = prev_day - Duration(days=1)
        return DataInterval(start=prev_day, end=run_after)

    def next_dagrun_info(
        self,
        last_automated_data_interval: DataInterval | None,
        restriction: TimeRestriction,
    ) -> DagRunInfo | None:
        base = (
            last_automated_data_interval.end
            if last_automated_data_interval is not None
            else restriction.earliest
        )

        if base is None:
            return None

        next_run = self._next_business_day(base)

        if restriction.latest and next_run > restriction.latest:
            return None

        # Data interval — от предыдущего business day до next_run
        prev_business = next_run - Duration(days=1)
        while not self._is_business_day(prev_business):
            prev_business = prev_business - Duration(days=1)

        return DagRunInfo.interval(
            start=prev_business,
            end=next_run,
        )

    def serialize(self) -> dict:
        return {"hour": self.hour, "minute": self.minute}

    @classmethod
    def deserialize(cls, data: dict) -> "BusinessDayTimetable":
        return cls(hour=data["hour"], minute=data["minute"])

Использование:

@dag(
    schedule=BusinessDayTimetable(hour=6, minute=0),
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def daily_revenue_report(): ...

Регистрация custom Timetable как plugin

Чтобы Airflow распознал custom Timetable, нужно зарегистрировать его через plugin:

# plugins/custom_timetables.py
from airflow.plugins_manager import AirflowPlugin
from my_module.timetables import BusinessDayTimetable, SaturdayTimetable

class CustomTimetablePlugin(AirflowPlugin):
    name = "custom_timetable_plugin"
    timetables = [BusinessDayTimetable, SaturdayTimetable]

Положить в $AIRFLOW_HOME/plugins/ или регистрировать через entry points в provider package.

После этого DagFileProcessor может deserialize эти Timetable из БД (после restart serialized_dag).


Использование AssetTriggeredTimetable

С Airflow 2.4+ можно совместить cron и dataset-based scheduling через AssetOrTimeSchedule (старое название DatasetOrTimeSchedule):

from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
from airflow import Dataset

dataset = Dataset("s3://bucket/source-data")

@dag(
    schedule=DatasetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 6 * * *", timezone="UTC"),
        datasets=[dataset],
    ),
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def my_dag(): ...

DAG запускается либо при cron (каждый день в 06:00), либо при update dataset.


Reuse via built-in timetables

Airflow ships с несколькими полезными built-ins:

TimetableИспользование
CronTimetableStandard cron syntax — default для strings
CronTriggerTimetableCron с пользователь-friendly timezone handling
DeltaDataIntervalTimetabletimedelta-based с правильным data_interval
EventsTimetable (2.5+)По list of explicit datetime events
DeltaTriggerTimetable (2.11+)Trigger-based на delta
ContinuousTimetableRun continuously (для special use cases)

EventsTimetable пример

from airflow.timetables.events import EventsTimetable
from pendulum import datetime as pdt

@dag(
    schedule=EventsTimetable(
        event_dates=[
            pdt(2026, 1, 1),   # New Year report
            pdt(2026, 4, 1),   # Q1 report
            pdt(2026, 7, 1),   # H1 report
            pdt(2026, 10, 1),  # Q3 report
        ],
        description="Quarterly reports",
    ),
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def quarterly_report(): ...

Запускается ровно в указанные dates.


Тестирование Timetable

Best practice — unit-test Timetable отдельно от DAG:

import pytest
from pendulum import datetime as pdt, timezone
from airflow.timetables.base import TimeRestriction, DataInterval
from my_timetable import BusinessDayTimetable

def test_next_business_day_skips_weekend():
    t = BusinessDayTimetable(hour=6)

    # Last run на пятницу
    last_interval = DataInterval(
        start=pdt(2026, 5, 8, 6, tz="UTC"),
        end=pdt(2026, 5, 9, 6, tz="UTC"),
    )

    restriction = TimeRestriction(
        earliest=pdt(2026, 1, 1, tz="UTC"),
        latest=None,
        catchup=False,
    )

    info = t.next_dagrun_info(last_interval, restriction)

    # Should skip Sat (May 9) and Sun (May 10) → Monday May 11
    assert info.run_after == pdt(2026, 5, 11, 6, tz="UTC")

def test_skips_holidays():
    t = BusinessDayTimetable(hour=6)

    # Last run на 4 января 2026 (воскресенье)
    last_interval = DataInterval(
        start=pdt(2026, 1, 3, 6, tz="UTC"),
        end=pdt(2026, 1, 4, 6, tz="UTC"),
    )
    restriction = TimeRestriction(
        earliest=pdt(2026, 1, 1, tz="UTC"),
        latest=None,
        catchup=False,
    )

    info = t.next_dagrun_info(last_interval, restriction)

    # Next: 5 января — но это понедельник, проверим что не holiday
    # (в нашем HOLIDAYS_2026 7 января — Christmas RU)
    # Так что 5 января должен быть business day
    assert info.run_after == pdt(2026, 1, 5, 6, tz="UTC")

Production gotchas

1. Serialization

DAG serialization требует, чтобы Timetable.serialize() / deserialize() работали безошибочно. Custom Timetable с complex state может ломать parsing.

Best practice: keep state simple (str, int, list of primitives). Don’t store callable functions.

2. Holidays — внешний источник

Hardcoded set в коде — не масштабируется. Production: загружать holidays из:

  • holidays Python package (community-maintained)
  • Internal calendar service (API call в __init__)
  • Database table (Airflow Connection + query на parse)

⚠️ API call в __init__ будет hit на каждом DagBag refresh — cache aggressively.

3. Catchup и custom Timetable

Если catchup=True и custom Timetable, scheduler будет создавать все missing runs с начала. Если ваш Timetable создаёт много runs (например, business days за год = 250 runs) — будет storm.

Fix: catchup=False или max_active_runs ограничение.

4. Timezones

Custom Timetable должен правильно обрабатывать timezone-aware datetimes. Pendulum (Airflow uses it) — recommended.

import pendulum

dt = pendulum.datetime(2026, 5, 13, tz="Europe/Moscow")

Не используйте datetime.now() или datetime(...) без tzinfo — это вызовет ошибки validation.


Проверка знанийKnowledge check
Когда стоит писать Custom Timetable, а когда достаточно cron в schedule string?
ОтветAnswer
**Cron достаточно когда**: (1) расписание выражается стандартным cron (5/15/30 минут, час дня, день недели/месяца) — например `0 6 * * *`, `*/15 * * * 1-5`; (2) data_interval совпадает с natural cron interval (hour/day/week); (3) timezone fixed. **Custom Timetable когда**: (1) расписание зависит от business calendar — fiscal year (April-March), business days с holidays, market hours; (2) непрерывные интервалы с irregular gaps (skip Sundays, run every 4h except 02:00-04:00 maintenance); (3) расписание привязано к events (quarterly reports на конкретные dates); (4) сложный data_interval — например 'обработать данные с предыдущего рабочего дня', а не просто 'предыдущий день'. Если можно описать cron-ом — используйте cron. Timetable добавляет complexity (требует unit tests, serialization, может ломать parsing).

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какой главный метод Custom Timetable нужно реализовать?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8