Listener API — pluggy hooks для lifecycle events
С Airflow 2.8 (Q4 2023) появился Listener API — formal mechanism для слушания всех lifecycle events системы. До этого варианты были фрагментарные: on_failure_callback per task, sla_miss_callback per DAG, мониторинг через StatsD metrics. Listener API даёт единое decoupled место, видящее все task и DagRun events. Это правильный способ для generic alerting, audit logging, lineage emission. OpenLineage provider использует именно Listener API для emit lineage events.
Этот урок разбирает архитектуру через pluggy (тот же plugin framework что и pytest), available hooks в 2.10/2.11, full example listener в Slack/PagerDuty, и почему Listener API лучше callbacks для systemic monitoring.
Pluggy — что это и почему
Listener API построен на библиотеке pluggy (pluggy.readthedocs.io) — тот же hook framework, что использует pytest.
with-statement: __enter__ и __exit__ — context manager protocol Это даёт:
- Decoupling: код Airflow вызывает hook
on_task_instance_running(...)без знания кто его слушает. - Multiple subscribers: на один event могут реагировать N listeners — все будут вызваны.
- No mandatory interface: listener реализует только те hooks, что нужны.
- Spec/impl разделение: Airflow определяет spec (signatures), listener реализует impl.
Псевдокод pluggy:
import pluggy
# Spec — определяет hook signatures
hookspec = pluggy.HookspecMarker("airflow")
hookimpl = pluggy.HookimplMarker("airflow")
class AirflowSpec:
@hookspec
def on_task_instance_running(self, previous_state, task_instance, session):
"""Called when a task transitions to running."""
class AirflowPluginManager(pluggy.PluginManager):
def __init__(self):
super().__init__("airflow")
self.add_hookspecs(AirflowSpec)
# В коде scheduler / TaskInstance lifecycle:
plugin_manager = get_listener_manager()
plugin_manager.hook.on_task_instance_running(
previous_state=State.QUEUED,
task_instance=ti,
session=session,
)
# ← вызывает все registered listeners
Available hooks в Airflow 2.10/2.11
| Hook | Когда вызывается | Args |
|---|---|---|
on_starting(component) | Scheduler / triggerer / worker starts | component: scheduler/triggerer/etc. |
before_stopping(component) | Scheduler / triggerer / worker shutdown | component |
on_task_instance_running(previous_state, task_instance, session) | TI transitions to running | TI object, session |
on_task_instance_success(previous_state, task_instance, session) | TI succeeds | TI + state |
on_task_instance_failed(previous_state, task_instance, error, session) | TI fails (после retries) | TI + error |
on_dag_run_running(dag_run, msg) | DagRun transitions to running | DagRun |
on_dag_run_success(dag_run, msg) | DagRun succeeds | DagRun |
on_dag_run_failed(dag_run, msg) | DagRun fails | DagRun |
on_dataset_created(dataset) | Dataset registered | Dataset object (2.4+) |
on_dataset_changed(dataset) | Dataset event emitted | Dataset object |
Note: Dataset в 2.10 — будет Asset в 3.x (terminology migration, AIP-74). В 2.x обратная совместимость.
Все hooks вызываются в scheduler/worker context — у вас есть SQLAlchemy session, full task/dag info. Listener должен быть быстрым — он блокирует main loop scheduler.
Full example: Slack/PagerDuty listener
Real-world listener, emit task/dag events в Slack и эскалирует criticality в PagerDuty.
# my_org_provider/listeners/events.py
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from airflow.listeners import hookimpl
if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance
from airflow.models.dagrun import DagRun
log = logging.getLogger(__name__)
def _send_slack(channel: str, text: str, attachments: list | None = None):
"""Helper to post to Slack via webhook."""
import requests, os
webhook = os.environ.get("SLACK_WEBHOOK_URL")
if not webhook:
log.warning("SLACK_WEBHOOK_URL not set, skipping Slack notification")
return
requests.post(
webhook,
json={"channel": channel, "text": text, "attachments": attachments or []},
timeout=5,
)
def _trigger_pagerduty(severity: str, summary: str, source: str, custom_details: dict):
"""Helper to send PagerDuty event API."""
import requests, os
routing_key = os.environ.get("PAGERDUTY_ROUTING_KEY")
if not routing_key:
log.warning("PAGERDUTY_ROUTING_KEY not set, skipping PD")
return
requests.post(
"https://events.pagerduty.com/v2/enqueue"
json={
"routing_key": routing_key,
"event_action": "trigger",
"payload": {
"summary": summary,
"severity": severity, # critical, error, warning, info
"source": source,
"custom_details": custom_details,
},
},
timeout=5,
)
# Список SLA-critical DAG-ов, для которых эскалируем в PagerDuty
CRITICAL_DAGS = {"orders_etl", "billing_pipeline", "fraud_detection"}
class AirflowEventListener:
"""Listener для task/dagrun events: send to Slack, escalate critical to PagerDuty."""
@hookimpl
def on_task_instance_running(self, previous_state, task_instance: "TaskInstance", session):
# Lightweight log on running — useful for slow-task detection
log.info(
"Task started: dag=%s task=%s run=%s try=%d host=%s",
task_instance.dag_id,
task_instance.task_id,
task_instance.run_id,
task_instance.try_number,
task_instance.hostname,
)
@hookimpl
def on_task_instance_failed(self, previous_state, task_instance: "TaskInstance", error, session):
dag_id = task_instance.dag_id
task_id = task_instance.task_id
run_id = task_instance.run_id
try_number = task_instance.try_number
# Slack notification всегда
_send_slack(
channel="#airflow-failures"
text=f"Task FAILED: `{dag_id}.{task_id}` (run={run_id}, try {try_number})"
attachments=[{
"color": "#cc0000",
"fields": [
{"title": "Error", "value": str(error)[:1000], "short": False},
{"title": "Logs", "value": task_instance.log_url, "short": True},
],
}],
)
# PagerDuty только для critical DAG-ов
if dag_id in CRITICAL_DAGS:
_trigger_pagerduty(
severity="critical"
summary=f"Airflow task failed: {dag_id}.{task_id}"
source="airflow-listener"
custom_details={
"dag_id": dag_id,
"task_id": task_id,
"run_id": run_id,
"try_number": try_number,
"error": str(error)[:500],
"log_url": task_instance.log_url,
},
)
@hookimpl
def on_dag_run_success(self, dag_run: "DagRun", msg: str | None = None):
# Notification для completed runs critical DAGs
if dag_run.dag_id in CRITICAL_DAGS:
_send_slack(
channel="#airflow-success"
text=f"DAG run SUCCESS: `{dag_run.dag_id}` ({dag_run.run_id})",
)
@hookimpl
def on_dag_run_failed(self, dag_run: "DagRun", msg: str | None = None):
_send_slack(
channel="#airflow-failures"
text=f"DAG RUN FAILED: `{dag_run.dag_id}` ({dag_run.run_id})"
attachments=[{
"color": "#cc0000",
"fields": [
{"title": "Msg", "value": msg or "n/a", "short": False},
],
}],
)
if dag_run.dag_id in CRITICAL_DAGS:
_trigger_pagerduty(
severity="critical"
summary=f"Airflow DAG run failed: {dag_run.dag_id}"
source="airflow-listener"
custom_details={
"dag_id": dag_run.dag_id,
"run_id": dag_run.run_id,
"execution_date": str(dag_run.execution_date),
},
)
@hookimpl
def on_starting(self, component):
log.info("Airflow component starting: %s", component)
_send_slack(
channel="#airflow-ops"
text=f"Airflow `{component}` is starting up.",
)
@hookimpl
def before_stopping(self, component):
log.info("Airflow component stopping: %s", component)
_send_slack(
channel="#airflow-ops"
text=f"Airflow `{component}` is shutting down.",
)
# Module-level instance — pluggy ищет это
event_listener = AirflowEventListener()
Registration
File-based plugin
# $AIRFLOW_HOME/plugins/event_listener_plugin.py
from airflow.plugins_manager import AirflowPlugin
class EventListenerPlugin(AirflowPlugin):
name = "event_listener_plugin"
# ВАЖНО: listeners — список MODULE PATHS (strings), не class refs
listeners = ["my_org_provider.listeners.events"]
Provider package
# my_org_provider/__init__.py
def get_provider_info():
return {
...
"listener-modules": ["my_org_provider.listeners.events"],
}
При load Airflow импортирует module и регистрирует все объекты с @hookimpl декорированными методами.
Lifecycle: где вызываются listeners
Ключевые detail:
on_task_instance_failedвызывается на финальной попытке, после всех retries исчерпаны. НЕ на каждой intermediate failure.- DagRun listeners вызываются scheduler при aggregate state transition.
- Listener invocation synchronous — блокирует scheduler/worker. Долгий listener → bottleneck. Делайте listener fast (≤100ms), вынесите heavy work в queue (Slack webhook OK, но database INSERT 100ms — рискованно).
Comparison: Listener API vs on_failure_callback
В 2.x старый способ — per-task callback:
def alert_failure(context):
ti = context["ti"]
send_slack(f"Task {ti.task_id} failed")
@task(on_failure_callback=alert_failure)
def my_task(): ...
| Aspect | on_failure_callback | Listener API |
|---|---|---|
| Scope | Per task | All tasks system-wide |
| Setup | Add to every task / default_args | Один registered listener |
| Visibility for DevOps | Spread across DAGs | Central |
| Maintenance | Update каждый DAG при изменении | Один module |
| Performance | Inline в task process | Inline в worker/scheduler — но central |
| Event types | failure / success / retry / sla_miss | All lifecycle events including DAG-level |
| Best for | Per-task custom logic (e.g., specific data cleanup) | Generic alerting, audit, lineage |
| Production scale | Hard to maintain at 1000+ DAGs | Idiomatic |
Production guidance: per-task callback для task-specific действий (отправить custom payload, cleanup specific resources). Listener для generic действий (alerting, lineage, metrics) — то, что должно работать для всех tasks.
OpenLineage uses Listener API
apache-airflow-providers-openlineage — official provider, который emit-ит data lineage events. Внутри он implementation Listener API:
# Simplified внутренности providers/openlineage/plugins/listener.py
class OpenLineageListener:
@hookimpl
def on_task_instance_running(self, previous_state, task_instance, session):
# Emit START event
lineage_event = build_start_event(task_instance)
self.adapter.emit(lineage_event)
@hookimpl
def on_task_instance_success(self, previous_state, task_instance, session):
lineage_event = build_complete_event(task_instance)
self.adapter.emit(lineage_event)
@hookimpl
def on_task_instance_failed(self, previous_state, task_instance, error, session):
lineage_event = build_fail_event(task_instance, error)
self.adapter.emit(lineage_event)
Без отдельной конфигурации DAG-ов — просто установка provider включает lineage. Это killer use case Listener API: декларативное, system-wide observability.
Production gotchas
1. Listener синхронный — блокирует scheduler/worker. Если ваш listener делает HTTP request на 2s — это 2s блок scheduler loop. Обязательно: timeout на requests (≤5s), fast-fail на errors, не делайте DB writes напрямую — async queue.
2. Exception в listener не падает Airflow. pluggy isolation: exception в одном listener logged, но не пропадает в Airflow. Это хорошо для безопасности — но может скрыть проблемы. Monitor listener exceptions через logs.
3. Listener выполняется в каждом worker. В Celery setup: 4 workers × множество TI = 4 process с listener. Webhook spam — реальная проблема. Используйте Slack rate-limiting headers, или дедупликацию (run_id-based).
4. session параметр — read-only мощно. SQLAlchemy session активен, можно делать query. Не делайте UPDATE / writes — это меняет state Airflow и может разломать scheduler.
5. task_instance.log_url нужен [webserver] base_url правильно настроенный. Если base_url default http://localhost:8080 — Slack пользователи не смогут открыть log. Поставить base_url=https://airflow.your-corp.com.
6. Listener в 2.x при load — module imported один раз. Если меняете listener code — restart всех процессов (scheduler, workers, triggerer). Прежний код продолжает работать до restart.