Learning Platform
Глоссарий Troubleshooting
Урок 10.03 · 32 мин
Продвинутый
BaseTriggerTriggerEventasyncioCustom TriggersSerialization

BaseTrigger anatomy — препарирование custom trigger

Чтобы deeply понимать deferrable operators, нужно написать свой trigger. Это даёт точную ментальную модель того, что происходит между self.defer() и execute_complete(), какие contract обязан соблюдать любой trigger, какие подводные камни возникают при serialization и async execution.

Этот урок — препарирование BaseTrigger через создание собственного DateTimeTrigger: trigger, который засыпает до указанного момента времени и эмитит event. Это minimal, но complete example: показывает все 5 обязательных частей custom trigger.


BaseTrigger contract

Любой custom trigger наследует airflow.triggers.base.BaseTrigger:

from airflow.triggers.base import BaseTrigger, TriggerEvent
from typing import Any, AsyncIterator


class BaseTrigger:
    """Контракт минимального trigger."""

    def __init__(self, **kwargs: Any) -> None:
        """Все state-параметры. JSON-serializable."""

    def serialize(self) -> tuple[str, dict[str, Any]]:
        """
        Returns (classpath, kwargs) для DB storage.
        После deserialize __init__ должен получить эти kwargs.
        """
        raise NotImplementedError

    async def run(self) -> AsyncIterator[TriggerEvent]:
        """
        Async generator. Ждёт условие, yield TriggerEvent.
        ОБЯЗАН использовать await asyncio.sleep, не time.sleep.
        """
        raise NotImplementedError

    def __eq__(self, other: object) -> bool:
        """
        Используется для deduplication: одинаковые triggers
        не создаются дважды.
        """
        ...

5 обязательных кусков:

  1. __init__ — принимает все нужные state параметры
  2. serialize() — возвращает tuple (classpath, dict) — JSON-serializable
  3. run() — async generator с реальной логикой
  4. TriggerEvent emission — yield с payload
Generators: PyGenObject и saспендирование stack frame send / throw / close: двусторонняя коммуникация с генератором
  1. (optional) __eq__ — для deduplication

Step 1: minimal DateTimeTrigger

Начнём с простейшего trigger — ждёт указанного momentum времени.

from datetime import datetime, timezone
import asyncio
from airflow.triggers.base import BaseTrigger, TriggerEvent
from typing import Any, AsyncIterator


class DateTimeTrigger(BaseTrigger):
    """Триггер, который засыпает до указанного момента UTC."""

    def __init__(self, moment: datetime) -> None:
        super().__init__()
        if moment.tzinfo is None:
            raise ValueError("moment must be timezone-aware")
        self.moment = moment

    def serialize(self) -> tuple[str, dict[str, Any]]:
        return (
            "my_package.triggers.DateTimeTrigger",
            {"moment": self.moment.isoformat()},
        )

    async def run(self) -> AsyncIterator[TriggerEvent]:
        while self.moment > datetime.now(tz=timezone.utc):
            # КРИТИЧНО: await asyncio.sleep, не time.sleep
            await asyncio.sleep(1)
        yield TriggerEvent(self.moment.isoformat())

Препарируем каждую часть.

__init__

def __init__(self, moment: datetime) -> None:
    super().__init__()
    if moment.tzinfo is None:
        raise ValueError("moment must be timezone-aware")
    self.moment = moment
  • Validate input on construction time — fail fast при misconfiguration. Если caller передал naive datetime, мы сразу падаем; alternative — fail при serialize (поздно).
  • Store как memberself.moment будет восстановлен после deserialization.
  • No I/O в init — конструктор должен быть cheap. Reading config, opening connections — делать в run().

serialize()

def serialize(self) -> tuple[str, dict[str, Any]]:
    return (
        "my_package.triggers.DateTimeTrigger",
        {"moment": self.moment.isoformat()},
    )
  • classpath — full Python import path. Triggerer process делает importlib.import_module() для restore. Должен быть accessible в Python path triggerer-а (важно для Docker images).
  • kwargs — JSON-serializable. Сложные объекты конвертируем в primitives:
    • datetimeisoformat() string
    • Decimalstr()
    • Custom class → dict representation

После serialize Airflow делает:

# JSON dumps в trigger.kwargs (JSONB column)
{"moment": "2026-05-12T15:00:00+00:00"}

При adopt-е triggerer делает:

# Deserialization
import_module(classpath)
cls = getattr(module, class_name)
instance = cls(**kwargs_from_db)
# Внимание: __init__ снова получит "moment" как string!
# Нужно реконструировать обратно:

Это частая ошибка: serialize даёт string, но __init__ ожидает datetime. Решение — handle both в __init__:

def __init__(self, moment: datetime | str) -> None:
    super().__init__()
    if isinstance(moment, str):
        moment = datetime.fromisoformat(moment)
    if moment.tzinfo is None:
        raise ValueError("...")
    self.moment = moment

run()

