Learning Platform
Глоссарий Troubleshooting
Урок 04.04 · 30 мин
Продвинутый
SensorspokerescheduledeferrableTriggererAIP-40Production

Sensors — три режима исполнения

Sensor — это operator, который ждёт событие: файл в S3, запись в Postgres, успешное завершение upstream DagRun. Снаружи это выглядит как «задача, которая стоит до тех пор, пока условие не выполнено». Но внутри Airflow есть три фундаментально разных способа реализовать это ожидание, и выбор между ними определяет, выдержит ли ваш Airflow тысячу одновременных sensors или ляжет на десятом.

Этот урок — самый production-критичный в модуле. Неправильный mode для sensor — частая причина того, что dev cluster работает, а prod, где появилась тысяча long-wait sensors, начинает захлёбываться. Сценарий: 100 S3KeySensor в режиме poke с 24-часовым timeout — это 100 заблокированных worker slots на сутки. Перевод на deferrable=True — те же 100 sensors работают на одном triggerer без worker slots вообще.


Базовый класс — BaseSensorOperator

Все sensors наследуются от BaseSensorOperator, который сам наследник BaseOperator. Главный отличительный метод — poke(context):

class BaseSensorOperator(BaseOperator, SkipMixin):
    valid_modes: Iterable[str] = ("poke", "reschedule")

    def __init__(
        self,
        *,
        poke_interval: float = 60,
        timeout: float = 60 * 60 * 24 * 7,  # 7 days default
        soft_fail: bool = False,
        mode: str = "poke",
        exponential_backoff: bool = False,
        max_wait: timedelta | float | None = None,
        silent_fail: bool = False,
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        if mode not in self.valid_modes:
            raise AirflowException(f"Mode must be one of {self.valid_modes}")
        self.poke_interval = poke_interval
        self.timeout = timeout
        self.soft_fail = soft_fail
        self.mode = mode
        # ...

    def poke(self, context: Context) -> bool | PokeReturnValue:
        raise AirflowException("Override me.")

    def execute(self, context: Context) -> Any:
        # см. ниже — отличается по mode
        ...

Контракт poke():

  • Возвращает True — условие выполнено, sensor success.
  • Возвращает False — условие не выполнено, продолжаем ждать.
  • Возвращает PokeReturnValue(is_done=True, xcom_value=...) — done + пушит value в XCom.
  • Raises AirflowSkipException — task получает state skipped.
  • Любой другой Exception — sensor failed.

Параметры:

  • mode"poke" или "reschedule". Управляет тем, как execute() ждёт между poke().
  • poke_interval — секунд между poke (default 60s).
  • timeout — максимальное время ожидания (default 7 дней).
  • soft_fail=True — при timeout sensor станет skipped, а не failed.

Третий режим — deferrable=True — отдельный механизм, не через mode. Подробнее ниже.

asyncio: event loop overview (концептуальный)

Mode 1: poke — блокирующий

Самый простой режим. execute() — это цикл с time.sleep:

# Упрощённая логика BaseSensorOperator.execute() в mode='poke'
def execute(self, context):
    started_at = time.monotonic()
    while True:
        try:
            result = self.poke(context)
        except AirflowSkipException:
            raise
        if result:
            return result.xcom_value if isinstance(result, PokeReturnValue) else None

        if (time.monotonic() - started_at) > self.timeout:
            if self.soft_fail:
                raise AirflowSkipException("Sensor timeout, soft_fail=True")
            raise AirflowSensorTimeout(f"Sensor timed out after {self.timeout}s")

        time.sleep(self.poke_interval)

Цена: worker slot занят всё время ожидания. Если у вас parallelism=32 и 32 sensor в poke mode — все остальные tasks стоят в queued, потому что нет свободного slot.

WARNING

Это самая частая ошибка новичков с sensors. На dev кажется, что всё ок — 5 sensors, 32 slots, есть запас. В prod после roll-out на 100 DAGs появляются 100 sensors с 24h timeout, и кластер встаёт. Симптомы: tasks подвешены в queued, lag растёт, scheduler logs показывают что executor full. Lookup select count(*) from task_instance where state='running' and operator like '%Sensor%' — если число близко к parallelism, это диагноз.

Когда poke всё-таки оправдан:

  • Очень короткое ожидание (< 5 минут) — overhead reschedule бессмысленен.
  • Условие должно выполниться немедленно или провалиться (например, инвариант проверки в начале pipeline).

Mode 2: reschedule — освобождает slot

В mode='reschedule' execute() не делает sleep — он вызывает poke() один раз и сразу выходит с особым результатом:

# Упрощённая логика mode='reschedule'
def execute(self, context):
    if not self.poke(context):
        # poke вернул False — освобождаем slot
        self._handle_reschedule(context)
        raise AirflowRescheduleException(reschedule_date=now() + timedelta(seconds=self.poke_interval))
    return None

Что делает scheduler при AirflowRescheduleException:

  1. Создаёт запись в таблице task_reschedule с reschedule_date = now() + poke_interval.
  2. Переводит TI обратно в state up_for_reschedule (новое состояние из 14 возможных).
  3. Освобождает worker slot — другие tasks могут запускаться.
  4. Через poke_interval secunds scheduler видит, что task_reschedule.reschedule_date &lt;= now(), переводит TI снова в scheduled.
  5. Worker подбирает task, вызывает execute(), poke() снова.
Жизненный цикл sensor в mode=reschedule
t=0: scheduler → queuedScheduler видит scheduled TI, переводит в queued, отправляет в executor queue. Worker подбирает task.
t=1s: worker run execute()Worker запускает task. execute() вызывает poke() ровно один раз — HEAD-запрос к S3, COUNT(*) к Postgres и т.п. Длительность секунды.
poke == False
t=2s: AirflowRescheduleExceptionexecute() поднимает RescheduleException. Worker записывает строку в task_reschedule(reschedule_date=now()+60s), переводит TI в up_for_reschedule, освобождает slot.
60s sleep (scheduler ticks)
t=62s: scheduler пробуждает TIScheduler видит up_for_reschedule с reschedule_date <= now(), переводит в scheduled. Цикл повторяется.
...через N циклов...
poke == True → successОчередной poke вернул True. execute() возвращает результат, TI получает state success, downstream tasks могут запускаться.

Цена reschedule:

  • +1 task_instance scheduling cycle per poke — scheduler делает работу каждый раз, когда вытаскивает TI обратно в scheduled.
  • +1 запись в task_reschedule на каждый цикл — со временем таблица растёт (cleanup на retention policy обязателен).
  • Latency = poke_interval — между моментом, когда условие стало True, и тем, когда вы это заметите, может пройти до poke_interval секунд.

Когда reschedule — правильный выбор:

  • Ожидание от 10 минут до нескольких часов.
  • Нет триггера (deferrable trigger) для этого условия.
  • poke_interval >= 60s (меньше — слишком много нагрузки на scheduler).

Mode 3: deferrable — asyncio через Triggerer

Самый эффективный режим, появился в Airflow 2.2 (AIP-40). Sensor не блокирует ни worker slot, ни scheduler tick. Ожидание выполняется в отдельном процессе — Triggerer, который запускает asyncio event loop.

Активация через deferrable=True:

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

wait_s3 = S3KeySensor(
    task_id="wait_for_data"
    bucket_name="data-prod"
    bucket_key="orders/{{ ds }}.parquet"
    aws_conn_id="aws_default"
    deferrable=True,  # ← magic
    poke_interval=60,
    timeout=24 * 3600,
)

Lifecycle deferrable sensor:

Deferrable sensor lifecycle
1. execute() стартует на workerWorker подбирает task, как обычно. execute() делает первый poke() (cheap check) — вдруг условие уже выполнено и triggerer не нужен.
условие не выполнено
2. raise TaskDeferred(trigger=...)execute() поднимает TaskDeferred с инстансом Trigger — это объект с async методом run(). Worker сериализует trigger, сохраняет в DB (trigger table), переводит TI в state 'deferred', освобождает slot.
3. Triggerer process подхватывает TriggerTriggerer — отдельный процесс, может быть один или несколько HA. Постоянно polls trigger table, забирает new triggers, десериализует, запускает trigger.run() как coroutine в asyncio loop.
4. trigger.run() ждёт асинхронноВнутри trigger.run() — async код. Для S3 это `await aiobotocore.client.head_object(...)` каждые poke_interval секунд. Тысячи triggers работают параллельно в одном asyncio loop без блокировки.
условие выполнилось
5. yield TriggerEvent(payload)Trigger возвращает TriggerEvent. Triggerer записывает event в trigger table, переводит TI обратно в 'scheduled', указывает next_method='execute_complete'.
6. Worker resumes — execute_complete(context, event)Scheduler видит TI scheduled с method='execute_complete', worker подбирает её и вызывает не execute(), а execute_complete(context, event=payload). Метод обрабатывает результат, возвращает return value, sensor success.

Цена deferrable:

  • Один процесс Triggerer на весь Airflow (или несколько для HA). Память — десятки MB на тысячи triggers (asyncio коэффициент).
  • Trigger code должен быть async. Если для вашего сервиса нет async client (например, какой-нибудь legacy SOAP API), deferrable не сделать.
  • CPU triggerer’а растёт линейно с числом active triggers. Но в practice один Triggerer тянет 5-10k активных triggers на средней машине.

Decision matrix — какой mode выбрать

Wait durationУсловиеMode
< 5 min, async-ableнапример, S3 key за минутуdeferrable=True
< 5 min, no asyncнапример, legacy XML-RPCpoke (приемлемая блокировка)
5 min — 6 h, async-ableSnowflake query running, S3 за часdeferrable=True
5 min — 6 h, no asynclegacy providerreschedule
6 h — 7 d, async-ableожидание внешнего batch jobdeferrable=True обязательно
6 h — 7 d, no asyncредкоreschedule + low poke_interval (10min)

Правило простое: если есть deferrable вариант — используйте его. Если нет — reschedule (если poke_interval ≥ 60s). poke остаётся только для коротких ожиданий или legacy кода без альтернатив.


Custom sensor — пример

Sensor, который проверяет наличие записи в Postgres:

from airflow.sensors.base import BaseSensorOperator, PokeReturnValue
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.context import Context

class PostgresRowSensor(BaseSensorOperator):
    template_fields = ("sql",)

    def __init__(self, *, conn_id: str, sql: str, **kwargs):
        super().__init__(**kwargs)
        self.conn_id = conn_id
        self.sql = sql

    def poke(self, context: Context) -> PokeReturnValue:
        hook = PostgresHook(postgres_conn_id=self.conn_id)
        rows = hook.get_records(self.sql)
        count = rows[0][0] if rows else 0
        self.log.info("Found %d rows matching %s", count, self.sql)
        if count > 0:
            return PokeReturnValue(is_done=True, xcom_value=count)
        return PokeReturnValue(is_done=False)

Используется как любой sensor:

wait_data = PostgresRowSensor(
    task_id="wait_for_orders"
    conn_id="warehouse"
    sql="SELECT COUNT(*) FROM orders WHERE order_date = '{{ ds }}'"
    mode="reschedule"
    poke_interval=120,
    timeout=6 * 3600,
)

Deferrable версия

Чтобы тот же sensor сделать deferrable, нужно:

  1. Реализовать Trigger класс с async run().
  2. Переопределить execute() — вместо синхронного цикла поднять TaskDeferred.
  3. Реализовать execute_complete() — callback после события.
import asyncio
from airflow.triggers.base import BaseTrigger, TriggerEvent

class PostgresRowTrigger(BaseTrigger):
    def __init__(self, conn_id: str, sql: str, poke_interval: float):
        super().__init__()
        self.conn_id = conn_id
        self.sql = sql
        self.poke_interval = poke_interval

    def serialize(self):
        return (
            "myplugin.triggers.PostgresRowTrigger",
            {"conn_id": self.conn_id, "sql": self.sql, "poke_interval": self.poke_interval},
        )

    async def run(self):
        # Псевдокод — реальный код использовал бы asyncpg или aiopg
        while True:
            count = await self._async_count()
            if count > 0:
                yield TriggerEvent({"status": "success", "count": count})
                return
            await asyncio.sleep(self.poke_interval)

    async def _async_count(self) -> int:
        # connect via asyncpg, execute self.sql, return count
        ...


class PostgresRowSensorAsync(BaseSensorOperator):
    template_fields = ("sql",)

    def __init__(self, *, conn_id, sql, deferrable: bool = True, **kwargs):
        super().__init__(**kwargs)
        self.conn_id = conn_id
        self.sql = sql
        self.deferrable = deferrable

    def execute(self, context):
        if not self.deferrable:
            # fallback на синхронный poke
            return super().execute(context)
        self.defer(
            trigger=PostgresRowTrigger(self.conn_id, self.sql, self.poke_interval),
            method_name="execute_complete",
        )

    def execute_complete(self, context, event=None):
        if event and event.get("status") == "success":
            return event["count"]
        raise AirflowException(f"Sensor failed: {event}")

В Module 09 (Triggerer) препарируем asyncio loop triggerer’а до уровня schedule/poll patterns.


Conversion guide — poke → deferrable в production

Реальный сценарий: у вас 100 DAGs с S3KeySensor(mode='poke'), кластер захлёбывается. План миграции:

Шаг 1 — audit. Найти все sensors и их режимы:

SELECT dag_id, task_id, operator, json_extract(args, '$.mode') as mode
FROM serialized_dag, json_each(json_extract(data, '$.tasks'))
WHERE operator LIKE '%Sensor%';

Или через Web UI: фильтр по operator name.

Шаг 2 — Triggerer up. Запустить процесс triggerer. В docker-compose:

triggerer:
  image: apache/airflow:2.10.5
  command: triggerer
  depends_on: [postgres]

Для HA — 2-3 реплики, distribution через triggerer_id.

Шаг 3 — incremental migration. По одному DAG переключать. Если sensor поддерживает deferrable — просто добавить deferrable=True:

# Было
S3KeySensor(
    task_id="wait"
    bucket_name="b"
    bucket_key="k"
    mode="poke",
)

# Стало
S3KeySensor(
    task_id="wait"
    bucket_name="b"
    bucket_key="k"
    deferrable=True,
)

Или глобально через config:

AIRFLOW__OPERATORS__DEFAULT_DEFERRABLE=true

Этот флаг (2.7+) превращает все sensors с поддержкой deferrable в deferrable, не трогая DAG код.

Шаг 4 — мониторинг. Метрики triggerer’а:

  • triggerer.running_triggers — должно быть < max_capacity (1000 default).
  • triggerer.heartbeat — alive проверка.
  • task_instance.duration для deferred TI — теперь время в deferred, не блокирует worker.

Что в Airflow 3.x

В 3.x deferrable стал first-class — большинство официальных sensors имеют deferrable как default. Также:

  • Edge Executor + Triggerer интеграция для геораспределённых deploy.
  • Trigger groups — приоритеты для разных типов triggers (важные business sensors vs technical health checks).

Но всё это — на горизонте upgrade. В 2.10/2.11 LTS deferrable работает отлично и есть для всех major providers (AWS, GCP, Azure, Snowflake, Databricks, K8s).


Production gotchas

  1. poke_interval=10s на reschedule mode. Каждый poke — это отдельный TI cycle: scheduler → queued → worker → poke → reschedule. Если interval 10s и у вас 100 sensors — это 600 scheduling cycles в минуту только на sensors. Минимум poke_interval=60s для reschedule, лучше 5-10 минут на long-wait.

  2. mode='poke' забыли указать → default mode=‘poke’. Default для BaseSensorOperator — poke. Если ваш sensor длинно-ждущий, обязательно явно ставьте mode='reschedule' или deferrable=True. Default — это самый прожорливый режим.

  3. soft_fail=True молча скрывает баги. Sensor timeout → state=skipped → downstream skipped (по умолчанию). DAG в UI показывает success (skipped = не failed), но данных нет. Если sensor реально критичен — soft_fail=False, чтобы DAG явно падал.

  4. Deferrable без Triggerer запущенного. Если в airflow.cfg или k8s deployment забыли запустить triggerer service, TI попадают в state deferred и никогда не возвращаются. Симптом: tasks висят в deferred бесконечно. Диагноз: airflow triggerer list или select count(*) from trigger — растёт без обработки.

  5. Trigger non-serializable parameters. Trigger.serialize() должен возвращать только JSON-сериализуемые типы. Если передаёте сложный объект (например, готовый boto3 client) — упадёт при reload trigger’а после restart triggerer’а. Сохраняйте только primitives, инстанциируйте clients внутри run().

  6. task_reschedule retention. Каждый reschedule создаёт строку. По умолчанию они хранятся вечно. Через год у вас могут быть миллионы записей и SELECT ... FROM task_reschedule становится медленным. Включите db_cleanup job или ручную чистку: DELETE FROM task_reschedule WHERE reschedule_date &lt; now() - interval '30 days'.

  7. exponential_backoff=True + слишком маленький max_wait. Backoff удваивает интервал на каждом неудачном poke: 60, 120, 240, 480… Без max_wait достигает 60 * 2^N — через 10 циклов это 17 часов между poke. Условие выполнится, sensor проспит. Всегда max_wait если используете exponential.


Проверка знанийKnowledge check
Вы видите в production кластере с parallelism=64, что 50 worker slots занято tasks в state 'running', все они — sensors одного типа (S3KeySensor), 20 других tasks висят в 'queued'. Что произошло, как починить, и почему параметр `deferrable=True` решит проблему фундаментально?
ОтветAnswer
Произошло **slot starvation** из-за sensors в `mode='poke'`. В poke режиме execute() — это цикл с sleep, который не отпускает worker slot. 50 sensors заняли 50 из 64 slots, остальные tasks стоят в queued, dag latency растёт. **Быстрое решение** (без изменения логики): для long-wait sensors переключить на `mode='reschedule'` — sensor вызывает poke() один раз, выходит с AirflowRescheduleException, освобождает slot, scheduler пробуждает TI через poke_interval. Цена — записи в task_reschedule table и scheduler cycle на каждый poke. **Фундаментальное решение**: `deferrable=True`. Sensor поднимает TaskDeferred(trigger=...) на первом execute, TI переходит в state 'deferred', slot освобождается мгновенно. Trigger подхватывается **Triggerer** — отдельным процессом с asyncio event loop, который одной машиной обслуживает тысячи concurrent triggers (async I/O, не блокирующие). Когда условие выполнено, trigger yield TriggerEvent, scheduler возвращает TI в scheduled с next_method='execute_complete', worker вызывает callback. Стоимость: один Triggerer process, десятки MB памяти на тысячи активных triggers. Что нужно: (1) запустить `airflow triggerer` (одну реплику или несколько HA); (2) поставить `deferrable=True` или включить глобально `AIRFLOW__OPERATORS__DEFAULT_DEFERRABLE=true` (2.7+). Это полностью решает slot starvation — sensors больше не конкурируют с обычными tasks за slots.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. В prod кластере с parallelism=64 заметили: 50 slots заняты sensors типа S3KeySensor в state running, 20 других tasks висят в queued. Что произошло и как фундаментально починить?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6