TaskSuccessEvent и HA triggerer — advanced patterns
Стандартный lifecycle deferrable task — это: defer → trigger ждёт → trigger yields → worker resumes для execute_complete(). Это elegant, но имеет overhead: после события TI снова попадает в worker, который только и делает что записывает «success». Для cases где execute_complete не делает реальной работы — это лишний worker startup.
В Airflow 2.10 появился TaskSuccessEvent — механизм, позволяющий triggerer самому завершить task без возврата на worker. Это закрывает цикл оптимизации deferrable architecture.
Также этот урок покрывает HA triggerer setup для production: multiple triggerer instances, queue separation для team isolation, monitoring metrics. Это всё, что нужно знать чтобы deploy production-grade triggerer infrastructure.
Проблема, которую решает TaskSuccessEvent
Стандартный deferrable flow:
1. Worker: execute() → self.defer() [worker slot занят 1s]
2. Triggerer: trigger.run() yields event [zero worker]
3. Worker: execute_complete(event) → success [worker slot занят 1s]
Steps 1 и 3 занимают worker slot минимум по 1 секунде (плюс startup overhead на KubernetesExecutor — может быть 10-30 секунд). Для simple sensors, где execute_complete просто validates event status и returns — это pure waste.
Что если triggerer мог сам сказать «task done», и worker не нужен на step 3?
В 2.10 (AIP-72, частично implemented) появляется TaskSuccessEvent — special TriggerEvent subclass:
from airflow.triggers.base import BaseTrigger, TaskSuccessEvent
class MyTrigger(BaseTrigger):
async def run(self):
# ... ждём условие
await asyncio.sleep(60)
# Triggerer завершит task без worker resume
yield TaskSuccessEvent({"key": "value"})
При yield TaskSuccessEvent triggerer:
UPDATE task_instance SET state='success', xcom_value={...}- DELETE FROM trigger
- НЕ возвращает TI в scheduled — execute_complete пропускается
XCom value берётся из TaskSuccessEvent payload. Если payload — dict, он становится XCom directly. Если простое значение — XCom = это значение.
Lifecycle с TaskSuccessEvent
Когда использовать TaskSuccessEvent
TaskSuccessEvent оправдан только когда execute_complete тривиально просто:
# GOOD candidate для TaskSuccessEvent:
def execute_complete(self, context, event):
if event["status"] == "success":
return event["xcom_value"]
raise AirflowException(event.get("message"))
Эту логику можно встроить в trigger — yield либо TaskSuccessEvent (success path), либо TriggerEvent (error path, для worker resume):
class OptimizedTrigger(BaseTrigger):
async def run(self):
try:
result = await self.async_check()
yield TaskSuccessEvent(result) # ← skip worker
except SomeError as e:
yield TriggerEvent({"status": "error", "message": str(e)})
# ↑ standard flow, worker resumes для error handling
Когда НЕ использовать TaskSuccessEvent:
execute_completeделает heavy CPU work (parsing, transformation)execute_completeнужен access к Airflow context (variables, connections)execute_completeможет делать second defer (multi-stage operators)execute_completeделает side effects (write to external system)
В этих cases standard TriggerEvent + worker resume — правильный pattern.
Multi-yield в 2.10+
В 2.10 multi-yield получил больше внимания — yield нескольких events за один trigger возможен для progress reporting. В стандартном flow первый yield по-прежнему завершает trigger, но с дополнительными API:
class ProgressTrigger(BaseTrigger):
async def run(self):
for i in range(10):
await asyncio.sleep(60)
# Progress update — не завершает trigger
self.log.info(f"Progress: {i * 10}%")
# В 3.x — это будет emit progress event без завершения
yield TriggerEvent({"status": "complete"})
В 2.10/2.11 progress reporting обычно через self.log (видно в UI через triggerer logs), а формальный TriggerEvent — только финальный. В 3.x ожидается richer API для progress events.
HA triggerer setup
Single triggerer — single point of failure. Production требует minimum 2 instances для:
- Failover при crash
- Rolling upgrades без downtime
- Load distribution (если нужно >5000 concurrent)
Deployment topology
Минимальная production setup:
# Helm values
triggerer:
enabled: true
replicas: 2 # минимум для HA
podDisruptionBudget:
enabled: true
config:
maxUnavailable: 1 # обеспечивает rolling updates
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 2
memory: 2Gi
livenessProbe:
enabled: true
initialDelaySeconds: 30
periodSeconds: 60
Каждый triggerer независим, координация через PostgreSQL:
-- Каждый triggerer adopts свою порцию через SKIP LOCKED
SELECT * FROM trigger
WHERE triggerer_id IS NULL
FOR UPDATE SKIP LOCKED
LIMIT 100;
При crash одного: другой через triggerer_health_check_threshold (30s) видит stale heartbeat, scheduler re-orphans triggers, second triggerer adopts.
Capacity planning
Sizing:
| Concurrent triggers | Replicas | Total CPU | Total memory |
|---|---|---|---|
| < 1,000 | 2 (HA) | 1 vCPU | 2GB |
| 1,000-5,000 | 2 (HA) | 2 vCPU | 4GB |
| 5,000-15,000 | 3 | 4 vCPU | 8GB |
| 15,000+ | 4+ | 8+ vCPU | 16GB+ |
HA — это minimum 2. Adding replicas beyond это даёт linear scaling до capacity asyncio loop одного process (~10-20k concurrent triggers per CPU core).
Queue separation — team isolation
В large organizations c shared Airflow instance несколько teams могут иметь interfering workloads. Один team создал custom trigger с blocking call — все team’s triggers замораживаются. Решение — queue separation через separate triggerer instances per team.
Triggerer queue mechanism
В 2.10+ trigger может быть assigned to specific «queue» (фактически — capacity group):
class TeamSpecificTrigger(BaseTrigger):
# Trigger будет обрабатываться только triggerer-ом, который
# имеет этот queue в конфиге
queue = "team-data-eng"
Triggerer запускается с list разрешённых queues:
# Триггерер team-data-eng обрабатывает только свою queue
airflow triggerer --queues "team-data-eng,default"
# Триггерер team-ml обрабатывает свою
airflow triggerer --queues "team-ml,default"
SQL для adoption фильтрует по queue:
SELECT * FROM trigger
WHERE triggerer_id IS NULL
AND queue IN ('team-data-eng', 'default')
FOR UPDATE SKIP LOCKED
LIMIT 100;
Это даёт fault isolation: bad trigger в team-ml не влияет на team-data-eng.
Queue separation в 2.10 — relatively new feature, не все providers поддерживают queue attribute в triggers. Эта pattern получит полную реализацию в 3.x (AIP-91 или later). В 2.10/2.11 — manually добавить queue field в custom triggers + соответствующий config triggerer-а.
Monitoring triggerer
Production triggerer требует мониторинга:
Key metrics
| Metric | Что значит | Alert threshold |
|---|---|---|
airflow.triggerer.running_triggers | Active triggers count | > 5000 (rule of thumb) |
airflow.triggerer.events_emitted | Events за период | Sustained 0 → suspicious |
airflow.triggerer.events_failed | Failed triggers | Any sustained > 0 |
airflow.triggerer.heartbeat | Last heartbeat (job table) | > 30s stale |
airflow.triggerer.capacity | Triggerer total capacity | < running_triggers + 20% |
| triggerer container CPU | Process CPU | > 80% sustained |
| triggerer container memory | Memory usage | > 80% of limit |
Prometheus / OTel setup
# airflow.cfg
[metrics]
otel_on = True
otel_host = otel-collector
otel_port = 4317
statsd_on = False
В Grafana dashboard:
# Active triggers per triggerer
airflow_triggerer_running_triggers{job="triggerer"}
# Events emission rate
rate(airflow_triggerer_events_emitted_total[5m])
# Failed triggers (should be 0)
rate(airflow_triggerer_events_failed_total[5m])
Health checks
Liveness probe — пингует triggerer через port (если configured):
# Kubernetes
livenessProbe:
exec:
command:
- airflow
- jobs
- check
- --job-type
- TriggererJob
- --hostname
- $(hostname)
initialDelaySeconds: 30
periodSeconds: 60
При фейле probe — Kubernetes restarts pod, scheduler re-orphans triggers.
Disaster scenarios
Scenario 1: Все triggerers умерли
-- Триггеры в trigger table, ни один triggerer не работает
SELECT t.id, t.triggerer_id, j.latest_heartbeat
FROM trigger t
LEFT JOIN job j ON t.triggerer_id = j.id
WHERE j.id IS NULL OR j.latest_heartbeat < now() - interval '30 seconds';
Recovery:
- Scheduler через
adopt_or_reset_orphaned_tasks(housekeeping job) re-orphan triggers (UPDATE triggerer_id = NULL). - Когда любой triggerer возвращается online — adopts через SKIP LOCKED.
- Triggers resume execution. Pending events не теряются (они в trigger table).
Worst case latency: 30s heartbeat threshold + 60s housekeeping interval = ~90s total.
Scenario 2: Triggerer память leak
# Triggerer container memory growing
kubectl top pod airflow-triggerer-0
# 1.5GB → 1.8GB → 2.0GB → OOMKilled
Causes:
- Custom triggers держат heavy state (large data structures)
- Не закрытые async connections (aiohttp sessions, asyncpg pools)
- Memory leak в aiobotocore / других async libraries
Mitigation:
- Audit custom triggers: какие держат большие objects?
- Use
async withдля resource cleanup:async def run(self): async with aiohttp.ClientSession() as session: async with session.get(...) as resp: ... # Сессия закрылась автоматически - Set memory limits + restart policy на OOM:
resources: limits: memory: 4Gi # Hard limit, OOMKill restart
Scenario 3: Triggerer не может adopt — DB connections exhausted
# Logs:
# sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached
Cause: triggerer открывает DB connection per concurrent operation, default pool слишком мал.
Fix:
[database]
sql_alchemy_pool_size = 50
sql_alchemy_max_overflow = 100
sql_alchemy_pool_pre_ping = true
sql_alchemy_pool_recycle = 1800
Plus tune Postgres max_connections (default 100 → 500+ для production).
Production gotchas
-
TaskSuccessEvent skips execute_complete. Если вы зависели от XCom push в execute_complete или Airflow context — нужно делать всё в trigger.run(). Не all data accessible в trigger context — например, Variables, Connections надо resolve через airflow imports.
-
Multiple triggerers могут конфликтовать через image versions. Если deploy 2 triggerer pods, одна старая image (с одной trigger class), другая новая (modified trigger class) — deserialization может failed на старом instance. Use rolling updates с PodDisruptionBudget.
-
Triggerer logs split по pod. Если 2 triggerer-а, и не очевидно, какой adopt-ил конкретный trigger. Production: aggregate logs через Loki/ELK с label triggerer_id, поиск через UI Airflow покажет relevant pod.
-
Queue separation не bullet-proof. Если default queue не есть в any triggerer config — все default triggers зависают. Always include “default” в at least one triggerer’s queues list.
-
Auto-scaling triggerers сложно. Unlike workers, которые скалируются по CPU load, triggerers больше memory-bound (держат много state). Horizontal Pod Autoscaler требует custom metric (running_triggers), не CPU.
-
Graceful shutdown crucial. Если triggerer killed без SIGTERM — triggers становятся orphaned (triggerer_id = dead job_id). Recovery занимает ~30s. Graceful: send SIGTERM, triggerer flushes events, UPDATE triggerer_id=NULL для всех своих triggers, exits.
-
Network policies в Kubernetes. Triggerer needs access к Airflow metadata DB + external systems (S3, custom APIs). Lock down с NetworkPolicy:
apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: triggerer-egress spec: podSelector: matchLabels: component: triggerer egress: - to: - podSelector: matchLabels: component: postgres - to: - ipBlock: cidr: 10.0.0.0/8 # внутренние APIs -
Triggerer не worker — heavy compute плохо. Если ваш trigger.run() делает heavy CPU work (parsing large JSON в memory, computing hashes), он замедляет весь event loop. Best practice: trigger держит только I/O wait + light validation, heavy work — execute_complete на worker-е.