async def run(self) -> AsyncIterator[TriggerEvent]:
    while self.moment > datetime.now(tz=timezone.utc):
        await asyncio.sleep(1)
    yield TriggerEvent(self.moment.isoformat())
  • Async generator — ключевое слово async def + yield. Это не coroutine, не regular generator. Это async generator — позволяет yield внутри await.
  • Loop — обычно while condition: await asyncio.sleep.
  • await asyncio.sleep(1) — НИКОГДА time.sleep. Это блокирует весь event loop (см. урок 02).
  • yield TriggerEvent — заканчивает trigger. После yield triggerer удаляет trigger row и пробуждает TI.

Payload TriggerEvent может быть любым JSON-serializable объектом:

yield TriggerEvent({
    "status": "success",
    "value": 42,
    "details": {"timestamp": "...", "extra": "..."},
})

Этот payload передастся в execute_complete(event) через next_kwargs в task_instance.


Step 2: Adding state for resumability

Что если triggerer перезапустится посреди ожидания? Trigger будет re-adopted другим triggerer-ом. __init__ получит те же kwargs что и изначально — состояние ожидания сбросится:

T=0:   trigger created, moment=15:00:00
T=10s: triggerer started waiting, now=14:00:10
T=20s: triggerer crashes
T=30s: another triggerer adopts, __init__ same kwargs
       → re-init starts from now=14:00:30
       → still waits until 15:00:00 (correct!)

Для DateTimeTrigger resumability «бесплатна» — мы сравниваем moment > now(), не ведём счётчик. Для других triggers нужно явно сделать state stateless или включить partial progress в serialize.

Counter-example: polling trigger с retry budget:

# BAD: counter теряется при restart triggerer
class PollingTrigger(BaseTrigger):
    def __init__(self, url, max_retries=10):
        self.url = url
        self.max_retries = max_retries
        self.retries_done = 0  # ← это будет 0 при re-adopt

    async def run(self):
        while self.retries_done < self.max_retries:
            ...
            self.retries_done += 1

    def serialize(self):
        return "...", {"url": self.url, "max_retries": self.max_retries}
        # retries_done НЕ сериализуется

Правильно: stateless, использовать time-based budget:

class PollingTrigger(BaseTrigger):
    def __init__(self, url, timeout: timedelta):
        self.url = url
        self.deadline = datetime.now(tz=timezone.utc) + timeout

    async def run(self):
        while datetime.now(tz=timezone.utc) < self.deadline:
            ...
            await asyncio.sleep(60)
        yield TriggerEvent({"status": "timeout"})

self.deadline устанавливается в __init__ (absolute time), serialize-ится, при re-adopt deadline остаётся правильный.


Lifecycle полный walkthrough

Lifecycle деferrable task с custom trigger
T=0: Worker execute()Worker запускает execute() operator-а. Внутри: проверка готовности (например, check S3). Если не готово — вызов self.defer(trigger=DateTimeTrigger(moment), method_name='execute_complete').
self.defer() raises TaskDeferred
T=10ms: Serializeself.defer() вызывает trigger.serialize() → ('package.DateTimeTrigger', {'moment': '2026-05-12T15:00:00+00:00'}). Worker INSERT INTO trigger (classpath, kwargs) VALUES (?, ?). triggerer_id=NULL — trigger 'unowned', ждёт adoption.
T=15ms: TI state updateUPDATE task_instance SET state='deferred', next_method='execute_complete', trigger_id=:trigger_id. Worker slot освобождается. Task в state=deferred — invisible для других scheduler operations, кроме adoption.
up to 1s tick
T=1s: Triggerer adoptTriggerer tick. SELECT * FROM trigger WHERE triggerer_id IS NULL FOR UPDATE SKIP LOCKED LIMIT 100. Adopt batch. importlib.import_module(classpath), cls(**kwargs) → restore DateTimeTrigger object.
asyncio.create_task
T=1s-15:00: run() executesTriggerer создаёт asyncio.Task(trigger.run()). Event loop крутит async generator: проверяет moment > now, await asyncio.sleep(1). Тысячи других triggers крутятся параллельно — cooperative scheduling между ними. CPU usage ~0% потому что просто waits.
moment reached, yield TriggerEvent
T=15:00: emit eventTrigger yields TriggerEvent(payload). Triggerer обрабатывает: UPDATE task_instance SET state='scheduled', next_kwargs={'event': payload}, trigger_id=NULL. DELETE FROM trigger WHERE id=:id. Trigger лишается своей жизни.
scheduler picks scheduled TI
T=15:00+5s: execute_completeScheduler видит TI state=scheduled с next_method. Создаёт TI run на worker. Worker вместо execute() вызывает getattr(operator, next_method)(context, event=next_kwargs['event']) — это execute_complete(context, event). Финальное завершение task.

Use в operator-е

DateTimeTrigger используется внутри operator через self.defer():

from airflow.models import BaseOperator
from airflow.utils.context import Context


