Learning Platform
Глоссарий Troubleshooting
Урок 10.06 · 25 мин
Продвинутый
TaskSuccessEventHA TriggererQueue SeparationMulti-team AirflowTriggerer Monitoring

TaskSuccessEvent и HA triggerer — advanced patterns

Стандартный lifecycle deferrable task — это: defer → trigger ждёт → trigger yields → worker resumes для execute_complete(). Это elegant, но имеет overhead: после события TI снова попадает в worker, который только и делает что записывает «success». Для cases где execute_complete не делает реальной работы — это лишний worker startup.

В Airflow 2.10 появился TaskSuccessEvent — механизм, позволяющий triggerer самому завершить task без возврата на worker. Это закрывает цикл оптимизации deferrable architecture.

Также этот урок покрывает HA triggerer setup для production: multiple triggerer instances, queue separation для team isolation, monitoring metrics. Это всё, что нужно знать чтобы deploy production-grade triggerer infrastructure.


Проблема, которую решает TaskSuccessEvent

Стандартный deferrable flow:

1. Worker: execute() → self.defer()           [worker slot занят 1s]
2. Triggerer: trigger.run() yields event      [zero worker]
3. Worker: execute_complete(event) → success  [worker slot занят 1s]

Steps 1 и 3 занимают worker slot минимум по 1 секунде (плюс startup overhead на KubernetesExecutor — может быть 10-30 секунд). Для simple sensors, где execute_complete просто validates event status и returns — это pure waste.

Что если triggerer мог сам сказать «task done», и worker не нужен на step 3?

В 2.10 (AIP-72, частично implemented) появляется TaskSuccessEvent — special TriggerEvent subclass:

from airflow.triggers.base import BaseTrigger, TaskSuccessEvent

class MyTrigger(BaseTrigger):
    async def run(self):
        # ... ждём условие
        await asyncio.sleep(60)

        # Triggerer завершит task без worker resume
        yield TaskSuccessEvent({"key": "value"})

При yield TaskSuccessEvent triggerer:

  • UPDATE task_instance SET state='success', xcom_value={...}
  • DELETE FROM trigger
  • НЕ возвращает TI в scheduled — execute_complete пропускается

XCom value берётся из TaskSuccessEvent payload. Если payload — dict, он становится XCom directly. Если простое значение — XCom = это значение.


Lifecycle с TaskSuccessEvent

Optimized lifecycle через TaskSuccessEvent
T=0: Worker execute()Worker запускается, делает initial check, defers если нужно. Затем worker slot освобождается. То же самое что стандартный flow.
T=1s-N: Triggerer waitstrigger.run() в asyncio loop. await asyncio.sleep, check condition. Это эквивалентно standard deferrable flow.
yield TaskSuccessEvent
T=N+1s: Triggerer completes taskВместо стандартного TriggerEvent, trigger yields TaskSuccessEvent(xcom_value). Triggerer обрабатывает: UPDATE TI SET state=success, xcom={...}. НЕТ возврата к scheduler. НЕТ worker startup. Task done.
Downstream tasks scheduledScheduler видит TI=success, processes downstream dependencies, queues next TIs. Это уже standard scheduler flow — оптимизация была в skip worker step.

Когда использовать TaskSuccessEvent

TaskSuccessEvent оправдан только когда execute_complete тривиально просто:

# GOOD candidate для TaskSuccessEvent:
def execute_complete(self, context, event):
    if event["status"] == "success":
        return event["xcom_value"]
    raise AirflowException(event.get("message"))

Эту логику можно встроить в trigger — yield либо TaskSuccessEvent (success path), либо TriggerEvent (error path, для worker resume):

class OptimizedTrigger(BaseTrigger):
    async def run(self):
        try:
            result = await self.async_check()
            yield TaskSuccessEvent(result)  # ← skip worker
        except SomeError as e:
            yield TriggerEvent({"status": "error", "message": str(e)})
            # ↑ standard flow, worker resumes для error handling

Когда НЕ использовать TaskSuccessEvent:

  • execute_complete делает heavy CPU work (parsing, transformation)
  • execute_complete нужен access к Airflow context (variables, connections)
  • execute_complete может делать second defer (multi-stage operators)
  • execute_complete делает side effects (write to external system)

В этих cases standard TriggerEvent + worker resume — правильный pattern.


Multi-yield в 2.10+

В 2.10 multi-yield получил больше внимания — yield нескольких events за один trigger возможен для progress reporting. В стандартном flow первый yield по-прежнему завершает trigger, но с дополнительными API:

class ProgressTrigger(BaseTrigger):
    async def run(self):
        for i in range(10):
            await asyncio.sleep(60)
            # Progress update — не завершает trigger
            self.log.info(f"Progress: {i * 10}%")
            # В 3.x — это будет emit progress event без завершения

        yield TriggerEvent({"status": "complete"})

В 2.10/2.11 progress reporting обычно через self.log (видно в UI через triggerer logs), а формальный TriggerEvent — только финальный. В 3.x ожидается richer API для progress events.


HA triggerer setup

