Learning Platform
Глоссарий Troubleshooting
Урок 10.05 · 24 мин
Продвинутый
Sensor ModespokerescheduledeferrableProduction CostBenchmarks

Sensor modes — три способа ждать в Airflow

Sensors — операторы, которые ждут условия (key в S3, row в DB, time moment). У Airflow 2.x три фундаментально разные модели execution для sensors, каждая с уникальным trade-off между latency, resource cost и operational complexity. Знание различий критично — неправильный выбор mode на 1000 sensors превращает Airflow в неработоспособный.

Этот урок — production-grade benchmark тhree modes на realistic scale (1000 одновременных sensors), с конкретными числами CPU/memory/DB cost. После — guideline когда какой mode mandatory.


Three modes — quick definition

Каждый sensor в Airflow имеет attribute mode:

S3KeySensor(
    task_id="wait_for_data"
    bucket_key="..."
    mode="poke",  # или "reschedule" или ... (deferrable не через mode)
    poke_interval=60,
    timeout=86400,
)
ModeГде живёт sensorWorker slotDB activity
poke (default)Worker threadЗанят весь timeoutМинимальная (heartbeats)
rescheduleWorker между poke, scheduler-managedFree между pokeВысокая (task_reschedule rows)
deferrable=TrueTriggerer asyncio loopFree всё времяНизкая (одна row в trigger)

Deferrable — НЕ режим, а separate execution path. Активируется через deferrable=True parameter (2.7+) или конкретный async operator class.


Mode 1: poke — classic sync mode

Worker thread выполняет poke() метод каждые poke_interval секунд:

class S3KeySensor(BaseSensorOperator):
    def poke(self, context) -> bool:
        return self.hook.check_for_key(...)

BaseSensorOperator.execute() крутит loop:

def execute(self, context):
    started_at = monotonic()
    while not self.poke(context):
        if monotonic() - started_at > self.timeout:
            raise AirflowSensorTimeout(...)
        time.sleep(self.poke_interval)
    return self.xcom_value

Worker thread заблокирован весь timeout. На Celery с 50 workers (50 slots) — 50 одновременных sensors максимум.

Pros: Simplest, low DB overhead, sub-second latency между poke и detection.

Cons: Worker slot занят, catastrophic при большом числе sensors.


Mode 2: reschedule — release slot между poke

Sensor отдаёт worker slot между poke calls. Реализация:

def execute(self, context):
    ti = context["ti"]

    if self.poke(context):
        return self.xcom_value
    else:
        # Раскрываем slot, scheduler пере-запустит через poke_interval
        next_poke_at = utcnow() + timedelta(seconds=self.poke_interval)
        raise AirflowRescheduleException(reschedule_date=next_poke_at)

AirflowRescheduleException ловится worker-ом, который:

  1. UPDATE TI state=‘up_for_reschedule’
  2. INSERT INTO task_reschedule (ti_id, reschedule_date)
  3. Освобождает worker slot

Scheduler видит task_reschedule rows, на следующем tick перепуливает TI назад в queued state.

Pros: Worker slot free между poke. На 50 workers — поддерживает сотни sensors.

Cons:

  • DB churn: каждый poke создаёт row в task_reschedule. На 1000 sensors с poke_interval=60s — 60k INSERTs/час.
  • Scheduler overhead: scheduler должен обрабатывать task_reschedule на каждом tick.
  • Latency: после rescheduled state minimum 5-15 секунд до re-execution (scheduler tick + queueing).
  • Worker startup overhead: каждый poke = new task instance startup, может быть 5-10s в KubernetesExecutor.

Mode 3: deferrable — async через triggerer

С deferrable=True (или specific async operator) sensor:

def execute(self, context):
    if self.hook.check_for_key(...):
        return  # already exists
    self.defer(trigger=S3KeyTrigger(...), method_name="execute_complete")

Triggerer держит trigger в asyncio loop, polls async без блокировки.

Pros:

  • Zero worker slots во время ожидания
  • Low DB overhead — одна row в trigger table за весь wait
  • Scales horizontally — один triggerer держит 5000+ triggers

Cons:

  • Triggerer deployment required
  • Latency: trigger adoption + scheduling может быть 5-15s
  • Code complexity: написание custom trigger требует async expertise

Production benchmark: 1000 concurrent sensors

Конкретные числа для 1000 одновременных S3KeySensor, timeout=4h, poke_interval=60s, на Kubernetes Executor (10 worker pods, 4 cores each):

mode="poke" — катастрофа

MetricValue
Worker pods needed40+ (4 sensors per slot)
CPU usage (workers)5% sustained (mostly sleep)
Memory40+ pods × 500MB = 20GB
DB ops/sec~0 (workers heartbeat only)
Failed sensors at scaleHigh (worker OOM при N>50)
Cost per hour~$2-3 (cloud worker compute)

