Learning Platform
Глоссарий Troubleshooting
Урок 18.06 · 24 мин
Продвинутый
SensorHuman-in-the-LoopApproval WorkflowHITLExternal Trigger

Approval workflow — sensor pattern в 2.x, comparison с HITL в 3.x

Approval workflows — это pipelines где требуется human decision в середине: «Manager approves loan application before disbursement», «Compliance approves data export», «Security approves production deploy». В Airflow 2.x это реализуется через sensor pattern — task ждёт external signal от approval system. В Airflow 3.1+ появился dedicated HITL (Human-in-the-Loop) operator для этого.

Этот урок — production-grade approval workflow на 2.x: design approval table, sensor implementation, integration с external approval UI (Slack/web app), comparison с 3.x HITL и migration path.


Webhooks и callbacks — как Slack Approve кнопка обновляет Airflow

Use cases для approval workflows

СценарийWhy approval
Loan disbursementCompliance — manager review high amounts
GDPR data exportCompliance — security officer approves
Production database backup restoreChange management — DBA team approves
ML model promotion to prodData scientist sign-off after validation metrics
Tax filings submissionLegal review before filing
Mass email campaignMarketing director approval

Common pattern: DAG прогоняет computation → waits for human approval → executes destructive/important action.

Sensor-based approval flow: poke → approval table → resume
compute_loan_details taskUpstream computation task — рассчитывает amount, fees, terms. Возвращает dict через XCom. Этот результат — input для approval (manager видит amount чтобы принимать решение). Task должен complete success до того как sensor начнётся.
approval task starts
wait_for_approval sensor (first poke)@task.sensor с poke_interval=300 (5 min), timeout=86400 (24h), mode='reschedule' (CRITICAL: освобождает worker между pokes vs poke mode которая занимает worker 24h), retries=0 (approval либо есть либо нет). На first poke: row не существует → INSERT approval row с status='pending' + notify approver через Slack/email.
INSERT ON CONFLICT DO NOTHING
approvals table: status='pending'External Postgres table с unique constraint на (dag_id, run_id, task_id) — idempotent. Fields: approval_id, status, decided_by, decision_reason, expires_at. Notify только при first poke (check new row) — избегать Slack spam при subsequent pokes.
approver открывает Slack/UI
Approver clicks Approve в SlackSlack button → webhook к FastAPI backend → POST /api/approvals/{id}/approve. Backend executes UPDATE: status='approved', decided_by=user, decided_at=now WHERE approval_id=X AND status='pending'. Idempotent — если уже approved, rowcount=0.
sensor next poke (5 min later)
sensor reads approval rowpoke_interval=300s — каждые 5 минут sensor выполняет SELECT status FROM approvals WHERE approval_id=X. Между pokes worker свободен (mode='reschedule'). 5 min — sweet spot: не spam DB, разумный feedback latency после approval.
status='approved' → is_done=True
PokeReturnValue(is_done=True, xcom_value)Status='approved' → return PokeReturnValue с xcom_value={'approved_by': decided_by, 'reason': reason}. Sensor exits success. Downstream task получает approval result через XCom. Status='rejected' → AirflowFailException (no retry). Status='expired' → AirflowFailException. Status='pending' → is_done=False (continue poking).
downstream resumes
disburse task executesDestructive/important action — execute только после approval. Pulls approval_result от sensor через XCom. INSERT INTO disbursements с approved_by/approved_at для audit trail. Compliance: every disbursement traceable обратно к specific manager decision + timestamp.

Design: external approval table

Approval state живёт вне Airflow в dedicated table:

CREATE TABLE approvals (
    approval_id VARCHAR PRIMARY KEY,
    dag_id VARCHAR NOT NULL,
    run_id VARCHAR NOT NULL,
    task_id VARCHAR NOT NULL,
    requested_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    requested_by VARCHAR NOT NULL,
    request_payload JSONB,
    -- approval state
    status VARCHAR NOT NULL DEFAULT 'pending', -- pending / approved / rejected / expired
    decided_at TIMESTAMP,
    decided_by VARCHAR,
    decision_reason TEXT,
    -- expiration
    expires_at TIMESTAMP,
    -- metadata
    metadata JSONB,
    UNIQUE (dag_id, run_id, task_id)
);

CREATE INDEX idx_approvals_status_dag ON approvals(status, dag_id);
CREATE INDEX idx_approvals_expires ON approvals(expires_at) WHERE status = 'pending';

Дополнительно — external UI (custom web app, Slack bot, или integration с PagerDuty/Opsgenie) который позволяет approver-у читать pending approvals и issue decisions.


