Learning Platform
Глоссарий Troubleshooting
Урок 13.04 · 32 мин
Продвинутый
Listener APIpluggyhookimplEventsOpenLineage

Listener API — pluggy hooks для lifecycle events

С Airflow 2.8 (Q4 2023) появился Listener API — formal mechanism для слушания всех lifecycle events системы. До этого варианты были фрагментарные: on_failure_callback per task, sla_miss_callback per DAG, мониторинг через StatsD metrics. Listener API даёт единое decoupled место, видящее все task и DagRun events. Это правильный способ для generic alerting, audit logging, lineage emission. OpenLineage provider использует именно Listener API для emit lineage events.

Этот урок разбирает архитектуру через pluggy (тот же plugin framework что и pytest), available hooks в 2.10/2.11, full example listener в Slack/PagerDuty, и почему Listener API лучше callbacks для systemic monitoring.


Pluggy — что это и почему

Listener API построен на библиотеке pluggy (pluggy.readthedocs.io) — тот же hook framework, что использует pytest.

with-statement: __enter__ и __exit__ — context manager protocol Это даёт:

  • Decoupling: код Airflow вызывает hook on_task_instance_running(...) без знания кто его слушает.
  • Multiple subscribers: на один event могут реагировать N listeners — все будут вызваны.
  • No mandatory interface: listener реализует только те hooks, что нужны.
  • Spec/impl разделение: Airflow определяет spec (signatures), listener реализует impl.

Псевдокод pluggy:

import pluggy

# Spec — определяет hook signatures
hookspec = pluggy.HookspecMarker("airflow")
hookimpl = pluggy.HookimplMarker("airflow")

class AirflowSpec:
    @hookspec
    def on_task_instance_running(self, previous_state, task_instance, session):
        """Called when a task transitions to running."""

class AirflowPluginManager(pluggy.PluginManager):
    def __init__(self):
        super().__init__("airflow")
        self.add_hookspecs(AirflowSpec)

# В коде scheduler / TaskInstance lifecycle:
plugin_manager = get_listener_manager()
plugin_manager.hook.on_task_instance_running(
    previous_state=State.QUEUED,
    task_instance=ti,
    session=session,
)
# ← вызывает все registered listeners

Available hooks в Airflow 2.10/2.11

HookКогда вызываетсяArgs
on_starting(component)Scheduler / triggerer / worker startscomponent: scheduler/triggerer/etc.
before_stopping(component)Scheduler / triggerer / worker shutdowncomponent
on_task_instance_running(previous_state, task_instance, session)TI transitions to runningTI object, session
on_task_instance_success(previous_state, task_instance, session)TI succeedsTI + state
on_task_instance_failed(previous_state, task_instance, error, session)TI fails (после retries)TI + error
on_dag_run_running(dag_run, msg)DagRun transitions to runningDagRun
on_dag_run_success(dag_run, msg)DagRun succeedsDagRun
on_dag_run_failed(dag_run, msg)DagRun failsDagRun
on_dataset_created(dataset)Dataset registeredDataset object (2.4+)
on_dataset_changed(dataset)Dataset event emittedDataset object

Note: Dataset в 2.10 — будет Asset в 3.x (terminology migration, AIP-74). В 2.x обратная совместимость.

Все hooks вызываются в scheduler/worker context — у вас есть SQLAlchemy session, full task/dag info. Listener должен быть быстрым — он блокирует main loop scheduler.


Full example: Slack/PagerDuty listener

Real-world listener, emit task/dag events в Slack и эскалирует criticality в PagerDuty.

# my_org_provider/listeners/events.py
from __future__ import annotations
import logging
from typing import TYPE_CHECKING

from airflow.listeners import hookimpl

if TYPE_CHECKING:
    from airflow.models.taskinstance import TaskInstance
    from airflow.models.dagrun import DagRun

log = logging.getLogger(__name__)


def _send_slack(channel: str, text: str, attachments: list | None = None):
    """Helper to post to Slack via webhook."""
    import requests, os
    webhook = os.environ.get("SLACK_WEBHOOK_URL")
    if not webhook:
        log.warning("SLACK_WEBHOOK_URL not set, skipping Slack notification")
        return
    requests.post(
        webhook,
        json={"channel": channel, "text": text, "attachments": attachments or []},
        timeout=5,
    )


def _trigger_pagerduty(severity: str, summary: str, source: str, custom_details: dict):
    """Helper to send PagerDuty event API."""
    import requests, os
    routing_key = os.environ.get("PAGERDUTY_ROUTING_KEY")
    if not routing_key:
        log.warning("PAGERDUTY_ROUTING_KEY not set, skipping PD")
        return
    requests.post(
        "https://events.pagerduty.com/v2/enqueue"
        json={
            "routing_key": routing_key,
            "event_action": "trigger",
            "payload": {
                "summary": summary,
                "severity": severity,  # critical, error, warning, info
                "source": source,
                "custom_details": custom_details,
            },
        },
        timeout=5,
    )


