Learning Platform
Глоссарий Troubleshooting
Урок 10.02 · 30 мин
Продвинутый
TriggererasyncioAIP-40HASKIP LOCKEDEvent Loop

Triggerer architecture — asyncio event loop и HA

Triggerer — один из самых недооценённых компонентов Airflow. Если scheduler и worker знакомы любому Airflow user, то triggerer часто остаётся «чёрной коробкой» — что-то крутится в фоне, держит deferrable triggers. Этот урок препарирует triggerer до строк кода: что это за процесс, как работает asyncio event loop, как несколько triggerer-ов координируются через PostgreSQL без consensus protocols.

Понимание архитектуры triggerer критично для production: неправильное deployment приводит к stuck triggers (timeout watchers вечны), потере events (crash без graceful shutdown), невозможности scale event-driven workloads.


Что такое triggerer

Triggerer — это отдельный Airflow process, запускаемый командой airflow triggerer. Его ответственность: исполнять deferrable triggers — асинхронные «ожидатели событий», вынесенные из worker-а.

Минимальное представление о месте triggerer в архитектуре:

ProcessЧто делаетConcurrency model
SchedulerСоздаёт DagRuns, переводит TI в queued stateSingle-threaded event loop с DB lock
Worker (Celery/K8s)Запускает execute() операторов в slotProcess per task
TriggererДержит triggers в memory, async-ожидает eventsSingle-threaded asyncio, тысячи concurrent triggers
DAG ProcessorПарсит DAG-файлы, обновляет serialized_dagMultiple processes
WebserverUI + REST APIGunicorn workers

Triggerer не запускает task code. Он запускает только trigger.run() методы — асинхронные generators, которые ожидают условие и yield TriggerEvent.


Зачем нужен triggerer

До deferrable operators (AIP-40, 2.2+) sensor с long timeout блокировал worker slot. Сценарий:

# Worker slot занят 4 часа, нет CPU работы — просто ожидание
S3KeySensor(
    task_id="wait_for_data"
    bucket_key="s3://bucket/{{ ds }}/data.parquet"
    timeout=14400,  # 4 hours
    poke_interval=60,  # check каждую минуту
    mode="poke",  # worker slot занят все 4 часа
)

Если у вас 100 таких sensors одновременно — 100 worker slots впустую заняты. На Celery cluster с 50 workers это catastrophe: фактическая capacity 0.

Альтернативы до deferrable:

  • mode="reschedule" — sensor освобождает slot между poke, но создаёт row в task_reschedule table → DB churn, scheduler overhead на каждый retry.
  • Custom polling DAGs — sensor реализован как отдельный hourly DAG → complex, hard to maintain.

Deferrable решает radically: sensor передаёт ожидание triggerer-у через self.defer(), освобождает slot, ждёт TriggerEvent. На triggerer-е тысячи таких ожиданий крутятся в одном asyncio loop практически без overhead.


asyncio — event loop overview

Anatomy asyncio loop в triggerer

Под капотом triggerer — простой Python script, который запускает один asyncio event loop в main thread:

# Псевдокод TriggererJobRunner._run_trigger_loop
async def _run_trigger_loop(self):
    self.task_set: set[asyncio.Task] = set()

    while not self.is_shutdown:
        # 1. Загрузить новые triggers из БД
        new_triggers = await self.load_triggers()

        for trigger_orm in new_triggers:
            # 2. Восстановить trigger из serialized state
            trigger_obj = self.deserialize(trigger_orm)

            # 3. Запустить asyncio task для каждого
            task = asyncio.create_task(
                self._run_trigger(trigger_obj, trigger_orm.id)
            )
            self.task_set.add(task)

        # 4. Cleanup завершённых
        done = {t for t in self.task_set if t.done()}
        self.task_set -= done

        await asyncio.sleep(1)  # tick interval

async def _run_trigger(self, trigger, trigger_id):
    try:
        async for event in trigger.run():
            # 5. Сохранить event в БД, разбудить TI
            await self.emit_event(trigger_id, event)
    except Exception as e:
        await self.emit_failure(trigger_id, e)

Ключевые свойства:

  1. Single-threaded — нет parallelism, нет GIL contention. Всё в одном thread на одном CPU core.
  2. Concurrency через asyncio — десятки тысяч triggers могут yield контроль через await, event loop переключается между ними.
  3. Cooperative scheduling — trigger обязан делать await asyncio.sleep(...). Если trigger.run() делает time.sleep(60) или blocking I/O — он блокирует весь loop.
WARNING

Самая частая ошибка при написании custom trigger — использовать blocking calls (requests.get(), time.sleep(), psycopg2.execute()). Это блокирует весь asyncio loop, тысячи других triggers замораживаются. Правильно — использовать async-friendly libraries: aiohttp, asyncio.sleep, asyncpg.


DB schema: trigger table

Triggers сериализуются и хранятся в metadata DB:

