Learning Platform
Глоссарий Troubleshooting
Урок 14.05 · 24 мин
Продвинутый
CLIairflow dagsairflow tasksairflow dbDebug

CLI deep dive: dags, tasks, db, pools, connections, users

airflow CLI — это второй (после REST API) основной интерфейс для управления кластером. Но у CLI есть уникальные возможности, которых нет в API: запуск task in-place (tasks test), управление базой данных (db upgrade/clean/check), backfill в форвграунде, тестовый запуск DAG без scheduler.

Этот урок — структурированный обзор CLI с фокусом на production workflows: что и когда использовать, какие флаги критичны, где CLI лучше API и наоборот.


Click и argparse — как устроены CLI-инструменты Python

Архитектура CLI

CLI — это обычный Python CLI tool (Click + argparse под капотом), который импортирует Airflow модули и работает прямо с metadata DB.

Где живёт CLI
airflow commandТочка входа из setup.py: airflow.__main__:main. Парсит args, диспатчит к subcommand. Доступен после pip install apache-airflow.
loads AIRFLOW_HOME/airflow.cfg
Airflow Python runtimeИмпортирует airflow.models, airflow.settings. Читает config (env vars override cfg). Создаёт DB session.
SQLAlchemy → Metadata DBБольшинство команд — это DB query / mutation. Например `airflow dags list` → SELECT FROM dag. `airflow connections add` → INSERT в connection с Fernet encryption.
DagBag (для tasks test)Команды `tasks test/run`, `dags test` загружают .py файлы из dags_folder, инстанцируют DAG/Task в текущем процессе. Прямой in-process execute без scheduler/executor.

Из этого следуют критичные ограничения:

  • CLI должна быть запущена в контейнере с правильной средой: тот же Python, те же провайдеры, доступ к metadata DB. Обычно — внутри scheduler/worker pod через kubectl exec.
  • Доступ к DB обязателен — без него падает на первой команде. Для diagnostics из вне используйте REST API.
  • Fernet key обязателен для команд с credentials (connections add, variables set).

Карта sub-commands

airflow
├── dags          # DAG operations: list, pause, trigger, test, backfill, delete
├── tasks         # Task operations: list, test, run, states-for-dag-run
├── db            # DB operations: init, upgrade, clean, check, reset
├── pools         # Pool CRUD
├── connections   # Connection CRUD + import/export
├── variables     # Variable CRUD + import/export
├── users         # FAB user management
├── roles         # FAB role management
├── providers     # Installed providers info
├── jobs          # Background jobs (scheduler/triggerer) info
├── config        # Print config values
├── info          # System diagnostic
├── plugins       # Loaded plugins
├── dag-processor # Standalone DAG Processor (для distributed setup)
├── scheduler     # Run scheduler in foreground
├── webserver     # Run webserver in foreground
├── triggerer     # Run triggerer in foreground
├── celery        # Celery commands (если используется CeleryExecutor)
└── kubernetes    # K8s pod cleanup

Ниже разбираем самые полезные subcommands и их флаги.


airflow dags — DAG operations

dags list

airflow dags list                          # все DAGs
airflow dags list -o yaml                  # output format
airflow dags list --tags etl,critical      # фильтр по tags

Производительность: для 1000+ DAGs lists с paginated UI быстрее. CLI делает full SELECT.

dags list-import-errors

airflow dags list-import-errors

Показывает DAGs которые не парсятся. Первая команда при «DAG не появился в UI». Эквивалент SELECT * FROM import_error.

dags pause / dags unpause

airflow dags pause my_dag
airflow dags unpause my_dag

Эквивалент UI toggle. Mass-pause через xargs:

airflow dags list -o plain | awk '{print $1}' | xargs -I{} airflow dags pause {}

dags trigger

airflow dags trigger my_dag \
  --run-id manual__test_run \
  --conf '{"source":"s3://test/"}' \
  --no-replace-microseconds

Чем отличается от REST API: CLI делает прямой INSERT в metadata DB, без webserver. Это быстрее (нет HTTP overhead), но требует доступ к DB.

dags test — изолированный test run