Single triggerer — single point of failure. Production требует minimum 2 instances для:

  • Failover при crash
  • Rolling upgrades без downtime
  • Load distribution (если нужно >5000 concurrent)

Deployment topology

Минимальная production setup:

# Helm values
triggerer:
  enabled: true
  replicas: 2  # минимум для HA
  
  podDisruptionBudget:
    enabled: true
    config:
      maxUnavailable: 1  # обеспечивает rolling updates
  
  resources:
    requests:
      cpu: 500m
      memory: 1Gi
    limits:
      cpu: 2
      memory: 2Gi
  
  livenessProbe:
    enabled: true
    initialDelaySeconds: 30
    periodSeconds: 60

Каждый triggerer независим, координация через PostgreSQL:

-- Каждый triggerer adopts свою порцию через SKIP LOCKED
SELECT * FROM trigger
WHERE triggerer_id IS NULL
FOR UPDATE SKIP LOCKED
LIMIT 100;

При crash одного: другой через triggerer_health_check_threshold (30s) видит stale heartbeat, scheduler re-orphans triggers, second triggerer adopts.

Capacity planning

Sizing:

Concurrent triggersReplicasTotal CPUTotal memory
< 1,0002 (HA)1 vCPU2GB
1,000-5,0002 (HA)2 vCPU4GB
5,000-15,00034 vCPU8GB
15,000+4+8+ vCPU16GB+

HA — это minimum 2. Adding replicas beyond это даёт linear scaling до capacity asyncio loop одного process (~10-20k concurrent triggers per CPU core).


Queue separation — team isolation

В large organizations c shared Airflow instance несколько teams могут иметь interfering workloads. Один team создал custom trigger с blocking call — все team’s triggers замораживаются. Решение — queue separation через separate triggerer instances per team.

Triggerer queue mechanism

В 2.10+ trigger может быть assigned to specific «queue» (фактически — capacity group):

class TeamSpecificTrigger(BaseTrigger):
    # Trigger будет обрабатываться только triggerer-ом, который
    # имеет этот queue в конфиге
    queue = "team-data-eng"

Triggerer запускается с list разрешённых queues:

# Триггерер team-data-eng обрабатывает только свою queue
airflow triggerer --queues "team-data-eng,default"

# Триггерер team-ml обрабатывает свою
airflow triggerer --queues "team-ml,default"

SQL для adoption фильтрует по queue:

SELECT * FROM trigger
WHERE triggerer_id IS NULL
  AND queue IN ('team-data-eng', 'default')
FOR UPDATE SKIP LOCKED
LIMIT 100;

Это даёт fault isolation: bad trigger в team-ml не влияет на team-data-eng.

NOTE

Queue separation в 2.10 — relatively new feature, не все providers поддерживают queue attribute в triggers. Эта pattern получит полную реализацию в 3.x (AIP-91 или later). В 2.10/2.11 — manually добавить queue field в custom triggers + соответствующий config triggerer-а.


Monitoring triggerer

Production triggerer требует мониторинга:

Key metrics

MetricЧто значитAlert threshold
airflow.triggerer.running_triggersActive triggers count> 5000 (rule of thumb)
airflow.triggerer.events_emittedEvents за периодSustained 0 → suspicious
airflow.triggerer.events_failedFailed triggersAny sustained > 0
airflow.triggerer.heartbeatLast heartbeat (job table)> 30s stale
airflow.triggerer.capacityTriggerer total capacity< running_triggers + 20%
triggerer container CPUProcess CPU> 80% sustained
triggerer container memoryMemory usage> 80% of limit

Prometheus / OTel setup

# airflow.cfg
[metrics]
otel_on = True
otel_host = otel-collector
otel_port = 4317
statsd_on = False

В Grafana dashboard:

# Active triggers per triggerer
airflow_triggerer_running_triggers{job="triggerer"}

# Events emission rate
rate(airflow_triggerer_events_emitted_total[5m])

# Failed triggers (should be 0)
rate(airflow_triggerer_events_failed_total[5m])

Health checks

Liveness probe — пингует triggerer через port (если configured):

# Kubernetes
livenessProbe:
  exec:
    command:
      - airflow
      - jobs
      - check
      - --job-type
      - TriggererJob
      - --hostname
      - $(hostname)
  initialDelaySeconds: 30
  periodSeconds: 60

При фейле probe — Kubernetes restarts pod, scheduler re-orphans triggers.


Disaster scenarios

Scenario 1: Все triggerers умерли

-- Триггеры в trigger table, ни один triggerer не работает
SELECT t.id, t.triggerer_id, j.latest_heartbeat
FROM trigger t
LEFT JOIN job j ON t.triggerer_id = j.id
WHERE j.id IS NULL OR j.latest_heartbeat < now() - interval '30 seconds';

Recovery:

  1. Scheduler через adopt_or_reset_orphaned_tasks (housekeeping job) re-orphan triggers (UPDATE triggerer_id = NULL).
  2. Когда любой triggerer возвращается online — adopts через SKIP LOCKED.
  3. Triggers resume execution. Pending events не теряются (они в trigger table).

