Learning Platform
Глоссарий Troubleshooting
Урок 18.08 · 24 мин
Продвинутый
Error HandlingRetriesDead Letter QueueAlertingCallbacksListeners

Error handling — exponential backoff, dead letter queues, alerting strategies

Production Airflow — это не «happy path» execution, это правильная обработка отказов. Серверы падают, API timeouts случаются, DB connections sometimes flake. Хороший DAG ожидает failures и обрабатывает их через layered defenses: smart retries с exponential backoff, dead letter queues для permanent failures, multi-tier alerting (informational → critical), proper callback chains для observability.

Этот урок — конкретные patterns для error handling в Airflow 2.x: retry strategies, alerting через on_failure_callback vs Listener API (модуль 12), dead letter pattern, sentinel pattern.


Retry strategy hierarchy

Не все retries равны. Three categories:

TypeRetry strategyExamples
TransientFast retry (5-15s) с max 3-5 attemptsDB connection drop, S3 5xx, transient network blip
RecoverableExponential backoff 1-60 minAPI rate limit, external service maintenance
TerminalNO retry — fail fastInvalid input data, schema mismatch, unauthorized

Airflow defaults: retries=0, retry_delay=300s. Production требует thinking about retries per task.

Error handling strategies: retries → DLQ → alerts → callbacks
Task execute() raises exceptionTask failed. Тип exception определяет path: AirflowFailException → no retry (terminal — auth failed, invalid input); AirflowSkipException → SKIPPED state (data quality, late data); standard Exception → retry pipeline активируется (если retries > 0). Smart code raises правильный тип.
if retries > 0 left
Retry с exponential backoffdelay_n = min(retry_delay × 2^(n-1), max_retry_delay). Attempt 2: 1 min, 3: 2 min, 4: 4 min, 5: 8 min, capped к max_retry_delay=1h. Total ~31 min для 5 retries. Plus jitter (random 0-30s) против thundering herd когда downstream DB outage и все tasks retry в same minute.
on_retry_callback firesCallback на каждом retry — informational level alert. Production rule: не page SRE на retries (alert fatigue), только log/Slack #warnings. Если try_number > 3 — может Slack #warnings с 'investigate?'. PagerDuty reserved для final failure.
if all retries exhausted
Task state = FAILEDFinal state. Все retries exhausted. on_failure_callback triggered (если defined). Listener API @hookimpl on_task_instance_failed fires (если plugin registered). state persistence в metadata DB. Downstream tasks check trigger_rule — обычно skipped если all_success default.
parallel paths from FAILED state
on_failure_callback (per-DAG)Function defined per-DAG. Runs synchronously в scheduler — keep fast (<2s timeout!). Wraps requests.post в try/except — alerting failure не должна сама raise. List of callbacks supported в 2.6+. Code duplication issue: 50 DAGs × own callback. Solution → Listener API.
Listener API (global plugin)@hookimpl on_task_instance_failed в plugin. Single source of truth для alerting — нет duplication. Read DAG tags для tiered severity: 'critical' → PagerDuty; 'production' → Slack; staging → log. Plus on_scheduler_zombie_detected (2.6+) для zombie events. Production standard.
DLQ insertВнутри task except: INSERT INTO event_dlq (event_id, error_message, dag_id, run_id, failed_at) ON CONFLICT DO UPDATE. Persists failed item для manual review. Daily DLQ-review DAG (9am) alerts если >0 unresolved >24h old. Allows replay/cleanup без losing failed events.
downstream behavior
Downstream tasks: trigger_rule checkDefault 'all_success' → downstream SKIPPED если upstream failed. 'all_done' → runs regardless. 'one_failed' → runs если any upstream failed (useful для cleanup tasks). Setup/Teardown semantic — teardown auto-runs after success/failure. Choose trigger_rule mindfully для cleanup tasks.
Tiered alerting routingAlertmanager / custom gateway routes based on: time of day (night → PagerDuty, day → Slack), DAG owner (alert correct team), severity (critical → page; production → notify; dev → log only). Reduces alert fatigue 80%. Test alerts quarterly через chaos drill.

Retry и backoff — как переживать временные сбои

Pattern 1: Exponential backoff

from datetime import timedelta

@dag(
    default_args={
        "retries": 5,
        "retry_delay": timedelta(minutes=1),
        "retry_exponential_backoff": True,
        "max_retry_delay": timedelta(hours=1),
    },
)
def my_pipeline():
    @task(retries=5, retry_exponential_backoff=True)
    def call_flaky_api():
        ...

