Learning Platform
Глоссарий Troubleshooting
Урок 14.06 · 26 мин
Продвинутый
Python ClientMonitoringBackupService AccountOperations

Programmatic access: Python client, batch operations, backup scripts

После знакомства с REST API endpoints (урок 02), auth (03), CI/CD patterns (04) и CLI (05) — этот завершающий урок модуля показывает, как собрать всё в production operations toolkit: переиспользуемый Python client, набор monitoring scripts, backup automation, и правильная организация service accounts.

Эти инструменты должны жить в отдельном репозитории airflow-operations рядом с infrastructure-as-code. Используйте паттерны ниже как стартовый набор для своей платформы.


Архитектура operations toolkit

Operations toolkit вокруг Airflow
airflow-operations repoОтдельный git repo, не в dags folder. Содержит: client lib, monitoring scripts, backup tools, runbook scripts. Версионируется отдельно от DAGs.
импортирует client
AirflowClient (custom)Тонкая обёртка над requests.Session с auth, retry policy, structured errors. Не использует airflow-client-python — слишком тяжёлый dep.
Service accountFAB user 'ops_automation' с минимальным role: read DagRuns/TI, trigger DAGs, manage Connections (только automated ones). НЕ Admin.
HTTPS
Airflow REST APIПрод webserver. Authentication через JWT (или basic_auth для simple setups). Все запросы логируются в access.log.
cron + alerting
OutputsSlack alerts через webhook, Prometheus metrics через push gateway, S3 backups, dashboard для on-call.

AirflowClient — production wrapper

Не используйте apache-airflow-client PyPI package — он тяжёлый (300+ Python files, тянет marshmallow, jsonschema). Для большинства задач достаточно тонкого wrapper над requests:

# airflow_ops/client.py
import os
import time
from typing import Any
from urllib.parse import urljoin

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry


class AirflowError(Exception):
    """Структурированная ошибка из RFC 7807 Problem Details."""

    def __init__(self, status: int, title: str, detail: str | None = None):
        self.status = status
        self.title = title
        self.detail = detail
        super().__init__(f"{status} {title}: {detail or ''}")


