Learning Platform
Глоссарий Troubleshooting
Урок 14.04 · 30 мин
Продвинутый
CI/CDGitHub ActionsAutomationPollingIdempotency

Automation patterns: CI/CD integration через REST API

Это самый практически важный урок модуля. Здесь мы превращаем теорию endpoints и authentication в production-ready patterns для интеграции Airflow с CI/CD, мониторинг-системами и внешними оркестраторами.

Ниже — пять каноничных сценариев с полным кодом, объяснением каждого подводного камня и production-ready error handling. Используйте их как шаблоны для своих интеграций.


Архитектура: где Airflow в external orchestration

External orchestration с Airflow
External trigger sourceGitHub Actions при merge в main, Step Functions/AWS EventBridge по schedule, Tableau Dashboard refresh request, Slack slash command, Argo Events от Kafka.
HTTPS POST
Airflow webserverПринимает POST /api/v1/dags/{id}/dagRuns. Создаёт DagRun в state queued, возвращает 201 с dag_run_id. Scheduler в свой следующий tick подберёт его.
scheduler picks up
Scheduler + WorkersDagRun движется queued → running. Workers выполняют tasks. State доступен через polling GET /dagRuns/{id}.
GET poll state
External pollingCI скрипт каждые N секунд проверяет state. При терминальном (success/failed) выходит. Альтернатива: webhook через Listener API (см. модуль 12).

Ключевая мысль: Airflow остаётся orchestrator, а external system — controller. CI знает только «запусти X, дождись завершения». Внутреннюю логику задачи Airflow скрывает.


Pattern 1: Trigger DAG из GitHub Actions

Use case

После merge в main ветку нужно прогнать integration test DAG, который проверяет, что новый код dbt models работает на staging.

Решение

# .github/workflows/dbt-integration-test.yml
name: dbt Integration Test

on:
  push:
    branches: [main]

jobs:
  trigger-airflow:
    runs-on: ubuntu-latest
    steps:
      - name: Trigger Airflow DAG
        id: trigger
        env:
          AIRFLOW_URL: https://airflow.example.com
          AIRFLOW_TOKEN: ${{ secrets.AIRFLOW_API_TOKEN }}
        run: |
          DAG_RUN_ID="ci__${{ github.run_id }}__${{ github.sha }}"

          response=$(curl -fsS -w "\n%{http_code}" \
            -X POST "$AIRFLOW_URL/api/v1/dags/dbt_integration_test/dagRuns" \
            -H "Authorization: Bearer $AIRFLOW_TOKEN" \
            -H "Content-Type: application/json" \
            -d "{
              \"dag_run_id\": \"$DAG_RUN_ID\",
              \"conf\": {
                \"commit_sha\": \"${{ github.sha }}\",
                \"branch\": \"${{ github.ref_name }}\",
                \"github_run_url\": \"${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}\"
              },
              \"note\": \"Triggered by GitHub Actions run ${{ github.run_id }}\"
            }")

          http_code=$(echo "$response" | tail -1)
          body=$(echo "$response" | head -n -1)

          if [ "$http_code" = "409" ]; then
            echo "DAG run already exists (CI retry), continuing"
          elif [ "$http_code" != "200" ] && [ "$http_code" != "201" ]; then
            echo "Trigger failed: HTTP $http_code"
            echo "$body"
            exit 1
          fi

          echo "dag_run_id=$DAG_RUN_ID" >> $GITHUB_OUTPUT

      - name: Wait for DAG completion
        env:
          AIRFLOW_URL: https://airflow.example.com
          AIRFLOW_TOKEN: ${{ secrets.AIRFLOW_API_TOKEN }}
          DAG_RUN_ID: ${{ steps.trigger.outputs.dag_run_id }}
        run: |
          python3 .github/scripts/wait_for_dag.py

Python script с правильным polling

# .github/scripts/wait_for_dag.py
import os
import sys
import time
import requests

AIRFLOW_URL = os.environ["AIRFLOW_URL"]
AIRFLOW_TOKEN = os.environ["AIRFLOW_TOKEN"]
DAG_RUN_ID = os.environ["DAG_RUN_ID"]
DAG_ID = "dbt_integration_test"
MAX_WAIT_SEC = 3600  # 1 час

