BaseTrigger anatomy — препарирование custom trigger
Чтобы deeply понимать deferrable operators, нужно написать свой trigger. Это даёт точную ментальную модель того, что происходит между self.defer() и execute_complete(), какие contract обязан соблюдать любой trigger, какие подводные камни возникают при serialization и async execution.
Этот урок — препарирование BaseTrigger через создание собственного DateTimeTrigger: trigger, который засыпает до указанного момента времени и эмитит event. Это minimal, но complete example: показывает все 5 обязательных частей custom trigger.
BaseTrigger contract
Любой custom trigger наследует airflow.triggers.base.BaseTrigger:
from airflow.triggers.base import BaseTrigger, TriggerEvent
from typing import Any, AsyncIterator
class BaseTrigger:
"""Контракт минимального trigger."""
def __init__(self, **kwargs: Any) -> None:
"""Все state-параметры. JSON-serializable."""
def serialize(self) -> tuple[str, dict[str, Any]]:
"""
Returns (classpath, kwargs) для DB storage.
После deserialize __init__ должен получить эти kwargs.
"""
raise NotImplementedError
async def run(self) -> AsyncIterator[TriggerEvent]:
"""
Async generator. Ждёт условие, yield TriggerEvent.
ОБЯЗАН использовать await asyncio.sleep, не time.sleep.
"""
raise NotImplementedError
def __eq__(self, other: object) -> bool:
"""
Используется для deduplication: одинаковые triggers
не создаются дважды.
"""
...
5 обязательных кусков:
__init__— принимает все нужные state параметрыserialize()— возвращает tuple (classpath, dict) — JSON-serializablerun()— async generator с реальной логикой- TriggerEvent emission — yield с payload
- (optional)
__eq__— для deduplication
Step 1: minimal DateTimeTrigger
Начнём с простейшего trigger — ждёт указанного momentum времени.
from datetime import datetime, timezone
import asyncio
from airflow.triggers.base import BaseTrigger, TriggerEvent
from typing import Any, AsyncIterator
class DateTimeTrigger(BaseTrigger):
"""Триггер, который засыпает до указанного момента UTC."""
def __init__(self, moment: datetime) -> None:
super().__init__()
if moment.tzinfo is None:
raise ValueError("moment must be timezone-aware")
self.moment = moment
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
"my_package.triggers.DateTimeTrigger",
{"moment": self.moment.isoformat()},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
while self.moment > datetime.now(tz=timezone.utc):
# КРИТИЧНО: await asyncio.sleep, не time.sleep
await asyncio.sleep(1)
yield TriggerEvent(self.moment.isoformat())
Препарируем каждую часть.
__init__
def __init__(self, moment: datetime) -> None:
super().__init__()
if moment.tzinfo is None:
raise ValueError("moment must be timezone-aware")
self.moment = moment
- Validate input on construction time — fail fast при misconfiguration. Если caller передал naive datetime, мы сразу падаем; alternative — fail при serialize (поздно).
- Store как member —
self.momentбудет восстановлен после deserialization. - No I/O в init — конструктор должен быть cheap. Reading config, opening connections — делать в
run().
serialize()
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
"my_package.triggers.DateTimeTrigger",
{"moment": self.moment.isoformat()},
)
- classpath — full Python import path. Triggerer process делает
importlib.import_module()для restore. Должен быть accessible в Python path triggerer-а (важно для Docker images). - kwargs — JSON-serializable. Сложные объекты конвертируем в primitives:
datetime→isoformat()stringDecimal→str()- Custom class →
dictrepresentation
После serialize Airflow делает:
# JSON dumps в trigger.kwargs (JSONB column)
{"moment": "2026-05-12T15:00:00+00:00"}
При adopt-е triggerer делает:
# Deserialization
import_module(classpath)
cls = getattr(module, class_name)
instance = cls(**kwargs_from_db)
# Внимание: __init__ снова получит "moment" как string!
# Нужно реконструировать обратно:
Это частая ошибка: serialize даёт string, но __init__ ожидает datetime. Решение — handle both в __init__:
def __init__(self, moment: datetime | str) -> None:
super().__init__()
if isinstance(moment, str):
moment = datetime.fromisoformat(moment)
if moment.tzinfo is None:
raise ValueError("...")
self.moment = moment
run()
async def run(self) -> AsyncIterator[TriggerEvent]:
while self.moment > datetime.now(tz=timezone.utc):
await asyncio.sleep(1)
yield TriggerEvent(self.moment.isoformat())
- Async generator — ключевое слово
async def+yield. Это не coroutine, не regular generator. Это async generator — позволяет yield внутри await. - Loop — обычно
while condition: await asyncio.sleep. await asyncio.sleep(1)— НИКОГДАtime.sleep. Это блокирует весь event loop (см. урок 02).- yield TriggerEvent — заканчивает trigger. После yield triggerer удаляет trigger row и пробуждает TI.
Payload TriggerEvent может быть любым JSON-serializable объектом:
yield TriggerEvent({
"status": "success",
"value": 42,
"details": {"timestamp": "...", "extra": "..."},
})
Этот payload передастся в execute_complete(event) через next_kwargs в task_instance.
Step 2: Adding state for resumability
Что если triggerer перезапустится посреди ожидания? Trigger будет re-adopted другим triggerer-ом. __init__ получит те же kwargs что и изначально — состояние ожидания сбросится:
T=0: trigger created, moment=15:00:00
T=10s: triggerer started waiting, now=14:00:10
T=20s: triggerer crashes
T=30s: another triggerer adopts, __init__ same kwargs
→ re-init starts from now=14:00:30
→ still waits until 15:00:00 (correct!)
Для DateTimeTrigger resumability «бесплатна» — мы сравниваем moment > now(), не ведём счётчик. Для других triggers нужно явно сделать state stateless или включить partial progress в serialize.
Counter-example: polling trigger с retry budget:
# BAD: counter теряется при restart triggerer
class PollingTrigger(BaseTrigger):
def __init__(self, url, max_retries=10):
self.url = url
self.max_retries = max_retries
self.retries_done = 0 # ← это будет 0 при re-adopt
async def run(self):
while self.retries_done < self.max_retries:
...
self.retries_done += 1
def serialize(self):
return "...", {"url": self.url, "max_retries": self.max_retries}
# retries_done НЕ сериализуется
Правильно: stateless, использовать time-based budget:
class PollingTrigger(BaseTrigger):
def __init__(self, url, timeout: timedelta):
self.url = url
self.deadline = datetime.now(tz=timezone.utc) + timeout
async def run(self):
while datetime.now(tz=timezone.utc) < self.deadline:
...
await asyncio.sleep(60)
yield TriggerEvent({"status": "timeout"})
self.deadline устанавливается в __init__ (absolute time), serialize-ится, при re-adopt deadline остаётся правильный.
Lifecycle полный walkthrough
Use в operator-е
DateTimeTrigger используется внутри operator через self.defer():
from airflow.models import BaseOperator
from airflow.utils.context import Context
class WaitUntilOperator(BaseOperator):
"""Operator, который ждёт до указанного времени."""
def __init__(self, target_time: datetime, **kwargs):
super().__init__(**kwargs)
self.target_time = target_time
def execute(self, context: Context):
now = datetime.now(tz=timezone.utc)
# Если момент уже наступил — не deferring, return immediately
if now >= self.target_time:
return f"Time already passed: {self.target_time}"
# Defer до момента
self.defer(
trigger=DateTimeTrigger(moment=self.target_time),
method_name="execute_complete",
)
def execute_complete(self, context: Context, event=None):
"""Called после yield TriggerEvent."""
return f"Reached moment: {event}"
Использование внутри @dag создаёт task с target_time параметром. При execution: если момент в будущем — defer; triggerer держит trigger; в нужный момент yield event; worker resumes execute_complete.
При execution: 14:00 task starts → defer; 14:00-15:00 triggerer crapuет async wait; 15:00 yield event; 15:00:05 worker resumes execute_complete, returns.
Multiple yields — multi-stage triggers
Trigger может yield несколько events перед завершением. Use case: progress reporting.
class BatchPollingTrigger(BaseTrigger):
"""Polling с progress updates."""
async def run(self):
for batch_idx in range(10):
await asyncio.sleep(60)
# Intermediate progress (НЕ завершает trigger)
yield TriggerEvent({
"status": "progress",
"batch": batch_idx,
"completed_pct": (batch_idx + 1) * 10,
})
# Финальное event
yield TriggerEvent({"status": "complete"})
В Airflow 2.x первый yield завершает trigger — TI пробуждается, runs execute_complete с первым event. Дальнейшие yields игнорируются (в 2.x). Multi-yield зарезервировано для будущих 3.x features (например, sub-progress reporting через UI). В 2.10/2.11 — yield один раз.
__eq__ и deduplication
Если два operator вызывают defer с одинаковыми trigger-ами — Airflow может deduplicate:
def __eq__(self, other: object) -> bool:
return (
isinstance(other, DateTimeTrigger)
and self.moment == other.moment
)
def __hash__(self) -> int:
return hash(("DateTimeTrigger", self.moment))
Это optional, но полезно если вы знаете, что одинаковые triggers могут создаваться часто. Без __eq__ каждый defer создаёт отдельный row в trigger table.
Production-grade trigger — checklist
При написании custom trigger проверьте:
- Все state JSON-serializable. Datetime → isoformat, Decimal → str, set → list.
__init__принимает both raw and deserialized types.Union[datetime, str], проверкаisinstance.serialize()возвращает kwargs, которые__init__примет. Идемпотентный round-trip.- Все sleeps —
await asyncio.sleep. Никакихtime.sleep,requests.get, sync DB calls. - Resumability: state не теряется при triggerer restart. Использовать absolute time / external state, не in-memory counters.
- Error handling: exceptions в run() обрабатываются — yield TriggerEvent с error info или re-raise (это failures trigger, task получит failed event).
- Reasonable polling interval:
await asyncio.sleep(1)для precise timing,await asyncio.sleep(60)для external polls. Неsleep(0)(CPU thrash). - classpath доступен в triggerer image. Custom trigger должен быть installed в triggerer Docker image.
Production gotchas
-
JSON only, не pickle. Airflow использует JSON для trigger kwargs (security + portability). Custom типы должны быть JSON-friendly.
-
super().__init__()обязателен. BaseTrigger имеет internal state — пропуск вызовет AttributeError. -
Imports внутри
run()— best practice для heavy deps (aiohttp, asyncpg). Отложенный import экономит memory тысяч triggers. -
Testing через
pytest-asyncio:@pytest.mark.asyncio async def test_datetime_trigger(): trigger = DateTimeTrigger(moment=datetime.now(tz=timezone.utc)) events = [e async for e in trigger.run()] assert len(events) == 1 -
self.logдоступен из BaseTrigger — logs пишутся в triggerer log file, не в task log. -
TaskDeferredмаскируется bare except. Если operator.execute() ловитexcept Exception, defer mechanism сломается. Используйте specific catches или re-raise TaskDeferred.