class AirflowClient:
    """
    Тонкий клиент для Airflow REST API v1.
    Обрабатывает retry, structured errors, pagination iteration.
    """

    def __init__(
        self,
        base_url: str | None = None,
        token: str | None = None,
        timeout: float = 30.0,
    ):
        self.base_url = (base_url or os.environ["AIRFLOW_URL"]).rstrip("/")
        self.timeout = timeout

        self.session = requests.Session()
        token = token or os.environ.get("AIRFLOW_API_TOKEN")
        if token:
            self.session.headers["Authorization"] = f"Bearer {token}"

        # Retry на transient errors
        retry = Retry(
            total=5,
            backoff_factor=0.5,  # 0.5, 1, 2, 4, 8 seconds
            status_forcelist=[502, 503, 504],
            allowed_methods=["GET", "POST", "PATCH", "DELETE"],
            respect_retry_after_header=True,
        )
        adapter = HTTPAdapter(max_retries=retry, pool_maxsize=20)
        self.session.mount("https://", adapter)
        self.session.mount("http://", adapter)

    def _request(self, method: str, path: str, **kwargs) -> dict[str, Any]:
        url = urljoin(self.base_url + "/", path.lstrip("/"))
        r = self.session.request(method, url, timeout=self.timeout, **kwargs)

        if r.status_code == 204:
            return {}

        try:
            body = r.json()
        except ValueError:
            body = {"detail": r.text}

        if r.status_code >= 400:
            raise AirflowError(
                status=r.status_code,
                title=body.get("title", "HTTP error"),
                detail=body.get("detail"),
            )
        return body

    # DAG Run operations

    def trigger_dag(
        self,
        dag_id: str,
        run_id: str | None = None,
        conf: dict[str, Any] | None = None,
        logical_date: str | None = None,
        note: str | None = None,
    ) -> dict[str, Any]:
        """Trigger DAG. Возвращает {dag_run_id, state, ...}. 409 → fetch existing."""
        body: dict[str, Any] = {}
        if run_id:
            body["dag_run_id"] = run_id
        if conf:
            body["conf"] = conf
        if logical_date:
            body["logical_date"] = logical_date
        if note:
            body["note"] = note

        try:
            return self._request("POST", f"/api/v1/dags/{dag_id}/dagRuns", json=body)
        except AirflowError as e:
            if e.status == 409 and run_id:
                return self.get_dag_run(dag_id, run_id)
            raise

    def get_dag_run(self, dag_id: str, run_id: str) -> dict[str, Any]:
        return self._request("GET", f"/api/v1/dags/{dag_id}/dagRuns/{run_id}")

    def wait_for_dag_run(
        self,
        dag_id: str,
        run_id: str,
        max_wait: float = 3600,
        log_progress: bool = True,
    ) -> dict[str, Any]:
        """Polling с exponential backoff."""
        delays = [2, 5, 10, 30, 60, 120]
        elapsed = 0.0
        i = 0
        while elapsed < max_wait:
            delay = delays[min(i, len(delays) - 1)]
            time.sleep(delay)
            elapsed += delay
            i += 1
            try:
                run = self.get_dag_run(dag_id, run_id)
            except AirflowError as e:
                if e.status == 404 and elapsed < 30:
                    continue  # race condition после POST
                raise
            if log_progress:
                print(f"[t={elapsed:.0f}s] {dag_id}/{run_id}{run['state']}")
            if run["state"] in ("success", "failed"):
                return run
        raise TimeoutError(f"DAG run {dag_id}/{run_id} did not finish in {max_wait}s")

    # Pagination iterator

    def paginate(
        self,
        path: str,
        items_key: str,
        page_size: int = 100,
        params: dict | None = None,
    ):
        """Iterate over all items, handling pagination."""
        params = dict(params or {})
        offset = 0
        params["limit"] = page_size
        while True:
            params["offset"] = offset
            resp = self._request("GET", path, params=params)
            items = resp.get(items_key, [])
            for item in items:
                yield item
            total = resp.get("total_entries", 0)
            offset += page_size
            if offset >= total or not items:
                break

    # Task instances

    def clear_task_instances(
        self,
        dag_id: str,
        task_ids: list[str],
        run_id: str,
        downstream: bool = True,
        dry_run: bool = False,
    ) -> dict[str, Any]:
        body = {
            "task_ids": task_ids,
            "dag_run_id": run_id,
            "include_downstream": downstream,
            "include_upstream": False,
            "reset_dag_runs": True,
            "dry_run": dry_run,
        }
        return self._request(
            "POST", f"/api/v1/dags/{dag_id}/clearTaskInstances", json=body
        )

Использование

from airflow_ops.client import AirflowClient

af = AirflowClient()  # читает AIRFLOW_URL, AIRFLOW_API_TOKEN из env

# Trigger + wait
run = af.trigger_dag(
    "etl_orders"
    run_id="ops-runbook-2026-05-12"
    conf={"source": "manual"}
)
final = af.wait_for_dag_run("etl_orders", run["dag_run_id"])

# Iterate all failed TI за день
for ti in af.paginate(
    "/api/v1/dags/~/dagRuns/~/taskInstances"
    items_key="task_instances"
    params={"state": "failed", "start_date_gte": "2026-05-12T00:00:00Z"}
):
    print(ti["dag_id"], ti["task_id"])

Script 1: Find stuck queued tasks

Tasks которые в state queued дольше 10 минут — обычно signal проблем с executor или message broker.

# scripts/find_stuck_queued.py
from datetime import datetime, timedelta, timezone
from airflow_ops.client import AirflowClient

af = AirflowClient()
threshold_min = 10

stuck = []
since = (datetime.now(timezone.utc) - timedelta(hours=24)).isoformat()

