Programmatic access: Python client, batch operations, backup scripts
После знакомства с REST API endpoints (урок 02), auth (03), CI/CD patterns (04) и CLI (05) — этот завершающий урок модуля показывает, как собрать всё в production operations toolkit: переиспользуемый Python client, набор monitoring scripts, backup automation, и правильная организация service accounts.
Эти инструменты должны жить в отдельном репозитории airflow-operations рядом с infrastructure-as-code. Используйте паттерны ниже как стартовый набор для своей платформы.
Архитектура operations toolkit
AirflowClient — production wrapper
Не используйте apache-airflow-client PyPI package — он тяжёлый (300+ Python files, тянет marshmallow, jsonschema). Для большинства задач достаточно тонкого wrapper над requests:
# airflow_ops/client.py
import os
import time
from typing import Any
from urllib.parse import urljoin
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class AirflowError(Exception):
"""Структурированная ошибка из RFC 7807 Problem Details."""
def __init__(self, status: int, title: str, detail: str | None = None):
self.status = status
self.title = title
self.detail = detail
super().__init__(f"{status} {title}: {detail or ''}")
class AirflowClient:
"""
Тонкий клиент для Airflow REST API v1.
Обрабатывает retry, structured errors, pagination iteration.
"""
def __init__(
self,
base_url: str | None = None,
token: str | None = None,
timeout: float = 30.0,
):
self.base_url = (base_url or os.environ["AIRFLOW_URL"]).rstrip("/")
self.timeout = timeout
self.session = requests.Session()
token = token or os.environ.get("AIRFLOW_API_TOKEN")
if token:
self.session.headers["Authorization"] = f"Bearer {token}"
# Retry на transient errors
retry = Retry(
total=5,
backoff_factor=0.5, # 0.5, 1, 2, 4, 8 seconds
status_forcelist=[502, 503, 504],
allowed_methods=["GET", "POST", "PATCH", "DELETE"],
respect_retry_after_header=True,
)
adapter = HTTPAdapter(max_retries=retry, pool_maxsize=20)
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)
def _request(self, method: str, path: str, **kwargs) -> dict[str, Any]:
url = urljoin(self.base_url + "/", path.lstrip("/"))
r = self.session.request(method, url, timeout=self.timeout, **kwargs)
if r.status_code == 204:
return {}
try:
body = r.json()
except ValueError:
body = {"detail": r.text}
if r.status_code >= 400:
raise AirflowError(
status=r.status_code,
title=body.get("title", "HTTP error"),
detail=body.get("detail"),
)
return body
# DAG Run operations
def trigger_dag(
self,
dag_id: str,
run_id: str | None = None,
conf: dict[str, Any] | None = None,
logical_date: str | None = None,
note: str | None = None,
) -> dict[str, Any]:
"""Trigger DAG. Возвращает {dag_run_id, state, ...}. 409 → fetch existing."""
body: dict[str, Any] = {}
if run_id:
body["dag_run_id"] = run_id
if conf:
body["conf"] = conf
if logical_date:
body["logical_date"] = logical_date
if note:
body["note"] = note
try:
return self._request("POST", f"/api/v1/dags/{dag_id}/dagRuns", json=body)
except AirflowError as e:
if e.status == 409 and run_id:
return self.get_dag_run(dag_id, run_id)
raise
def get_dag_run(self, dag_id: str, run_id: str) -> dict[str, Any]:
return self._request("GET", f"/api/v1/dags/{dag_id}/dagRuns/{run_id}")
def wait_for_dag_run(
self,
dag_id: str,
run_id: str,
max_wait: float = 3600,
log_progress: bool = True,
) -> dict[str, Any]:
"""Polling с exponential backoff."""
delays = [2, 5, 10, 30, 60, 120]
elapsed = 0.0
i = 0
while elapsed < max_wait:
delay = delays[min(i, len(delays) - 1)]
time.sleep(delay)
elapsed += delay
i += 1
try:
run = self.get_dag_run(dag_id, run_id)
except AirflowError as e:
if e.status == 404 and elapsed < 30:
continue # race condition после POST
raise
if log_progress:
print(f"[t={elapsed:.0f}s] {dag_id}/{run_id} → {run['state']}")
if run["state"] in ("success", "failed"):
return run
raise TimeoutError(f"DAG run {dag_id}/{run_id} did not finish in {max_wait}s")
# Pagination iterator
def paginate(
self,
path: str,
items_key: str,
page_size: int = 100,
params: dict | None = None,
):
"""Iterate over all items, handling pagination."""
params = dict(params or {})
offset = 0
params["limit"] = page_size
while True:
params["offset"] = offset
resp = self._request("GET", path, params=params)
items = resp.get(items_key, [])
for item in items:
yield item
total = resp.get("total_entries", 0)
offset += page_size
if offset >= total or not items:
break
# Task instances
def clear_task_instances(
self,
dag_id: str,
task_ids: list[str],
run_id: str,
downstream: bool = True,
dry_run: bool = False,
) -> dict[str, Any]:
body = {
"task_ids": task_ids,
"dag_run_id": run_id,
"include_downstream": downstream,
"include_upstream": False,
"reset_dag_runs": True,
"dry_run": dry_run,
}
return self._request(
"POST", f"/api/v1/dags/{dag_id}/clearTaskInstances", json=body
)
Использование
from airflow_ops.client import AirflowClient
af = AirflowClient() # читает AIRFLOW_URL, AIRFLOW_API_TOKEN из env
# Trigger + wait
run = af.trigger_dag(
"etl_orders"
run_id="ops-runbook-2026-05-12"
conf={"source": "manual"}
)
final = af.wait_for_dag_run("etl_orders", run["dag_run_id"])
# Iterate all failed TI за день
for ti in af.paginate(
"/api/v1/dags/~/dagRuns/~/taskInstances"
items_key="task_instances"
params={"state": "failed", "start_date_gte": "2026-05-12T00:00:00Z"}
):
print(ti["dag_id"], ti["task_id"])
Script 1: Find stuck queued tasks
Tasks которые в state queued дольше 10 минут — обычно signal проблем с executor или message broker.
# scripts/find_stuck_queued.py
from datetime import datetime, timedelta, timezone
from airflow_ops.client import AirflowClient
af = AirflowClient()
threshold_min = 10
stuck = []
since = (datetime.now(timezone.utc) - timedelta(hours=24)).isoformat()
for ti in af.paginate(
"/api/v1/dags/~/dagRuns/~/taskInstances"
items_key="task_instances"
params={
"state": "queued",
"queued_at_gte": since, # с 2.10+
}
):
queued_dttm = ti.get("queued_when")
if not queued_dttm:
continue
queued_at = datetime.fromisoformat(queued_dttm.replace("Z", "+00:00"))
age_min = (datetime.now(timezone.utc) - queued_at).total_seconds() / 60
if age_min > threshold_min:
stuck.append({
"dag": ti["dag_id"],
"task": ti["task_id"],
"run": ti["dag_run_id"],
"queued_min": round(age_min, 1),
"pool": ti.get("pool"),
"priority": ti.get("priority_weight"),
})
if stuck:
import json
print(f"WARN: {len(stuck)} stuck queued tasks:")
print(json.dumps(stuck[:20], indent=2))
# Alert через webhook
Возможные причины stuck queued:
| Причина | Признак | Решение |
|---|---|---|
| Pool exhausted | Same pool у всех stuck | Увеличить slot_pool.slots |
| Worker dead | Tasks в queued, executor.open_slots = 0 | Restart workers |
| Broker mute (Celery) | Redis/Rabbit недоступен | Check broker, scheduler logs |
| Scheduler critical section contention | Низкий enqueue rate в metrics | Scale scheduler / DB tuning |
Script 2: Alert на long-running DAGs
Расширенная версия SLA monitor из урока 04 — с expected_duration из исторических данных:
# scripts/long_running_alert.py
import statistics
from datetime import datetime, timezone
from airflow_ops.client import AirflowClient
af = AirflowClient()
def expected_duration(dag_id: str, percentile: float = 0.95) -> float:
"""p95 длительности последних 30 успешных runs."""
durations = []
for run in af.paginate(
f"/api/v1/dags/{dag_id}/dagRuns"
items_key="dag_runs"
params={"state": "success", "order_by": "-end_date"},
page_size=30
):
if not run.get("end_date") or not run.get("start_date"):
continue
start = datetime.fromisoformat(run["start_date"].replace("Z", "+00:00"))
end = datetime.fromisoformat(run["end_date"].replace("Z", "+00:00"))
durations.append((end - start).total_seconds())
if len(durations) >= 30:
break
if not durations:
return 0
return statistics.quantiles(durations, n=20)[18] # ~p95
# Все running DAGs
running = list(af.paginate(
"/api/v1/dags/~/dagRuns/~/taskInstances"
items_key="task_instances"
params={"state": "running"}
))
alerts = []
for ti in running:
dag_id = ti["dag_id"]
run_id = ti["dag_run_id"]
start_str = ti.get("start_date")
if not start_str:
continue
start = datetime.fromisoformat(start_str.replace("Z", "+00:00"))
elapsed = (datetime.now(timezone.utc) - start).total_seconds()
expected = expected_duration(dag_id)
if expected > 0 and elapsed > expected * 2:
alerts.append({
"dag": dag_id,
"run": run_id,
"elapsed_min": round(elapsed / 60),
"expected_p95_min": round(expected / 60),
"ratio": round(elapsed / expected, 1),
})
if alerts:
print(f"WARN: {len(alerts)} DAGs running 2x longer than p95")
Script 3: Backup metadata
Backup критичных таблиц через SQL dump + connection/variable export через CLI:
#!/bin/bash
# scripts/backup_metadata.sh
set -euo pipefail
BACKUP_DIR="/backup/airflow/$(date +%Y-%m-%d)"
mkdir -p "$BACKUP_DIR"
# 1. PostgreSQL dump только metadata tables (без log/xcom — слишком большие)
PGPASSWORD="$PGPASSWORD" pg_dump \
-h "$PG_HOST" -U "$PG_USER" -d airflow \
--schema=public \
--table=dag --table=dag_run --table=task_instance \
--table=connection --table=variable --table=slot_pool \
--table=dataset --table=dataset_event \
--table=ab_user --table=ab_role --table=ab_permission_view_role \
--format=custom --compress=9 \
-f "$BACKUP_DIR/airflow_metadata.dump"
# 2. CLI export — readable form для disaster recovery
kubectl exec deploy/airflow-scheduler -- airflow connections export - --file-format=json > "$BACKUP_DIR/connections.json"
kubectl exec deploy/airflow-scheduler -- airflow variables export - > "$BACKUP_DIR/variables.json"
kubectl exec deploy/airflow-scheduler -- airflow pools export - > "$BACKUP_DIR/pools.json"
# 3. Encrypt и upload в S3
tar czf - -C "$BACKUP_DIR" . | gpg --encrypt --recipient [email protected] | \
aws s3 cp - "s3://airflow-backups/$(date +%Y-%m-%d).tar.gz.gpg"
# 4. Retention — удалить локальные старше 7 дней
find /backup/airflow -mtime +7 -type d -exec rm -rf {} +
Cron: ежедневно в 4:00.
Connection/Variable export расшифровывает значения с использованием Fernet key (поэтому требует доступ к ключу). Дампы содержат plaintext credentials. Всегда encrypt перед хранением (GPG, KMS-encrypted S3, etc).
Script 4: Service account creation
Чем меньше permissions у automation user — тем безопаснее. Скрипт для bootstrap service account:
# scripts/setup_service_account.py
"""
Create a least-privilege service account for ops automation.
Roles created via airflow-managed FAB API.
"""
import secrets
from airflow_ops.client import AirflowClient
af = AirflowClient(token=ADMIN_TOKEN) # admin token нужен только для setup
# 1. Create role с точными permissions
role_name = "OpsAutomation"
permissions = [
# Read DagRuns, TI для monitoring
{"action": "can_read", "resource": "DAG Runs"},
{"action": "can_read", "resource": "Task Instances"},
# Trigger and clear DAGs
{"action": "can_create", "resource": "DAG Runs"},
{"action": "can_edit", "resource": "Task Instances"},
# Read DAG list (для discovery)
{"action": "can_read", "resource": "DAGs"},
# NB: НЕТ permission на Connections/Variables/Pools/Users
# Это сделано намеренно — sensitive data только через CLI
]
# Создание role через API:
af._request("POST", "/api/v1/roles", json={
"name": role_name,
"actions": permissions,
})
# 2. Create user
password = secrets.token_urlsafe(32)
af._request("POST", "/api/v1/users", json={
"username": "ops_automation",
"email": "[email protected]",
"first_name": "Ops",
"last_name": "Automation",
"password": password,
"roles": [{"name": role_name}],
})
# 3. Print credentials для secret manager
print(f"Service account created.")
print(f"Username: ops_automation")
print(f"Password (save to Vault!): {password}")
Service account живёт в:
- Vault path
airflow/svc/ops_automation - AWS Secrets Manager
airflow-ops-automation - GitHub Actions secrets
AIRFLOW_OPS_TOKEN
Никогда не commits в git, не в env.
Script 5: Audit report — кто что триггерил
# scripts/audit_triggers.py
"""
Generate audit report: who triggered which DAGs за последний месяц.
Через GET /dags/{}/dagRuns и cross-reference с note/conf.
"""
from datetime import datetime, timedelta, timezone
from collections import Counter
from airflow_ops.client import AirflowClient
af = AirflowClient()
since = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
triggers = Counter()
for ti in af.paginate(
"/api/v1/dags/~/dagRuns"
items_key="dag_runs"
params={
"start_date_gte": since,
"run_type": "manual",
}
):
actor = "unknown"
note = ti.get("note", "")
conf = ti.get("conf") or {}
# Парсим actor из note/conf
if "github_run_url" in conf:
actor = "github-ci"
elif "triggered_by" in conf:
actor = conf["triggered_by"]
elif note and note.startswith("Triggered by"):
actor = note.split("by", 1)[1].strip()
triggers[(ti["dag_id"], actor)] += 1
# Sort и print top 50
for (dag, actor), count in triggers.most_common(50):
print(f"{count:5d} {actor:30s} {dag}")
Этот отчёт нужен для compliance (SOX, GDPR data lineage) и для понимания «кто заваливает production DAGs ad-hoc triggers».
Production gotchas
1. Token caching на disk
Не делайте этого:
# плохо — token утечёт через core dump или filesystem
token = open(os.path.expanduser("~/.airflow-token")).read().strip()
Лучше — через credential helper или env var из vault-agent (sidecar который автоматически refresh-ит).
2. Session pooling — обязательно
requests.get(...) без сессии создаёт новое TCP+TLS соединение. На 100 запросов это +5-10 секунд только handshake. Используйте requests.Session() (как в AirflowClient).
3. Большие списки нужно paginate в правильном порядке
order_by обязательно при pagination — иначе результаты могут дублироваться/пропадать между page если данные меняются. Используйте стабильное поле:
params={"order_by": "execution_date"} # стабильный для read
4. paginate бесконечен при API bug
Если API возвращает items но total_entries глюк (rare) — pagination зациклится. В реальном клиенте добавьте safety limit:
max_pages = 1000
if pages_fetched > max_pages:
raise RuntimeError("Pagination safety limit exceeded")
5. Timeout vs server-side execution
client.timeout=30 означает, что client откажется ждать 30s. Но сервер продолжит обработку. Для POST trigger это OK (DagRun уже создан). Для DELETE — может оставить partial state.
Идемпотентные DELETE: повторите запрос — это безопасно.
Toolkit organization — recommended layout
airflow-operations/
├── airflow_ops/
│ ├── __init__.py
│ ├── client.py # AirflowClient class
│ ├── slack.py # Slack alert helpers
│ └── prometheus.py # PushGateway helpers
├── scripts/
│ ├── find_stuck_queued.py
│ ├── long_running_alert.py
│ ├── auto_retry_idempotent.py
│ ├── backup_metadata.sh
│ ├── audit_triggers.py
│ └── setup_service_account.py
├── tests/
│ └── test_client.py
├── pyproject.toml
└── README.md
pyproject.toml:
[project]
name = "airflow-ops"
version = "0.1.0"
dependencies = [
"requests>=2.31",
"urllib3>=2.0",
]
[project.scripts]
af-find-stuck = "airflow_ops.scripts.find_stuck:main"
af-backup = "airflow_ops.scripts.backup:main"
Деплой как Docker image, запуск через Kubernetes CronJobs.