Learning Platform
Глоссарий Troubleshooting
Урок 14.02 · 26 мин
Продвинутый
REST APIFlask-RESTfulOpenAPIDAG RunIdempotency

REST API v1 — endpoints, OpenAPI, идемпотентность

REST API v1 — это публичный HTTP контракт Airflow. Он стал stable в 2.0 (PEP-484-подобный момент — после долгого «experimental» периода) и с тех пор обратно совместим. В 2.10/2.11 LTS этот API остаётся основным механизмом программного управления Airflow: запуск DAG из CI/CD, мониторинг состояния, очистка task instances, управление пулом подключений и переменных.

Понимание устройства API критично, потому что в production вы будете писать клиенты, scripts и Terraform-providers, опирающиеся именно на эти endpoints. Этот урок — анатомия API: какие группы endpoints существуют, как они спроектированы, где подводные камни.


Архитектура API в Airflow 2.x

API v1 живёт в том же Flask-приложении, что и Web UI. Это важно понимать для производительности и безопасности:

Где живёт REST API v1
КлиентHTTP клиент: curl, Python requests, Terraform airflow provider, GitHub Action, Postman. Шлёт JSON по HTTPS на webserver.
HTTPS, /api/v1/*
Webserver — Flask + GunicornWebserver обрабатывает оба класса запросов: /admin/* для UI и /api/v1/* для REST API. Тот же Gunicorn pool workers — поэтому heavy API нагрузка влияет на UI latency. В production за NGINX или ALB.
Flask-AppBuilder middleware
FAB auth + permission checkКаждый request проходит через FAB: (1) auth backend identifies user; (2) permission check (например `can_read` на DagRun resource); (3) если OK — переходит к Flask-RESTful endpoint.
Flask-RESTful endpointEndpoint — Python-функция, сгенерированная из OpenAPI spec через connexion. Делает SQLAlchemy запросы к metadata DB, сериализует через marshmallow в JSON.
SQLAlchemy
Metadata DBPostgreSQL/MySQL. Большинство endpoints — чистые DB запросы. Trigger DAG — UPDATE dag_run + UPDATE dag (next_dagrun). Scheduler затем подберёт run через свой обычный loop.

Важное следствие: API не имеет отдельного процесса. Это просто часть webserver. Поэтому:

  • Cпейс между UI и API делится — high-volume polling добавляет нагрузку на UI users.
  • Перезагрузка webserver роняет API (но не scheduler/workers).
  • Connection pool — общий с UI. sql_alchemy_pool_size должен покрывать оба.

OpenAPI 3 spec

Полный контракт API описан в OpenAPI 3:

# YAML spec
GET /api/v1/openapi.yaml

# UI Swagger
GET /api/v1/ui/

Spec используется тремя способами:

  1. Генерация серверного кодаconnexion lib читает YAML и роутит запросы к Python-функциям.
  2. Генерация клиентовopenapi-generator создаёт типизированные клиенты для Python, Go, Java, TypeScript.
  3. Документация — Swagger UI на /api/v1/ui/.
NOTE

Spec не описывает абсолютно все возможности. Например, поля conf в TriggerDagRun не валидируются schema-ой (только additionalProperties: true). Сложные filtering правила (например, state IN (...)) выражены через query parameters state=success,failed, что не всегда отражено в типизированных клиентах.


Группы endpoints

API v1 разделён на ~15 ресурсов. Основные:

ГруппаКлючевые endpointsЧто делает
DAGsGET /dags, GET /dags/{id}, PATCH /dags/{id}List/get/pause/unpause
DAG RunsPOST /dags/{id}/dagRuns, GET /dags/{id}/dagRunsTrigger, мониторинг
Task InstancesGET /dags/{id}/dagRuns/{run_id}/taskInstances, POST /dags/{id}/clearTaskInstancesList, clear failed
ConnectionsGET/POST/PATCH/DELETE /connectionsCRUD подключений
VariablesGET/POST/PATCH/DELETE /variablesCRUD переменных
PoolsGET/POST/PATCH/DELETE /poolsCRUD пулов
XComGET /dags/{id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntriesRead XCom values
Datasets (2.4+)GET /datasets, GET /datasets/eventsBrowse data lineage
MonitoringGET /health, GET /configLiveness, deployment info
Permissions/UsersGET /users, GET /rolesFAB admin (только admin role)

Trigger DAG Run — самый частый endpoint

POST /api/v1/dags/etl_orders/dagRuns
Content-Type: application/json
Authorization: Basic ...

{
  "dag_run_id": "manual__github-action-12345",
  "logical_date": "2026-05-12T00:00:00Z",
  "data_interval_start": "2026-05-11T00:00:00Z",
  "data_interval_end": "2026-05-12T00:00:00Z",
  "conf": {
    "source_bucket": "s3://raw/orders/",
    "target_table": "analytics.orders_daily"
  },
  "note": "Backfill 2026-05-11 после fix bug в transform"
}

Несколько тонкостей:

  • dag_run_id — если не указан, генерируется как manual__<timestamp>. Custom id даёт идемпотентность: повторный POST с тем же id вернёт 409 Conflict, не создаст дубль.
  • logical_date vs data_interval_* — в 2.x logical_date (бывший execution_date) это «точка во времени», которую представляет run. data_interval_start/end — реальные границы данных. Они часто не совпадают (cron 0 8 * * * имеет logical_date=00:00, а data_interval=[вчера 00:00, сегодня 00:00]).
  • conf — произвольный JSON, доступный в task через {{ dag_run.conf['key'] }}. Не валидируется.
  • note (2.7+) — текстовая аннотация, видна в UI.
Идемпотентность пайплайнов — повтор не ломает данные

Идемпотентность через dag_run_id

Это критично для CI/CD сценариев:

import requests
from requests.auth import HTTPBasicAuth

GITHUB_RUN_ID = "github-actions-12345"

resp = requests.post(
    "https://airflow.example.com/api/v1/dags/etl_orders/dagRuns"
    auth=HTTPBasicAuth("ci_user", "..."),
    json={
        "dag_run_id": f"ci__{GITHUB_RUN_ID}",
        "conf": {"branch": "main", "commit_sha": "abc123"}
    },
    timeout=10
)

if resp.status_code == 409:
    # Уже триггерили этот build ранее — это OK, не падаем
    print("DAG run already exists, fetching state")
    run_resp = requests.get(
        f"https://airflow.example.com/api/v1/dags/etl_orders/dagRuns/ci__{GITHUB_RUN_ID}"
        auth=HTTPBasicAuth("ci_user", "...")
    )
elif resp.status_code >= 400:
    raise RuntimeError(f"Trigger failed: {resp.status_code} {resp.text}")

Без dag_run_id идемпотентность отсутствует — retry CI шага создаст дубль DagRun.


Pagination и filtering

Все list endpoints используют offset/limit pagination:

GET /api/v1/dags?limit=100&offset=200&order_by=-last_parsed_time

Параметры:

  • limit — default 100, max 1000. Жёстко контролируется через [api] maximum_page_limit.
  • offset — для перехода к следующей странице.
  • order_by — поле сортировки, - префикс для DESC.

Ответ всегда содержит total_entries, что позволяет посчитать число страниц:

{
  "dags": [ ... ],
  "total_entries": 1847
}

Filtering — query parameters

Разные endpoints поддерживают разные фильтры. Часто используемые:

# DagRuns с фильтрами
GET /api/v1/dags/etl_orders/dagRuns
    ?state=failed,running
    &execution_date_gte=2026-05-01T00:00:00Z
    &execution_date_lte=2026-05-12T00:00:00Z
    &order_by=-execution_date

# Task instances — те же filter principles
GET /api/v1/dags/~/dagRuns/~/taskInstances
    ?state=failed
    &start_date_gte=2026-05-12T00:00:00Z
    &dag_ids=etl_orders,etl_users
NOTE

В URL pattern ~ (тильда) — это wildcard «любой dag_id или run_id». Это позволяет cross-DAG queries: GET /api/v1/dags/~/dagRuns/~/taskInstances?state=failed — все failed TI во всём кластере.


Clear task instances — мощный endpoint

POST /api/v1/dags/etl_orders/clearTaskInstances
Content-Type: application/json

{
  "task_ids": ["transform_orders", "load_orders"],
  "dag_run_id": "scheduled__2026-05-12T00:00:00",
  "include_upstream": false,
  "include_downstream": true,
  "include_future": false,
  "include_past": false,
  "reset_dag_runs": true,
  "dry_run": true
}

Это аналог кнопки Clear в UI. С dry_run: true возвращает список TI без изменений — критично для скриптов: сначала проверьте scope, потом запускайте без dry_run.

# Pattern: dry-run сначала, потом реальный clear
dry = requests.post(url, json={**payload, "dry_run": True}).json()
print(f"Will clear {len(dry['task_instances'])} TI")
input("Continue? [y/N]: ")
real = requests.post(url, json={**payload, "dry_run": False}).json()

reset_dag_runs: true (2.6+) — сбрасывает state DagRun в queued, что заставит scheduler пересчитать его. Без этого DagRun может остаться в success с failed TI внутри.


Connections и Variables CRUD

Эти endpoints позволяют управлять конфигурацией программно — alternative к UI и CLI:

# Создать connection
POST /api/v1/connections
{
  "connection_id": "warehouse_snowflake",
  "conn_type": "snowflake",
  "host": "xy12345.us-east-1",
  "login": "AIRFLOW_SVC",
  "password": "...",
  "schema": "ANALYTICS",
  "extra": "{\"warehouse\": \"COMPUTE_WH\", \"role\": \"ETL_ROLE\"}"
}

# Patch (полное обновление)
PATCH /api/v1/connections/warehouse_snowflake
{
  "host": "xy12345.eu-central-1",
  "extra": "{\"warehouse\": \"NEW_WH\"}"
}

# Все variables
GET /api/v1/variables?limit=100
WARNING

password и extra в connection хранятся Fernet-encrypted в metadata DB. При POST/PATCH webserver делает encrypt с использованием fernet_key из airflow.cfg. Если у webserver нет правильного ключа — credentials записываются с другим ключом, и workers не смогут их прочитать. Всегда используйте одинаковый Fernet key на всех компонентах.


Обработка ошибок — RFC 7807

API возвращает ошибки в формате Problem Details for HTTP APIs (RFC 7807):

{
  "type": "https://airflow.apache.org/docs/apache-airflow/2.10.5/stable-rest-api-ref.html#section/Errors/AlreadyExists",
  "title": "Object already exists",
  "status": 409,
  "detail": "DagRun with id 'ci__github-12345' already exists",
  "instance": "/api/v1/dags/etl_orders/dagRuns"
}

Ключевые статус-коды:

HTTPКогда
200OK (GET, PATCH)
201Created (POST)
204No Content (DELETE)
400Bad Request — schema validation failed
401Unauthenticated — нет/неверная auth
403Forbidden — нет permission (FAB role не пускает)
404Not Found — DAG/run/task_id не существует
409Conflict — попытка создать существующий объект
422Unprocessable — semantic invalid (например, попытка trigger paused DAG)
500Internal — bug или DB connection error

Хороший клиент различает 409 (acceptable, idempotent) от 5xx (retry с backoff).


Production gotchas

1. Heavy polling — webserver под нагрузкой

Самый частый антипаттерн: CI скрипт триггерит DAG и polls GET /dagRuns/{id} каждую секунду. На 20 параллельных CI build это 20 RPS на webserver, который делит pool с UI users.

Fix: poll с exponential backoff (2s → 5s → 15s → 60s, max 5 минут):

import time

def wait_for_dag_run(url, max_wait=3600):
    delays = [2, 5, 10, 30, 60, 60, 120]
    elapsed = 0
    for delay in delays + [120] * 1000:
        if elapsed > max_wait:
            raise TimeoutError()
        resp = requests.get(url, auth=auth).json()
        if resp["state"] in ("success", "failed"):
            return resp
        time.sleep(delay)
        elapsed += delay

2. next_page_token отсутствует

API v1 не поддерживает cursor-based pagination. На больших offsets (10k+) DB начинает тормозить (OFFSET 50000 в Postgres — линейный scan). Для batch операций используйте filtering по дате (execution_date_gte) вместо deep offset.

3. Schema migration ломает контракты

В minor releases (2.10 → 2.11) Airflow иногда добавляет новые поля в responses. Клиенты на pydantic с extra="forbid" падают. Всегда используйте forward-compatible parsing: ignore unknown fields.

4. Auth backend влияет на perf

basic_auth делает DB lookup на каждый request. Для high-volume integration session cookie быстрее (но не подходит для machine-to-machine). См. урок 03 про детали auth.

5. /api/v1/health — НЕ liveness probe

/health возвращает 200 даже если scheduler мёртв, лишь webserver жив. Для accurate health смотрите metadatabase, scheduler, triggerer поля в JSON и parse их статусы.


Проверка знанийKnowledge check
GitHub Action триггерит DAG `etl_users` после каждого merge в main. Иногда из-за retry CI step DAG триггерится дважды на один и тот же commit. Как обеспечить идемпотентность на API уровне?
ОтветAnswer
Использовать **`dag_run_id`**, детерминированно построенный из CI context: например, `ci__sha-abc123` (commit SHA) или `ci__github-run-12345` (GitHub run id). При повторном POST API вернёт **409 Conflict** с уже существующим run — это акcceptable, скрипт должен трактовать 409 как success (либо fetch существующий run через GET) и не падать. Без `dag_run_id` Airflow генерирует timestamp-based id и каждый POST создаёт новый run. Дополнительно: в скрипте после trigger делать `GET /dagRuns/{dag_run_id}` для подтверждения state — иначе race condition между POST 201 и сразу следующим polling может вернуть 404 (scheduler ещё не подобрал run).

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. CI/CD pipeline делает POST /api/v1/dags/etl/dagRuns без `dag_run_id`. Retry CI step происходит дважды. Что произойдёт?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6