for ti in af.paginate(
    "/api/v1/dags/~/dagRuns/~/taskInstances"
    items_key="task_instances"
    params={
        "state": "queued",
        "queued_at_gte": since,  # с 2.10+
    }
):
    queued_dttm = ti.get("queued_when")
    if not queued_dttm:
        continue
    queued_at = datetime.fromisoformat(queued_dttm.replace("Z", "+00:00"))
    age_min = (datetime.now(timezone.utc) - queued_at).total_seconds() / 60
    if age_min > threshold_min:
        stuck.append({
            "dag": ti["dag_id"],
            "task": ti["task_id"],
            "run": ti["dag_run_id"],
            "queued_min": round(age_min, 1),
            "pool": ti.get("pool"),
            "priority": ti.get("priority_weight"),
        })

if stuck:
    import json
    print(f"WARN: {len(stuck)} stuck queued tasks:")
    print(json.dumps(stuck[:20], indent=2))
    # Alert через webhook

Возможные причины stuck queued:

ПричинаПризнакРешение
Pool exhaustedSame pool у всех stuckУвеличить slot_pool.slots
Worker deadTasks в queued, executor.open_slots = 0Restart workers
Broker mute (Celery)Redis/Rabbit недоступенCheck broker, scheduler logs
Scheduler critical section contentionНизкий enqueue rate в metricsScale scheduler / DB tuning

Script 2: Alert на long-running DAGs

Расширенная версия SLA monitor из урока 04 — с expected_duration из исторических данных:

# scripts/long_running_alert.py
import statistics
from datetime import datetime, timezone

from airflow_ops.client import AirflowClient

af = AirflowClient()

def expected_duration(dag_id: str, percentile: float = 0.95) -> float:
    """p95 длительности последних 30 успешных runs."""
    durations = []
    for run in af.paginate(
        f"/api/v1/dags/{dag_id}/dagRuns"
        items_key="dag_runs"
        params={"state": "success", "order_by": "-end_date"},
        page_size=30
    ):
        if not run.get("end_date") or not run.get("start_date"):
            continue
        start = datetime.fromisoformat(run["start_date"].replace("Z", "+00:00"))
        end = datetime.fromisoformat(run["end_date"].replace("Z", "+00:00"))
        durations.append((end - start).total_seconds())
        if len(durations) >= 30:
            break
    if not durations:
        return 0
    return statistics.quantiles(durations, n=20)[18]  # ~p95

# Все running DAGs
running = list(af.paginate(
    "/api/v1/dags/~/dagRuns/~/taskInstances"
    items_key="task_instances"
    params={"state": "running"}
))

alerts = []
for ti in running:
    dag_id = ti["dag_id"]
    run_id = ti["dag_run_id"]
    start_str = ti.get("start_date")
    if not start_str:
        continue
    start = datetime.fromisoformat(start_str.replace("Z", "+00:00"))
    elapsed = (datetime.now(timezone.utc) - start).total_seconds()
    expected = expected_duration(dag_id)
    if expected > 0 and elapsed > expected * 2:
        alerts.append({
            "dag": dag_id,
            "run": run_id,
            "elapsed_min": round(elapsed / 60),
            "expected_p95_min": round(expected / 60),
            "ratio": round(elapsed / expected, 1),
        })

if alerts:
    print(f"WARN: {len(alerts)} DAGs running 2x longer than p95")

Script 3: Backup metadata

Backup критичных таблиц через SQL dump + connection/variable export через CLI:

#!/bin/bash
# scripts/backup_metadata.sh
set -euo pipefail

BACKUP_DIR="/backup/airflow/$(date +%Y-%m-%d)"
mkdir -p "$BACKUP_DIR"

# 1. PostgreSQL dump только metadata tables (без log/xcom — слишком большие)
PGPASSWORD="$PGPASSWORD" pg_dump \
  -h "$PG_HOST" -U "$PG_USER" -d airflow \
  --schema=public \
  --table=dag --table=dag_run --table=task_instance \
  --table=connection --table=variable --table=slot_pool \
  --table=dataset --table=dataset_event \
  --table=ab_user --table=ab_role --table=ab_permission_view_role \
  --format=custom --compress=9 \
  -f "$BACKUP_DIR/airflow_metadata.dump"