Implementation: sensor pattern

# plugins/sensors/approval_sensor.py
from airflow.decorators import task
from airflow.sensors.base import PokeReturnValue
from airflow.exceptions import AirflowFailException
from airflow.providers.postgres.hooks.postgres import PostgresHook

@task.sensor(
    poke_interval=300,           # check каждые 5 минут
    timeout=86400,                # max 24 часа ждать
    mode="reschedule",            # освобождает worker между pokes
    retries=0,                    # не retry — approval либо есть либо нет
)
def wait_for_approval(approval_type: str, **context) -> PokeReturnValue:
    """Wait для approval external signal."""
    dag_run = context["dag_run"]
    task_instance = context["task_instance"]

    approval_id = f"{dag_run.dag_id}__{dag_run.run_id}__{approval_type}"

    hook = PostgresHook(postgres_conn_id="approvals_db")
    row = hook.get_first(f"""
        SELECT status, decided_by, decision_reason, expires_at
        FROM approvals
        WHERE approval_id = '{approval_id}'
    """)

    if row is None:
        # Approval ещё не создан — это первый poke
        # Insert approval request
        hook.run(f"""
            INSERT INTO approvals
                (approval_id, dag_id, run_id, task_id, requested_by, status, expires_at)
            VALUES
                ('{approval_id}', '{dag_run.dag_id}', '{dag_run.run_id}',
                 '{task_instance.task_id}', 'airflow', 'pending',
                 CURRENT_TIMESTAMP + INTERVAL '24 hours')
            ON CONFLICT (dag_id, run_id, task_id) DO NOTHING;
        """)
        # Notify approver через external system (Slack, email)
        notify_approver(approval_id, approval_type, context)
        return PokeReturnValue(is_done=False)

    status, decided_by, reason, expires_at = row

    if status == "approved":
        return PokeReturnValue(
            is_done=True,
            xcom_value={"approved_by": decided_by, "reason": reason}
        )

    if status == "rejected":
        # Approval explicitly rejected — fail DAG (no retry)
        raise AirflowFailException(
            f"Approval {approval_id} rejected by {decided_by}: {reason}"
        )

    if status == "expired":
        raise AirflowFailException(f"Approval {approval_id} expired")

    # Still pending — wait
    return PokeReturnValue(is_done=False)

def notify_approver(approval_id, approval_type, context):
    """Send Slack/email notification."""
    import requests
    approval_url = f"https://approvals.example.com/approve/{approval_id}"
    msg = f"""
🔔 *Approval needed*: {approval_type}
DAG: {context['dag_run'].dag_id}
Run: {context['dag_run'].run_id}

[Approve / Reject]({approval_url})
"""
    requests.post(
        "https://hooks.slack.com/services/..."
        json={"text": msg, "channel": "#approvals"},
    )

Использование в DAG

@dag(schedule="@daily", start_date=..., catchup=False)
def loan_disbursement_pipeline():
    @task
    def compute_loan_details() -> dict:
        # Compute amount, fees, terms
        return {"amount": 50000, "applicant_id": "X123"}

    approval = wait_for_approval(approval_type="loan_disbursement")

    @task
    def disburse(loan_details: dict, approval_result: dict):
        """Executed только после approval."""
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        hook = PostgresHook(postgres_conn_id="bank_core")
        hook.run(f"""
            INSERT INTO disbursements
                (applicant_id, amount, approved_by, approved_at)
            VALUES
                ('{loan_details['applicant_id']}', {loan_details['amount']},
                 '{approval_result['approved_by']}', now());
        """)

    details = compute_loan_details()
    approval_result = approval
    disburse(details, approval_result)

loan_disbursement_pipeline()

Key design choices

Choice 1: mode="reschedule" vs mode="poke"

ModeWorker behaviorПодходит дляCost
pokeWorker занят весь sensor lifetimeShort timeout (<10 min)Wastes worker slot
rescheduleWorker free между pokes — TI re-scheduledLong timeout (hours, days)Slight overhead per poke

Для approval (24 hours) — только reschedule. С poke 24h × 16 concurrent approvals = exhaust pool.

Choice 2: poke_interval

  • Слишком short (10s) → DB overhead, fast feedback loop
  • Слишком long (1h) → slow user response (от approval до DAG continue)
  • Sweet spot — 5 minutes для approval workflows

Choice 3: timeout

Approval должен иметь timeout — иначе DagRun stuck forever. Typical:

  • 24 hours для standard approvals
  • 7 days для compliance reviews
  • 1 hour для urgent operational approvals

