Triggerer и Deferrable Operators — обзор модуля
Deferrable operators (AIP-40, 2.2+) — фундаментальное улучшение для долгоживущих sensors. Раньше sensor в poke mode блокировал worker slot на часы, или в reschedule mode попадал в queue каждые N секунд. Deferrable — async event loop держит тысячи triggers без блокировки worker.
Уроки модуля
| # | Урок | Что внутри |
|---|---|---|
| 01 | Обзор модуля | Текущий урок |
| 02 | Triggerer architecture | asyncio event loop, single process, HA через SKIP LOCKED |
| 03 | BaseTrigger anatomy | serialize(), run() (async gen), TriggerEvent |
| 04 | Custom deferrable operator | Conversion of sync sensor → async, real-world example |
| 05 | TaskSuccessEvent (2.10+) | Триггер завершает task без возврата на worker |
| 06 | HA triggerer | Multiple triggerer instances, queue separation |
Ключевые концепты
Lifecycle deferrable task
- Task в worker вызывает
self.defer(trigger=MyTrigger(...), method_name="execute_complete"). - Worker сериализует trigger → пишет в таблицу
trigger→ TI state=deferred → освобождает slot. - Triggerer берёт trigger (
SELECT ... FOR UPDATE SKIP LOCKED— multiple triggerer-ов работают параллельно). - asyncio loop запускает
trigger.run()— async generator. - Trigger yields
TriggerEvent→ triggerer пишет event в БД, TI →scheduled. - Worker подбирает task заново, вызывает
execute_complete(event).
Custom Trigger пример
class DateTimeTrigger(BaseTrigger):
def __init__(self, moment):
self.moment = moment
def serialize(self):
return ("path.to.DateTimeTrigger", {"moment": self.moment})
async def run(self):
while self.moment > utcnow():
await asyncio.sleep(1) # NB: asyncio, не time.sleep
yield TriggerEvent(self.moment)
Scaling сравнение
| Sensor mode | 1 worker slot, 1 sensor | 1000 sensors |
|---|---|---|
poke | 1 worker slot занят | 1000 worker slots занято — катастрофа |
reschedule | 0 worker slot между | 1000 ti в task_reschedule table, постоянный churn |
deferred | 0 worker slot, 1 trigger в asyncio | 1 triggerer process, 1000 active triggers — норм |
Killer takeaway
Любой sensor с timeout > 1 час должен быть deferrable=True. Это фундаментальное правило production Airflow. Triggerer asyncio scale handles 5000+ одновременных triggers на одном процессе.