# 2. CLI export — readable form для disaster recovery
kubectl exec deploy/airflow-scheduler -- airflow connections export - --file-format=json > "$BACKUP_DIR/connections.json"
kubectl exec deploy/airflow-scheduler -- airflow variables export - > "$BACKUP_DIR/variables.json"
kubectl exec deploy/airflow-scheduler -- airflow pools export - > "$BACKUP_DIR/pools.json"

# 3. Encrypt и upload в S3
tar czf - -C "$BACKUP_DIR" . | gpg --encrypt --recipient [email protected] | \
  aws s3 cp - "s3://airflow-backups/$(date +%Y-%m-%d).tar.gz.gpg"

# 4. Retention — удалить локальные старше 7 дней
find /backup/airflow -mtime +7 -type d -exec rm -rf {} +

Cron: ежедневно в 4:00.

WARNING

Connection/Variable export расшифровывает значения с использованием Fernet key (поэтому требует доступ к ключу). Дампы содержат plaintext credentials. Всегда encrypt перед хранением (GPG, KMS-encrypted S3, etc).


Script 4: Service account creation

Чем меньше permissions у automation user — тем безопаснее. Скрипт для bootstrap service account:

# scripts/setup_service_account.py
"""
Create a least-privilege service account for ops automation.
Roles created via airflow-managed FAB API.
"""
import secrets
from airflow_ops.client import AirflowClient

af = AirflowClient(token=ADMIN_TOKEN)  # admin token нужен только для setup

# 1. Create role с точными permissions
role_name = "OpsAutomation"
permissions = [
    # Read DagRuns, TI для monitoring
    {"action": "can_read", "resource": "DAG Runs"},
    {"action": "can_read", "resource": "Task Instances"},
    # Trigger and clear DAGs
    {"action": "can_create", "resource": "DAG Runs"},
    {"action": "can_edit", "resource": "Task Instances"},
    # Read DAG list (для discovery)
    {"action": "can_read", "resource": "DAGs"},
    # NB: НЕТ permission на Connections/Variables/Pools/Users
    # Это сделано намеренно — sensitive data только через CLI
]

# Создание role через API:
af._request("POST", "/api/v1/roles", json={
    "name": role_name,
    "actions": permissions,
})

# 2. Create user
password = secrets.token_urlsafe(32)
af._request("POST", "/api/v1/users", json={
    "username": "ops_automation",
    "email": "[email protected]",
    "first_name": "Ops",
    "last_name": "Automation",
    "password": password,
    "roles": [{"name": role_name}],
})

# 3. Print credentials для secret manager
print(f"Service account created.")
print(f"Username: ops_automation")
print(f"Password (save to Vault!): {password}")

Service account живёт в:

  • Vault path airflow/svc/ops_automation
  • AWS Secrets Manager airflow-ops-automation
  • GitHub Actions secrets AIRFLOW_OPS_TOKEN

Никогда не commits в git, не в env.


Script 5: Audit report — кто что триггерил

# scripts/audit_triggers.py
"""
Generate audit report: who triggered which DAGs за последний месяц.
Через GET /dags/{}/dagRuns и cross-reference с note/conf.
"""
from datetime import datetime, timedelta, timezone
from collections import Counter
from airflow_ops.client import AirflowClient

af = AirflowClient()
since = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()

triggers = Counter()
for ti in af.paginate(
    "/api/v1/dags/~/dagRuns"
    items_key="dag_runs"
    params={
        "start_date_gte": since,
        "run_type": "manual",
    }
):
    actor = "unknown"
    note = ti.get("note", "")
    conf = ti.get("conf") or {}

    # Парсим actor из note/conf
    if "github_run_url" in conf:
        actor = "github-ci"
    elif "triggered_by" in conf:
        actor = conf["triggered_by"]
    elif note and note.startswith("Triggered by"):
        actor = note.split("by", 1)[1].strip()
    triggers[(ti["dag_id"], actor)] += 1