session = requests.Session()
session.headers["Authorization"] = f"Bearer {AIRFLOW_TOKEN}"

# Exponential backoff: 2, 5, 10, 30, 60, 120 (затем держится на 120)
delays = [2, 5, 10, 30, 60, 120]
elapsed = 0
i = 0

while elapsed < MAX_WAIT_SEC:
    delay = delays[min(i, len(delays) - 1)]
    time.sleep(delay)
    elapsed += delay
    i += 1

    try:
        r = session.get(
            f"{AIRFLOW_URL}/api/v1/dags/{DAG_ID}/dagRuns/{DAG_RUN_ID}"
            timeout=10
        )
        r.raise_for_status()
    except requests.RequestException as e:
        print(f"Polling error (will retry): {e}")
        continue

    state = r.json()["state"]
    print(f"[t={elapsed}s] state={state}")

    if state == "success":
        sys.exit(0)
    if state == "failed":
        # Получим failed tasks для логов
        ti_resp = session.get(
            f"{AIRFLOW_URL}/api/v1/dags/{DAG_ID}/dagRuns/{DAG_RUN_ID}/taskInstances"
            params={"state": "failed"}
        ).json()
        for ti in ti_resp.get("task_instances", []):
            print(f"  FAILED task: {ti['task_id']} (try {ti['try_number']})")
            print(f"    log: {AIRFLOW_URL}/dags/{DAG_ID}/grid?dag_run_id={DAG_RUN_ID}&task_id={ti['task_id']}")
        sys.exit(1)

print(f"Timeout: DAG did not finish in {MAX_WAIT_SEC}s")
sys.exit(2)

Что здесь принципиально важно

  1. Идемпотентный dag_run_id — детерминированно построен из github.run_id + sha. Retry CI step не создаст дубль.
  2. 409 трактуется как success — это означает «уже триггерили, OK».
  3. Exponential backoff polling — не убивает webserver, но реагирует быстро.
  4. При failure печатаем URL логов — GitHub Action UI содержит deep link к Airflow UI.
  5. conf несёт CI context — DAG может использовать commit_sha для checkout правильного кода.

Pattern 2: Clear failed tasks programmatically

Use case

Каждое утро в 7:00 мониторинг видит failed tasks из ночного batch. Команда хочет автоматический скрипт, который retry-ит только идемпотентные tasks (по списку), но не трогает тех, что требуют ручного review.

Решение

# scripts/auto_retry_idempotent.py
import os
import requests
from datetime import datetime, timedelta, timezone

AIRFLOW = "https://airflow.example.com"
session = requests.Session()
session.headers["Authorization"] = f"Bearer {os.environ['AIRFLOW_TOKEN']}"

# Whitelist task_ids, которые безопасно retry
IDEMPOTENT_TASKS = {
    "etl_orders": ["extract_orders", "transform_orders"],
    "etl_users": ["sync_users_full"],
}

# Какие НЕ retry — требуют ручного review
SENSITIVE = {
    "etl_orders": ["load_to_warehouse"],  # idempotency не гарантирована
}

since = (datetime.now(timezone.utc) - timedelta(hours=12)).isoformat()

# Найдём все failed TI за последние 12 часов
r = session.get(
    f"{AIRFLOW}/api/v1/dags/~/dagRuns/~/taskInstances"
    params={
        "state": "failed",
        "start_date_gte": since,
        "limit": 500
    },
    timeout=30
)
r.raise_for_status()

failed_tis = r.json()["task_instances"]
print(f"Found {len(failed_tis)} failed TI since {since}")

to_retry: dict[tuple[str, str], list[str]] = {}  # (dag_id, run_id) -> [task_ids]

for ti in failed_tis:
    dag_id = ti["dag_id"]
    task_id = ti["task_id"]
    run_id = ti["dag_run_id"]

    if task_id in IDEMPOTENT_TASKS.get(dag_id, []):
        to_retry.setdefault((dag_id, run_id), []).append(task_id)
    elif task_id in SENSITIVE.get(dag_id, []):
        print(f"SENSITIVE failed (manual review): {dag_id}.{task_id} run={run_id}")
    else:
        print(f"UNKNOWN failed (alerting): {dag_id}.{task_id}")