airflow dags test my_dag 2026-05-12

Это уникальная возможность CLI (в API нет аналога). Запускает весь DAG в текущем процессе, без scheduler, без executor, без metadata DB persistence. Идеально для:

  • Локальная разработка DAG
  • Smoke test перед deploy
  • Debug сложных deps
# С breakpoint поддержкой (с 2.7+)
airflow dags test my_dag 2026-05-12 \
  --post-mortem  # дропает в pdb при exception
NOTE

dags test НЕ персистит state в DB. Это даёт быстрый цикл «измени → запусти → посмотри». Но Connection/Variable lookups идут реальные — поэтому удобно для interactive debug в localhost окружении с copy production secrets.

dags backfill — историческое recompute

airflow dags backfill my_dag \
  --start-date 2026-04-01 \
  --end-date 2026-04-30 \
  --reset-dagruns \
  --pool backfill_pool \
  --rerun-failed-tasks

Ключевые флаги:

  • --reset-dagruns — если DagRuns уже существуют, сбросить их state перед запуском.
  • --rerun-failed-tasks — для уже failed TI делать retry, а не skip.
  • --pool backfill_pool — изолировать backfill от production pool (важно для production health).
  • --max-active-runs N (2.7+) — ограничить параллельность backfill.

Backfill через CLI запускает scheduler в foreground режиме в текущем процессе. Это значит:

  • Если SSH сессия упала — backfill умер.
  • Terminal blocked на часы.

Альтернатива — Pattern 5 из урока 04 (через REST API в loop), даёт async backfill.

dags delete

airflow dags delete my_dag

Удаляет все metadata о DAG: DagRuns, TaskInstances, XCom, logs, dataset events, references. Файл .py не трогает. Если файл остаётся — DAG появится снова при следующем parse.

Production pattern: сначала удалить файл из git → wait scheduler удалит is_active=False → потом airflow dags delete.


airflow tasks — Task operations

tasks list

airflow tasks list my_dag --tree

Показывает структуру DAG. --tree рисует ASCII-tree деревом — удобно для документирования.

tasks test — запуск task без scheduler

airflow tasks test my_dag transform_data 2026-05-12

Запускает одну task в текущем процессе на конкретный logical date. Не пишет в metadata DB. Идеально для:

  • Локальный debug task логики
  • Quick test «работает ли connection после fix»
  • Reproduce production failure локально

С 2.10+ принимает --env-vars '{"key":"val"}' для override:

airflow tasks test my_dag fetch_api 2026-05-12 \
  --env-vars '{"API_URL":"http://localhost:8000"}'

tasks run — запуск как worker

airflow tasks run my_dag transform_data 2026-05-12 --local

Это то, что executor запускает на worker. --local — стандартный LocalTaskJob. CLI команда обычно НЕ вызывается вручную — её использует executor.

tasks states-for-dag-run

airflow tasks states-for-dag-run my_dag manual__2026-05-12T10:00:00

Print состояние всех TI в run. Альтернатива GET /taskInstances через API.

tasks clear

airflow tasks clear my_dag \
  --task-regex 'transform_.*' \
  --start-date 2026-05-01 \
  --end-date 2026-05-12 \
  --only-failed \
  --yes  # без интерактивного confirm

Mass-clear по regex + date range. Очень мощно — но опасно. Всегда сначала без --yes, посмотреть list, потом запускать с --yes.


airflow db — Metadata DB management

Самые важные production команды.

db upgrade

airflow db upgrade

Применяет Alembic migrations к metadata DB. Обязательно перед запуском новой версии Airflow.

В Helm chart запускается в airflow-run-migrations Job перед deploy. В docker-compose — через init container.

db downgrade (2.7+)

airflow db downgrade --to-version 2.8.0

Откат миграций. Используется при rollback to previous version. Опасно — некоторые миграции необратимы (drop column).

db check — connectivity test

airflow db check

Проверяет, что Airflow может подключиться к DB. Возвращает 0/1 — удобно для readiness probe.

# Kubernetes readiness
readinessProbe:
  exec:
    command: ["airflow", "db", "check"]
  initialDelaySeconds: 30
  periodSeconds: 30

