Error handling — exponential backoff, dead letter queues, alerting strategies
Production Airflow — это не «happy path» execution, это правильная обработка отказов. Серверы падают, API timeouts случаются, DB connections sometimes flake. Хороший DAG ожидает failures и обрабатывает их через layered defenses: smart retries с exponential backoff, dead letter queues для permanent failures, multi-tier alerting (informational → critical), proper callback chains для observability.
Этот урок — конкретные patterns для error handling в Airflow 2.x: retry strategies, alerting через on_failure_callback vs Listener API (модуль 12), dead letter pattern, sentinel pattern.
Retry strategy hierarchy
Не все retries равны. Three categories:
| Type | Retry strategy | Examples |
|---|---|---|
| Transient | Fast retry (5-15s) с max 3-5 attempts | DB connection drop, S3 5xx, transient network blip |
| Recoverable | Exponential backoff 1-60 min | API rate limit, external service maintenance |
| Terminal | NO retry — fail fast | Invalid input data, schema mismatch, unauthorized |
Airflow defaults: retries=0, retry_delay=300s. Production требует thinking about retries per task.
Retry и backoff — как переживать временные сбои
Pattern 1: Exponential backoff
from datetime import timedelta
@dag(
default_args={
"retries": 5,
"retry_delay": timedelta(minutes=1),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(hours=1),
},
)
def my_pipeline():
@task(retries=5, retry_exponential_backoff=True)
def call_flaky_api():
...
С exponential backoff:
- Attempt 1: immediate
- Attempt 2: после 1 min (retry_delay)
- Attempt 3: после 2 min (2× delay)
- Attempt 4: после 4 min
- Attempt 5: после 8 min
- Attempt 6: после 16 min (capped to max_retry_delay = 60 min)
Total wait: ~31 min для 5 retries. Это balance: enough time для transient issues to resolve, не infinite hold.
Math: delay_n = min(retry_delay × 2^(n-1), max_retry_delay) plus jitter.
Pattern 2: Smart exception handling
Не все exceptions равны. Use AirflowFailException для terminal errors:
from airflow.exceptions import AirflowFailException, AirflowSkipException, AirflowException
@task
def smart_task():
try:
result = call_external_api()
except AuthenticationError:
# Terminal — credentials invalid, retry не поможет
raise AirflowFailException("Auth failed — check credentials")
except RateLimitError:
# Transient — обычно проходит через минуту
raise # default exception → retry
except ValidationError as e:
if "missing required field" in str(e):
# Data quality issue — skip rather than retry
raise AirflowSkipException(f"Data quality: {e}")
raise # other validation → retry
| Exception | Effect |
|---|---|
AirflowFailException | Task fails immediately, no retries |
AirflowSkipException | Task marked SKIPPED — downstream trigger_rule matters |
Standard Exception / AirflowException | Triggers retry (если retries > 0 left) |
Pattern 3: on_failure_callback
Callbacks — простой способ react на failure events:
def alert_on_failure(context):
"""Called when task fails (after all retries exhausted)."""
import requests
task = context["task"]
dag = context["dag"]
ti = context["task_instance"]
exception = context.get("exception")
msg = f"""
🚨 *Task failed*: `{dag.dag_id}.{task.task_id}`
*Try*: {ti.try_number}/{task.retries + 1}
*Error*: {exception}
*Logs*: <{ti.log_url}|View logs>
"""
requests.post(
"https://hooks.slack.com/services/..."
json={"text": msg, "channel": "#airflow-alerts"},
)
@dag(
default_args={"on_failure_callback": alert_on_failure},
)
def my_dag():
...
Callbacks runs:
on_success_callback— task succeedson_failure_callback— task fails (после всех retries)on_retry_callback— task pre-retrysla_miss_callback— SLA missedon_execute_callback— task starts (2.5+)
DAG-level или task-level — task overrides DAG.
Production gotchas с callbacks
Callbacks block scheduler. Они execute синхронно в scheduler process. Slow callback (5+ seconds) замедляет scheduler tick. Make callbacks fast:
def alert_async(context):
"""Quick callback — fire-and-forget."""
import requests
# POST non-blocking timeout
try:
requests.post(WEBHOOK, json={...}, timeout=2)
except Exception:
# Не raise — alerting failure не должна fail task
pass
Multiple callbacks через list (2.6+):
@dag(
default_args={
"on_failure_callback": [alert_slack, alert_pagerduty, log_to_sentry],
},
)
В 2.5- — wrap в single function:
def combined(context):
alert_slack(context)
alert_pagerduty(context)
Pattern 4: Listener API (модуль 12)
Listener API — newer, more powerful чем callbacks. Listener получает events для всех DAGs в Airflow instance — не нужно declare per-DAG.
# plugins/listeners/alerting.py
from airflow.listeners import hookimpl
from airflow.models.taskinstance import TaskInstance, TaskInstanceState
@hookimpl
def on_task_instance_failed(previous_state, task_instance: TaskInstance, error=None, session=None):
"""Called когда any task in any DAG fails."""
dag_id = task_instance.dag_id
task_id = task_instance.task_id
# Different alerts для different criticality
if "critical" in (task_instance.task.dag.tags or []):
send_pagerduty(task_instance, error)
if "production" in (task_instance.task.dag.tags or []):
send_slack(task_instance, error)
# Always log в SIEM
send_to_splunk(task_instance, error)
Register listener:
# plugins/__init__.py
from airflow.plugins_manager import AirflowPlugin
from plugins.listeners import alerting
class AlertingPlugin(AirflowPlugin):
name = "alerting"
listeners = [alerting]
| Feature | on_failure_callback | Listener API |
|---|---|---|
| Scope | Per-DAG / per-task | Global (все DAGs) |
| Events | Only task lifecycle | Task + DAG + scheduler events |
| Performance | Runs sync in scheduler | Async-friendly |
| Declaration | DAG/task code | Plugin |
| Best for | DAG-specific logic | Cross-cutting (alerting, audit) |
Listener — production-стандарт для observability. Callbacks для DAG-specific business logic.
Pattern 5: Dead letter queue
Когда retries exhausted и task fundamentally failed — записать «failed item» в DLQ для manual review.
@task
def process_event(event_id: str):
"""Process single event."""
try:
result = call_external_service(event_id)
return result
except Exception as e:
# Final failure — record в DLQ
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="dlq_db")
hook.run(f"""
INSERT INTO event_dlq (event_id, error_message, error_type, failed_at, dag_id, run_id)
VALUES ('{event_id}', '{str(e).replace("'", "''")}', '{type(e).__name__}', now(),
'{{{{ dag.dag_id }}}}', '{{{{ run_id }}}}')
ON CONFLICT (event_id) DO UPDATE SET
error_message = EXCLUDED.error_message,
failed_at = now();
""")
raise # Still raise — task fails, but record persisted
DLQ table:
CREATE TABLE event_dlq (
event_id VARCHAR PRIMARY KEY,
error_message TEXT,
error_type VARCHAR,
failed_at TIMESTAMP,
dag_id VARCHAR,
run_id VARCHAR,
retry_count INT DEFAULT 0,
resolved_at TIMESTAMP,
resolved_by VARCHAR
);
CREATE INDEX idx_dlq_unresolved ON event_dlq(failed_at) WHERE resolved_at IS NULL;
Daily DLQ review:
@dag(schedule="0 9 * * *", ...) # Каждое утро 9am
def review_dlq():
@task
def alert_on_unresolved_dlq():
hook = PostgresHook(postgres_conn_id="dlq_db")
count = hook.get_first(
"SELECT count(*) FROM event_dlq WHERE resolved_at IS NULL AND failed_at < now() - interval '1 day'"
)[0]
if count > 0:
send_slack(f"⚠️ {count} unresolved DLQ items >24h old")
Pattern 6: Sentinel task / canary
Sentinel task проверяет health системы PRIOR работе:
@dag(...)
def critical_pipeline():
@task(retries=2, retry_delay=timedelta(seconds=30))
def health_check():
"""Verify all dependencies healthy before main work."""
checks = [
("Postgres", lambda: PostgresHook().get_first("SELECT 1")),
("S3", lambda: S3Hook().head_object("my-bucket", "sentinel")),
("API", lambda: requests.get("https://api.example.com/health", timeout=5)),
]
failures = []
for name, check in checks:
try:
check()
except Exception as e:
failures.append(f"{name}: {e}")
if failures:
raise AirflowFailException(f"Health checks failed: {failures}")
@task
def heavy_etl():
...
health_check() >> heavy_etl()
If health_check fails, heavy_etl скиплено (через trigger_rule='all_success' default). Saves resources — no point starting Spark cluster if S3 down.
Pattern 7: Tiered alerting
Не все failures are equal:
@dag(
tags=["critical", "production"],
default_args={
"on_failure_callback": tiered_alert,
"on_retry_callback": retry_alert_informational,
},
)
def critical_pipeline():
...
def tiered_alert(context):
"""Alert уровень depends на task criticality."""
task = context["task"]
if task.priority_weight >= 100:
# Critical task — PagerDuty
send_pagerduty(context, severity="critical")
send_slack(context, channel="#oncall")
elif "production" in context["dag"].tags:
# Production but not critical — Slack only
send_slack(context, channel="#airflow-alerts")
else:
# Dev/staging — log only
log_to_loki(context)
def retry_alert_informational(context):
"""Retry — только log, не page."""
ti = context["task_instance"]
if ti.try_number > 3:
# Многократные retries — possibly something wrong
send_slack(context, channel="#airflow-warnings"
message=f"Task retry #{ti.try_number} — investigate?")
Pager fatigue — anti-pattern. Если SRE wakes up для every failed DAG — они burn out. Tiered approach:
- PagerDuty только for: critical_path DAG failed, production DB outage, data SLA missed
- Slack для: any production DAG failure
- Email/log для: dev/staging, retries, info events
Pattern 8: SLA monitoring
Airflow sla parameter на task:
@task(sla=timedelta(hours=1))
def daily_aggregation():
...
If task не complete within SLA от start of execution period → sla_miss_callback triggered. Use для data freshness alerting.
def sla_miss_alert(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Called when SLA missed."""
send_slack(f"📅 SLA missed: {[s.task_id for s in slas]}")
@dag(sla_miss_callback=sla_miss_alert, ...)
def my_dag():
...
В 3.x SLA полностью переработан через AIP-89 — но в 2.x работает as-is.
Production gotchas
retry_delay без jitter — thundering herd. Если все tasks fail at same time (downstream DB outage) и retry в same minute — все retry в same minute → DB overload снова. Add jitter:
import random
@task(retry_delay=timedelta(seconds=60 + random.randint(0, 30)))
Однако: retry_delay evaluated при task definition (parse time), не run time — jitter random но same для всех retries one task. Implementation в Airflow 3.x improved.
AirflowFailException от scheduler — handle. If exception thrown в callback итself — scheduler logs ERROR. Wrap callback в try/except.
on_failure_callback doesn’t run если task killed by zombie cleanup. Если worker dies before reporting state — task marked zombie/failed, но callback не run (worker мёртвый). Use Listener API для zombie events.
External alerting service down. If Slack/PagerDuty webhook возвращает 5xx — callback retries inside the timeout. Don’t make callback fail the task — wrap in try/except и log only:
def alert(context):
try:
requests.post(WEBHOOK, json={...}, timeout=2)
except Exception as e:
log.warning(f"Alerting failed: {e}")
# Не raise — task already done, alerting failure doesn't matter
Audit log mandatory. Production must log all failures в SIEM. Use Listener API + structured logging:
@hookimpl
def on_task_instance_failed(...):
log.error("task_failed", extra={
"dag_id": task_instance.dag_id,
"task_id": task_instance.task_id,
"run_id": task_instance.run_id,
"try_number": task_instance.try_number,
"error": str(error) if error else None,
})
Splunk/Datadog SIEM ingests structured logs — powerful queries для incident postmortem.