# Сначала dry-run для каждой группы
for (dag_id, run_id), task_ids in to_retry.items():
    payload = {
        "task_ids": task_ids,
        "dag_run_id": run_id,
        "include_upstream": False,
        "include_downstream": True,  # retry downstream тоже — они скорее всего тоже failed/skipped
        "reset_dag_runs": True,
        "dry_run": True
    }
    dry = session.post(
        f"{AIRFLOW}/api/v1/dags/{dag_id}/clearTaskInstances"
        json=payload, timeout=30
    ).json()
    print(f"{dag_id} run={run_id}: would clear {len(dry['task_instances'])} TI")

    # Реальный clear
    payload["dry_run"] = False
    real = session.post(
        f"{AIRFLOW}/api/v1/dags/{dag_id}/clearTaskInstances"
        json=payload, timeout=30
    )
    real.raise_for_status()
    print(f"  cleared. Scheduler will reschedule.")

Расширение: alerting через Slack при unknown failures

import json

def slack_alert(message: str):
    requests.post(
        os.environ["SLACK_WEBHOOK_URL"],
        json={"text": message},
        timeout=5
    )

# При нахождении UNKNOWN failed — alert
if dag_id not in IDEMPOTENT_TASKS and dag_id not in SENSITIVE:
    slack_alert(
        f"Unknown DAG with failed task: `{dag_id}.{task_id}`\n"
        f"Run: {run_id}\n"
        f"URL: {AIRFLOW}/dags/{dag_id}/grid?dag_run_id={run_id}"
    )

Pattern 3: Programmatic Connection management

Use case

Terraform управляет инфраструктурой. Каждое окружение (dev/staging/prod) имеет свой Snowflake account, IAM role, S3 bucket. Хочется чтобы Terraform после создания resources записывал connection в Airflow — без ручной кликалки в UI.

Решение

# scripts/sync_connections_from_tf.py
"""
Reads Terraform outputs, syncs to Airflow connections.
Идемпотентно: PUT (PATCH) для существующих, POST для новых.
"""
import json
import os
import requests
import sys

AIRFLOW = os.environ["AIRFLOW_URL"]
session = requests.Session()
session.headers["Authorization"] = f"Bearer {os.environ['AIRFLOW_TOKEN']}"

with open(sys.argv[1]) as f:  # terraform output JSON
    tf_outputs = json.load(f)

connections = [
    {
        "connection_id": "warehouse_snowflake",
        "conn_type": "snowflake",
        "host": tf_outputs["snowflake_account"]["value"],
        "login": tf_outputs["airflow_svc_user"]["value"],
        "password": tf_outputs["airflow_svc_password"]["value"],
        "schema": tf_outputs["snowflake_default_schema"]["value"],
        "extra": json.dumps({
            "warehouse": tf_outputs["snowflake_warehouse"]["value"],
            "role": tf_outputs["snowflake_role"]["value"],
            "region": tf_outputs["aws_region"]["value"]
        })
    },
    {
        "connection_id": "data_lake_s3",
        "conn_type": "aws",
        "extra": json.dumps({
            "role_arn": tf_outputs["airflow_iam_role_arn"]["value"],
            "region_name": tf_outputs["aws_region"]["value"]
        })
    }
]

for conn in connections:
    conn_id = conn["connection_id"]

    # Check existence
    r = session.get(f"{AIRFLOW}/api/v1/connections/{conn_id}", timeout=10)
    if r.status_code == 404:
        # Create
        cr = session.post(f"{AIRFLOW}/api/v1/connections", json=conn, timeout=10)
        cr.raise_for_status()
        print(f"CREATED {conn_id}")
    elif r.status_code == 200:
        # Update — PATCH без connection_id в body
        body = {k: v for k, v in conn.items() if k != "connection_id"}
        ur = session.patch(
            f"{AIRFLOW}/api/v1/connections/{conn_id}"
            json=body, timeout=10
        )
        ur.raise_for_status()
        print(f"UPDATED {conn_id}")
    else:
        r.raise_for_status()

Альтернатива: использовать airflow-provider в Terraform

# Terraform не имеет официального Airflow provider в стандартной library,
# но есть community: hashicorp/airflow или ZWMP/airflow.
# Для production обычно проще написать скрипт выше — даёт полный контроль.

