Sensors — три режима исполнения
Sensor — это operator, который ждёт событие: файл в S3, запись в Postgres, успешное завершение upstream DagRun. Снаружи это выглядит как «задача, которая стоит до тех пор, пока условие не выполнено». Но внутри Airflow есть три фундаментально разных способа реализовать это ожидание, и выбор между ними определяет, выдержит ли ваш Airflow тысячу одновременных sensors или ляжет на десятом.
Этот урок — самый production-критичный в модуле. Неправильный mode для sensor — частая причина того, что dev cluster работает, а prod, где появилась тысяча long-wait sensors, начинает захлёбываться. Сценарий: 100 S3KeySensor в режиме poke с 24-часовым timeout — это 100 заблокированных worker slots на сутки. Перевод на deferrable=True — те же 100 sensors работают на одном triggerer без worker slots вообще.
Базовый класс — BaseSensorOperator
Все sensors наследуются от BaseSensorOperator, который сам наследник BaseOperator. Главный отличительный метод — poke(context):
class BaseSensorOperator(BaseOperator, SkipMixin):
valid_modes: Iterable[str] = ("poke", "reschedule")
def __init__(
self,
*,
poke_interval: float = 60,
timeout: float = 60 * 60 * 24 * 7, # 7 days default
soft_fail: bool = False,
mode: str = "poke",
exponential_backoff: bool = False,
max_wait: timedelta | float | None = None,
silent_fail: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
if mode not in self.valid_modes:
raise AirflowException(f"Mode must be one of {self.valid_modes}")
self.poke_interval = poke_interval
self.timeout = timeout
self.soft_fail = soft_fail
self.mode = mode
# ...
def poke(self, context: Context) -> bool | PokeReturnValue:
raise AirflowException("Override me.")
def execute(self, context: Context) -> Any:
# см. ниже — отличается по mode
...
Контракт poke():
- Возвращает
True— условие выполнено, sensor success. - Возвращает
False— условие не выполнено, продолжаем ждать. - Возвращает
PokeReturnValue(is_done=True, xcom_value=...)— done + пушит value в XCom. - Raises
AirflowSkipException— task получает stateskipped. - Любой другой
Exception— sensor failed.
Параметры:
mode—"poke"или"reschedule". Управляет тем, как execute() ждёт между poke().poke_interval— секунд между poke (default 60s).timeout— максимальное время ожидания (default 7 дней).soft_fail=True— при timeout sensor станетskipped, а неfailed.
Третий режим — deferrable=True — отдельный механизм, не через mode. Подробнее ниже.
Mode 1: poke — блокирующий
Самый простой режим. execute() — это цикл с time.sleep:
# Упрощённая логика BaseSensorOperator.execute() в mode='poke'
def execute(self, context):
started_at = time.monotonic()
while True:
try:
result = self.poke(context)
except AirflowSkipException:
raise
if result:
return result.xcom_value if isinstance(result, PokeReturnValue) else None
if (time.monotonic() - started_at) > self.timeout:
if self.soft_fail:
raise AirflowSkipException("Sensor timeout, soft_fail=True")
raise AirflowSensorTimeout(f"Sensor timed out after {self.timeout}s")
time.sleep(self.poke_interval)
Цена: worker slot занят всё время ожидания. Если у вас parallelism=32 и 32 sensor в poke mode — все остальные tasks стоят в queued, потому что нет свободного slot.
Это самая частая ошибка новичков с sensors. На dev кажется, что всё ок — 5 sensors, 32 slots, есть запас. В prod после roll-out на 100 DAGs появляются 100 sensors с 24h timeout, и кластер встаёт. Симптомы: tasks подвешены в queued, lag растёт, scheduler logs показывают что executor full. Lookup select count(*) from task_instance where state='running' and operator like '%Sensor%' — если число близко к parallelism, это диагноз.
Когда poke всё-таки оправдан:
- Очень короткое ожидание (< 5 минут) — overhead reschedule бессмысленен.
- Условие должно выполниться немедленно или провалиться (например, инвариант проверки в начале pipeline).
Mode 2: reschedule — освобождает slot
В mode='reschedule' execute() не делает sleep — он вызывает poke() один раз и сразу выходит с особым результатом:
# Упрощённая логика mode='reschedule'
def execute(self, context):
if not self.poke(context):
# poke вернул False — освобождаем slot
self._handle_reschedule(context)
raise AirflowRescheduleException(reschedule_date=now() + timedelta(seconds=self.poke_interval))
return None
Что делает scheduler при AirflowRescheduleException:
- Создаёт запись в таблице
task_rescheduleсreschedule_date = now() + poke_interval. - Переводит TI обратно в state
up_for_reschedule(новое состояние из 14 возможных). - Освобождает worker slot — другие tasks могут запускаться.
- Через
poke_intervalsecunds scheduler видит, чтоtask_reschedule.reschedule_date <= now(), переводит TI снова вscheduled. - Worker подбирает task, вызывает execute(), poke() снова.
Цена reschedule:
- +1 task_instance scheduling cycle per poke — scheduler делает работу каждый раз, когда вытаскивает TI обратно в scheduled.
- +1 запись в task_reschedule на каждый цикл — со временем таблица растёт (cleanup на retention policy обязателен).
- Latency = poke_interval — между моментом, когда условие стало True, и тем, когда вы это заметите, может пройти до
poke_intervalсекунд.
Когда reschedule — правильный выбор:
- Ожидание от 10 минут до нескольких часов.
- Нет триггера (deferrable trigger) для этого условия.
poke_interval >= 60s(меньше — слишком много нагрузки на scheduler).
Mode 3: deferrable — asyncio через Triggerer
Самый эффективный режим, появился в Airflow 2.2 (AIP-40). Sensor не блокирует ни worker slot, ни scheduler tick. Ожидание выполняется в отдельном процессе — Triggerer, который запускает asyncio event loop.
Активация через deferrable=True:
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
wait_s3 = S3KeySensor(
task_id="wait_for_data"
bucket_name="data-prod"
bucket_key="orders/{{ ds }}.parquet"
aws_conn_id="aws_default"
deferrable=True, # ← magic
poke_interval=60,
timeout=24 * 3600,
)
Lifecycle deferrable sensor:
Цена deferrable:
- Один процесс Triggerer на весь Airflow (или несколько для HA). Память — десятки MB на тысячи triggers (asyncio коэффициент).
- Trigger code должен быть async. Если для вашего сервиса нет async client (например, какой-нибудь legacy SOAP API), deferrable не сделать.
- CPU triggerer’а растёт линейно с числом active triggers. Но в practice один Triggerer тянет 5-10k активных triggers на средней машине.
Decision matrix — какой mode выбрать
| Wait duration | Условие | Mode |
|---|---|---|
| < 5 min, async-able | например, S3 key за минуту | deferrable=True |
| < 5 min, no async | например, legacy XML-RPC | poke (приемлемая блокировка) |
| 5 min — 6 h, async-able | Snowflake query running, S3 за час | deferrable=True |
| 5 min — 6 h, no async | legacy provider | reschedule |
| 6 h — 7 d, async-able | ожидание внешнего batch job | deferrable=True обязательно |
| 6 h — 7 d, no async | редко | reschedule + low poke_interval (10min) |
Правило простое: если есть deferrable вариант — используйте его. Если нет — reschedule (если poke_interval ≥ 60s). poke остаётся только для коротких ожиданий или legacy кода без альтернатив.
Custom sensor — пример
Sensor, который проверяет наличие записи в Postgres:
from airflow.sensors.base import BaseSensorOperator, PokeReturnValue
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.context import Context
class PostgresRowSensor(BaseSensorOperator):
template_fields = ("sql",)
def __init__(self, *, conn_id: str, sql: str, **kwargs):
super().__init__(**kwargs)
self.conn_id = conn_id
self.sql = sql
def poke(self, context: Context) -> PokeReturnValue:
hook = PostgresHook(postgres_conn_id=self.conn_id)
rows = hook.get_records(self.sql)
count = rows[0][0] if rows else 0
self.log.info("Found %d rows matching %s", count, self.sql)
if count > 0:
return PokeReturnValue(is_done=True, xcom_value=count)
return PokeReturnValue(is_done=False)
Используется как любой sensor:
wait_data = PostgresRowSensor(
task_id="wait_for_orders"
conn_id="warehouse"
sql="SELECT COUNT(*) FROM orders WHERE order_date = '{{ ds }}'"
mode="reschedule"
poke_interval=120,
timeout=6 * 3600,
)
Deferrable версия
Чтобы тот же sensor сделать deferrable, нужно:
- Реализовать
Triggerкласс с asyncrun(). - Переопределить
execute()— вместо синхронного цикла поднятьTaskDeferred. - Реализовать
execute_complete()— callback после события.
import asyncio
from airflow.triggers.base import BaseTrigger, TriggerEvent
class PostgresRowTrigger(BaseTrigger):
def __init__(self, conn_id: str, sql: str, poke_interval: float):
super().__init__()
self.conn_id = conn_id
self.sql = sql
self.poke_interval = poke_interval
def serialize(self):
return (
"myplugin.triggers.PostgresRowTrigger",
{"conn_id": self.conn_id, "sql": self.sql, "poke_interval": self.poke_interval},
)
async def run(self):
# Псевдокод — реальный код использовал бы asyncpg или aiopg
while True:
count = await self._async_count()
if count > 0:
yield TriggerEvent({"status": "success", "count": count})
return
await asyncio.sleep(self.poke_interval)
async def _async_count(self) -> int:
# connect via asyncpg, execute self.sql, return count
...
class PostgresRowSensorAsync(BaseSensorOperator):
template_fields = ("sql",)
def __init__(self, *, conn_id, sql, deferrable: bool = True, **kwargs):
super().__init__(**kwargs)
self.conn_id = conn_id
self.sql = sql
self.deferrable = deferrable
def execute(self, context):
if not self.deferrable:
# fallback на синхронный poke
return super().execute(context)
self.defer(
trigger=PostgresRowTrigger(self.conn_id, self.sql, self.poke_interval),
method_name="execute_complete",
)
def execute_complete(self, context, event=None):
if event and event.get("status") == "success":
return event["count"]
raise AirflowException(f"Sensor failed: {event}")
В Module 09 (Triggerer) препарируем asyncio loop triggerer’а до уровня schedule/poll patterns.
Conversion guide — poke → deferrable в production
Реальный сценарий: у вас 100 DAGs с S3KeySensor(mode='poke'), кластер захлёбывается. План миграции:
Шаг 1 — audit. Найти все sensors и их режимы:
SELECT dag_id, task_id, operator, json_extract(args, '$.mode') as mode
FROM serialized_dag, json_each(json_extract(data, '$.tasks'))
WHERE operator LIKE '%Sensor%';
Или через Web UI: фильтр по operator name.
Шаг 2 — Triggerer up. Запустить процесс triggerer. В docker-compose:
triggerer:
image: apache/airflow:2.10.5
command: triggerer
depends_on: [postgres]
Для HA — 2-3 реплики, distribution через triggerer_id.
Шаг 3 — incremental migration. По одному DAG переключать. Если sensor поддерживает deferrable — просто добавить deferrable=True:
# Было
S3KeySensor(
task_id="wait"
bucket_name="b"
bucket_key="k"
mode="poke",
)
# Стало
S3KeySensor(
task_id="wait"
bucket_name="b"
bucket_key="k"
deferrable=True,
)
Или глобально через config:
AIRFLOW__OPERATORS__DEFAULT_DEFERRABLE=true
Этот флаг (2.7+) превращает все sensors с поддержкой deferrable в deferrable, не трогая DAG код.
Шаг 4 — мониторинг. Метрики triggerer’а:
triggerer.running_triggers— должно быть < max_capacity (1000 default).triggerer.heartbeat— alive проверка.task_instance.durationдля deferred TI — теперь время вdeferred, не блокирует worker.
Что в Airflow 3.x
В 3.x deferrable стал first-class — большинство официальных sensors имеют deferrable как default. Также:
- Edge Executor + Triggerer интеграция для геораспределённых deploy.
- Trigger groups — приоритеты для разных типов triggers (важные business sensors vs technical health checks).
Но всё это — на горизонте upgrade. В 2.10/2.11 LTS deferrable работает отлично и есть для всех major providers (AWS, GCP, Azure, Snowflake, Databricks, K8s).
Production gotchas
-
poke_interval=10sна reschedule mode. Каждый poke — это отдельный TI cycle: scheduler → queued → worker → poke → reschedule. Если interval 10s и у вас 100 sensors — это 600 scheduling cycles в минуту только на sensors. Минимумpoke_interval=60sдля reschedule, лучше 5-10 минут на long-wait. -
mode='poke'забыли указать → default mode=‘poke’. Default для BaseSensorOperator —poke. Если ваш sensor длинно-ждущий, обязательно явно ставьтеmode='reschedule'илиdeferrable=True. Default — это самый прожорливый режим. -
soft_fail=Trueмолча скрывает баги. Sensor timeout → state=skipped → downstream skipped (по умолчанию). DAG в UI показывает success (skipped = не failed), но данных нет. Если sensor реально критичен —soft_fail=False, чтобы DAG явно падал. -
Deferrable без Triggerer запущенного. Если в
airflow.cfgили k8s deployment забыли запуститьtriggererservice, TI попадают в statedeferredи никогда не возвращаются. Симптом: tasks висят вdeferredбесконечно. Диагноз:airflow triggerer listилиselect count(*) from trigger— растёт без обработки. -
Trigger non-serializable parameters.
Trigger.serialize()должен возвращать только JSON-сериализуемые типы. Если передаёте сложный объект (например, готовый boto3 client) — упадёт при reload trigger’а после restart triggerer’а. Сохраняйте только primitives, инстанциируйте clients внутриrun(). -
task_reschedule retention. Каждый reschedule создаёт строку. По умолчанию они хранятся вечно. Через год у вас могут быть миллионы записей и
SELECT ... FROM task_rescheduleстановится медленным. Включитеdb_cleanupjob или ручную чистку:DELETE FROM task_reschedule WHERE reschedule_date < now() - interval '30 days'. -
exponential_backoff=True+ слишком маленькийmax_wait. Backoff удваивает интервал на каждом неудачном poke: 60, 120, 240, 480… Безmax_waitдостигает 60 * 2^N — через 10 циклов это 17 часов между poke. Условие выполнится, sensor проспит. Всегдаmax_waitесли используете exponential.