# Sort и print top 50
for (dag, actor), count in triggers.most_common(50):
    print(f"{count:5d}  {actor:30s}  {dag}")

Этот отчёт нужен для compliance (SOX, GDPR data lineage) и для понимания «кто заваливает production DAGs ad-hoc triggers».


Production gotchas

1. Token caching на disk

Не делайте этого:

# плохо — token утечёт через core dump или filesystem
token = open(os.path.expanduser("~/.airflow-token")).read().strip()

Лучше — через credential helper или env var из vault-agent (sidecar который автоматически refresh-ит).

2. Session pooling — обязательно

requests.get(...) без сессии создаёт новое TCP+TLS соединение. На 100 запросов это +5-10 секунд только handshake. Используйте requests.Session() (как в AirflowClient).

3. Большие списки нужно paginate в правильном порядке

order_by обязательно при pagination — иначе результаты могут дублироваться/пропадать между page если данные меняются. Используйте стабильное поле:

params={"order_by": "execution_date"}  # стабильный для read

4. paginate бесконечен при API bug

Если API возвращает items но total_entries глюк (rare) — pagination зациклится. В реальном клиенте добавьте safety limit:

max_pages = 1000
if pages_fetched > max_pages:
    raise RuntimeError("Pagination safety limit exceeded")

5. Timeout vs server-side execution

client.timeout=30 означает, что client откажется ждать 30s. Но сервер продолжит обработку. Для POST trigger это OK (DagRun уже создан). Для DELETE — может оставить partial state.

Идемпотентные DELETE: повторите запрос — это безопасно.


airflow-operations/
├── airflow_ops/
│   ├── __init__.py
│   ├── client.py          # AirflowClient class
│   ├── slack.py            # Slack alert helpers
│   └── prometheus.py       # PushGateway helpers
├── scripts/
│   ├── find_stuck_queued.py
│   ├── long_running_alert.py
│   ├── auto_retry_idempotent.py
│   ├── backup_metadata.sh
│   ├── audit_triggers.py
│   └── setup_service_account.py
├── tests/
│   └── test_client.py
├── pyproject.toml
└── README.md

pyproject.toml:

[project]
name = "airflow-ops"
version = "0.1.0"
dependencies = [
    "requests>=2.31",
    "urllib3>=2.0",
]

[project.scripts]
af-find-stuck = "airflow_ops.scripts.find_stuck:main"
af-backup = "airflow_ops.scripts.backup:main"

Деплой как Docker image, запуск через Kubernetes CronJobs.


Проверка знанийKnowledge check
Команда написала monitoring script, который раз в минуту делает `GET /api/v1/dags/~/dagRuns/~/taskInstances?state=running&limit=1000` чтобы показать live dashboard. После rollout webserver начал OOM-ить раз в час. Что произошло и как переделать?
ОтветAnswer
**Проблема 1: heavy SQL query на каждый poll.** `GET /taskInstances` без `dag_ids` фильтра делает full table scan task_instance + JOIN c dag_run. На production (10k+ runs/day) это **10-50MB JSON response** и **0.5-2 sec query time**. Раз в минуту = 1.5GB/час трафика + постоянная DB нагрузка. **Проблема 2: каждый response распарсивается webserver в Python objects** через marshmallow — резкий пик памяти, особенно при concurrent polls (если dashboard открыт у нескольких людей). Накопительный effect → OOM. **Fixes:** (1) **Reduce scope:** filter по `dag_ids` (только важные DAGs), `start_date_gte` (только последний час). (2) **Reduce frequency:** poll раз в 30s, не 1s. (3) **Иcпользовать webhook/Listener API** (модуль 12) для push-based updates вместо polling. (4) **Cache в Redis:** один backend service делает 1 poll/min, dashboard читает из Redis. (5) Долгосрочно — **OpenTelemetry metrics** (модуль 14): scheduler.queued_tasks, dag_run.success_rate. Графана через Prometheus намного дешевле чем REST API polling. (6) Если real-time критичен — webhook через task_instance state change event.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Pattern для production AirflowClient: HTTP retry policy. Что важно?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6