С exponential backoff:

  • Attempt 1: immediate
  • Attempt 2: после 1 min (retry_delay)
  • Attempt 3: после 2 min (2× delay)
  • Attempt 4: после 4 min
  • Attempt 5: после 8 min
  • Attempt 6: после 16 min (capped to max_retry_delay = 60 min)

Total wait: ~31 min для 5 retries. Это balance: enough time для transient issues to resolve, не infinite hold.

Math: delay_n = min(retry_delay × 2^(n-1), max_retry_delay) plus jitter.


Pattern 2: Smart exception handling

Не все exceptions равны. Use AirflowFailException для terminal errors:

from airflow.exceptions import AirflowFailException, AirflowSkipException, AirflowException

@task
def smart_task():
    try:
        result = call_external_api()
    except AuthenticationError:
        # Terminal — credentials invalid, retry не поможет
        raise AirflowFailException("Auth failed — check credentials")
    except RateLimitError:
        # Transient — обычно проходит через минуту
        raise  # default exception → retry
    except ValidationError as e:
        if "missing required field" in str(e):
            # Data quality issue — skip rather than retry
            raise AirflowSkipException(f"Data quality: {e}")
        raise  # other validation → retry
ExceptionEffect
AirflowFailExceptionTask fails immediately, no retries
AirflowSkipExceptionTask marked SKIPPED — downstream trigger_rule matters
Standard Exception / AirflowExceptionTriggers retry (если retries > 0 left)

Pattern 3: on_failure_callback

Callbacks — простой способ react на failure events:

def alert_on_failure(context):
    """Called when task fails (after all retries exhausted)."""
    import requests
    task = context["task"]
    dag = context["dag"]
    ti = context["task_instance"]
    exception = context.get("exception")

    msg = f"""
🚨 *Task failed*: `{dag.dag_id}.{task.task_id}`
*Try*: {ti.try_number}/{task.retries + 1}
*Error*: {exception}
*Logs*: <{ti.log_url}|View logs>
"""
    requests.post(
        "https://hooks.slack.com/services/..."
        json={"text": msg, "channel": "#airflow-alerts"},
    )

@dag(
    default_args={"on_failure_callback": alert_on_failure},
)
def my_dag():
    ...

Callbacks runs:

  • on_success_callback — task succeeds
  • on_failure_callback — task fails (после всех retries)
  • on_retry_callback — task pre-retry
  • sla_miss_callback — SLA missed
  • on_execute_callback — task starts (2.5+)

DAG-level или task-level — task overrides DAG.

Production gotchas с callbacks

Callbacks block scheduler. Они execute синхронно в scheduler process. Slow callback (5+ seconds) замедляет scheduler tick. Make callbacks fast:

def alert_async(context):
    """Quick callback — fire-and-forget."""
    import requests
    # POST non-blocking timeout
    try:
        requests.post(WEBHOOK, json={...}, timeout=2)
    except Exception:
        # Не raise — alerting failure не должна fail task
        pass

Multiple callbacks через list (2.6+):

@dag(
    default_args={
        "on_failure_callback": [alert_slack, alert_pagerduty, log_to_sentry],
    },
)

В 2.5- — wrap в single function:

def combined(context):
    alert_slack(context)
    alert_pagerduty(context)

Pattern 4: Listener API (модуль 12)

Listener API — newer, more powerful чем callbacks. Listener получает events для всех DAGs в Airflow instance — не нужно declare per-DAG.

# plugins/listeners/alerting.py
from airflow.listeners import hookimpl
from airflow.models.taskinstance import TaskInstance, TaskInstanceState

@hookimpl
def on_task_instance_failed(previous_state, task_instance: TaskInstance, error=None, session=None):
    """Called когда any task in any DAG fails."""
    dag_id = task_instance.dag_id
    task_id = task_instance.task_id

    # Different alerts для different criticality
    if "critical" in (task_instance.task.dag.tags or []):
        send_pagerduty(task_instance, error)
    if "production" in (task_instance.task.dag.tags or []):
        send_slack(task_instance, error)

    # Always log в SIEM
    send_to_splunk(task_instance, error)

Register listener:

# plugins/__init__.py
from airflow.plugins_manager import AirflowPlugin
from plugins.listeners import alerting

class AlertingPlugin(AirflowPlugin):
    name = "alerting"
    listeners = [alerting]
Featureon_failure_callbackListener API
ScopePer-DAG / per-taskGlobal (все DAGs)
EventsOnly task lifecycleTask + DAG + scheduler events
PerformanceRuns sync in schedulerAsync-friendly
DeclarationDAG/task codePlugin
Best forDAG-specific logicCross-cutting (alerting, audit)