resource "airflow_connection" "warehouse" {
  connection_id = "warehouse_snowflake"
  conn_type     = "snowflake"
  host          = snowflake_account.main.url
  login         = snowflake_user.airflow.name
  password      = random_password.airflow_user.result
  schema        = "ANALYTICS"
  extra = jsonencode({
    warehouse = snowflake_warehouse.compute.name
    role      = snowflake_role.etl.name
  })
}
NOTE

Лучшая практика: НЕ хранить credentials в Airflow connections напрямую. Использовать secrets backend (Vault, AWS Secrets Manager, GCP Secret Manager). Тогда Airflow API нужен только для bootstrap или для conn_type, host, port — не для passwords. Это обсуждается в модуле 10 (Connections & Secrets).


Pattern 4: Monitor long-running DAGs

Use case

ML training DAG обычно занимает 4-6 часов. Иногда зависает на 12+ часов из-за data quality issues. Нужен monitoring скрипт, который alert-ит если DAG не завершился за expected SLA.

Решение

# scripts/sla_monitor.py
import os
import requests
from datetime import datetime, timedelta, timezone

AIRFLOW = os.environ["AIRFLOW_URL"]
session = requests.Session()
session.headers["Authorization"] = f"Bearer {os.environ['AIRFLOW_TOKEN']}"

# SLA per DAG в часах
SLA = {
    "ml_training_daily": 6,
    "etl_orders_hourly": 1,
    "etl_users_daily": 3,
}

now = datetime.now(timezone.utc)
violations = []

for dag_id, sla_hours in SLA.items():
    threshold = (now - timedelta(hours=sla_hours)).isoformat()

    r = session.get(
        f"{AIRFLOW}/api/v1/dags/{dag_id}/dagRuns"
        params={
            "state": "running",
            "limit": 10,
            "order_by": "-start_date"
        },
        timeout=10
    )
    r.raise_for_status()

    for run in r.json()["dag_runs"]:
        start = datetime.fromisoformat(run["start_date"].replace("Z", "+00:00"))
        running_hours = (now - start).total_seconds() / 3600
        if running_hours > sla_hours:
            violations.append({
                "dag_id": dag_id,
                "run_id": run["dag_run_id"],
                "running_hours": round(running_hours, 1),
                "sla_hours": sla_hours,
                "url": f"{AIRFLOW}/dags/{dag_id}/grid?dag_run_id={run['dag_run_id']}"
            })

if violations:
    body = "**Airflow SLA violations:**\n\n"
    for v in violations:
        body += f"- `{v['dag_id']}` running {v['running_hours']}h (SLA {v['sla_hours']}h)\n{v['url']}\n"

    requests.post(
        os.environ["SLACK_WEBHOOK_URL"],
        json={"text": body, "channel": "#data-platform-alerts"},
        timeout=5
    )

Запускать через cron каждые 15 минут. Это decoupled SLA — независимый от Airflow process, что важно для случаев, когда сам scheduler заболел.


Pattern 5: Backfill orchestration

Use case

После добавления нового источника в DAG, нужно прогнать его за прошлый месяц (28 runs за разные даты). Делать это через UI кликами — больно.

# scripts/backfill.py
import os
import time
import requests
from datetime import datetime, timedelta, timezone

AIRFLOW = os.environ["AIRFLOW_URL"]
DAG_ID = "etl_orders"
session = requests.Session()
session.headers["Authorization"] = f"Bearer {os.environ['AIRFLOW_TOKEN']}"

start = datetime(2026, 4, 1, tzinfo=timezone.utc)
end = datetime(2026, 4, 28, tzinfo=timezone.utc)
date = start

# Параллелизм: max 3 одновременно
MAX_CONCURRENT = 3

