Automation patterns: CI/CD integration через REST API
Это самый практически важный урок модуля. Здесь мы превращаем теорию endpoints и authentication в production-ready patterns для интеграции Airflow с CI/CD, мониторинг-системами и внешними оркестраторами.
Ниже — пять каноничных сценариев с полным кодом, объяснением каждого подводного камня и production-ready error handling. Используйте их как шаблоны для своих интеграций.
Архитектура: где Airflow в external orchestration
Ключевая мысль: Airflow остаётся orchestrator, а external system — controller. CI знает только «запусти X, дождись завершения». Внутреннюю логику задачи Airflow скрывает.
Pattern 1: Trigger DAG из GitHub Actions
Use case
После merge в main ветку нужно прогнать integration test DAG, который проверяет, что новый код dbt models работает на staging.
Решение
# .github/workflows/dbt-integration-test.yml
name: dbt Integration Test
on:
push:
branches: [main]
jobs:
trigger-airflow:
runs-on: ubuntu-latest
steps:
- name: Trigger Airflow DAG
id: trigger
env:
AIRFLOW_URL: https://airflow.example.com
AIRFLOW_TOKEN: ${{ secrets.AIRFLOW_API_TOKEN }}
run: |
DAG_RUN_ID="ci__${{ github.run_id }}__${{ github.sha }}"
response=$(curl -fsS -w "\n%{http_code}" \
-X POST "$AIRFLOW_URL/api/v1/dags/dbt_integration_test/dagRuns" \
-H "Authorization: Bearer $AIRFLOW_TOKEN" \
-H "Content-Type: application/json" \
-d "{
\"dag_run_id\": \"$DAG_RUN_ID\",
\"conf\": {
\"commit_sha\": \"${{ github.sha }}\",
\"branch\": \"${{ github.ref_name }}\",
\"github_run_url\": \"${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}\"
},
\"note\": \"Triggered by GitHub Actions run ${{ github.run_id }}\"
}")
http_code=$(echo "$response" | tail -1)
body=$(echo "$response" | head -n -1)
if [ "$http_code" = "409" ]; then
echo "DAG run already exists (CI retry), continuing"
elif [ "$http_code" != "200" ] && [ "$http_code" != "201" ]; then
echo "Trigger failed: HTTP $http_code"
echo "$body"
exit 1
fi
echo "dag_run_id=$DAG_RUN_ID" >> $GITHUB_OUTPUT
- name: Wait for DAG completion
env:
AIRFLOW_URL: https://airflow.example.com
AIRFLOW_TOKEN: ${{ secrets.AIRFLOW_API_TOKEN }}
DAG_RUN_ID: ${{ steps.trigger.outputs.dag_run_id }}
run: |
python3 .github/scripts/wait_for_dag.py
Python script с правильным polling
# .github/scripts/wait_for_dag.py
import os
import sys
import time
import requests
AIRFLOW_URL = os.environ["AIRFLOW_URL"]
AIRFLOW_TOKEN = os.environ["AIRFLOW_TOKEN"]
DAG_RUN_ID = os.environ["DAG_RUN_ID"]
DAG_ID = "dbt_integration_test"
MAX_WAIT_SEC = 3600 # 1 час
session = requests.Session()
session.headers["Authorization"] = f"Bearer {AIRFLOW_TOKEN}"
# Exponential backoff: 2, 5, 10, 30, 60, 120 (затем держится на 120)
delays = [2, 5, 10, 30, 60, 120]
elapsed = 0
i = 0
while elapsed < MAX_WAIT_SEC:
delay = delays[min(i, len(delays) - 1)]
time.sleep(delay)
elapsed += delay
i += 1
try:
r = session.get(
f"{AIRFLOW_URL}/api/v1/dags/{DAG_ID}/dagRuns/{DAG_RUN_ID}"
timeout=10
)
r.raise_for_status()
except requests.RequestException as e:
print(f"Polling error (will retry): {e}")
continue
state = r.json()["state"]
print(f"[t={elapsed}s] state={state}")
if state == "success":
sys.exit(0)
if state == "failed":
# Получим failed tasks для логов
ti_resp = session.get(
f"{AIRFLOW_URL}/api/v1/dags/{DAG_ID}/dagRuns/{DAG_RUN_ID}/taskInstances"
params={"state": "failed"}
).json()
for ti in ti_resp.get("task_instances", []):
print(f" FAILED task: {ti['task_id']} (try {ti['try_number']})")
print(f" log: {AIRFLOW_URL}/dags/{DAG_ID}/grid?dag_run_id={DAG_RUN_ID}&task_id={ti['task_id']}")
sys.exit(1)
print(f"Timeout: DAG did not finish in {MAX_WAIT_SEC}s")
sys.exit(2)
Что здесь принципиально важно
- Идемпотентный
dag_run_id— детерминированно построен изgithub.run_id + sha. Retry CI step не создаст дубль. - 409 трактуется как success — это означает «уже триггерили, OK».
- Exponential backoff polling — не убивает webserver, но реагирует быстро.
- При failure печатаем URL логов — GitHub Action UI содержит deep link к Airflow UI.
confнесёт CI context — DAG может использоватьcommit_shaдля checkout правильного кода.
Pattern 2: Clear failed tasks programmatically
Use case
Каждое утро в 7:00 мониторинг видит failed tasks из ночного batch. Команда хочет автоматический скрипт, который retry-ит только идемпотентные tasks (по списку), но не трогает тех, что требуют ручного review.
Решение
# scripts/auto_retry_idempotent.py
import os
import requests
from datetime import datetime, timedelta, timezone
AIRFLOW = "https://airflow.example.com"
session = requests.Session()
session.headers["Authorization"] = f"Bearer {os.environ['AIRFLOW_TOKEN']}"
# Whitelist task_ids, которые безопасно retry
IDEMPOTENT_TASKS = {
"etl_orders": ["extract_orders", "transform_orders"],
"etl_users": ["sync_users_full"],
}
# Какие НЕ retry — требуют ручного review
SENSITIVE = {
"etl_orders": ["load_to_warehouse"], # idempotency не гарантирована
}
since = (datetime.now(timezone.utc) - timedelta(hours=12)).isoformat()
# Найдём все failed TI за последние 12 часов
r = session.get(
f"{AIRFLOW}/api/v1/dags/~/dagRuns/~/taskInstances"
params={
"state": "failed",
"start_date_gte": since,
"limit": 500
},
timeout=30
)
r.raise_for_status()
failed_tis = r.json()["task_instances"]
print(f"Found {len(failed_tis)} failed TI since {since}")
to_retry: dict[tuple[str, str], list[str]] = {} # (dag_id, run_id) -> [task_ids]
for ti in failed_tis:
dag_id = ti["dag_id"]
task_id = ti["task_id"]
run_id = ti["dag_run_id"]
if task_id in IDEMPOTENT_TASKS.get(dag_id, []):
to_retry.setdefault((dag_id, run_id), []).append(task_id)
elif task_id in SENSITIVE.get(dag_id, []):
print(f"SENSITIVE failed (manual review): {dag_id}.{task_id} run={run_id}")
else:
print(f"UNKNOWN failed (alerting): {dag_id}.{task_id}")
# Сначала dry-run для каждой группы
for (dag_id, run_id), task_ids in to_retry.items():
payload = {
"task_ids": task_ids,
"dag_run_id": run_id,
"include_upstream": False,
"include_downstream": True, # retry downstream тоже — они скорее всего тоже failed/skipped
"reset_dag_runs": True,
"dry_run": True
}
dry = session.post(
f"{AIRFLOW}/api/v1/dags/{dag_id}/clearTaskInstances"
json=payload, timeout=30
).json()
print(f"{dag_id} run={run_id}: would clear {len(dry['task_instances'])} TI")
# Реальный clear
payload["dry_run"] = False
real = session.post(
f"{AIRFLOW}/api/v1/dags/{dag_id}/clearTaskInstances"
json=payload, timeout=30
)
real.raise_for_status()
print(f" cleared. Scheduler will reschedule.")
Расширение: alerting через Slack при unknown failures
import json
def slack_alert(message: str):
requests.post(
os.environ["SLACK_WEBHOOK_URL"],
json={"text": message},
timeout=5
)
# При нахождении UNKNOWN failed — alert
if dag_id not in IDEMPOTENT_TASKS and dag_id not in SENSITIVE:
slack_alert(
f"Unknown DAG with failed task: `{dag_id}.{task_id}`\n"
f"Run: {run_id}\n"
f"URL: {AIRFLOW}/dags/{dag_id}/grid?dag_run_id={run_id}"
)
Pattern 3: Programmatic Connection management
Use case
Terraform управляет инфраструктурой. Каждое окружение (dev/staging/prod) имеет свой Snowflake account, IAM role, S3 bucket. Хочется чтобы Terraform после создания resources записывал connection в Airflow — без ручной кликалки в UI.
Решение
# scripts/sync_connections_from_tf.py
"""
Reads Terraform outputs, syncs to Airflow connections.
Идемпотентно: PUT (PATCH) для существующих, POST для новых.
"""
import json
import os
import requests
import sys
AIRFLOW = os.environ["AIRFLOW_URL"]
session = requests.Session()
session.headers["Authorization"] = f"Bearer {os.environ['AIRFLOW_TOKEN']}"
with open(sys.argv[1]) as f: # terraform output JSON
tf_outputs = json.load(f)
connections = [
{
"connection_id": "warehouse_snowflake",
"conn_type": "snowflake",
"host": tf_outputs["snowflake_account"]["value"],
"login": tf_outputs["airflow_svc_user"]["value"],
"password": tf_outputs["airflow_svc_password"]["value"],
"schema": tf_outputs["snowflake_default_schema"]["value"],
"extra": json.dumps({
"warehouse": tf_outputs["snowflake_warehouse"]["value"],
"role": tf_outputs["snowflake_role"]["value"],
"region": tf_outputs["aws_region"]["value"]
})
},
{
"connection_id": "data_lake_s3",
"conn_type": "aws",
"extra": json.dumps({
"role_arn": tf_outputs["airflow_iam_role_arn"]["value"],
"region_name": tf_outputs["aws_region"]["value"]
})
}
]
for conn in connections:
conn_id = conn["connection_id"]
# Check existence
r = session.get(f"{AIRFLOW}/api/v1/connections/{conn_id}", timeout=10)
if r.status_code == 404:
# Create
cr = session.post(f"{AIRFLOW}/api/v1/connections", json=conn, timeout=10)
cr.raise_for_status()
print(f"CREATED {conn_id}")
elif r.status_code == 200:
# Update — PATCH без connection_id в body
body = {k: v for k, v in conn.items() if k != "connection_id"}
ur = session.patch(
f"{AIRFLOW}/api/v1/connections/{conn_id}"
json=body, timeout=10
)
ur.raise_for_status()
print(f"UPDATED {conn_id}")
else:
r.raise_for_status()
Альтернатива: использовать airflow-provider в Terraform
# Terraform не имеет официального Airflow provider в стандартной library,
# но есть community: hashicorp/airflow или ZWMP/airflow.
# Для production обычно проще написать скрипт выше — даёт полный контроль.
resource "airflow_connection" "warehouse" {
connection_id = "warehouse_snowflake"
conn_type = "snowflake"
host = snowflake_account.main.url
login = snowflake_user.airflow.name
password = random_password.airflow_user.result
schema = "ANALYTICS"
extra = jsonencode({
warehouse = snowflake_warehouse.compute.name
role = snowflake_role.etl.name
})
}
Лучшая практика: НЕ хранить credentials в Airflow connections напрямую. Использовать secrets backend (Vault, AWS Secrets Manager, GCP Secret Manager). Тогда Airflow API нужен только для bootstrap или для conn_type, host, port — не для passwords. Это обсуждается в модуле 10 (Connections & Secrets).
Pattern 4: Monitor long-running DAGs
Use case
ML training DAG обычно занимает 4-6 часов. Иногда зависает на 12+ часов из-за data quality issues. Нужен monitoring скрипт, который alert-ит если DAG не завершился за expected SLA.
Решение
# scripts/sla_monitor.py
import os
import requests
from datetime import datetime, timedelta, timezone
AIRFLOW = os.environ["AIRFLOW_URL"]
session = requests.Session()
session.headers["Authorization"] = f"Bearer {os.environ['AIRFLOW_TOKEN']}"
# SLA per DAG в часах
SLA = {
"ml_training_daily": 6,
"etl_orders_hourly": 1,
"etl_users_daily": 3,
}
now = datetime.now(timezone.utc)
violations = []
for dag_id, sla_hours in SLA.items():
threshold = (now - timedelta(hours=sla_hours)).isoformat()
r = session.get(
f"{AIRFLOW}/api/v1/dags/{dag_id}/dagRuns"
params={
"state": "running",
"limit": 10,
"order_by": "-start_date"
},
timeout=10
)
r.raise_for_status()
for run in r.json()["dag_runs"]:
start = datetime.fromisoformat(run["start_date"].replace("Z", "+00:00"))
running_hours = (now - start).total_seconds() / 3600
if running_hours > sla_hours:
violations.append({
"dag_id": dag_id,
"run_id": run["dag_run_id"],
"running_hours": round(running_hours, 1),
"sla_hours": sla_hours,
"url": f"{AIRFLOW}/dags/{dag_id}/grid?dag_run_id={run['dag_run_id']}"
})
if violations:
body = "**Airflow SLA violations:**\n\n"
for v in violations:
body += f"- `{v['dag_id']}` running {v['running_hours']}h (SLA {v['sla_hours']}h)\n → {v['url']}\n"
requests.post(
os.environ["SLACK_WEBHOOK_URL"],
json={"text": body, "channel": "#data-platform-alerts"},
timeout=5
)
Запускать через cron каждые 15 минут. Это decoupled SLA — независимый от Airflow process, что важно для случаев, когда сам scheduler заболел.
Pattern 5: Backfill orchestration
Use case
После добавления нового источника в DAG, нужно прогнать его за прошлый месяц (28 runs за разные даты). Делать это через UI кликами — больно.
# scripts/backfill.py
import os
import time
import requests
from datetime import datetime, timedelta, timezone
AIRFLOW = os.environ["AIRFLOW_URL"]
DAG_ID = "etl_orders"
session = requests.Session()
session.headers["Authorization"] = f"Bearer {os.environ['AIRFLOW_TOKEN']}"
start = datetime(2026, 4, 1, tzinfo=timezone.utc)
end = datetime(2026, 4, 28, tzinfo=timezone.utc)
date = start
# Параллелизм: max 3 одновременно
MAX_CONCURRENT = 3
triggered = []
while date <= end:
# Wait if too many already running
while True:
r = session.get(
f"{AIRFLOW}/api/v1/dags/{DAG_ID}/dagRuns"
params={"state": "running,queued", "limit": 100}
).json()
active = len(r["dag_runs"])
if active < MAX_CONCURRENT:
break
print(f"{active} active runs, waiting 60s")
time.sleep(60)
run_id = f"backfill__{date.strftime('%Y-%m-%d')}"
resp = session.post(
f"{AIRFLOW}/api/v1/dags/{DAG_ID}/dagRuns"
json={
"dag_run_id": run_id,
"logical_date": date.isoformat(),
"data_interval_start": date.isoformat(),
"data_interval_end": (date + timedelta(days=1)).isoformat(),
"note": "Backfill after adding new orders source"
}
)
if resp.status_code == 409:
print(f" exists: {run_id}")
else:
resp.raise_for_status()
print(f" triggered: {run_id}")
triggered.append((date, run_id))
date += timedelta(days=1)
print(f"\nTriggered {len(triggered)} backfill runs.")
Альтернатива: airflow dags backfill CLI команда. Но она работает внутри scheduler container и блокирует терминал на часы. REST API approach позволяет запускать асинхронно, c контролем параллельности, из любого hosts.
Production gotchas
1. Token storage
Никогда не храните API tokens в plain env variables в CI. Используйте secrets:
- GitHub Actions:
${{ secrets.AIRFLOW_API_TOKEN }} - GitLab CI: protected variables
- Jenkins: credentials plugin
Token rotation — раз в 90 дней минимум. Service account с минимальным permission scope (только нужные DAGs).
2. Webserver behind ALB/NGINX
Production Airflow обычно за HTTPS proxy. Это меняет несколько вещей:
location /api/v1/ {
proxy_pass http://airflow-webserver:8080;
proxy_set_header X-Forwarded-Proto https;
proxy_set_header X-Forwarded-For $remote_addr;
proxy_set_header Host $host;
proxy_read_timeout 300s; # для длинных list queries
client_max_body_size 10M; # conf в trigger может быть большой
}
Airflow config:
[webserver]
enable_proxy_fix = True
proxy_fix_x_for = 1
proxy_fix_x_proto = 1
proxy_fix_x_host = 1
3. Race condition: 201 → 404
После POST /dagRuns API возвращает 201, но immediate GET того же run может вернуть 404. Это потому что Airflow использует optimistic concurrency: POST успешен только когда commit состоялся, но replication lag может задержать видимость.
Fix: при первом 404 после успешного POST — wait 1-2 секунды и retry.
4. Не использовать API для streaming logs
Endpoint GET /dags/{}/dagRuns/{}/taskInstances/{}/logs/{try} существует. Но это не stream — он возвращает полный log файл. На больших logs (10+ MB) blocks webserver на секунды. Используйте S3/GCS prefetch для real-time tailing, не API.
5. limit=1000 это max — для больших scans нужна pagination
Если ваш скрипт собирает данные за месяц и есть 50k TI — нельзя сделать limit=50000. Нужна pagination с order_by стабильного поля (start_date) и offset. Deep offset (>10k) тормозит — лучше split по start_date_gte/lte диапазонам.