CREATE TABLE trigger (
    id              BIGSERIAL PRIMARY KEY,
    classpath       VARCHAR(1000) NOT NULL,
    kwargs          JSONB NOT NULL,
    created_date    TIMESTAMPTZ NOT NULL DEFAULT now(),
    triggerer_id    INTEGER  -- FK to job.id (какой triggerer держит)
);

CREATE INDEX idx_trigger_triggerer_id ON trigger (triggerer_id);

Колонки:

  • classpath — full Python path: airflow.providers.amazon.aws.triggers.s3.S3KeyTrigger
  • kwargs — serialized __init__ params (JSON-serializable)
  • triggerer_id — какой triggerer instance currently держит trigger (NULL = available для adopt)

Lifecycle row в этой таблице:

  1. Worker вызывает self.defer(trigger=...) → создаётся row с triggerer_id=NULL.
  2. Triggerer на следующем tick делает SELECT ... FOR UPDATE SKIP LOCKED → adopts trigger (sets triggerer_id).
  3. Trigger runs в memory triggerer-а.
  4. Trigger yields TriggerEvent → INSERT в trigger_event table (otherwise: event delivered through callback channel в memory), DELETE row from trigger.
  5. TI state → scheduled, worker подбирает заново.

HA: multiple triggerers через SKIP LOCKED

Для production требуется HA triggerer setup — несколько triggerer instances работают параллельно. Координация — opera same pattern as scheduler critical section, но с SKIP LOCKED вместо NOWAIT.

-- Triggerer adopt-ит unowned triggers
SELECT id, classpath, kwargs
FROM trigger
WHERE triggerer_id IS NULL
FOR UPDATE SKIP LOCKED
LIMIT 100;

-- Затем:
UPDATE trigger
SET triggerer_id = :my_triggerer_job_id
WHERE id IN (selected_ids);

Что делает SKIP LOCKED:

  • Если row уже locked другим triggerer-ом — пропускает этот row и берёт следующий
  • В отличие от NOWAIT (error при contention) — возвращает available rows
  • Это идеально для work-stealing: каждый triggerer берёт свою порцию

Сравнение coordination patterns:

PatternSchedulerTriggerer
Use caseMutex over pool decisionsWork distribution
SQLFOR UPDATE NOWAITFOR UPDATE SKIP LOCKED
On conflictError → skip tickSkip locked rows → take available
ParallelismSerialized critical sectionParallel work distribution

Lifecycle deferrable task — полная диаграмма

Deferrable task lifecycle
Worker: task.execute()Worker запускает task. Inside operator code: проверка условия (например, S3 key exists?). Если еще не готово — вызов self.defer(trigger=MyTrigger(...), method_name='execute_complete').
raise TaskDeferred
TaskDeferred exceptionself.defer() raises TaskDeferred — это специальное исключение, не error. Worker ловит его в task runner, серализует trigger (classpath + kwargs), INSERT в trigger table, UPDATE task_instance SET state='deferred', next_method='execute_complete'.
worker slot freed
Triggerer tickTriggerer (separate process) на своём tick делает SELECT FOR UPDATE SKIP LOCKED FROM trigger WHERE triggerer_id IS NULL. Adopts batch (default 100), создаёт asyncio.Task для каждого. UPDATE trigger SET triggerer_id=my_job_id.
adopt batch
trigger.run() async loopВ asyncio event loop триггер выполняет своё условие. Например: while not s3_key_exists(): await asyncio.sleep(60). Тысячи triggers крутятся параллельно через cooperative scheduling. Один CPU core, но handle 5000+ concurrent waits.
condition met → yield TriggerEvent
TriggerEvent → DBTriggerer ловит yielded TriggerEvent, сохраняет payload, UPDATE task_instance SET state='scheduled', next_method=execute_complete, next_kwargs={event: ...}. DELETE FROM trigger WHERE id=?.
scheduler picks scheduled TI
Worker resumesWorker подбирает TI заново (новый slot). Вместо execute() вызывает execute_complete(event). Эта функция получает event payload и завершает task (или может снова defer для multi-stage waits).

Heartbeat и failover

Каждый triggerer instance является TriggererJob row в job table:

SELECT id, hostname, state, latest_heartbeat
FROM job
WHERE job_type = 'TriggererJob' AND state = 'running';

Triggerer обновляет latest_heartbeat каждые 5 секунд (config triggerer_heartrate). Если другой компонент видит stale heartbeat (older than triggerer_health_check_threshold, default 30s) — считает triggerer мёртвым.

При смерти triggerer-а его triggers становятся orphaned:

-- Найти orphaned triggers (triggerer dead, but trigger still owned)
SELECT t.* FROM trigger t
JOIN job j ON t.triggerer_id = j.id
WHERE j.latest_heartbeat < now() - interval '30 seconds';

Scheduler во время своего housekeeping видит это и:

-- Re-orphan triggers from dead triggerers
UPDATE trigger SET triggerer_id = NULL
WHERE triggerer_id IN (
    SELECT id FROM job
    WHERE job_type='TriggererJob'
      AND latest_heartbeat < now() - interval '30 seconds'
);