Listener — production-стандарт для observability. Callbacks для DAG-specific business logic.


Pattern 5: Dead letter queue

Когда retries exhausted и task fundamentally failed — записать «failed item» в DLQ для manual review.

@task
def process_event(event_id: str):
    """Process single event."""
    try:
        result = call_external_service(event_id)
        return result
    except Exception as e:
        # Final failure — record в DLQ
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        hook = PostgresHook(postgres_conn_id="dlq_db")
        hook.run(f"""
            INSERT INTO event_dlq (event_id, error_message, error_type, failed_at, dag_id, run_id)
            VALUES ('{event_id}', '{str(e).replace("'", "''")}', '{type(e).__name__}', now(),
                    '{{{{ dag.dag_id }}}}', '{{{{ run_id }}}}')
            ON CONFLICT (event_id) DO UPDATE SET
                error_message = EXCLUDED.error_message,
                failed_at = now();
        """)
        raise  # Still raise — task fails, but record persisted

DLQ table:

CREATE TABLE event_dlq (
    event_id VARCHAR PRIMARY KEY,
    error_message TEXT,
    error_type VARCHAR,
    failed_at TIMESTAMP,
    dag_id VARCHAR,
    run_id VARCHAR,
    retry_count INT DEFAULT 0,
    resolved_at TIMESTAMP,
    resolved_by VARCHAR
);

CREATE INDEX idx_dlq_unresolved ON event_dlq(failed_at) WHERE resolved_at IS NULL;

Daily DLQ review:

@dag(schedule="0 9 * * *", ...)  # Каждое утро 9am
def review_dlq():
    @task
    def alert_on_unresolved_dlq():
        hook = PostgresHook(postgres_conn_id="dlq_db")
        count = hook.get_first(
            "SELECT count(*) FROM event_dlq WHERE resolved_at IS NULL AND failed_at < now() - interval '1 day'"
        )[0]
        if count > 0:
            send_slack(f"⚠️ {count} unresolved DLQ items >24h old")

Pattern 6: Sentinel task / canary

Sentinel task проверяет health системы PRIOR работе:

@dag(...)
def critical_pipeline():
    @task(retries=2, retry_delay=timedelta(seconds=30))
    def health_check():
        """Verify all dependencies healthy before main work."""
        checks = [
            ("Postgres", lambda: PostgresHook().get_first("SELECT 1")),
            ("S3", lambda: S3Hook().head_object("my-bucket", "sentinel")),
            ("API", lambda: requests.get("https://api.example.com/health", timeout=5)),
        ]

        failures = []
        for name, check in checks:
            try:
                check()
            except Exception as e:
                failures.append(f"{name}: {e}")

        if failures:
            raise AirflowFailException(f"Health checks failed: {failures}")

    @task
    def heavy_etl():
        ...

    health_check() >> heavy_etl()

If health_check fails, heavy_etl скиплено (через trigger_rule='all_success' default). Saves resources — no point starting Spark cluster if S3 down.


Pattern 7: Tiered alerting

Не все failures are equal:

@dag(
    tags=["critical", "production"],
    default_args={
        "on_failure_callback": tiered_alert,
        "on_retry_callback": retry_alert_informational,
    },
)
def critical_pipeline():
    ...

def tiered_alert(context):
    """Alert уровень depends на task criticality."""
    task = context["task"]

    if task.priority_weight >= 100:
        # Critical task — PagerDuty
        send_pagerduty(context, severity="critical")
        send_slack(context, channel="#oncall")
    elif "production" in context["dag"].tags:
        # Production but not critical — Slack only
        send_slack(context, channel="#airflow-alerts")
    else:
        # Dev/staging — log only
        log_to_loki(context)

def retry_alert_informational(context):
    """Retry — только log, не page."""
    ti = context["task_instance"]
    if ti.try_number > 3:
        # Многократные retries — possibly something wrong
        send_slack(context, channel="#airflow-warnings"
                   message=f"Task retry #{ti.try_number} — investigate?")

Pager fatigue — anti-pattern. Если SRE wakes up для every failed DAG — они burn out. Tiered approach:

  • PagerDuty только for: critical_path DAG failed, production DB outage, data SLA missed
  • Slack для: any production DAG failure
  • Email/log для: dev/staging, retries, info events

Pattern 8: SLA monitoring

Airflow sla parameter на task:

@task(sla=timedelta(hours=1))
def daily_aggregation():
    ...

If task не complete within SLA от start of execution period → sla_miss_callback triggered. Use для data freshness alerting.