triggered = []
while date <= end:
    # Wait if too many already running
    while True:
        r = session.get(
            f"{AIRFLOW}/api/v1/dags/{DAG_ID}/dagRuns"
            params={"state": "running,queued", "limit": 100}
        ).json()
        active = len(r["dag_runs"])
        if active < MAX_CONCURRENT:
            break
        print(f"{active} active runs, waiting 60s")
        time.sleep(60)

    run_id = f"backfill__{date.strftime('%Y-%m-%d')}"
    resp = session.post(
        f"{AIRFLOW}/api/v1/dags/{DAG_ID}/dagRuns"
        json={
            "dag_run_id": run_id,
            "logical_date": date.isoformat(),
            "data_interval_start": date.isoformat(),
            "data_interval_end": (date + timedelta(days=1)).isoformat(),
            "note": "Backfill after adding new orders source"
        }
    )
    if resp.status_code == 409:
        print(f"  exists: {run_id}")
    else:
        resp.raise_for_status()
        print(f"  triggered: {run_id}")
    triggered.append((date, run_id))
    date += timedelta(days=1)

print(f"\nTriggered {len(triggered)} backfill runs.")
WARNING

Альтернатива: airflow dags backfill CLI команда. Но она работает внутри scheduler container и блокирует терминал на часы. REST API approach позволяет запускать асинхронно, c контролем параллельности, из любого hosts.


Production gotchas

1. Token storage

Никогда не храните API tokens в plain env variables в CI. Используйте secrets:

  • GitHub Actions: ${{ secrets.AIRFLOW_API_TOKEN }}
  • GitLab CI: protected variables
  • Jenkins: credentials plugin

Token rotation — раз в 90 дней минимум. Service account с минимальным permission scope (только нужные DAGs).

2. Webserver behind ALB/NGINX

Production Airflow обычно за HTTPS proxy. Это меняет несколько вещей:

location /api/v1/ {
    proxy_pass http://airflow-webserver:8080;
    proxy_set_header X-Forwarded-Proto https;
    proxy_set_header X-Forwarded-For $remote_addr;
    proxy_set_header Host $host;
    proxy_read_timeout 300s;  # для длинных list queries
    client_max_body_size 10M;  # conf в trigger может быть большой
}

Airflow config:

[webserver]
enable_proxy_fix = True
proxy_fix_x_for = 1
proxy_fix_x_proto = 1
proxy_fix_x_host = 1

3. Race condition: 201 → 404

После POST /dagRuns API возвращает 201, но immediate GET того же run может вернуть 404. Это потому что Airflow использует optimistic concurrency: POST успешен только когда commit состоялся, но replication lag может задержать видимость.

Fix: при первом 404 после успешного POST — wait 1-2 секунды и retry.

4. Не использовать API для streaming logs

Endpoint GET /dags/{}/dagRuns/{}/taskInstances/{}/logs/{try} существует. Но это не stream — он возвращает полный log файл. На больших logs (10+ MB) blocks webserver на секунды. Используйте S3/GCS prefetch для real-time tailing, не API.

5. limit=1000 это max — для больших scans нужна pagination

Если ваш скрипт собирает данные за месяц и есть 50k TI — нельзя сделать limit=50000. Нужна pagination с order_by стабильного поля (start_date) и offset. Deep offset (>10k) тормозит — лучше split по start_date_gte/lte диапазонам.


Проверка знанийKnowledge check
GitHub Action триггерит DAG, ждёт его завершения, и затем deploy production. Иногда между POST trigger и first GET poll возникает 404 'DAG run not found' — скрипт падает, хотя DAG в реальности запустился. Что произошло, как починить?
ОтветAnswer
**Race condition между POST commit и read replica visibility.** API возвращает 201 как только POST committed в primary DB. Если read запросы идут на replica (или Airflow использует PgBouncer transaction pooling), GET может попасть на ещё не реплицированный state и вернуть 404. Также возможно, что scheduler ещё не перевёл run из 'queued' в 'running', и hot caches webserver его не видит. **Fixes:** (1) **Retry 404 первые 5-10 секунд после POST** — простейший и надёжный. Если 404 продолжается > 30s — значит реально не создан, fail. (2) Использовать idempotent `dag_run_id` детерминированный, чтобы POST мог быть retried без 409 problems. (3) Не использовать read replica для immediate-after-write reads (если есть в архитектуре). (4) Если используется PgBouncer — режим session pooling вместо transaction. Production pattern: первый poll через 10-15 секунд после POST + retry 404 ещё 30 секунд.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. GitHub Action триггерит DAG и хочет дождаться завершения. Какая стратегия polling правильная?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6