REST API v1 — endpoints, OpenAPI, идемпотентность
REST API v1 — это публичный HTTP контракт Airflow. Он стал stable в 2.0 (PEP-484-подобный момент — после долгого «experimental» периода) и с тех пор обратно совместим. В 2.10/2.11 LTS этот API остаётся основным механизмом программного управления Airflow: запуск DAG из CI/CD, мониторинг состояния, очистка task instances, управление пулом подключений и переменных.
Понимание устройства API критично, потому что в production вы будете писать клиенты, scripts и Terraform-providers, опирающиеся именно на эти endpoints. Этот урок — анатомия API: какие группы endpoints существуют, как они спроектированы, где подводные камни.
Архитектура API в Airflow 2.x
API v1 живёт в том же Flask-приложении, что и Web UI. Это важно понимать для производительности и безопасности:
Важное следствие: API не имеет отдельного процесса. Это просто часть webserver. Поэтому:
- Cпейс между UI и API делится — high-volume polling добавляет нагрузку на UI users.
- Перезагрузка webserver роняет API (но не scheduler/workers).
- Connection pool — общий с UI.
sql_alchemy_pool_sizeдолжен покрывать оба.
OpenAPI 3 spec
Полный контракт API описан в OpenAPI 3:
# YAML spec
GET /api/v1/openapi.yaml
# UI Swagger
GET /api/v1/ui/
Spec используется тремя способами:
- Генерация серверного кода —
connexionlib читает YAML и роутит запросы к Python-функциям. - Генерация клиентов —
openapi-generatorсоздаёт типизированные клиенты для Python, Go, Java, TypeScript. - Документация — Swagger UI на
/api/v1/ui/.
Spec не описывает абсолютно все возможности. Например, поля conf в TriggerDagRun не валидируются schema-ой (только additionalProperties: true). Сложные filtering правила (например, state IN (...)) выражены через query parameters state=success,failed, что не всегда отражено в типизированных клиентах.
Группы endpoints
API v1 разделён на ~15 ресурсов. Основные:
| Группа | Ключевые endpoints | Что делает |
|---|---|---|
| DAGs | GET /dags, GET /dags/{id}, PATCH /dags/{id} | List/get/pause/unpause |
| DAG Runs | POST /dags/{id}/dagRuns, GET /dags/{id}/dagRuns | Trigger, мониторинг |
| Task Instances | GET /dags/{id}/dagRuns/{run_id}/taskInstances, POST /dags/{id}/clearTaskInstances | List, clear failed |
| Connections | GET/POST/PATCH/DELETE /connections | CRUD подключений |
| Variables | GET/POST/PATCH/DELETE /variables | CRUD переменных |
| Pools | GET/POST/PATCH/DELETE /pools | CRUD пулов |
| XCom | GET /dags/{id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries | Read XCom values |
| Datasets (2.4+) | GET /datasets, GET /datasets/events | Browse data lineage |
| Monitoring | GET /health, GET /config | Liveness, deployment info |
| Permissions/Users | GET /users, GET /roles | FAB admin (только admin role) |
Trigger DAG Run — самый частый endpoint
POST /api/v1/dags/etl_orders/dagRuns
Content-Type: application/json
Authorization: Basic ...
{
"dag_run_id": "manual__github-action-12345",
"logical_date": "2026-05-12T00:00:00Z",
"data_interval_start": "2026-05-11T00:00:00Z",
"data_interval_end": "2026-05-12T00:00:00Z",
"conf": {
"source_bucket": "s3://raw/orders/",
"target_table": "analytics.orders_daily"
},
"note": "Backfill 2026-05-11 после fix bug в transform"
}
Несколько тонкостей:
dag_run_id— если не указан, генерируется какmanual__<timestamp>. Custom id даёт идемпотентность: повторный POST с тем же id вернёт409 Conflict, не создаст дубль.logical_datevsdata_interval_*— в 2.xlogical_date(бывшийexecution_date) это «точка во времени», которую представляет run.data_interval_start/end— реальные границы данных. Они часто не совпадают (cron0 8 * * *имеет logical_date=00:00, а data_interval=[вчера 00:00, сегодня 00:00]).conf— произвольный JSON, доступный в task через{{ dag_run.conf['key'] }}. Не валидируется.note(2.7+) — текстовая аннотация, видна в UI.
Идемпотентность через dag_run_id
Это критично для CI/CD сценариев:
import requests
from requests.auth import HTTPBasicAuth
GITHUB_RUN_ID = "github-actions-12345"
resp = requests.post(
"https://airflow.example.com/api/v1/dags/etl_orders/dagRuns"
auth=HTTPBasicAuth("ci_user", "..."),
json={
"dag_run_id": f"ci__{GITHUB_RUN_ID}",
"conf": {"branch": "main", "commit_sha": "abc123"}
},
timeout=10
)
if resp.status_code == 409:
# Уже триггерили этот build ранее — это OK, не падаем
print("DAG run already exists, fetching state")
run_resp = requests.get(
f"https://airflow.example.com/api/v1/dags/etl_orders/dagRuns/ci__{GITHUB_RUN_ID}"
auth=HTTPBasicAuth("ci_user", "...")
)
elif resp.status_code >= 400:
raise RuntimeError(f"Trigger failed: {resp.status_code} {resp.text}")
Без dag_run_id идемпотентность отсутствует — retry CI шага создаст дубль DagRun.
Pagination и filtering
Все list endpoints используют offset/limit pagination:
GET /api/v1/dags?limit=100&offset=200&order_by=-last_parsed_time
Параметры:
limit— default 100, max 1000. Жёстко контролируется через[api] maximum_page_limit.offset— для перехода к следующей странице.order_by— поле сортировки,-префикс для DESC.
Ответ всегда содержит total_entries, что позволяет посчитать число страниц:
{
"dags": [ ... ],
"total_entries": 1847
}
Filtering — query parameters
Разные endpoints поддерживают разные фильтры. Часто используемые:
# DagRuns с фильтрами
GET /api/v1/dags/etl_orders/dagRuns
?state=failed,running
&execution_date_gte=2026-05-01T00:00:00Z
&execution_date_lte=2026-05-12T00:00:00Z
&order_by=-execution_date
# Task instances — те же filter principles
GET /api/v1/dags/~/dagRuns/~/taskInstances
?state=failed
&start_date_gte=2026-05-12T00:00:00Z
&dag_ids=etl_orders,etl_users
В URL pattern ~ (тильда) — это wildcard «любой dag_id или run_id». Это позволяет cross-DAG queries: GET /api/v1/dags/~/dagRuns/~/taskInstances?state=failed — все failed TI во всём кластере.
Clear task instances — мощный endpoint
POST /api/v1/dags/etl_orders/clearTaskInstances
Content-Type: application/json
{
"task_ids": ["transform_orders", "load_orders"],
"dag_run_id": "scheduled__2026-05-12T00:00:00",
"include_upstream": false,
"include_downstream": true,
"include_future": false,
"include_past": false,
"reset_dag_runs": true,
"dry_run": true
}
Это аналог кнопки Clear в UI. С dry_run: true возвращает список TI без изменений — критично для скриптов: сначала проверьте scope, потом запускайте без dry_run.
# Pattern: dry-run сначала, потом реальный clear
dry = requests.post(url, json={**payload, "dry_run": True}).json()
print(f"Will clear {len(dry['task_instances'])} TI")
input("Continue? [y/N]: ")
real = requests.post(url, json={**payload, "dry_run": False}).json()
reset_dag_runs: true (2.6+) — сбрасывает state DagRun в queued, что заставит scheduler пересчитать его. Без этого DagRun может остаться в success с failed TI внутри.
Connections и Variables CRUD
Эти endpoints позволяют управлять конфигурацией программно — alternative к UI и CLI:
# Создать connection
POST /api/v1/connections
{
"connection_id": "warehouse_snowflake",
"conn_type": "snowflake",
"host": "xy12345.us-east-1",
"login": "AIRFLOW_SVC",
"password": "...",
"schema": "ANALYTICS",
"extra": "{\"warehouse\": \"COMPUTE_WH\", \"role\": \"ETL_ROLE\"}"
}
# Patch (полное обновление)
PATCH /api/v1/connections/warehouse_snowflake
{
"host": "xy12345.eu-central-1",
"extra": "{\"warehouse\": \"NEW_WH\"}"
}
# Все variables
GET /api/v1/variables?limit=100
password и extra в connection хранятся Fernet-encrypted в metadata DB. При POST/PATCH webserver делает encrypt с использованием fernet_key из airflow.cfg. Если у webserver нет правильного ключа — credentials записываются с другим ключом, и workers не смогут их прочитать. Всегда используйте одинаковый Fernet key на всех компонентах.
Обработка ошибок — RFC 7807
API возвращает ошибки в формате Problem Details for HTTP APIs (RFC 7807):
{
"type": "https://airflow.apache.org/docs/apache-airflow/2.10.5/stable-rest-api-ref.html#section/Errors/AlreadyExists",
"title": "Object already exists",
"status": 409,
"detail": "DagRun with id 'ci__github-12345' already exists",
"instance": "/api/v1/dags/etl_orders/dagRuns"
}
Ключевые статус-коды:
| HTTP | Когда |
|---|---|
| 200 | OK (GET, PATCH) |
| 201 | Created (POST) |
| 204 | No Content (DELETE) |
| 400 | Bad Request — schema validation failed |
| 401 | Unauthenticated — нет/неверная auth |
| 403 | Forbidden — нет permission (FAB role не пускает) |
| 404 | Not Found — DAG/run/task_id не существует |
| 409 | Conflict — попытка создать существующий объект |
| 422 | Unprocessable — semantic invalid (например, попытка trigger paused DAG) |
| 500 | Internal — bug или DB connection error |
Хороший клиент различает 409 (acceptable, idempotent) от 5xx (retry с backoff).
Production gotchas
1. Heavy polling — webserver под нагрузкой
Самый частый антипаттерн: CI скрипт триггерит DAG и polls GET /dagRuns/{id} каждую секунду. На 20 параллельных CI build это 20 RPS на webserver, который делит pool с UI users.
Fix: poll с exponential backoff (2s → 5s → 15s → 60s, max 5 минут):
import time
def wait_for_dag_run(url, max_wait=3600):
delays = [2, 5, 10, 30, 60, 60, 120]
elapsed = 0
for delay in delays + [120] * 1000:
if elapsed > max_wait:
raise TimeoutError()
resp = requests.get(url, auth=auth).json()
if resp["state"] in ("success", "failed"):
return resp
time.sleep(delay)
elapsed += delay
2. next_page_token отсутствует
API v1 не поддерживает cursor-based pagination. На больших offsets (10k+) DB начинает тормозить (OFFSET 50000 в Postgres — линейный scan). Для batch операций используйте filtering по дате (execution_date_gte) вместо deep offset.
3. Schema migration ломает контракты
В minor releases (2.10 → 2.11) Airflow иногда добавляет новые поля в responses. Клиенты на pydantic с extra="forbid" падают. Всегда используйте forward-compatible parsing: ignore unknown fields.
4. Auth backend влияет на perf
basic_auth делает DB lookup на каждый request. Для high-volume integration session cookie быстрее (но не подходит для machine-to-machine). См. урок 03 про детали auth.
5. /api/v1/health — НЕ liveness probe
/health возвращает 200 даже если scheduler мёртв, лишь webserver жив. Для accurate health смотрите metadatabase, scheduler, triggerer поля в JSON и parse их статусы.