# Список SLA-critical DAG-ов, для которых эскалируем в PagerDuty
CRITICAL_DAGS = {"orders_etl", "billing_pipeline", "fraud_detection"}


class AirflowEventListener:
    """Listener для task/dagrun events: send to Slack, escalate critical to PagerDuty."""

    @hookimpl
    def on_task_instance_running(self, previous_state, task_instance: "TaskInstance", session):
        # Lightweight log on running — useful for slow-task detection
        log.info(
            "Task started: dag=%s task=%s run=%s try=%d host=%s",
            task_instance.dag_id,
            task_instance.task_id,
            task_instance.run_id,
            task_instance.try_number,
            task_instance.hostname,
        )

    @hookimpl
    def on_task_instance_failed(self, previous_state, task_instance: "TaskInstance", error, session):
        dag_id = task_instance.dag_id
        task_id = task_instance.task_id
        run_id = task_instance.run_id
        try_number = task_instance.try_number

        # Slack notification всегда
        _send_slack(
            channel="#airflow-failures"
            text=f"Task FAILED: `{dag_id}.{task_id}` (run={run_id}, try {try_number})"
            attachments=[{
                "color": "#cc0000",
                "fields": [
                    {"title": "Error", "value": str(error)[:1000], "short": False},
                    {"title": "Logs", "value": task_instance.log_url, "short": True},
                ],
            }],
        )

        # PagerDuty только для critical DAG-ов
        if dag_id in CRITICAL_DAGS:
            _trigger_pagerduty(
                severity="critical"
                summary=f"Airflow task failed: {dag_id}.{task_id}"
                source="airflow-listener"
                custom_details={
                    "dag_id": dag_id,
                    "task_id": task_id,
                    "run_id": run_id,
                    "try_number": try_number,
                    "error": str(error)[:500],
                    "log_url": task_instance.log_url,
                },
            )

    @hookimpl
    def on_dag_run_success(self, dag_run: "DagRun", msg: str | None = None):
        # Notification для completed runs critical DAGs
        if dag_run.dag_id in CRITICAL_DAGS:
            _send_slack(
                channel="#airflow-success"
                text=f"DAG run SUCCESS: `{dag_run.dag_id}` ({dag_run.run_id})",
            )

    @hookimpl
    def on_dag_run_failed(self, dag_run: "DagRun", msg: str | None = None):
        _send_slack(
            channel="#airflow-failures"
            text=f"DAG RUN FAILED: `{dag_run.dag_id}` ({dag_run.run_id})"
            attachments=[{
                "color": "#cc0000",
                "fields": [
                    {"title": "Msg", "value": msg or "n/a", "short": False},
                ],
            }],
        )
        if dag_run.dag_id in CRITICAL_DAGS:
            _trigger_pagerduty(
                severity="critical"
                summary=f"Airflow DAG run failed: {dag_run.dag_id}"
                source="airflow-listener"
                custom_details={
                    "dag_id": dag_run.dag_id,
                    "run_id": dag_run.run_id,
                    "execution_date": str(dag_run.execution_date),
                },
            )

    @hookimpl
    def on_starting(self, component):
        log.info("Airflow component starting: %s", component)
        _send_slack(
            channel="#airflow-ops"
            text=f"Airflow `{component}` is starting up.",
        )

    @hookimpl
    def before_stopping(self, component):
        log.info("Airflow component stopping: %s", component)
        _send_slack(
            channel="#airflow-ops"
            text=f"Airflow `{component}` is shutting down.",
        )


# Module-level instance — pluggy ищет это
event_listener = AirflowEventListener()

Registration

File-based plugin

# $AIRFLOW_HOME/plugins/event_listener_plugin.py
from airflow.plugins_manager import AirflowPlugin

class EventListenerPlugin(AirflowPlugin):
    name = "event_listener_plugin"
    # ВАЖНО: listeners — список MODULE PATHS (strings), не class refs
    listeners = ["my_org_provider.listeners.events"]

Provider package

# my_org_provider/__init__.py
def get_provider_info():
    return {
        ...
        "listener-modules": ["my_org_provider.listeners.events"],
    }

При load Airflow импортирует module и регистрирует все объекты с @hookimpl декорированными методами.


Lifecycle: где вызываются listeners

Listener invocation points в task lifecycle
Scheduler tickScheduler видит scheduled TI, проходит critical section, переводит scheduled → queued. На этом этапе listeners НЕ вызываются — scheduler фокусируется на enqueueing.
executor pickup
Worker starts taskLocalTaskJob / Celery worker запускает TI. Перед actual execution: ti._run_raw_task → on_task_instance_running listener invoked. Это первый ping в listener — task начала.
user code runs
Task function executesPython code, SQL queries, K8s pod execution. Listener не дёргается во время run — он реагирует только на state transitions.
completion
Success pathTask завершилась без exception. State → success. on_task_instance_success(previous_state=running, task_instance, session) вызывается.
Failure pathTask бросила exception. После retries (если есть) — state → failed. on_task_instance_failed(previous_state=up_for_retry или running, task_instance, error, session) вызывается. ВАЖНО: вызывается на финальной попытке, не на каждой retry.
DagRun completion
DagRun terminal stateScheduler aggregates TI states. Все success → DagRun success → on_dag_run_success. Failed deps → DagRun failed → on_dag_run_failed.

