Triggerer architecture — asyncio event loop и HA
Triggerer — один из самых недооценённых компонентов Airflow. Если scheduler и worker знакомы любому Airflow user, то triggerer часто остаётся «чёрной коробкой» — что-то крутится в фоне, держит deferrable triggers. Этот урок препарирует triggerer до строк кода: что это за процесс, как работает asyncio event loop, как несколько triggerer-ов координируются через PostgreSQL без consensus protocols.
Понимание архитектуры triggerer критично для production: неправильное deployment приводит к stuck triggers (timeout watchers вечны), потере events (crash без graceful shutdown), невозможности scale event-driven workloads.
Что такое triggerer
Triggerer — это отдельный Airflow process, запускаемый командой airflow triggerer. Его ответственность: исполнять deferrable triggers — асинхронные «ожидатели событий», вынесенные из worker-а.
Минимальное представление о месте triggerer в архитектуре:
| Process | Что делает | Concurrency model |
|---|---|---|
| Scheduler | Создаёт DagRuns, переводит TI в queued state | Single-threaded event loop с DB lock |
| Worker (Celery/K8s) | Запускает execute() операторов в slot | Process per task |
| Triggerer | Держит triggers в memory, async-ожидает events | Single-threaded asyncio, тысячи concurrent triggers |
| DAG Processor | Парсит DAG-файлы, обновляет serialized_dag | Multiple processes |
| Webserver | UI + REST API | Gunicorn workers |
Triggerer не запускает task code. Он запускает только trigger.run() методы — асинхронные generators, которые ожидают условие и yield TriggerEvent.
Зачем нужен triggerer
До deferrable operators (AIP-40, 2.2+) sensor с long timeout блокировал worker slot. Сценарий:
# Worker slot занят 4 часа, нет CPU работы — просто ожидание
S3KeySensor(
task_id="wait_for_data"
bucket_key="s3://bucket/{{ ds }}/data.parquet"
timeout=14400, # 4 hours
poke_interval=60, # check каждую минуту
mode="poke", # worker slot занят все 4 часа
)
Если у вас 100 таких sensors одновременно — 100 worker slots впустую заняты. На Celery cluster с 50 workers это catastrophe: фактическая capacity 0.
Альтернативы до deferrable:
mode="reschedule"— sensor освобождает slot между poke, но создаёт row вtask_rescheduletable → DB churn, scheduler overhead на каждый retry.- Custom polling DAGs — sensor реализован как отдельный hourly DAG → complex, hard to maintain.
Deferrable решает radically: sensor передаёт ожидание triggerer-у через self.defer(), освобождает slot, ждёт TriggerEvent. На triggerer-е тысячи таких ожиданий крутятся в одном asyncio loop практически без overhead.
asyncio — event loop overview
Anatomy asyncio loop в triggerer
Под капотом triggerer — простой Python script, который запускает один asyncio event loop в main thread:
# Псевдокод TriggererJobRunner._run_trigger_loop
async def _run_trigger_loop(self):
self.task_set: set[asyncio.Task] = set()
while not self.is_shutdown:
# 1. Загрузить новые triggers из БД
new_triggers = await self.load_triggers()
for trigger_orm in new_triggers:
# 2. Восстановить trigger из serialized state
trigger_obj = self.deserialize(trigger_orm)
# 3. Запустить asyncio task для каждого
task = asyncio.create_task(
self._run_trigger(trigger_obj, trigger_orm.id)
)
self.task_set.add(task)
# 4. Cleanup завершённых
done = {t for t in self.task_set if t.done()}
self.task_set -= done
await asyncio.sleep(1) # tick interval
async def _run_trigger(self, trigger, trigger_id):
try:
async for event in trigger.run():
# 5. Сохранить event в БД, разбудить TI
await self.emit_event(trigger_id, event)
except Exception as e:
await self.emit_failure(trigger_id, e)
Ключевые свойства:
- Single-threaded — нет parallelism, нет GIL contention. Всё в одном thread на одном CPU core.
- Concurrency через asyncio — десятки тысяч triggers могут yield контроль через
await, event loop переключается между ними. - Cooperative scheduling — trigger обязан делать
await asyncio.sleep(...). Если trigger.run() делаетtime.sleep(60)или blocking I/O — он блокирует весь loop.
Самая частая ошибка при написании custom trigger — использовать blocking calls (requests.get(), time.sleep(), psycopg2.execute()). Это блокирует весь asyncio loop, тысячи других triggers замораживаются. Правильно — использовать async-friendly libraries: aiohttp, asyncio.sleep, asyncpg.
DB schema: trigger table
Triggers сериализуются и хранятся в metadata DB:
CREATE TABLE trigger (
id BIGSERIAL PRIMARY KEY,
classpath VARCHAR(1000) NOT NULL,
kwargs JSONB NOT NULL,
created_date TIMESTAMPTZ NOT NULL DEFAULT now(),
triggerer_id INTEGER -- FK to job.id (какой triggerer держит)
);
CREATE INDEX idx_trigger_triggerer_id ON trigger (triggerer_id);
Колонки:
classpath— full Python path:airflow.providers.amazon.aws.triggers.s3.S3KeyTriggerkwargs— serialized__init__params (JSON-serializable)triggerer_id— какой triggerer instance currently держит trigger (NULL = available для adopt)
Lifecycle row в этой таблице:
- Worker вызывает
self.defer(trigger=...)→ создаётся row сtriggerer_id=NULL. - Triggerer на следующем tick делает
SELECT ... FOR UPDATE SKIP LOCKED→ adopts trigger (sets triggerer_id). - Trigger runs в memory triggerer-а.
- Trigger yields TriggerEvent → INSERT в
trigger_eventtable (otherwise: event delivered through callback channel в memory), DELETE row fromtrigger. - TI state →
scheduled, worker подбирает заново.
HA: multiple triggerers через SKIP LOCKED
Для production требуется HA triggerer setup — несколько triggerer instances работают параллельно. Координация — opera same pattern as scheduler critical section, но с SKIP LOCKED вместо NOWAIT.
-- Triggerer adopt-ит unowned triggers
SELECT id, classpath, kwargs
FROM trigger
WHERE triggerer_id IS NULL
FOR UPDATE SKIP LOCKED
LIMIT 100;
-- Затем:
UPDATE trigger
SET triggerer_id = :my_triggerer_job_id
WHERE id IN (selected_ids);
Что делает SKIP LOCKED:
- Если row уже locked другим triggerer-ом — пропускает этот row и берёт следующий
- В отличие от
NOWAIT(error при contention) — возвращает available rows - Это идеально для work-stealing: каждый triggerer берёт свою порцию
Сравнение coordination patterns:
| Pattern | Scheduler | Triggerer |
|---|---|---|
| Use case | Mutex over pool decisions | Work distribution |
| SQL | FOR UPDATE NOWAIT | FOR UPDATE SKIP LOCKED |
| On conflict | Error → skip tick | Skip locked rows → take available |
| Parallelism | Serialized critical section | Parallel work distribution |
Lifecycle deferrable task — полная диаграмма
Heartbeat и failover
Каждый triggerer instance является TriggererJob row в job table:
SELECT id, hostname, state, latest_heartbeat
FROM job
WHERE job_type = 'TriggererJob' AND state = 'running';
Triggerer обновляет latest_heartbeat каждые 5 секунд (config triggerer_heartrate). Если другой компонент видит stale heartbeat (older than triggerer_health_check_threshold, default 30s) — считает triggerer мёртвым.
При смерти triggerer-а его triggers становятся orphaned:
-- Найти orphaned triggers (triggerer dead, but trigger still owned)
SELECT t.* FROM trigger t
JOIN job j ON t.triggerer_id = j.id
WHERE j.latest_heartbeat < now() - interval '30 seconds';
Scheduler во время своего housekeeping видит это и:
-- Re-orphan triggers from dead triggerers
UPDATE trigger SET triggerer_id = NULL
WHERE triggerer_id IN (
SELECT id FROM job
WHERE job_type='TriggererJob'
AND latest_heartbeat < now() - interval '30 seconds'
);
После этого живые triggerers adopt-ят orphaned через тот же SKIP LOCKED mechanism. Failover latency — обычно 30-60s.
Производительность: сколько triggers выдержит один process
Реальные benchmarks для triggerer (Airflow 2.10, типичный VM 4 cores, 8GB RAM):
| Concurrent triggers | CPU usage | Memory | Notes |
|---|---|---|---|
| 100 | 1% | 100MB | Triггерер бездельничает |
| 1,000 | 5% | 200MB | Лёгкая нагрузка |
| 5,000 | 25% | 400MB | Sweet spot |
| 10,000 | 60% | 800MB | High load, но OK |
| 20,000+ | 90%+ | 1.5GB+ | Bottleneck — нужен второй triggerer |
Numbers зависят от что делают triggers:
- Pure
await asyncio.sleeptriggers — cheap, can scale до 50k+ - HTTP polling через aiohttp — moderate cost
- Database polling (через asyncpg) — heavier, до 5k
Если вам нужно >10k concurrent — добавьте второй triggerer (HA setup). До этого один достаточно.
Comparison со scheduler architecture
| Aspect | Scheduler | Triggerer |
|---|---|---|
| Concurrency | Single-threaded в Python (multi-process для DAG parsing) | Single-threaded asyncio в main process |
| Workload | CPU-heavy (DAG decisions, dataset processing) | I/O-heavy (waits, polls) |
| DB pattern | FOR UPDATE NOWAIT (mutex) | FOR UPDATE SKIP LOCKED (work distribution) |
| Tick interval | 5s default | 1s default |
| HA model | Active-active, serialized critical section | Active-active, work stealing |
| Bottleneck | Critical section throughput | asyncio event loop capacity |
| Scaling | 2-4 instances (diminishing returns) | 1-3 typically enough |
Это два разных pattern, оптимизированные для разных workloads. Scheduler — coordinated state machine. Triggerer — embarrassingly parallel waits.
Production gotchas
-
Single-threaded blocks easily. Любой sync call в trigger.run() заблокирует весь event loop. Один
time.sleep(60)в одном trigger останавливает 5000 других. Всегдаawait asyncio.sleep. -
Triggerer обязателен для deferrable operators. Если deployment-е deferrable sensor, но triggerer не запущен — TI зависают в state=deferred forever. Не failure, не error — silent zombie.
-
memory leaks в triggers. Каждый trigger держит state в memory триггерера. Если 10k triggers по 1MB каждый — 10GB memory. Audit custom triggers на heavy state.
-
Triggers не удаляются при DAG paused. Если DAG paused во время deferred — trigger продолжает крутиться в triggerer-е. После unpause TI продолжит execution. Это feature, но может быть surprise.
-
Multiple triggerers могут конфликтовать через классы import. Если custom trigger требует package, который не installed в triggerer image — он failed при deserialization. Triggerer image должен содержать те же deps что workers.
-
Restart triggerer вызывает re-adoption. При graceful shutdown (SIGTERM) — triggerer flush events, release triggers (UPDATE SET triggerer_id=NULL). При kill -9 — triggerer dies, scheduler через 30s re-orphans triggers. В обоих случаях triggers не теряются.
-
Connection pool tuning. Triggerer тоже использует metadata DB connections — один per asyncio task в пиковые моменты. Размер pool:
[database] sql_alchemy_max_sizeминимум = expected concurrent triggers + scheduler/webserver consumption. Для 5k triggers — pool 200+. -
Monitoring metrics:
triggerer.running_triggers— текущее числоtriggerer.events_emitted— events за периодtriggerer.failed— failed triggers (важно alerting!)
Запуск triggerer
# Single instance
airflow triggerer
# С custom log level
airflow triggerer --log-file /var/log/airflow/triggerer.log
# Multiple instances для HA — просто запустить N таких commands
# на разных hosts (или Pods в Kubernetes):
airflow triggerer # на host-1
airflow triggerer # на host-2
# Координация через PostgreSQL SKIP LOCKED — никакой extra config
Helm chart Airflow:
triggerer:
enabled: true
replicas: 2
resources:
requests: { cpu: 500m, memory: 1Gi }
limits: { cpu: 2, memory: 2Gi }