def sla_miss_alert(dag, task_list, blocking_task_list, slas, blocking_tis):
    """Called when SLA missed."""
    send_slack(f"📅 SLA missed: {[s.task_id for s in slas]}")

@dag(sla_miss_callback=sla_miss_alert, ...)
def my_dag():
    ...

В 3.x SLA полностью переработан через AIP-89 — но в 2.x работает as-is.


Production gotchas

retry_delay без jitter — thundering herd. Если все tasks fail at same time (downstream DB outage) и retry в same minute — все retry в same minute → DB overload снова. Add jitter:

import random
@task(retry_delay=timedelta(seconds=60 + random.randint(0, 30)))

Однако: retry_delay evaluated при task definition (parse time), не run time — jitter random но same для всех retries one task. Implementation в Airflow 3.x improved.

AirflowFailException от scheduler — handle. If exception thrown в callback итself — scheduler logs ERROR. Wrap callback в try/except.

on_failure_callback doesn’t run если task killed by zombie cleanup. Если worker dies before reporting state — task marked zombie/failed, но callback не run (worker мёртвый). Use Listener API для zombie events.

External alerting service down. If Slack/PagerDuty webhook возвращает 5xx — callback retries inside the timeout. Don’t make callback fail the task — wrap in try/except и log only:

def alert(context):
    try:
        requests.post(WEBHOOK, json={...}, timeout=2)
    except Exception as e:
        log.warning(f"Alerting failed: {e}")
        # Не raise — task already done, alerting failure doesn't matter

Audit log mandatory. Production must log all failures в SIEM. Use Listener API + structured logging:

@hookimpl
def on_task_instance_failed(...):
    log.error("task_failed", extra={
        "dag_id": task_instance.dag_id,
        "task_id": task_instance.task_id,
        "run_id": task_instance.run_id,
        "try_number": task_instance.try_number,
        "error": str(error) if error else None,
    })

Splunk/Datadog SIEM ingests structured logs — powerful queries для incident postmortem.


Проверка знанийKnowledge check
Production team имеет 50 critical DAGs. Текущее alerting: on_failure_callback в каждом DAG отправляет Slack. Проблемы: (1) каждый DAG имеет own callback code — duplicated; (2) Slack channel flooded с retry events — alert fatigue, real failures missed; (3) когда worker pod killed (zombie), no alert. Как redesign?
ОтветAnswer
Redesign к Listener API + tiered alerting + zombie detection: (1) **Migrate from callbacks к Listener API** — single plugin `plugins/alerting_listener.py` с @hookimpl на on_task_instance_failed. Single source of truth для alerting logic. Removes duplication across 50 DAGs. Удалить `on_failure_callback` от DAGs (или leave для DAG-specific custom logic). (2) **Tiered severity** в listener: read DAG tags ('critical', 'production', 'staging') — different actions для different tiers. Critical → PagerDuty + Slack #oncall. Production → Slack #airflow-alerts only. Staging → log only. **Никаких alerts на retries** — only на final failure (try_number == retries + 1) или на explicit AirflowFailException. (3) **Zombie detection** через Listener — @hookimpl на on_scheduler_zombie_detected (2.6+) — log zombie events, alert если > 5 per hour (indicates worker fleet issues). (4) **Structured logging mandatory** — log.error('task_failed', extra={'dag_id':..., 'try':...}) — Splunk/Datadog для full audit trail beyond Slack. (5) **De-duplication** — listener checks recent alerts table (last 5 min): if same DAG failed multiple times in row, group в one alert vs spam channel. (6) **Slack threading** — first failure → new Slack thread, subsequent retries → reply в thread. Channel остаётся clean. (7) **Dashboards instead alerts** — Grafana dashboard 'Airflow Health' с current failures, retry counts, recovery rate. SRE check dashboard hourly, real alerts reserved для critical. (8) **Alert routing rules** — alerting gateway (Alertmanager) routes based on: time of day (night → PagerDuty, day → Slack), DAG owner (alert correct team), severity (page vs notify). (9) **Test alerts quarterly** — chaos drill: kill worker pod, verify alert reaches on-call. Test PagerDuty flow end-to-end. **Result**: alert fatigue reduced 80%, real incidents caught faster, code maintenance easier (1 listener vs 50 callbacks), zombie detection added. Migration over 2-3 sprints — first Listener в parallel с callbacks, validate, then remove callbacks.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Exponential backoff retries с 5 attempts, retry_delay=1min, max_retry_delay=1h. Какое максимальное total wait?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8