Approval workflow — sensor pattern в 2.x, comparison с HITL в 3.x
Approval workflows — это pipelines где требуется human decision в середине: «Manager approves loan application before disbursement», «Compliance approves data export», «Security approves production deploy». В Airflow 2.x это реализуется через sensor pattern — task ждёт external signal от approval system. В Airflow 3.1+ появился dedicated HITL (Human-in-the-Loop) operator для этого.
Этот урок — production-grade approval workflow на 2.x: design approval table, sensor implementation, integration с external approval UI (Slack/web app), comparison с 3.x HITL и migration path.
Webhooks и callbacks — как Slack Approve кнопка обновляет Airflow
Use cases для approval workflows
| Сценарий | Why approval |
|---|---|
| Loan disbursement | Compliance — manager review high amounts |
| GDPR data export | Compliance — security officer approves |
| Production database backup restore | Change management — DBA team approves |
| ML model promotion to prod | Data scientist sign-off after validation metrics |
| Tax filings submission | Legal review before filing |
| Mass email campaign | Marketing director approval |
Common pattern: DAG прогоняет computation → waits for human approval → executes destructive/important action.
Design: external approval table
Approval state живёт вне Airflow в dedicated table:
CREATE TABLE approvals (
approval_id VARCHAR PRIMARY KEY,
dag_id VARCHAR NOT NULL,
run_id VARCHAR NOT NULL,
task_id VARCHAR NOT NULL,
requested_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
requested_by VARCHAR NOT NULL,
request_payload JSONB,
-- approval state
status VARCHAR NOT NULL DEFAULT 'pending', -- pending / approved / rejected / expired
decided_at TIMESTAMP,
decided_by VARCHAR,
decision_reason TEXT,
-- expiration
expires_at TIMESTAMP,
-- metadata
metadata JSONB,
UNIQUE (dag_id, run_id, task_id)
);
CREATE INDEX idx_approvals_status_dag ON approvals(status, dag_id);
CREATE INDEX idx_approvals_expires ON approvals(expires_at) WHERE status = 'pending';
Дополнительно — external UI (custom web app, Slack bot, или integration с PagerDuty/Opsgenie) который позволяет approver-у читать pending approvals и issue decisions.
Implementation: sensor pattern
# plugins/sensors/approval_sensor.py
from airflow.decorators import task
from airflow.sensors.base import PokeReturnValue
from airflow.exceptions import AirflowFailException
from airflow.providers.postgres.hooks.postgres import PostgresHook
@task.sensor(
poke_interval=300, # check каждые 5 минут
timeout=86400, # max 24 часа ждать
mode="reschedule", # освобождает worker между pokes
retries=0, # не retry — approval либо есть либо нет
)
def wait_for_approval(approval_type: str, **context) -> PokeReturnValue:
"""Wait для approval external signal."""
dag_run = context["dag_run"]
task_instance = context["task_instance"]
approval_id = f"{dag_run.dag_id}__{dag_run.run_id}__{approval_type}"
hook = PostgresHook(postgres_conn_id="approvals_db")
row = hook.get_first(f"""
SELECT status, decided_by, decision_reason, expires_at
FROM approvals
WHERE approval_id = '{approval_id}'
""")
if row is None:
# Approval ещё не создан — это первый poke
# Insert approval request
hook.run(f"""
INSERT INTO approvals
(approval_id, dag_id, run_id, task_id, requested_by, status, expires_at)
VALUES
('{approval_id}', '{dag_run.dag_id}', '{dag_run.run_id}',
'{task_instance.task_id}', 'airflow', 'pending',
CURRENT_TIMESTAMP + INTERVAL '24 hours')
ON CONFLICT (dag_id, run_id, task_id) DO NOTHING;
""")
# Notify approver через external system (Slack, email)
notify_approver(approval_id, approval_type, context)
return PokeReturnValue(is_done=False)
status, decided_by, reason, expires_at = row
if status == "approved":
return PokeReturnValue(
is_done=True,
xcom_value={"approved_by": decided_by, "reason": reason}
)
if status == "rejected":
# Approval explicitly rejected — fail DAG (no retry)
raise AirflowFailException(
f"Approval {approval_id} rejected by {decided_by}: {reason}"
)
if status == "expired":
raise AirflowFailException(f"Approval {approval_id} expired")
# Still pending — wait
return PokeReturnValue(is_done=False)
def notify_approver(approval_id, approval_type, context):
"""Send Slack/email notification."""
import requests
approval_url = f"https://approvals.example.com/approve/{approval_id}"
msg = f"""
🔔 *Approval needed*: {approval_type}
DAG: {context['dag_run'].dag_id}
Run: {context['dag_run'].run_id}
[Approve / Reject]({approval_url})
"""
requests.post(
"https://hooks.slack.com/services/..."
json={"text": msg, "channel": "#approvals"},
)
Использование в DAG
@dag(schedule="@daily", start_date=..., catchup=False)
def loan_disbursement_pipeline():
@task
def compute_loan_details() -> dict:
# Compute amount, fees, terms
return {"amount": 50000, "applicant_id": "X123"}
approval = wait_for_approval(approval_type="loan_disbursement")
@task
def disburse(loan_details: dict, approval_result: dict):
"""Executed только после approval."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="bank_core")
hook.run(f"""
INSERT INTO disbursements
(applicant_id, amount, approved_by, approved_at)
VALUES
('{loan_details['applicant_id']}', {loan_details['amount']},
'{approval_result['approved_by']}', now());
""")
details = compute_loan_details()
approval_result = approval
disburse(details, approval_result)
loan_disbursement_pipeline()
Key design choices
Choice 1: mode="reschedule" vs mode="poke"
| Mode | Worker behavior | Подходит для | Cost |
|---|---|---|---|
poke | Worker занят весь sensor lifetime | Short timeout (<10 min) | Wastes worker slot |
reschedule | Worker free между pokes — TI re-scheduled | Long timeout (hours, days) | Slight overhead per poke |
Для approval (24 hours) — только reschedule. С poke 24h × 16 concurrent approvals = exhaust pool.
Choice 2: poke_interval
- Слишком short (10s) → DB overhead, fast feedback loop
- Слишком long (1h) → slow user response (от approval до DAG continue)
- Sweet spot — 5 minutes для approval workflows
Choice 3: timeout
Approval должен иметь timeout — иначе DagRun stuck forever. Typical:
- 24 hours для standard approvals
- 7 days для compliance reviews
- 1 hour для urgent operational approvals
После timeout sensor fails → DagRun fails → можно retry или manual intervention.
Choice 4: retries=0
Sensor для approval не должен retry — approval либо approved/rejected/expired, retry не помогает. retries=0 делает behavior explicit.
Approval UI integration
Approval table — это state store, не UI. UI — отдельный service:
# approvals_app/main.py (FastAPI example)
from fastapi import FastAPI, HTTPException
from sqlalchemy import select, update
app = FastAPI()
@app.get("/api/approvals/pending")
def list_pending(user_id: str):
"""List pending approvals для user."""
return db.execute(
select(Approval).where(
Approval.status == "pending",
Approval.expires_at > datetime.utcnow(),
)
).all()
@app.post("/api/approvals/{approval_id}/approve")
def approve(approval_id: str, user_id: str, reason: str):
"""Approve запрос."""
result = db.execute(
update(Approval)
.where(Approval.approval_id == approval_id, Approval.status == "pending")
.values(status="approved", decided_by=user_id, decided_at=datetime.utcnow(), decision_reason=reason)
)
if result.rowcount == 0:
raise HTTPException(404, "Approval not found or already decided")
db.commit()
# Optional: notify Airflow (но sensor сам poll-ит через poke_interval)
return {"status": "approved"}
@app.post("/api/approvals/{approval_id}/reject")
def reject(approval_id: str, user_id: str, reason: str):
"""Reject запрос — фейлит DagRun."""
# Similar to approve
Frontend — простая React/Vue app которая listens на /api/approvals/pending. Approver кликает Approve → POST /approve → Airflow sensor на next poke увидит status=approved.
Slack integration через slack-sdk
Approval через Slack message — most common UX:
def notify_approver_slack(approval_id, approval_type, context):
from slack_sdk import WebClient
client = WebClient(token=Variable.get("slack_bot_token"))
blocks = [
{"type": "section", "text": {"type": "mrkdwn", "text": f"*Approval needed*: {approval_type}"}},
{"type": "section", "fields": [
{"type": "mrkdwn", "text": f"*DAG*: {context['dag_run'].dag_id}"},
{"type": "mrkdwn", "text": f"*Run*: `{context['dag_run'].run_id}`"},
]},
{"type": "actions", "elements": [
{"type": "button", "text": {"type": "plain_text", "text": "Approve"},
"style": "primary", "value": f"approve:{approval_id}", "action_id": "approve"},
{"type": "button", "text": {"type": "plain_text", "text": "Reject"},
"style": "danger", "value": f"reject:{approval_id}", "action_id": "reject"},
]},
]
client.chat_postMessage(channel="#approvals", blocks=blocks)
Slack action — webhook к вашему backend, который вызывает /approve или /reject.
HITL operator в Airflow 3.1+
Airflow 3.1 (released ~2026) ввёл AIP-90 Human-in-the-Loop (HITL) operator — built-in решение:
# Airflow 3.1+ syntax
from airflow.sdk import dag, task
from airflow.providers.standard.operators.hitl import ApprovalOperator
@dag(...)
def loan_pipeline():
compute = compute_loan_details()
# Built-in HITL operator
approval = ApprovalOperator(
task_id="manager_approval"
prompt="Approve loan disbursement?"
timeout=timedelta(hours=24),
notify_channels=["#approvals"], # Slack/Teams integration
approver_groups=["managers"], # role-based authorization
)
disburse = disburse_task()
compute >> approval >> disburse
HITL operator в 3.1+ предоставляет:
- Built-in UI в Airflow для approvers (нет need в external app)
- Role-based authorization (через FAB roles или 3.x auth providers)
- Slack/Teams notifications out-of-the-box
- Timeout handling
- Audit trail в metadata DB
Comparison: 2.x sensor vs 3.x HITL
| Feature | 2.x sensor pattern | 3.x HITL operator |
|---|---|---|
| Approval UI | External app (build yourself) | Built-in Airflow UI |
| Authorization | External (Slack OAuth, app auth) | FAB roles / RBAC |
| Notifications | Custom (Slack SDK) | Built-in (Slack/Teams) |
| Audit trail | External approval table | Airflow metadata DB |
| Timeout handling | Manual sensor timeout | Operator timeout |
| Code complexity | High (sensor + table + UI + notifications) | Low (one operator) |
| Production ready | Yes (with discipline) | Yes (2026, 3.1+ stable) |
Migration path 2.x → 3.x для approval workflows
Если у вас approval sensor в 2.x и planируете migrate к 3.x:
- Keep 2.x sensor pattern до full migration на 3.1+
- Approval table остаётся — даже с HITL operator можно integrate с existing table
- HITL operator может proxy в external table через custom callbacks (Airflow 3.x HITL allows hooks)
- Postpone migration approval logic до core 3.x migration complete
Migration scenario:
# 2.x — wait_for_approval sensor с external table
# Migrate в 3.x ApprovalOperator
# Mapping:
# poke_interval → no longer needed (built-in)
# timeout → operator timeout parameter
# notify_approver → notify_channels
# approval table → Airflow metadata (или legacy table через callback)
Production gotchas
Approval timeout не cancels external work. Если approval expires, sensor fails — но external resource (например ML training that already started) продолжается. Need explicit cleanup task с trigger_rule='one_failed' на approval.
Slack message expires. После 30 days Slack может archive channel. Approval link must include direct URL к approval system, не rely на Slack message content.
Multi-approver workflows. Если нужны 2-of-3 approvers — single sensor pattern не подходит. Build custom logic: approval table tracks individual decisions, sensor checks aggregate (SELECT COUNT(*) WHERE decision='approve').
Approval re-trigger при retry. Если task downstream of approval fails и retries, нужно НЕ re-request approval. Use unique approval_id per dag_run (не per task try) — first sensor call inserts row, retries reuse same row.
Notification fatigue. 1 sensor pokes каждые 5 min → если notify_approver на каждом poke, Slack channel заспамлен. Notify только при первом poke (см. INSERT ON CONFLICT DO NOTHING logic).
Cancellation pattern. Если DAG cancelled через UI пока waiting — sensor automatically marked SKIPPED. Approval table row остаётся pending — это OK (expires), но clean up через nightly job.
Audit log mandatory для compliance. Approval decisions должны быть logged для compliance (GDPR, SOX). Approval table + immutable audit (separate table или log to SIEM).