После этого живые triggerers adopt-ят orphaned через тот же SKIP LOCKED mechanism. Failover latency — обычно 30-60s.


Производительность: сколько triggers выдержит один process

Реальные benchmarks для triggerer (Airflow 2.10, типичный VM 4 cores, 8GB RAM):

Concurrent triggersCPU usageMemoryNotes
1001%100MBTriггерер бездельничает
1,0005%200MBЛёгкая нагрузка
5,00025%400MBSweet spot
10,00060%800MBHigh load, но OK
20,000+90%+1.5GB+Bottleneck — нужен второй triggerer

Numbers зависят от что делают triggers:

  • Pure await asyncio.sleep triggers — cheap, can scale до 50k+
  • HTTP polling через aiohttp — moderate cost
  • Database polling (через asyncpg) — heavier, до 5k

Если вам нужно >10k concurrent — добавьте второй triggerer (HA setup). До этого один достаточно.


Comparison со scheduler architecture

AspectSchedulerTriggerer
ConcurrencySingle-threaded в Python (multi-process для DAG parsing)Single-threaded asyncio в main process
WorkloadCPU-heavy (DAG decisions, dataset processing)I/O-heavy (waits, polls)
DB patternFOR UPDATE NOWAIT (mutex)FOR UPDATE SKIP LOCKED (work distribution)
Tick interval5s default1s default
HA modelActive-active, serialized critical sectionActive-active, work stealing
BottleneckCritical section throughputasyncio event loop capacity
Scaling2-4 instances (diminishing returns)1-3 typically enough

Это два разных pattern, оптимизированные для разных workloads. Scheduler — coordinated state machine. Triggerer — embarrassingly parallel waits.


Production gotchas

  1. Single-threaded blocks easily. Любой sync call в trigger.run() заблокирует весь event loop. Один time.sleep(60) в одном trigger останавливает 5000 других. Всегда await asyncio.sleep.

  2. Triggerer обязателен для deferrable operators. Если deployment-е deferrable sensor, но triggerer не запущен — TI зависают в state=deferred forever. Не failure, не error — silent zombie.

  3. memory leaks в triggers. Каждый trigger держит state в memory триггерера. Если 10k triggers по 1MB каждый — 10GB memory. Audit custom triggers на heavy state.

  4. Triggers не удаляются при DAG paused. Если DAG paused во время deferred — trigger продолжает крутиться в triggerer-е. После unpause TI продолжит execution. Это feature, но может быть surprise.

  5. Multiple triggerers могут конфликтовать через классы import. Если custom trigger требует package, который не installed в triggerer image — он failed при deserialization. Triggerer image должен содержать те же deps что workers.

  6. Restart triggerer вызывает re-adoption. При graceful shutdown (SIGTERM) — triggerer flush events, release triggers (UPDATE SET triggerer_id=NULL). При kill -9 — triggerer dies, scheduler через 30s re-orphans triggers. В обоих случаях triggers не теряются.

  7. Connection pool tuning. Triggerer тоже использует metadata DB connections — один per asyncio task в пиковые моменты. Размер pool: [database] sql_alchemy_max_size минимум = expected concurrent triggers + scheduler/webserver consumption. Для 5k triggers — pool 200+.

  8. Monitoring metrics:

    • triggerer.running_triggers — текущее число
    • triggerer.events_emitted — events за период
    • triggerer.failed — failed triggers (важно alerting!)

Запуск triggerer

# Single instance
airflow triggerer

# С custom log level
airflow triggerer --log-file /var/log/airflow/triggerer.log

# Multiple instances для HA — просто запустить N таких commands
# на разных hosts (или Pods в Kubernetes):
airflow triggerer  # на host-1
airflow triggerer  # на host-2
# Координация через PostgreSQL SKIP LOCKED — никакой extra config

Helm chart Airflow:

triggerer:
  enabled: true
  replicas: 2
  resources:
    requests: { cpu: 500m, memory: 1Gi }
    limits: { cpu: 2, memory: 2Gi }

Проверка знанийKnowledge check
Custom DateTimeTrigger использует time.sleep(60) вместо await asyncio.sleep(60). Что произойдёт на production триггерере с 1000 active triggers?
ОтветAnswer
Catastrophic blocking. asyncio event loop single-threaded — sync call time.sleep(60) блокирует main thread на 60 секунд. Все 999 других triggers замораживаются — никаких ожиданий, никаких yields, никаких events emitted. После 60s loop resume на 1ms (другой trigger получит свой sleep) — и опять блок. В практике: triggers exceed timeout, TI marked failed, downstream cascade ломается, в логах ничего — 'просто' медленно. Это самый классический bug. Защита: code review всех custom triggers на blocking calls (time.sleep, requests.get, psycopg2 sync). Использовать asyncio.sleep, aiohttp, asyncpg. Linting через ruff async checks (ASYNC101 — blocking-call-in-async).

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Почему scheduler использует FOR UPDATE NOWAIT, а triggerer — FOR UPDATE SKIP LOCKED?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6