Ключевые detail:

  • on_task_instance_failed вызывается на финальной попытке, после всех retries исчерпаны. НЕ на каждой intermediate failure.
  • DagRun listeners вызываются scheduler при aggregate state transition.
  • Listener invocation synchronous — блокирует scheduler/worker. Долгий listener → bottleneck. Делайте listener fast (≤100ms), вынесите heavy work в queue (Slack webhook OK, но database INSERT 100ms — рискованно).

Comparison: Listener API vs on_failure_callback

В 2.x старый способ — per-task callback:

def alert_failure(context):
    ti = context["ti"]
    send_slack(f"Task {ti.task_id} failed")

@task(on_failure_callback=alert_failure)
def my_task(): ...
Aspecton_failure_callbackListener API
ScopePer taskAll tasks system-wide
SetupAdd to every task / default_argsОдин registered listener
Visibility for DevOpsSpread across DAGsCentral
MaintenanceUpdate каждый DAG при измененииОдин module
PerformanceInline в task processInline в worker/scheduler — но central
Event typesfailure / success / retry / sla_missAll lifecycle events including DAG-level
Best forPer-task custom logic (e.g., specific data cleanup)Generic alerting, audit, lineage
Production scaleHard to maintain at 1000+ DAGsIdiomatic

Production guidance: per-task callback для task-specific действий (отправить custom payload, cleanup specific resources). Listener для generic действий (alerting, lineage, metrics) — то, что должно работать для всех tasks.


OpenLineage uses Listener API

apache-airflow-providers-openlineage — official provider, который emit-ит data lineage events. Внутри он implementation Listener API:

# Simplified внутренности providers/openlineage/plugins/listener.py
class OpenLineageListener:
    @hookimpl
    def on_task_instance_running(self, previous_state, task_instance, session):
        # Emit START event
        lineage_event = build_start_event(task_instance)
        self.adapter.emit(lineage_event)

    @hookimpl
    def on_task_instance_success(self, previous_state, task_instance, session):
        lineage_event = build_complete_event(task_instance)
        self.adapter.emit(lineage_event)

    @hookimpl
    def on_task_instance_failed(self, previous_state, task_instance, error, session):
        lineage_event = build_fail_event(task_instance, error)
        self.adapter.emit(lineage_event)

Без отдельной конфигурации DAG-ов — просто установка provider включает lineage. Это killer use case Listener API: декларативное, system-wide observability.


Production gotchas

1. Listener синхронный — блокирует scheduler/worker. Если ваш listener делает HTTP request на 2s — это 2s блок scheduler loop. Обязательно: timeout на requests (≤5s), fast-fail на errors, не делайте DB writes напрямую — async queue.

2. Exception в listener не падает Airflow. pluggy isolation: exception в одном listener logged, но не пропадает в Airflow. Это хорошо для безопасности — но может скрыть проблемы. Monitor listener exceptions через logs.

3. Listener выполняется в каждом worker. В Celery setup: 4 workers × множество TI = 4 process с listener. Webhook spam — реальная проблема. Используйте Slack rate-limiting headers, или дедупликацию (run_id-based).

4. session параметр — read-only мощно. SQLAlchemy session активен, можно делать query. Не делайте UPDATE / writes — это меняет state Airflow и может разломать scheduler.

5. task_instance.log_url нужен [webserver] base_url правильно настроенный. Если base_url default http://localhost:8080 — Slack пользователи не смогут открыть log. Поставить base_url=https://airflow.your-corp.com.

6. Listener в 2.x при load — module imported один раз. Если меняете listener code — restart всех процессов (scheduler, workers, triggerer). Прежний код продолжает работать до restart.


Проверка знанийKnowledge check
Listener API vs on_failure_callback — почему для system-wide alerting в production с 500 DAG-ами Listener — правильный выбор?
ОтветAnswer
Главные причины: (1) Centralization — один listener module, vs callback в каждом DAG (или хотя бы в default_args, который часто забывают). (2) Coverage guarantee — listener видит ВСЕ tasks автоматически, callback можно забыть добавить на новую task. (3) Maintenance — изменить alerting logic = одна правка кода + restart, vs обход 500 DAG-ов. (4) Event types — listener видит task running/success/failed + dagrun running/success/failed + dataset created/changed + scheduler/worker lifecycle (on_starting/before_stopping). callback покрывает только task-level failure/success/retry/sla_miss. (5) DevOps visibility — listener код живёт рядом с monitoring infrastructure (single repo), callback размазан по DAG code. (6) Industry standard — OpenLineage, Datadog Airflow integration, Astronomer's lineage — все используют Listener. Когда callback всё-таки лучше: per-task custom logic, например cleanup specific resources в случае failure конкретного task. Combine: Listener для generic alerting + per-task callback для DAG-specific cleanup. Это правильная dual layer.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Listener API в Airflow доступен с какой версии и на каком framework построен?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6