Conversion sync sensor → deferrable operator
Большинство operators in Airflow ecosystem написаны в classic sync style — execute() блокирует worker, делает работу, returns. Для long-running waits (sensors, polling APIs, monitoring tasks) это безответственно расходует ресурсы. AIP-40 даёт механизм конвертации существующих sync operators в deferrable — pattern, который применим к практически любому sensor или watch-task.
Этот урок — practical guide по конвертации. Возьмём S3KeySensor из apache-airflow-providers-amazon и проследим, как из его sync версии получается async equivalent. Применяемые техники переносятся на любой sensor: GCS, Azure Blob, HTTP, SQL row presence, Kafka offset, etc.
Anatomy sync sensor — что мы конвертируем
Стандартный sync S3KeySensor примерно такой:
from airflow.sensors.base import BaseSensorOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
class S3KeySensor(BaseSensorOperator):
"""Sync version: блокирует worker slot до появления key в S3."""
template_fields = ("bucket_key", "bucket_name")
def __init__(
self,
*,
bucket_key: str,
bucket_name: str | None = None,
aws_conn_id: str = "aws_default",
**kwargs,
):
super().__init__(**kwargs)
self.bucket_key = bucket_key
self.bucket_name = bucket_name
self.aws_conn_id = aws_conn_id
def poke(self, context) -> bool:
"""Called каждые poke_interval секунд. Returns True если ready."""
hook = S3Hook(aws_conn_id=self.aws_conn_id)
return hook.check_for_key(self.bucket_key, self.bucket_name)
Это работает, но если timeout=86400 (24 часа) — worker slot занят день. Конвертируем.
Strategy: разделение execute от poll
Async pattern для sensor:
- Operator.execute() — entry point. Делает первоначальный check (cheap quick win если key уже exists). Если не готово —
self.defer(). - Trigger.run() — async loop, который держит ожидание на triggerer-е.
- Operator.execute_complete() — после yield TriggerEvent. Финальный validation, returns.
Полная архитектура:
asyncio — event loop overview
Step 1: Custom trigger для S3
Создаём S3KeyTrigger — он держит async polling:
from datetime import datetime, timezone
import asyncio
from typing import Any, AsyncIterator
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
class S3KeyTrigger(BaseTrigger):
"""Async polling для S3 key existence."""
def __init__(
self,
bucket_key: str,
bucket_name: str,
aws_conn_id: str,
poke_interval: float = 60.0,
timeout: float = 86400.0,
):
super().__init__()
self.bucket_key = bucket_key
self.bucket_name = bucket_name
self.aws_conn_id = aws_conn_id
self.poke_interval = poke_interval
# Absolute deadline для resumability
self.deadline_ts = (
datetime.now(tz=timezone.utc).timestamp() + timeout
)
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
"my_package.triggers.S3KeyTrigger",
{
"bucket_key": self.bucket_key,
"bucket_name": self.bucket_name,
"aws_conn_id": self.aws_conn_id,
"poke_interval": self.poke_interval,
"timeout": (
self.deadline_ts -
datetime.now(tz=timezone.utc).timestamp()
),
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
# Получаем async hook через factory
hook = S3Hook(aws_conn_id=self.aws_conn_id)
async_session = await hook.get_async_session()
async with async_session.client("s3") as client:
while True:
# Check timeout
if datetime.now(tz=timezone.utc).timestamp() > self.deadline_ts:
yield TriggerEvent({
"status": "timeout",
"message": f"Key {self.bucket_key} not found within timeout",
})
return
# Async check
try:
await client.head_object(
Bucket=self.bucket_name,
Key=self.bucket_key,
)
# Key exists
yield TriggerEvent({
"status": "success",
"bucket_key": self.bucket_key,
})
return
except client.exceptions.NoSuchKey:
pass
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
return
# Sleep until next poll
await asyncio.sleep(self.poke_interval)
Препарируем ключевые части:
Resumability через absolute deadline
self.deadline_ts = datetime.now(tz=timezone.utc).timestamp() + timeout
В __init__ фиксируем absolute moment, не relative countdown. При triggerer restart re-adopt получит remaining timeout через serialize().
Serialize remaining timeout
"timeout": self.deadline_ts - datetime.now(tz=timezone.utc).timestamp()
Сериализуем remaining timeout, не original. При adopt новый instance создастся с правильным remaining. Если 80% original timeout уже истекло — recovered triggerer не начнёт с full timeout заново.
Async S3 client
hook = S3Hook(aws_conn_id=self.aws_conn_id)
async_session = await hook.get_async_session()
async with async_session.client("s3") as client:
S3Hook.get_async_session() (в providers-amazon 8.x+) возвращает aiobotocore session. Никогда не используем sync boto3 — это блокирует event loop. Все providers с deferrable support имеют async hooks: RedshiftDataHook.async_conn, EmrHook.async_conn, etc.
Error handling в async generator
try:
await client.head_object(...)
yield TriggerEvent({"status": "success", ...})
return
except client.exceptions.NoSuchKey:
pass # Key еще не появился, continue polling
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
return
- NoSuchKey — expected condition, continue loop.
- Unexpected exceptions — yield error event, terminate. Не raise — это убивает trigger без notification operator-у.
- После yield —
return— иначе trigger продолжит loop (но subsequent yields игнорируются в 2.x).
Step 2: Operator с self.defer()
Теперь operator:
from airflow.models import BaseOperator
from airflow.utils.context import Context
from airflow.exceptions import AirflowException
class S3KeySensorAsync(BaseOperator):
"""Deferrable S3KeySensor."""
template_fields = ("bucket_key", "bucket_name")
def __init__(
self,
*,
bucket_key: str,
bucket_name: str,
aws_conn_id: str = "aws_default",
poke_interval: float = 60.0,
timeout: float = 86400.0,
**kwargs,
):
super().__init__(**kwargs)
self.bucket_key = bucket_key
self.bucket_name = bucket_name
self.aws_conn_id = aws_conn_id
self.poke_interval = poke_interval
self.timeout = timeout
def execute(self, context: Context):
"""Initial check + defer."""
# Quick initial check (cheap synchronous attempt)
hook = S3Hook(aws_conn_id=self.aws_conn_id)
if hook.check_for_key(self.bucket_key, self.bucket_name):
self.log.info(f"Key {self.bucket_key} already exists")
return {"bucket_key": self.bucket_key, "found_immediately": True}
# Defer для long wait
self.log.info(f"Deferring to wait for {self.bucket_key}")
self.defer(
trigger=S3KeyTrigger(
bucket_key=self.bucket_key,
bucket_name=self.bucket_name,
aws_conn_id=self.aws_conn_id,
poke_interval=self.poke_interval,
timeout=self.timeout,
),
method_name="execute_complete",
)
def execute_complete(self, context: Context, event: dict | None = None):
"""Called after trigger yields TriggerEvent."""
if event is None:
raise AirflowException("No event from trigger")
status = event.get("status")
if status == "success":
self.log.info(f"Key found: {event['bucket_key']}")
return {
"bucket_key": event["bucket_key"],
"found_immediately": False,
}
elif status == "timeout":
raise AirflowException(f"Timeout: {event['message']}")
elif status == "error":
raise AirflowException(f"Error: {event['message']}")
else:
raise AirflowException(f"Unknown status: {status}")
Препарируем:
Initial check в execute()
if hook.check_for_key(...):
return {"found_immediately": True}
Optimization: если key уже есть — никакого defer. Sync check, return immediately. Это экономит triggerer adoption overhead для common case (data уже там).
self.defer() API
self.defer(
trigger=S3KeyTrigger(...),
method_name="execute_complete"
timeout=None, # Optional: hard timeout на defer
)
- trigger — instance вашего BaseTrigger subclass
- method_name — имя метода operator-а, который будет called после event
- timeout — optional hard limit на defer duration (отдельно от trigger logic)
self.defer() raises TaskDeferred exception — это flow control, не error. Worker ловит специально, делает serialization + state update.
execute_complete signature
def execute_complete(self, context: Context, event: dict | None = None):
- context — обычный Airflow context (ds, ti, etc.)
- event — payload от
yield TriggerEvent(payload)в trigger.run()
Метод должен:
- Validate event status
- Either return value (success) или raise AirflowException (failure)
- Return value становится XCom (как execute() в обычном operator-е)
Step 3: Использование
from datetime import datetime
from airflow.decorators import dag
@dag(
schedule="@daily"
start_date=datetime(2026, 1, 1),
catchup=False,
)
def my_pipeline():
wait_for_data = S3KeySensorAsync(
task_id="wait_for_data"
bucket_key="data/{{ ds }}/extract.parquet"
bucket_name="my-warehouse"
aws_conn_id="aws_default"
poke_interval=60,
timeout=14400, # 4 hours max
)
@task
def process_data(s3_info: dict):
bucket_key = s3_info["bucket_key"]
# process the file
pass
process_data(wait_for_data.output)
my_pipeline()
Поведение: execute() → quick S3 check → если есть, return immediately; иначе defer (worker slot free, triggerer polls async); когда key появится → execute_complete → XCom; process_data получает XCom через taskflow.
Critical advantage: 100 одновременных waits = 0 worker slots блокировано.
Conversion checklist
Применимо к любому sync sensor:
- Identify async-friendly library (aiohttp, aiobotocore, asyncpg) для I/O.
- Создать
XxxTrigger(BaseTrigger)с__init__,serialize(), asyncrun(). - Resumability: absolute deadline в
__init__, serialize remaining time. - Error handling: yield TriggerEvent с status field, return после yield.
- Создать
XxxSensorAsync(BaseOperator)с execute() + execute_complete(). - Initial sync check в execute() — optimization для common case.
self.defer(trigger=..., method_name="execute_complete").- execute_complete: validate event.status, raise или return XCom value.
- Тесты: async unit-tests для trigger, integration test для operator.
Built-in deferrable equivalents
Многие popular sensors уже имеют built-in async версии — не нужно писать с нуля:
| Sync | Async equivalent | Provider |
|---|---|---|
S3KeySensor | S3KeySensor(deferrable=True) | apache-airflow-providers-amazon |
DateTimeSensor | DateTimeSensorAsync или deferrable=True | standard |
ExternalTaskSensor | ExternalTaskSensor(deferrable=True) | apache-airflow |
HttpSensor | HttpSensor(deferrable=True) | providers-http |
BigQueryInsertJobOperator | BigQueryInsertJobOperator(deferrable=True) | |
KubernetesPodOperator | KubernetesPodOperator(deferrable=True) | cncf-kubernetes |
В 2.10+ почти все sensors из popular providers имеют deferrable=True parameter — просто добавить его, и operator автоматически работает через triggerer.
# Просто добавить deferrable=True
ExternalTaskSensor(
task_id="wait_upstream"
external_dag_id="upstream_dag"
external_task_id="final_task"
deferrable=True, # ← теперь не блокирует worker slot
)
Кастомные triggers пишем только когда нет built-in (специфичные APIs, in-house systems).
Production gotchas
deferrable=Trueбез deployed triggerer = zombie tasks. TI зависают в state=deferred forever. Sanity check:kubectl get pods -l component=triggerer.AIRFLOW__OPERATORS__DEFAULT_DEFERRABLE=trueглобальный flag — explicitdeferrable=Trueper operator безопаснее.- Async hook compatibility. Не все provider hooks имеют async equivalents — проверяйте
get_async_session()/async_connметоды. - Logs split.
self.logв execute() → worker log; в trigger.run() → triggerer log. Debugging — оба files. - File descriptor limits. 5000 triggers с aiohttp persistent connections могут exhaust ulimit. Production:
ulimit -n 65536. - Idempotency execute_complete. При rare race event может быть emitted twice (2.7.x bug). Делайте идемпотентным.
- Templated fields работают.
bucket_key="data/{{ ds }}/file.parquet"рендерится перед execute(), resolved strings сериализуются.