Custom Timetables (AIP-39, 2.2+)
Cron-выражения покрывают 95% use cases расписаний, но иногда вам нужно что-то более sophisticated: «запускать DAG только в business days, кроме holidays», «следовать fiscal calendar (April-March)», «бежать каждые 4 часа кроме maintenance window». Custom Timetables (AIP-39, появились в Airflow 2.2) позволяют это.
Этот урок — детальное препарирование Timetable interface, реализация production-grade custom timetables и примеры из реального мира.
Batch-обработка — окна, расписание, идемпотентность
Что такое Timetable
В Airflow 2.2 introduced abstraction Timetable — класс, который scheduler спрашивает: «когда должен быть следующий DagRun?» и «какой data interval он представляет?». Это заменяет старый schedule_interval (deprecated в 2.2+, удалён в 3.x).
Каждое значение schedule под капотом конвертируется в Timetable:
# schedule="@daily" → CronTimetable
# schedule=timedelta(hours=1) → DeltaTimetable
# schedule="0 9 * * 1-5" → CronTimetable
# schedule=None → NullTimetable
# schedule=[my_dataset] → DatasetTriggeredTimetable
# schedule=MyCustomTimetable() → MyCustomTimetable
Timetable interface
airflow.timetables.base.Timetable — abstract base:
from airflow.timetables.base import Timetable, DagRunInfo, DataInterval, TimeRestriction
from pendulum import DateTime
class Timetable:
"""Abstract Timetable interface."""
description: str = "" # Show in UI
periodic: bool = True # True для regular schedule, False для event-based
can_be_scheduled: bool = True # False для DatasetTriggered
def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
"""For manual runs — what data interval should this run represent?"""
def next_dagrun_info(
self,
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
"""The main method — when next DagRun should run, what interval?"""
def serialize(self) -> dict:
"""Serialize для DB storage."""
@classmethod
def deserialize(cls, data: dict) -> "Timetable":
"""Deserialize из DB."""
Концепции
DataInterval(start, end)— какой временной период данные представляет этот DagRunDagRunInfo(run_after, data_interval)— когда scheduler должен create этот run + intervalTimeRestriction(earliest, latest, catchup)— bounds от DAG (start_date,end_date,catchup)
Differences: data_interval vs execution_date
В Airflow 2.x было два понятия времени:
execution_date(legacy 1.x semantic) — start момент interval, использовался как labeldata_interval_start/data_interval_end— точные границы interval (2.2+)
Для @daily DAG, который запускается 2026-05-13 00:00:00:
execution_date=2026-05-13 00:00:00(legacy alias)data_interval_start=2026-05-13 00:00:00data_interval_end=2026-05-14 00:00:00
Этот run обрабатывает данные за 13 мая (полный день). Запуск 14 мая (next interval start).
В 3.x execution_date deprecated в пользу logical_date (alias для data_interval_start).
Простой пример: каждую субботу в 02:00
from pendulum import DateTime, Duration
from airflow.timetables.base import Timetable, DagRunInfo, DataInterval, TimeRestriction
class SaturdayTimetable(Timetable):
"""Запускать каждую субботу в 02:00."""
description = "On Saturdays at 02:00 UTC"
def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
# Для manual run — previous week ending Saturday morning
return DataInterval(
start=run_after - Duration(days=7),
end=run_after,
)
def next_dagrun_info(
self,
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
# Find next Saturday 02:00 after last run или start_date
if last_automated_data_interval is None:
# First run — from start_date
base = restriction.earliest
else:
base = last_automated_data_interval.end
# Compute next Saturday 02:00 UTC
days_until_saturday = (5 - base.weekday()) % 7
if days_until_saturday == 0 and base.hour >= 2:
days_until_saturday = 7
next_saturday = base.replace(hour=2, minute=0, second=0, microsecond=0).add(
days=days_until_saturday
)
# Check restriction
if restriction.latest and next_saturday > restriction.latest:
return None
return DagRunInfo.interval(
start=next_saturday - Duration(days=7),
end=next_saturday,
)
Использование в DAG:
@dag(
schedule=SaturdayTimetable(),
start_date=datetime(2026, 1, 1),
catchup=False,
)
def weekly_report(): ...
Production пример: business days excluding holidays
Самый частый use case — запускать только в рабочие дни, исключая holidays:
from pendulum import DateTime, Duration
from datetime import datetime, date
from airflow.timetables.base import Timetable, DagRunInfo, DataInterval, TimeRestriction
HOLIDAYS_2026 = {
date(2026, 1, 1), # New Year
date(2026, 1, 7), # Christmas (RU)
date(2026, 2, 23), # Defender of Fatherland Day
date(2026, 3, 8), # Women's Day
date(2026, 5, 1), # Labour Day
date(2026, 5, 9), # Victory Day
date(2026, 6, 12), # Russia Day
date(2026, 11, 4), # Unity Day
# ... can load from external service
}
class BusinessDayTimetable(Timetable):
"""Запускать в каждый business day в 06:00."""
description = "On business days (Mon-Fri excl. holidays) at 06:00 UTC"
def __init__(self, hour: int = 6, minute: int = 0):
self.hour = hour
self.minute = minute
def _is_business_day(self, d: DateTime) -> bool:
return d.weekday() < 5 and d.date() not in HOLIDAYS_2026
def _next_business_day(self, after: DateTime) -> DateTime:
candidate = after.replace(hour=self.hour, minute=self.minute, second=0, microsecond=0)
if candidate <= after:
candidate = candidate.add(days=1)
# Skip weekends and holidays
while not self._is_business_day(candidate):
candidate = candidate.add(days=1)
return candidate
def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
# Previous business day morning to this morning
prev_day = run_after - Duration(days=1)
while not self._is_business_day(prev_day):
prev_day = prev_day - Duration(days=1)
return DataInterval(start=prev_day, end=run_after)
def next_dagrun_info(
self,
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
base = (
last_automated_data_interval.end
if last_automated_data_interval is not None
else restriction.earliest
)
if base is None:
return None
next_run = self._next_business_day(base)
if restriction.latest and next_run > restriction.latest:
return None
# Data interval — от предыдущего business day до next_run
prev_business = next_run - Duration(days=1)
while not self._is_business_day(prev_business):
prev_business = prev_business - Duration(days=1)
return DagRunInfo.interval(
start=prev_business,
end=next_run,
)
def serialize(self) -> dict:
return {"hour": self.hour, "minute": self.minute}
@classmethod
def deserialize(cls, data: dict) -> "BusinessDayTimetable":
return cls(hour=data["hour"], minute=data["minute"])
Использование:
@dag(
schedule=BusinessDayTimetable(hour=6, minute=0),
start_date=datetime(2026, 1, 1),
catchup=False,
)
def daily_revenue_report(): ...
Регистрация custom Timetable как plugin
Чтобы Airflow распознал custom Timetable, нужно зарегистрировать его через plugin:
# plugins/custom_timetables.py
from airflow.plugins_manager import AirflowPlugin
from my_module.timetables import BusinessDayTimetable, SaturdayTimetable
class CustomTimetablePlugin(AirflowPlugin):
name = "custom_timetable_plugin"
timetables = [BusinessDayTimetable, SaturdayTimetable]
Положить в $AIRFLOW_HOME/plugins/ или регистрировать через entry points в provider package.
После этого DagFileProcessor может deserialize эти Timetable из БД (после restart serialized_dag).
Использование AssetTriggeredTimetable
С Airflow 2.4+ можно совместить cron и dataset-based scheduling через AssetOrTimeSchedule (старое название DatasetOrTimeSchedule):
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
from airflow import Dataset
dataset = Dataset("s3://bucket/source-data")
@dag(
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 6 * * *", timezone="UTC"),
datasets=[dataset],
),
start_date=datetime(2026, 1, 1),
catchup=False,
)
def my_dag(): ...
DAG запускается либо при cron (каждый день в 06:00), либо при update dataset.
Reuse via built-in timetables
Airflow ships с несколькими полезными built-ins:
| Timetable | Использование |
|---|---|
CronTimetable | Standard cron syntax — default для strings |
CronTriggerTimetable | Cron с пользователь-friendly timezone handling |
DeltaDataIntervalTimetable | timedelta-based с правильным data_interval |
EventsTimetable (2.5+) | По list of explicit datetime events |
DeltaTriggerTimetable (2.11+) | Trigger-based на delta |
ContinuousTimetable | Run continuously (для special use cases) |
EventsTimetable пример
from airflow.timetables.events import EventsTimetable
from pendulum import datetime as pdt
@dag(
schedule=EventsTimetable(
event_dates=[
pdt(2026, 1, 1), # New Year report
pdt(2026, 4, 1), # Q1 report
pdt(2026, 7, 1), # H1 report
pdt(2026, 10, 1), # Q3 report
],
description="Quarterly reports",
),
start_date=datetime(2026, 1, 1),
catchup=False,
)
def quarterly_report(): ...
Запускается ровно в указанные dates.
Тестирование Timetable
Best practice — unit-test Timetable отдельно от DAG:
import pytest
from pendulum import datetime as pdt, timezone
from airflow.timetables.base import TimeRestriction, DataInterval
from my_timetable import BusinessDayTimetable
def test_next_business_day_skips_weekend():
t = BusinessDayTimetable(hour=6)
# Last run на пятницу
last_interval = DataInterval(
start=pdt(2026, 5, 8, 6, tz="UTC"),
end=pdt(2026, 5, 9, 6, tz="UTC"),
)
restriction = TimeRestriction(
earliest=pdt(2026, 1, 1, tz="UTC"),
latest=None,
catchup=False,
)
info = t.next_dagrun_info(last_interval, restriction)
# Should skip Sat (May 9) and Sun (May 10) → Monday May 11
assert info.run_after == pdt(2026, 5, 11, 6, tz="UTC")
def test_skips_holidays():
t = BusinessDayTimetable(hour=6)
# Last run на 4 января 2026 (воскресенье)
last_interval = DataInterval(
start=pdt(2026, 1, 3, 6, tz="UTC"),
end=pdt(2026, 1, 4, 6, tz="UTC"),
)
restriction = TimeRestriction(
earliest=pdt(2026, 1, 1, tz="UTC"),
latest=None,
catchup=False,
)
info = t.next_dagrun_info(last_interval, restriction)
# Next: 5 января — но это понедельник, проверим что не holiday
# (в нашем HOLIDAYS_2026 7 января — Christmas RU)
# Так что 5 января должен быть business day
assert info.run_after == pdt(2026, 1, 5, 6, tz="UTC")
Production gotchas
1. Serialization
DAG serialization требует, чтобы Timetable.serialize() / deserialize() работали безошибочно. Custom Timetable с complex state может ломать parsing.
Best practice: keep state simple (str, int, list of primitives). Don’t store callable functions.
2. Holidays — внешний источник
Hardcoded set в коде — не масштабируется. Production: загружать holidays из:
holidaysPython package (community-maintained)- Internal calendar service (API call в
__init__) - Database table (Airflow Connection + query на parse)
⚠️ API call в __init__ будет hit на каждом DagBag refresh — cache aggressively.
3. Catchup и custom Timetable
Если catchup=True и custom Timetable, scheduler будет создавать все missing runs с начала. Если ваш Timetable создаёт много runs (например, business days за год = 250 runs) — будет storm.
Fix: catchup=False или max_active_runs ограничение.
4. Timezones
Custom Timetable должен правильно обрабатывать timezone-aware datetimes. Pendulum (Airflow uses it) — recommended.
import pendulum
dt = pendulum.datetime(2026, 5, 13, tz="Europe/Moscow")
Не используйте datetime.now() или datetime(...) без tzinfo — это вызовет ошибки validation.