class WaitUntilOperator(BaseOperator):
    """Operator, который ждёт до указанного времени."""

    def __init__(self, target_time: datetime, **kwargs):
        super().__init__(**kwargs)
        self.target_time = target_time

    def execute(self, context: Context):
        now = datetime.now(tz=timezone.utc)

        # Если момент уже наступил — не deferring, return immediately
        if now >= self.target_time:
            return f"Time already passed: {self.target_time}"

        # Defer до момента
        self.defer(
            trigger=DateTimeTrigger(moment=self.target_time),
            method_name="execute_complete",
        )

    def execute_complete(self, context: Context, event=None):
        """Called после yield TriggerEvent."""
        return f"Reached moment: {event}"

Использование внутри @dag создаёт task с target_time параметром. При execution: если момент в будущем — defer; triggerer держит trigger; в нужный момент yield event; worker resumes execute_complete.

При execution: 14:00 task starts → defer; 14:00-15:00 triggerer crapuет async wait; 15:00 yield event; 15:00:05 worker resumes execute_complete, returns.


Multiple yields — multi-stage triggers

Trigger может yield несколько events перед завершением. Use case: progress reporting.

class BatchPollingTrigger(BaseTrigger):
    """Polling с progress updates."""

    async def run(self):
        for batch_idx in range(10):
            await asyncio.sleep(60)
            # Intermediate progress (НЕ завершает trigger)
            yield TriggerEvent({
                "status": "progress",
                "batch": batch_idx,
                "completed_pct": (batch_idx + 1) * 10,
            })

        # Финальное event
        yield TriggerEvent({"status": "complete"})
WARNING

В Airflow 2.x первый yield завершает trigger — TI пробуждается, runs execute_complete с первым event. Дальнейшие yields игнорируются (в 2.x). Multi-yield зарезервировано для будущих 3.x features (например, sub-progress reporting через UI). В 2.10/2.11 — yield один раз.


__eq__ и deduplication

Если два operator вызывают defer с одинаковыми trigger-ами — Airflow может deduplicate:

def __eq__(self, other: object) -> bool:
    return (
        isinstance(other, DateTimeTrigger)
        and self.moment == other.moment
    )

def __hash__(self) -> int:
    return hash(("DateTimeTrigger", self.moment))

Это optional, но полезно если вы знаете, что одинаковые triggers могут создаваться часто. Без __eq__ каждый defer создаёт отдельный row в trigger table.


Production-grade trigger — checklist

При написании custom trigger проверьте:

  1. Все state JSON-serializable. Datetime → isoformat, Decimal → str, set → list.
  2. __init__ принимает both raw and deserialized types. Union[datetime, str], проверка isinstance.
  3. serialize() возвращает kwargs, которые __init__ примет. Идемпотентный round-trip.
  4. Все sleeps — await asyncio.sleep. Никаких time.sleep, requests.get, sync DB calls.
  5. Resumability: state не теряется при triggerer restart. Использовать absolute time / external state, не in-memory counters.
  6. Error handling: exceptions в run() обрабатываются — yield TriggerEvent с error info или re-raise (это failures trigger, task получит failed event).
  7. Reasonable polling interval: await asyncio.sleep(1) для precise timing, await asyncio.sleep(60) для external polls. Не sleep(0) (CPU thrash).
  8. classpath доступен в triggerer image. Custom trigger должен быть installed в triggerer Docker image.

Production gotchas

  1. JSON only, не pickle. Airflow использует JSON для trigger kwargs (security + portability). Custom типы должны быть JSON-friendly.

  2. super().__init__() обязателен. BaseTrigger имеет internal state — пропуск вызовет AttributeError.

  3. Imports внутри run() — best practice для heavy deps (aiohttp, asyncpg). Отложенный import экономит memory тысяч triggers.

  4. Testing через pytest-asyncio:

    @pytest.mark.asyncio
    async def test_datetime_trigger():
        trigger = DateTimeTrigger(moment=datetime.now(tz=timezone.utc))
        events = [e async for e in trigger.run()]
        assert len(events) == 1
  5. self.log доступен из BaseTrigger — logs пишутся в triggerer log file, не в task log.

  6. TaskDeferred маскируется bare except. Если operator.execute() ловит except Exception, defer mechanism сломается. Используйте specific catches или re-raise TaskDeferred.


Проверка знанийKnowledge check
Custom trigger PollingHttpTrigger делает: in run(): for i in range(100): response = requests.get(self.url); if response.ok: yield TriggerEvent(...); await asyncio.sleep(60). Какие 2 проблемы и как fix?
ОтветAnswer
Проблема 1: requests.get — SYNC blocking call. Заблокирует весь asyncio loop на duration HTTP request (могут быть секунды). Тысячи других triggers замораживаются. Fix: использовать aiohttp: async with aiohttp.ClientSession() as session: async with session.get(self.url) as response: ... Проблема 2: counter range(100) не сериализуется — при triggerer restart re-adopt начнёт с i=0 заново. Если trigger ловит first response.ok после 50 iterations — restart перезапустит counter. Не критично для idempotent polls (просто повтор), но для time-based budget важно. Fix: использовать absolute deadline: self.deadline = datetime.now() + timeout (set в __init__, serialize), check while datetime.now() < self.deadline. Bonus issue: requests НЕ установлен в minimal Airflow triggerer image — нужен в triggerer Dockerfile.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что возвращает BaseTrigger.serialize()?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6