Verdict: non-viable. Worker fleet must scale до 40+ только для одних sensors. ETL workloads страдают.

mode="reschedule" — DB burden

MetricValue
Worker pods needed4-6 (короткие poke executions)
CPU usage (workers)15% sustained (startup overhead)
Memory6 pods × 500MB = 3GB
DB ops/sec1000 inserts/min в task_reschedule
Scheduler latency+30% slower (busy with reschedule processing)
Cost per hour~$0.50 + DB upgrade required

Verdict: borderline. Requires powerful DB instance (db.r6g.xlarge+). Не работает с MariaDB. Scheduler may bottleneck на dataset cascades параллельно.

deferrable=True — production-ready

MetricValue
Worker pods needed0 (только peak когда event triggered)
Triggerer pods1 (handles 5k triggers easily)
Triggerer CPU5-10%
Triggerer memory1-2GB
DB ops/sec0 (после initial registration)
Latency5-10s after key appears
Cost per hour~$0.10

Verdict: clear winner. 20-30x cheaper than poke, 5x cheaper than reschedule. Scales horizontally.


Latency comparison

«Сколько времени между actual event и task ready to continue»:

Latency three modes
poke: tight loopWorker делает poke() каждые poke_interval. Если событие произошло сразу после прошлого poke — задержка до next poke (avg = poke_interval / 2). Можно настроить poke_interval=5s для sub-10s latency.
reschedule: scheduler boundПосле failed poke, TI=up_for_reschedule, scheduler tick (5s) подбирает, queues, worker startup. Total: poke_interval + scheduler_tick (5s) + worker_startup (5-20s on K8s).
deferrable: triggerer + schedulerTrigger ловит async event, emits TriggerEvent. Triggerer записывает в DB (1s). Scheduler tick видит TI=scheduled (5s). Worker starts execute_complete (5-20s). Polling interval не влияет на post-event latency.

Поэтому:

  • Sub-second latency required: poke (но cost огромный)
  • Reasonable latency (5-30s): deferrable — sweet spot
  • Не критично, but low cost: deferrable

Когда deferrable — mandatory

В production есть жёсткие правила:

Rule 1: timeout > 1 hour → deferrable

Если sensor может ждать >1 часа, poke/reschedule не масштабируются:

  • poke блокирует slot — даже 50 одновременных «съедают» worker pool.
  • reschedule создаёт hundreds of DB rows — DB перегружена.

Deferrable — единственный viable option.

Rule 2: > 100 concurrent sensors → deferrable

Independent от timeout, scale matters. 100 concurrent в poke mode = 100 worker slots. Это size for hundreds workers — irresponsible.

Rule 3: External API polling → deferrable

API polling часто имеет rate limits + slow responses. Async polling с aiohttp одного triggerer-а более эффективен, чем десятки workers, делающих sync HTTP.

Rule 4: Real-time event waits → deferrable + AssetWatcher (3.x)

Waits for external event (SQS message, Kafka offset, S3 EventBridge). В 3.x AssetWatcher делает это natively. В 2.x — custom deferrable trigger с async client library.


Когда poke остаётся valid

Не всегда deferrable нужен. Cases где poke ОК:

  1. Quick existence check (< 1 минута total). Overhead defer/triggerer/wakeup может быть больше чем sync poke loop.
  2. Single sensor on tiny DAG. Если DAG имеет один sensor с timeout=10 минут — не стоит deploy-ить triggerer ради него.
  3. Development/testing — proще без triggerer setup.
  4. Sensors с heavy CPU work in poke (unusual). Если poke() делает heavy compute, async pattern не помогает — нужен sync compute на worker.

Reschedule remains valid?

reschedule всё ещё имеет use case, но он narrow:

  • Legacy infrastructure без deployed triggerer.
  • Operators без deferrable implementation (rare в 2.10+).
  • Specific scenarios где worker hooks несовместимы с asyncio.

В новых deployments — выбирайте deferrable.


Migration path: poke → deferrable

Для существующего DAG с mode="poke":

Step 1: Audit sensors

# Найти все sensors с poke mode
# Поиск по DAGs:
grep -r "mode='poke'" dags/
grep -r "mode=\"poke\"" dags/

# Или через DB:
# SELECT task_id, dag_id FROM serialized_dag
# WHERE data::text LIKE '%mode%poke%';

Step 2: Заменить на deferrable

# Before
S3KeySensor(
    task_id="wait"
    bucket_key="..."
    mode="poke"
    poke_interval=60,
    timeout=14400,
)

# After
S3KeySensor(
    task_id="wait"
    bucket_key="..."
    deferrable=True,  # ← добавлено
    poke_interval=60,
    timeout=14400,
    # mode не нужен, deferrable перекрывает
)