db check-migrations

airflow db check-migrations

Проверяет, что миграции применены. Если есть pending — exit 1.

db clean — cleanup старых данных

airflow db clean \
  --clean-before-timestamp '2026-04-12' \
  --tables 'task_instance,log,xcom,dag_run' \
  --skip-archive \
  --yes

Удаляет старые rows. Критично для production: без cleanup metadata DB растёт до 100s GB.

  • --skip-archive — не создавать backup table перед удалением (по default архивируется).
  • --clean-before-timestamp — все строки старше timestamp удаляются.

Production pattern: cron daily:

0 3 * * * airflow db clean --clean-before-timestamp $(date -u -d '30 days ago' +%Y-%m-%d) --yes --skip-archive

db shell

airflow db shell

Открывает psql/mysql shell с уже настроенным connection. Удобно для production diagnostics.


airflow connections / variables / pools

CRUD команды для метаданных.

Connections

# Create
airflow connections add warehouse_snowflake \
  --conn-type snowflake \
  --conn-host xy12345.us-east-1 \
  --conn-login AIRFLOW_SVC \
  --conn-password '...' \
  --conn-extra '{"warehouse":"COMPUTE_WH","role":"ETL_ROLE"}'

# Read
airflow connections get warehouse_snowflake --output json

# Update — нет напрямую, нужно delete + add
airflow connections delete warehouse_snowflake
airflow connections add warehouse_snowflake --conn-type ...

# Export / Import
airflow connections export connections.json --file-format json
airflow connections import connections.yaml

Variables

# Set с типом
airflow variables set retry_limit 3 --json  # сохранит как JSON int

# Get
airflow variables get retry_limit

# Export всех
airflow variables export vars.json

# Import
airflow variables import vars.json

Pools

airflow pools list
airflow pools set gpu_pool 4 "GPU workers pool" --include-deferred
airflow pools delete gpu_pool

# Import bulk
airflow pools import pools.json

--include-deferred (2.7+) считает deferred TI как занимающие slot — критично для accurate budget control.


airflow users — FAB user management

# Create admin
airflow users create \
  --username admin \
  --firstname Admin \
  --lastname User \
  --email [email protected] \
  --role Admin \
  --password '...'

# Reset password
airflow users reset-password --username admin

# Add role to user
airflow users add-role --username ci_user --role TeamA_CI

# List
airflow users list

# Delete
airflow users delete --username old_user

Альтернатива: REST API /api/v1/users — но он требует Admin role и работает только если auth_manager поддерживает write (FAB поддерживает, AWS IAM нет).


airflow jobs check — heartbeat detection

airflow jobs check \
  --job-type SchedulerJob \
  --hostname $(hostname) \
  --limit 1 \
  --allow-multiple

Проверяет, что SchedulerJob с указанным hostname активен (latest_heartbeat в пределах health threshold). Используется для liveness probe:

livenessProbe:
  exec:
    command:
      - airflow
      - jobs
      - check
      - --job-type
      - SchedulerJob
      - --hostname
      - $(hostname)
  periodSeconds: 30
  failureThreshold: 5

Production workflows

Workflow 1: Deploy new version

# 1. Tag старую DB
pg_dump airflow > airflow_pre_2_11_upgrade.sql

# 2. Bring scheduler/workers down (Helm rollout pause)
kubectl scale deploy/airflow-scheduler --replicas=0
kubectl scale deploy/airflow-worker --replicas=0

# 3. Run migrations
kubectl run airflow-migrate --image=apache/airflow:2.11.0 \
  --rm -it --restart=Never \
  --env="AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=..." \
  -- airflow db upgrade

# 4. Validate
airflow db check-migrations

# 5. Resume
kubectl scale deploy/airflow-scheduler --replicas=2
kubectl scale deploy/airflow-worker --replicas=10

Workflow 2: Disaster recovery — corrupted DagRun

# 1. Identify problematic run
airflow tasks states-for-dag-run my_dag manual__broken_run

# 2. Force clear all TI
airflow tasks clear my_dag \
  --start-date 2026-05-12 --end-date 2026-05-12 \
  --yes