После timeout sensor fails → DagRun fails → можно retry или manual intervention.

Choice 4: retries=0

Sensor для approval не должен retry — approval либо approved/rejected/expired, retry не помогает. retries=0 делает behavior explicit.


Approval UI integration

Approval table — это state store, не UI. UI — отдельный service:

# approvals_app/main.py (FastAPI example)
from fastapi import FastAPI, HTTPException
from sqlalchemy import select, update

app = FastAPI()

@app.get("/api/approvals/pending")
def list_pending(user_id: str):
    """List pending approvals для user."""
    return db.execute(
        select(Approval).where(
            Approval.status == "pending",
            Approval.expires_at > datetime.utcnow(),
        )
    ).all()

@app.post("/api/approvals/{approval_id}/approve")
def approve(approval_id: str, user_id: str, reason: str):
    """Approve запрос."""
    result = db.execute(
        update(Approval)
        .where(Approval.approval_id == approval_id, Approval.status == "pending")
        .values(status="approved", decided_by=user_id, decided_at=datetime.utcnow(), decision_reason=reason)
    )
    if result.rowcount == 0:
        raise HTTPException(404, "Approval not found or already decided")
    db.commit()
    # Optional: notify Airflow (но sensor сам poll-ит через poke_interval)
    return {"status": "approved"}

@app.post("/api/approvals/{approval_id}/reject")
def reject(approval_id: str, user_id: str, reason: str):
    """Reject запрос — фейлит DagRun."""
    # Similar to approve

Frontend — простая React/Vue app которая listens на /api/approvals/pending. Approver кликает Approve → POST /approve → Airflow sensor на next poke увидит status=approved.


Slack integration через slack-sdk

Approval через Slack message — most common UX:

def notify_approver_slack(approval_id, approval_type, context):
    from slack_sdk import WebClient
    client = WebClient(token=Variable.get("slack_bot_token"))

    blocks = [
        {"type": "section", "text": {"type": "mrkdwn", "text": f"*Approval needed*: {approval_type}"}},
        {"type": "section", "fields": [
            {"type": "mrkdwn", "text": f"*DAG*: {context['dag_run'].dag_id}"},
            {"type": "mrkdwn", "text": f"*Run*: `{context['dag_run'].run_id}`"},
        ]},
        {"type": "actions", "elements": [
            {"type": "button", "text": {"type": "plain_text", "text": "Approve"},
             "style": "primary", "value": f"approve:{approval_id}", "action_id": "approve"},
            {"type": "button", "text": {"type": "plain_text", "text": "Reject"},
             "style": "danger", "value": f"reject:{approval_id}", "action_id": "reject"},
        ]},
    ]
    client.chat_postMessage(channel="#approvals", blocks=blocks)

Slack action — webhook к вашему backend, который вызывает /approve или /reject.


HITL operator в Airflow 3.1+

Airflow 3.1 (released ~2026) ввёл AIP-90 Human-in-the-Loop (HITL) operator — built-in решение:

# Airflow 3.1+ syntax
from airflow.sdk import dag, task
from airflow.providers.standard.operators.hitl import ApprovalOperator

@dag(...)
def loan_pipeline():
    compute = compute_loan_details()

    # Built-in HITL operator
    approval = ApprovalOperator(
        task_id="manager_approval"
        prompt="Approve loan disbursement?"
        timeout=timedelta(hours=24),
        notify_channels=["#approvals"],  # Slack/Teams integration
        approver_groups=["managers"],     # role-based authorization
    )

    disburse = disburse_task()

    compute >> approval >> disburse

HITL operator в 3.1+ предоставляет:

  • Built-in UI в Airflow для approvers (нет need в external app)
  • Role-based authorization (через FAB roles или 3.x auth providers)
  • Slack/Teams notifications out-of-the-box
  • Timeout handling
  • Audit trail в metadata DB

Comparison: 2.x sensor vs 3.x HITL

Feature2.x sensor pattern3.x HITL operator
Approval UIExternal app (build yourself)Built-in Airflow UI
AuthorizationExternal (Slack OAuth, app auth)FAB roles / RBAC
NotificationsCustom (Slack SDK)Built-in (Slack/Teams)
Audit trailExternal approval tableAirflow metadata DB
Timeout handlingManual sensor timeoutOperator timeout
Code complexityHigh (sensor + table + UI + notifications)Low (one operator)
Production readyYes (with discipline)Yes (2026, 3.1+ stable)

Migration path 2.x → 3.x для approval workflows