Step 3: Deploy triggerer

В Kubernetes Helm chart:

triggerer:
  enabled: true
  replicas: 2
  resources:
    requests:
      cpu: 500m
      memory: 1Gi
    limits:
      cpu: 2
      memory: 2Gi

Step 4: Verify

Проверить:

  • airflow.triggerer.running_triggers метрика растёт.
  • TI переходят в state=deferred (visible в UI).
  • Trigger table accumulates rows: SELECT COUNT(*) FROM trigger.

Step 5: Cleanup poke-mode workers

Если у вас были dedicated workers для long-running sensors — можно уменьшить worker count.


Comparison table — definitive

Aspectpokerescheduledeferrable
Worker slot during waitOccupiedFree between pokesNever occupied
DB ops during wait~0High (per poke)~0
Latency to detectionpoke_interval/2poke_interval + 15-30s5-15s after event
Scaling ceiling~50 (per 50 workers)~500 (DB bound)5000+ per triggerer
Infra requirementWorkersWorkers + healthy DBTriggerer required
Code complexityTrivialTrivialModerate (custom trigger)
Best fitQuick (<60s) waitsLegacy without triggererProduction at scale
Failure modeWorker OOMDB lock contentionTriggerer crash → re-adopt
Resource cost (1000 sensors)$$$$$$$

Production gotchas

  1. Deferrable требует поддержки operator-а. Не все operators имеют deferrable=True parameter. Audit перед migration: provider docs или inspect.signature(MyOperator.__init__).

  2. AIRFLOW__OPERATORS__DEFAULT_DEFERRABLE=true — глобальный switch. Включается deferrable для всех supporting operators. Прежде чем включать — убедитесь triggerer deployed + проверены custom operators.

  3. Long-running execute() в operator-е. Иногда «sensor-like» работа сделана в обычном operator с while loop. Конвертация требует написания custom trigger — больше работы, чем для built-in sensors.

  4. Polling interval matters. poke_interval=1s в deferrable создаёт 60 polls/min на triggerer. Для high-frequency polls — увеличьте interval, или используйте push-based mechanism (Kafka consumer trigger вместо poll loop).

  5. DateTimeSensor vs DateTimeSensorAsync — оба существуют в standard provider. DateTimeSensorAsync — native deferrable implementation. В 2.10+ обычный DateTimeSensor имеет deferrable=True flag — predeferable, та же async реализация.

  6. AsyncSensor in старых providers. Например, S3KeySensorAsync (deprecated в providers-amazon 8.x) был стандалоновый класс. Современная convention — S3KeySensor(deferrable=True). При migration убедитесь, какой class у вас.

  7. Backwards compatibility. Если рantage deploy deferrable=True operator в инфраструктуре без triggerer — task в state=deferred forever, UI показывает «running», но фактически stuck. Always deploy triggerer первым.

  8. Connection pooling matters. Triggerer открывает много DB connections + external (S3, Postgres) per concurrent trigger. Production: pool size = expected concurrent × 1.5 + buffer.


Проверка знанийKnowledge check
DAG имеет 500 sensors, каждый ждёт S3 key с timeout=8 часов, poke_interval=30 секунд. Сейчас mode='reschedule'. Какие production metrics ожидать и почему deferrable critical?
ОтветAnswer
Reschedule mode на этом scale: (1) task_reschedule inserts: 500 sensors × (3600/30) = 60,000 inserts/hour. DB write IOPS bottleneck — small RDS не выдержит, нужен db.r6g.xlarge+. (2) Scheduler overhead: каждый tick scheduler scans task_reschedule, JOINs ti, makes reschedule decisions. Scheduler loop duration растёт с 100ms до 5s+. (3) Worker startup: каждый poke = new TI run = K8s pod startup (15-30s overhead). На 500 sensors с 30s intervals — постоянная нагрузка на K8s API. (4) Latency: события detected с avg задержкой 30s + 5s tick + 15s startup = 50s, при том что real ETL stages могут wait 5-10 минут. С deferrable: (1) ZERO DB inserts во время wait — 500 rows in trigger table, всё. (2) Один triggerer process handles все 500 через asyncio (~10% CPU). (3) Latency reduced до 5-15s после event. (4) Worker pool can shrink на 70%+ — те же 500 sensors требуют 0 worker slots. Cost reduction: 80-90%. Migration: добавить deferrable=True per sensor, deploy triggerer (2 replicas для HA), monitor airflow.triggerer.running_triggers metric. Это classic case 'mandatory deferrable'.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. 1000 одновременных sensors с timeout=4h, poke_interval=60s. Какие DB ops/час для mode='reschedule'?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6