# 3. Or delete DagRun completely
airflow dags delete-dag-run my_dag manual__broken_run

# 4. Re-trigger
airflow dags trigger my_dag --run-id manual__retry

Workflow 3: Stuck task adoption

# Task в state running, но scheduler/worker мёртв
# 1. List жертв
airflow tasks states-for-dag-run my_dag dag_run_id_x | grep running

# 2. Force reset
airflow db shell <<EOF
UPDATE task_instance
SET state = NULL, queued_dttm = NULL
WHERE dag_id = 'my_dag'
  AND run_id = 'dag_run_id_x'
  AND state = 'running';
EOF

# 3. Scheduler подберёт на следующем tick

CLI vs REST API — когда что использовать

СценарийЛучше CLIЛучше REST API
Local DAG development (tasks test)yes
db upgrade/migrationyes
Bulk import (connections import)yes
Deploy automation в Helmyes
Cron cleanup (db clean)yes
Trigger DAG из GitHub Actionsyes
Mass clear failed TI в productionyes
Real-time SLA monitoring из externalyes
Sync connections из Terraformyes
Programmatic UI dashboardyes

Rule of thumb: CLI — для внутренних operations (внутри кластера, обычно automation deployment-а). REST API — для внешних integrations (CI/CD, monitoring, third-party).


Production gotchas

1. CLI обязательно из контейнера с DB credentials

airflow dags list на localhost без DB connection упадёт с sqlalchemy.exc.OperationalError. Всегда:

kubectl exec -it deploy/airflow-scheduler -- airflow dags list

2. dags trigger --conf shell escaping

# Неправильно: shell съест кавычки
airflow dags trigger my_dag --conf "{"key":"val"}"

# Правильно: single-quote вокруг JSON
airflow dags trigger my_dag --conf '{"key":"val"}'

# Или через файл
echo '{"key":"val"}' > /tmp/conf.json
airflow dags trigger my_dag --conf "$(cat /tmp/conf.json)"

3. db clean --skip-archive не создаёт backup

По default db clean создаёт _airflow_deleted__* tables с архивными данными. Это удваивает место. --skip-archive — pure delete. Используйте для регулярного cleanup, но первый раз — без флага, посмотрите архив.

4. users create --password оставляет в history

Все shell history. Не делайте этого в shared host. Use:

airflow users create --use-random-password --username admin ...
# Затем reset через UI или CLI:
airflow users reset-password --username admin

Или используйте --password $(&lt; /tmp/secret) и удалите /tmp/secret сразу.

5. dags test имеет полный доступ к production secrets

tasks test и dags test загружают реальные Connections/Variables (Fernet key, secrets backend). Запуск на production host = production data flow. Будьте осторожны с side effects (UPDATE в warehouse и т.п.).


Проверка знанийKnowledge check
Production scheduler начал получать `import_error` для всех DAGs после deploy новой версии provider package. CI прошёл pre-deploy checks, но что-то в runtime сломалось. Какие CLI команды помогут диагностировать и починить?
ОтветAnswer
**Диагностика:** (1) `airflow dags list-import-errors` — точные tracebacks (или `SELECT * FROM import_error`). (2) `airflow info` — print среды: versions Python, Airflow, all installed providers — найти что обновилось неожиданно. (3) `airflow providers list` — все провайдеры с их versions. (4) `airflow plugins` — кастомные plugins (могут конфликтовать с new provider). (5) Внутри pod: `pip show apache-airflow-providers-postgres` — установленная version, deps tree. **Починка:** (1) Если конкретный provider broken — `pip install apache-airflow-providers-X==<old version>` в новом image, redeploy. (2) Если DAG зависит от удалённого symbol — quick fix через `airflow dags pause` всех affected, fix `.py`, unpause. (3) Validate fix: `airflow dags test <dag_id> $(date +%Y-%m-%d)` в pod — locally прогонит без scheduler. (4) Permanent: lock provider versions в `constraints.txt`, pin в Dockerfile.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Когда использовать `airflow dags test` vs `airflow dags trigger`?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6