Если у вас approval sensor в 2.x и planируете migrate к 3.x:

  1. Keep 2.x sensor pattern до full migration на 3.1+
  2. Approval table остаётся — даже с HITL operator можно integrate с existing table
  3. HITL operator может proxy в external table через custom callbacks (Airflow 3.x HITL allows hooks)
  4. Postpone migration approval logic до core 3.x migration complete

Migration scenario:

# 2.x — wait_for_approval sensor с external table
# Migrate в 3.x ApprovalOperator
# Mapping:
#   poke_interval     → no longer needed (built-in)
#   timeout            → operator timeout parameter
#   notify_approver    → notify_channels
#   approval table     → Airflow metadata (или legacy table через callback)

Production gotchas

Approval timeout не cancels external work. Если approval expires, sensor fails — но external resource (например ML training that already started) продолжается. Need explicit cleanup task с trigger_rule='one_failed' на approval.

Slack message expires. После 30 days Slack может archive channel. Approval link must include direct URL к approval system, не rely на Slack message content.

Multi-approver workflows. Если нужны 2-of-3 approvers — single sensor pattern не подходит. Build custom logic: approval table tracks individual decisions, sensor checks aggregate (SELECT COUNT(*) WHERE decision='approve').

Approval re-trigger при retry. Если task downstream of approval fails и retries, нужно НЕ re-request approval. Use unique approval_id per dag_run (не per task try) — first sensor call inserts row, retries reuse same row.

Notification fatigue. 1 sensor pokes каждые 5 min → если notify_approver на каждом poke, Slack channel заспамлен. Notify только при первом poke (см. INSERT ON CONFLICT DO NOTHING logic).

Cancellation pattern. Если DAG cancelled через UI пока waiting — sensor automatically marked SKIPPED. Approval table row остаётся pending — это OK (expires), но clean up через nightly job.

Audit log mandatory для compliance. Approval decisions должны быть logged для compliance (GDPR, SOX). Approval table + immutable audit (separate table или log to SIEM).


Проверка знанийKnowledge check
Production team хочет добавить approval step в существующий DAG для production data migration (one-time operation, 1-2 раза в год). 24 hour timeout. Какой production-grade approach в Airflow 2.x — что должно быть в design?
ОтветAnswer
Production-grade approval workflow в 2.x — full checklist: (1) **External approval table** в Postgres — `approvals (approval_id, dag_id, run_id, task_id, status, decided_by, decided_at, reason, expires_at, metadata jsonb)` с unique constraint на (dag_id, run_id, task_id) для idempotency. (2) **Sensor с правильными параметрами**: `@task.sensor(poke_interval=300, timeout=86400, mode='reschedule', retries=0)` — 5min poke (balance: not spam DB, fast feedback), 24h timeout, reschedule mode (CRITICAL для long timeouts — освобождает worker между pokes vs poke mode что keeps worker busy 24h), retries=0 (approval либо есть, либо нет — retry не имеет смысла). (3) **Idempotent approval creation** — INSERT INTO approvals... ON CONFLICT DO NOTHING — first poke создаёт row, subsequent pokes только query status. Prevents duplicate approval requests при retries. (4) **External UI для approvers** — простой FastAPI/React app: GET /pending shows list, POST /approve/{id} sets status='approved'. Plus Slack bot integration для convenience — message с Approve/Reject buttons → webhook → backend → POST /approve. (5) **Notify approver только при первом poke** — check INSERT возвращает new row (xmax = 0) — если new, send Slack notification. Иначе skip (избегать spam). (6) **Status handling в sensor**: status='approved' → PokeReturnValue(is_done=True, xcom_value={...}); status='rejected' → AirflowFailException (fast fail, no retry); status='expired' → AirflowFailException; status='pending' → is_done=False. (7) **Audit logging** — все approval decisions в SIEM (Splunk/Datadog) для compliance. (8) **Multi-approver если нужно** — query SELECT COUNT(*) FROM approvals_decisions WHERE approval_id=X AND decision='approve' >= 2 (e.g., 2-of-3 quorum). (9) **Cancellation cleanup** — nightly DAG который archives expired approvals. (10) **Tests** — unit test для sensor logic (mocked DB returns each status), integration test через airflow tasks test + inserting approval row + verify sensor exits correctly. **Не подходит**: short timeout + poke mode (waste worker), создание approval в top-level (race condition), notify на каждом poke (spam). После 3.1+ migration → replace на ApprovalOperator. Until then — battle-tested sensor pattern.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Approval sensor с 24h timeout — mode='poke' или mode='reschedule'?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8