Learning Platform
Глоссарий Troubleshooting
Урок 10.04 · 30 мин
Продвинутый
DeferrableSensor Conversionself.deferexecute_completeaiohttpAsync I/O

Conversion sync sensor → deferrable operator

Большинство operators in Airflow ecosystem написаны в classic sync style — execute() блокирует worker, делает работу, returns. Для long-running waits (sensors, polling APIs, monitoring tasks) это безответственно расходует ресурсы. AIP-40 даёт механизм конвертации существующих sync operators в deferrable — pattern, который применим к практически любому sensor или watch-task.

Этот урок — practical guide по конвертации. Возьмём S3KeySensor из apache-airflow-providers-amazon и проследим, как из его sync версии получается async equivalent. Применяемые техники переносятся на любой sensor: GCS, Azure Blob, HTTP, SQL row presence, Kafka offset, etc.


Anatomy sync sensor — что мы конвертируем

Стандартный sync S3KeySensor примерно такой:

from airflow.sensors.base import BaseSensorOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook


class S3KeySensor(BaseSensorOperator):
    """Sync version: блокирует worker slot до появления key в S3."""

    template_fields = ("bucket_key", "bucket_name")

    def __init__(
        self,
        *,
        bucket_key: str,
        bucket_name: str | None = None,
        aws_conn_id: str = "aws_default",
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.bucket_key = bucket_key
        self.bucket_name = bucket_name
        self.aws_conn_id = aws_conn_id

    def poke(self, context) -> bool:
        """Called каждые poke_interval секунд. Returns True если ready."""
        hook = S3Hook(aws_conn_id=self.aws_conn_id)
        return hook.check_for_key(self.bucket_key, self.bucket_name)

Это работает, но если timeout=86400 (24 часа) — worker slot занят день. Конвертируем.


Strategy: разделение execute от poll

Async pattern для sensor:

  1. Operator.execute() — entry point. Делает первоначальный check (cheap quick win если key уже exists). Если не готово — self.defer().
  2. Trigger.run() — async loop, который держит ожидание на triggerer-е.
  3. Operator.execute_complete() — после yield TriggerEvent. Финальный validation, returns.

Полная архитектура:

Sync vs Async sensor architecture
Sync S3KeySensor (poke mode)Worker slot занят весь timeout. poke() вызывается worker-ом каждые poke_interval. Blocking I/O — sync boto3 API call. Если 100 sensors с 1h timeout — 100 worker slots впустую.
convert
Async S3KeySensorAsyncWorker запускает execute(), быстрый initial check. Если не ready — self.defer(trigger=S3KeyTrigger(...)). Worker slot освобождается мгновенно.
S3KeyTrigger в triggererTrigger в asyncio loop. await asyncio.sleep(poke_interval), затем async check через aiobotocore. Тысячи таких triggers крутятся в одном triggerer process. CPU usage ~1% на 1000 polls/min.
execute_complete on eventКогда trigger yields TriggerEvent({'status': 'success'}) — TI=scheduled, worker picks up, вызывает execute_complete(context, event). Финальный validate, returns. Logs из обеих фаз linked в один task log.

asyncio — event loop overview

Step 1: Custom trigger для S3

Создаём S3KeyTrigger — он держит async polling:

from datetime import datetime, timezone
import asyncio
from typing import Any, AsyncIterator

from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.providers.amazon.aws.hooks.s3 import S3Hook


class S3KeyTrigger(BaseTrigger):
    """Async polling для S3 key existence."""

    def __init__(
        self,
        bucket_key: str,
        bucket_name: str,
        aws_conn_id: str,
        poke_interval: float = 60.0,
        timeout: float = 86400.0,
    ):
        super().__init__()
        self.bucket_key = bucket_key
        self.bucket_name = bucket_name
        self.aws_conn_id = aws_conn_id
        self.poke_interval = poke_interval
        # Absolute deadline для resumability
        self.deadline_ts = (
            datetime.now(tz=timezone.utc).timestamp() + timeout
        )

    def serialize(self) -> tuple[str, dict[str, Any]]:
        return (
            "my_package.triggers.S3KeyTrigger",
            {
                "bucket_key": self.bucket_key,
                "bucket_name": self.bucket_name,
                "aws_conn_id": self.aws_conn_id,
                "poke_interval": self.poke_interval,
                "timeout": (
                    self.deadline_ts -
                    datetime.now(tz=timezone.utc).timestamp()
                ),
            },
        )

    async def run(self) -> AsyncIterator[TriggerEvent]:
        # Получаем async hook через factory
        hook = S3Hook(aws_conn_id=self.aws_conn_id)
        async_session = await hook.get_async_session()
        async with async_session.client("s3") as client:
            while True:
                # Check timeout
                if datetime.now(tz=timezone.utc).timestamp() > self.deadline_ts:
                    yield TriggerEvent({
                        "status": "timeout",
                        "message": f"Key {self.bucket_key} not found within timeout",
                    })
                    return

                # Async check
                try:
                    await client.head_object(
                        Bucket=self.bucket_name,
                        Key=self.bucket_key,
                    )
                    # Key exists
                    yield TriggerEvent({
                        "status": "success",
                        "bucket_key": self.bucket_key,
                    })
                    return
                except client.exceptions.NoSuchKey:
                    pass
                except Exception as e:
                    yield TriggerEvent({"status": "error", "message": str(e)})
                    return

                # Sleep until next poll
                await asyncio.sleep(self.poke_interval)

Препарируем ключевые части:

Resumability через absolute deadline

self.deadline_ts = datetime.now(tz=timezone.utc).timestamp() + timeout

В __init__ фиксируем absolute moment, не relative countdown. При triggerer restart re-adopt получит remaining timeout через serialize().

Serialize remaining timeout

"timeout": self.deadline_ts - datetime.now(tz=timezone.utc).timestamp()

Сериализуем remaining timeout, не original. При adopt новый instance создастся с правильным remaining. Если 80% original timeout уже истекло — recovered triggerer не начнёт с full timeout заново.

Async S3 client

hook = S3Hook(aws_conn_id=self.aws_conn_id)
async_session = await hook.get_async_session()
async with async_session.client("s3") as client:

S3Hook.get_async_session() (в providers-amazon 8.x+) возвращает aiobotocore session. Никогда не используем sync boto3 — это блокирует event loop. Все providers с deferrable support имеют async hooks: RedshiftDataHook.async_conn, EmrHook.async_conn, etc.

Error handling в async generator

try:
    await client.head_object(...)
    yield TriggerEvent({"status": "success", ...})
    return
except client.exceptions.NoSuchKey:
    pass  # Key еще не появился, continue polling
except Exception as e:
    yield TriggerEvent({"status": "error", "message": str(e)})
    return
  • NoSuchKey — expected condition, continue loop.
  • Unexpected exceptions — yield error event, terminate. Не raise — это убивает trigger без notification operator-у.
  • После yield — return — иначе trigger продолжит loop (но subsequent yields игнорируются в 2.x).

Step 2: Operator с self.defer()

Теперь operator:

from airflow.models import BaseOperator
from airflow.utils.context import Context
from airflow.exceptions import AirflowException


class S3KeySensorAsync(BaseOperator):
    """Deferrable S3KeySensor."""

    template_fields = ("bucket_key", "bucket_name")

    def __init__(
        self,
        *,
        bucket_key: str,
        bucket_name: str,
        aws_conn_id: str = "aws_default",
        poke_interval: float = 60.0,
        timeout: float = 86400.0,
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.bucket_key = bucket_key
        self.bucket_name = bucket_name
        self.aws_conn_id = aws_conn_id
        self.poke_interval = poke_interval
        self.timeout = timeout

    def execute(self, context: Context):
        """Initial check + defer."""

        # Quick initial check (cheap synchronous attempt)
        hook = S3Hook(aws_conn_id=self.aws_conn_id)
        if hook.check_for_key(self.bucket_key, self.bucket_name):
            self.log.info(f"Key {self.bucket_key} already exists")
            return {"bucket_key": self.bucket_key, "found_immediately": True}

        # Defer для long wait
        self.log.info(f"Deferring to wait for {self.bucket_key}")
        self.defer(
            trigger=S3KeyTrigger(
                bucket_key=self.bucket_key,
                bucket_name=self.bucket_name,
                aws_conn_id=self.aws_conn_id,
                poke_interval=self.poke_interval,
                timeout=self.timeout,
            ),
            method_name="execute_complete",
        )

    def execute_complete(self, context: Context, event: dict | None = None):
        """Called after trigger yields TriggerEvent."""
        if event is None:
            raise AirflowException("No event from trigger")

        status = event.get("status")
        if status == "success":
            self.log.info(f"Key found: {event['bucket_key']}")
            return {
                "bucket_key": event["bucket_key"],
                "found_immediately": False,
            }
        elif status == "timeout":
            raise AirflowException(f"Timeout: {event['message']}")
        elif status == "error":
            raise AirflowException(f"Error: {event['message']}")
        else:
            raise AirflowException(f"Unknown status: {status}")

Препарируем:

Initial check в execute()

if hook.check_for_key(...):
    return {"found_immediately": True}

Optimization: если key уже есть — никакого defer. Sync check, return immediately. Это экономит triggerer adoption overhead для common case (data уже там).

self.defer() API

self.defer(
    trigger=S3KeyTrigger(...),
    method_name="execute_complete"
    timeout=None,  # Optional: hard timeout на defer
)
  • trigger — instance вашего BaseTrigger subclass
  • method_name — имя метода operator-а, который будет called после event
  • timeout — optional hard limit на defer duration (отдельно от trigger logic)

self.defer() raises TaskDeferred exception — это flow control, не error. Worker ловит специально, делает serialization + state update.

execute_complete signature

def execute_complete(self, context: Context, event: dict | None = None):
  • context — обычный Airflow context (ds, ti, etc.)
  • event — payload от yield TriggerEvent(payload) в trigger.run()

Метод должен:

  • Validate event status
  • Either return value (success) или raise AirflowException (failure)
  • Return value становится XCom (как execute() в обычном operator-е)

Step 3: Использование

from datetime import datetime
from airflow.decorators import dag

@dag(
    schedule="@daily"
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def my_pipeline():
    wait_for_data = S3KeySensorAsync(
        task_id="wait_for_data"
        bucket_key="data/{{ ds }}/extract.parquet"
        bucket_name="my-warehouse"
        aws_conn_id="aws_default"
        poke_interval=60,
        timeout=14400,  # 4 hours max
    )

    @task
    def process_data(s3_info: dict):
        bucket_key = s3_info["bucket_key"]
        # process the file
        pass

    process_data(wait_for_data.output)

my_pipeline()

Поведение: execute() → quick S3 check → если есть, return immediately; иначе defer (worker slot free, triggerer polls async); когда key появится → execute_complete → XCom; process_data получает XCom через taskflow.

Critical advantage: 100 одновременных waits = 0 worker slots блокировано.


Conversion checklist

Применимо к любому sync sensor:

  1. Identify async-friendly library (aiohttp, aiobotocore, asyncpg) для I/O.
  2. Создать XxxTrigger(BaseTrigger) с __init__, serialize(), async run().
  3. Resumability: absolute deadline в __init__, serialize remaining time.
  4. Error handling: yield TriggerEvent с status field, return после yield.
  5. Создать XxxSensorAsync(BaseOperator) с execute() + execute_complete().
  6. Initial sync check в execute() — optimization для common case.
  7. self.defer(trigger=..., method_name="execute_complete").
  8. execute_complete: validate event.status, raise или return XCom value.
  9. Тесты: async unit-tests для trigger, integration test для operator.

Built-in deferrable equivalents

Многие popular sensors уже имеют built-in async версии — не нужно писать с нуля:

SyncAsync equivalentProvider
S3KeySensorS3KeySensor(deferrable=True)apache-airflow-providers-amazon
DateTimeSensorDateTimeSensorAsync или deferrable=Truestandard
ExternalTaskSensorExternalTaskSensor(deferrable=True)apache-airflow
HttpSensorHttpSensor(deferrable=True)providers-http
BigQueryInsertJobOperatorBigQueryInsertJobOperator(deferrable=True)google
KubernetesPodOperatorKubernetesPodOperator(deferrable=True)cncf-kubernetes

В 2.10+ почти все sensors из popular providers имеют deferrable=True parameter — просто добавить его, и operator автоматически работает через triggerer.

# Просто добавить deferrable=True
ExternalTaskSensor(
    task_id="wait_upstream"
    external_dag_id="upstream_dag"
    external_task_id="final_task"
    deferrable=True,  # ← теперь не блокирует worker slot
)

Кастомные triggers пишем только когда нет built-in (специфичные APIs, in-house systems).


Production gotchas

  1. deferrable=True без deployed triggerer = zombie tasks. TI зависают в state=deferred forever. Sanity check: kubectl get pods -l component=triggerer.
  2. AIRFLOW__OPERATORS__DEFAULT_DEFERRABLE=true глобальный flag — explicit deferrable=True per operator безопаснее.
  3. Async hook compatibility. Не все provider hooks имеют async equivalents — проверяйте get_async_session() / async_conn методы.
  4. Logs split. self.log в execute() → worker log; в trigger.run() → triggerer log. Debugging — оба files.
  5. File descriptor limits. 5000 triggers с aiohttp persistent connections могут exhaust ulimit. Production: ulimit -n 65536.
  6. Idempotency execute_complete. При rare race event может быть emitted twice (2.7.x bug). Делайте идемпотентным.
  7. Templated fields работают. bucket_key="data/{{ ds }}/file.parquet" рендерится перед execute(), resolved strings сериализуются.

Проверка знанийKnowledge check
Команда конвертирует custom DatabasePollingSensor (waits for row condition) с sync psycopg2 в deferrable. Какие 3 ключевых изменения нужны?
ОтветAnswer
(1) Hook replacement: psycopg2 — sync. Нужно использовать asyncpg (или aiopg) в trigger.run(). Заменить SyncPostgresHook на async-compatible: PostgresHook.get_async_conn() (если provider поддерживает) или прямой asyncpg connection через connection params из Airflow Connection. (2) Trigger class: создать DatabasePollingTrigger(BaseTrigger) с serialize/run. В run(): async with asyncpg.connect(...) as conn: while not satisfied: rows = await conn.fetch(query); если condition met — yield TriggerEvent({'rows': rows}); else: await asyncio.sleep(poll_interval). (3) Operator restructure: execute() делает initial quick check (sync) — если уже ready, return immediately. Иначе self.defer(trigger=DatabasePollingTrigger(...), method_name='execute_complete'). execute_complete(context, event): validate event status, return rows как XCom. Bonus: resumability — absolute deadline в __init__, serialize remaining timeout. И проверить что asyncpg installed в triggerer Docker image (requirements.txt).

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. self.defer(trigger=..., method_name=...) — что точно происходит?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6