Worst case latency: 30s heartbeat threshold + 60s housekeeping interval = ~90s total.

Scenario 2: Triggerer память leak

# Triggerer container memory growing
kubectl top pod airflow-triggerer-0
# 1.5GB → 1.8GB → 2.0GB → OOMKilled

Causes:

  • Custom triggers держат heavy state (large data structures)
  • Не закрытые async connections (aiohttp sessions, asyncpg pools)
  • Memory leak в aiobotocore / других async libraries

Mitigation:

  1. Audit custom triggers: какие держат большие objects?
  2. Use async with для resource cleanup:
    async def run(self):
        async with aiohttp.ClientSession() as session:
            async with session.get(...) as resp:
                ...
        # Сессия закрылась автоматически
  3. Set memory limits + restart policy на OOM:
    resources:
      limits:
        memory: 4Gi  # Hard limit, OOMKill restart

Scenario 3: Triggerer не может adopt — DB connections exhausted

# Logs:
# sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached

Cause: triggerer открывает DB connection per concurrent operation, default pool слишком мал.

Fix:

[database]
sql_alchemy_pool_size = 50
sql_alchemy_max_overflow = 100
sql_alchemy_pool_pre_ping = true
sql_alchemy_pool_recycle = 1800

Plus tune Postgres max_connections (default 100 → 500+ для production).


Production gotchas

  1. TaskSuccessEvent skips execute_complete. Если вы зависели от XCom push в execute_complete или Airflow context — нужно делать всё в trigger.run(). Не all data accessible в trigger context — например, Variables, Connections надо resolve через airflow imports.

  2. Multiple triggerers могут конфликтовать через image versions. Если deploy 2 triggerer pods, одна старая image (с одной trigger class), другая новая (modified trigger class) — deserialization может failed на старом instance. Use rolling updates с PodDisruptionBudget.

  3. Triggerer logs split по pod. Если 2 triggerer-а, и не очевидно, какой adopt-ил конкретный trigger. Production: aggregate logs через Loki/ELK с label triggerer_id, поиск через UI Airflow покажет relevant pod.

  4. Queue separation не bullet-proof. Если default queue не есть в any triggerer config — все default triggers зависают. Always include “default” в at least one triggerer’s queues list.

  5. Auto-scaling triggerers сложно. Unlike workers, которые скалируются по CPU load, triggerers больше memory-bound (держат много state). Horizontal Pod Autoscaler требует custom metric (running_triggers), не CPU.

  6. Graceful shutdown crucial. Если triggerer killed без SIGTERM — triggers становятся orphaned (triggerer_id = dead job_id). Recovery занимает ~30s. Graceful: send SIGTERM, triggerer flushes events, UPDATE triggerer_id=NULL для всех своих triggers, exits.

  7. Network policies в Kubernetes. Triggerer needs access к Airflow metadata DB + external systems (S3, custom APIs). Lock down с NetworkPolicy:

    apiVersion: networking.k8s.io/v1
    kind: NetworkPolicy
    metadata:
      name: triggerer-egress
    spec:
      podSelector:
        matchLabels:
          component: triggerer
      egress:
        - to:
            - podSelector:
                matchLabels:
                  component: postgres
        - to:
            - ipBlock:
                cidr: 10.0.0.0/8  # внутренние APIs
  8. Triggerer не worker — heavy compute плохо. Если ваш trigger.run() делает heavy CPU work (parsing large JSON в memory, computing hashes), он замедляет весь event loop. Best practice: trigger держит только I/O wait + light validation, heavy work — execute_complete на worker-е.


Проверка знанийKnowledge check
Production multi-team Airflow setup: team-A и team-B имеют свои deferrable operators. team-A написал custom trigger с redis pubsub блокировкой (sync redis client вместо aredis). Как защитить team-B и каковы trade-offs?
ОтветAnswer
Защита через queue separation (2.10+): (1) team-A triggers получают queue='team-a' (custom field в BaseTrigger subclass или config). (2) Deploy 2 sets of triggerers: airflow triggerer --queues='team-a' для team-A, airflow triggerer --queues='team-b,default' для team-B. (3) Bad trigger от team-A блокирует только team-A triggerer event loop — team-B не affected. (4) Mitigation root cause: code review всех custom triggers на blocking I/O, audit для async-friendly libraries (aredis instead of redis, aiohttp instead of requests, asyncpg instead of psycopg2). Trade-offs queue separation: (a) Extra infrastructure cost (2x triggerer pods, +CPU/memory); (b) Operational complexity (separate dashboards, logs, alerts per queue); (c) Risk uneven utilization — team-A queue underloaded, team-B overloaded; (d) Default queue handling — careful not orphan tasks без assigned triggerer. Without queue separation alternatives: (1) Strict code review process (everyone learns async patterns); (2) Linting rules (ruff ASYNC101) catch blocking calls в CI; (3) Monitoring per-trigger latency, alert on > threshold; (4) Sandbox testing custom triggers перед production. Queue separation = operational safety net, not replacement for engineering rigor.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. TaskSuccessEvent (2.10+) — что это и когда оправдан?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6