CLI deep dive: dags, tasks, db, pools, connections, users
airflow CLI — это второй (после REST API) основной интерфейс для управления кластером. Но у CLI есть уникальные возможности, которых нет в API: запуск task in-place (tasks test), управление базой данных (db upgrade/clean/check), backfill в форвграунде, тестовый запуск DAG без scheduler.
Этот урок — структурированный обзор CLI с фокусом на production workflows: что и когда использовать, какие флаги критичны, где CLI лучше API и наоборот.
Click и argparse — как устроены CLI-инструменты Python
Архитектура CLI
CLI — это обычный Python CLI tool (Click + argparse под капотом), который импортирует Airflow модули и работает прямо с metadata DB.
Из этого следуют критичные ограничения:
- CLI должна быть запущена в контейнере с правильной средой: тот же Python, те же провайдеры, доступ к metadata DB. Обычно — внутри scheduler/worker pod через
kubectl exec. - Доступ к DB обязателен — без него падает на первой команде. Для diagnostics из вне используйте REST API.
- Fernet key обязателен для команд с credentials (
connections add,variables set).
Карта sub-commands
airflow
├── dags # DAG operations: list, pause, trigger, test, backfill, delete
├── tasks # Task operations: list, test, run, states-for-dag-run
├── db # DB operations: init, upgrade, clean, check, reset
├── pools # Pool CRUD
├── connections # Connection CRUD + import/export
├── variables # Variable CRUD + import/export
├── users # FAB user management
├── roles # FAB role management
├── providers # Installed providers info
├── jobs # Background jobs (scheduler/triggerer) info
├── config # Print config values
├── info # System diagnostic
├── plugins # Loaded plugins
├── dag-processor # Standalone DAG Processor (для distributed setup)
├── scheduler # Run scheduler in foreground
├── webserver # Run webserver in foreground
├── triggerer # Run triggerer in foreground
├── celery # Celery commands (если используется CeleryExecutor)
└── kubernetes # K8s pod cleanup
Ниже разбираем самые полезные subcommands и их флаги.
airflow dags — DAG operations
dags list
airflow dags list # все DAGs
airflow dags list -o yaml # output format
airflow dags list --tags etl,critical # фильтр по tags
Производительность: для 1000+ DAGs lists с paginated UI быстрее. CLI делает full SELECT.
dags list-import-errors
airflow dags list-import-errors
Показывает DAGs которые не парсятся. Первая команда при «DAG не появился в UI». Эквивалент SELECT * FROM import_error.
dags pause / dags unpause
airflow dags pause my_dag
airflow dags unpause my_dag
Эквивалент UI toggle. Mass-pause через xargs:
airflow dags list -o plain | awk '{print $1}' | xargs -I{} airflow dags pause {}
dags trigger
airflow dags trigger my_dag \
--run-id manual__test_run \
--conf '{"source":"s3://test/"}' \
--no-replace-microseconds
Чем отличается от REST API: CLI делает прямой INSERT в metadata DB, без webserver. Это быстрее (нет HTTP overhead), но требует доступ к DB.
dags test — изолированный test run
airflow dags test my_dag 2026-05-12
Это уникальная возможность CLI (в API нет аналога). Запускает весь DAG в текущем процессе, без scheduler, без executor, без metadata DB persistence. Идеально для:
- Локальная разработка DAG
- Smoke test перед deploy
- Debug сложных deps
# С breakpoint поддержкой (с 2.7+)
airflow dags test my_dag 2026-05-12 \
--post-mortem # дропает в pdb при exception
dags test НЕ персистит state в DB. Это даёт быстрый цикл «измени → запусти → посмотри». Но Connection/Variable lookups идут реальные — поэтому удобно для interactive debug в localhost окружении с copy production secrets.
dags backfill — историческое recompute
airflow dags backfill my_dag \
--start-date 2026-04-01 \
--end-date 2026-04-30 \
--reset-dagruns \
--pool backfill_pool \
--rerun-failed-tasks
Ключевые флаги:
--reset-dagruns— если DagRuns уже существуют, сбросить их state перед запуском.--rerun-failed-tasks— для уже failed TI делать retry, а не skip.--pool backfill_pool— изолировать backfill от production pool (важно для production health).--max-active-runs N(2.7+) — ограничить параллельность backfill.
Backfill через CLI запускает scheduler в foreground режиме в текущем процессе. Это значит:
- Если SSH сессия упала — backfill умер.
- Terminal blocked на часы.
Альтернатива — Pattern 5 из урока 04 (через REST API в loop), даёт async backfill.
dags delete
airflow dags delete my_dag
Удаляет все metadata о DAG: DagRuns, TaskInstances, XCom, logs, dataset events, references. Файл .py не трогает. Если файл остаётся — DAG появится снова при следующем parse.
Production pattern: сначала удалить файл из git → wait scheduler удалит is_active=False → потом airflow dags delete.
airflow tasks — Task operations
tasks list
airflow tasks list my_dag --tree
Показывает структуру DAG. --tree рисует ASCII-tree деревом — удобно для документирования.
tasks test — запуск task без scheduler
airflow tasks test my_dag transform_data 2026-05-12
Запускает одну task в текущем процессе на конкретный logical date. Не пишет в metadata DB. Идеально для:
- Локальный debug task логики
- Quick test «работает ли connection после fix»
- Reproduce production failure локально
С 2.10+ принимает --env-vars '{"key":"val"}' для override:
airflow tasks test my_dag fetch_api 2026-05-12 \
--env-vars '{"API_URL":"http://localhost:8000"}'
tasks run — запуск как worker
airflow tasks run my_dag transform_data 2026-05-12 --local
Это то, что executor запускает на worker. --local — стандартный LocalTaskJob. CLI команда обычно НЕ вызывается вручную — её использует executor.
tasks states-for-dag-run
airflow tasks states-for-dag-run my_dag manual__2026-05-12T10:00:00
Print состояние всех TI в run. Альтернатива GET /taskInstances через API.
tasks clear
airflow tasks clear my_dag \
--task-regex 'transform_.*' \
--start-date 2026-05-01 \
--end-date 2026-05-12 \
--only-failed \
--yes # без интерактивного confirm
Mass-clear по regex + date range. Очень мощно — но опасно. Всегда сначала без --yes, посмотреть list, потом запускать с --yes.
airflow db — Metadata DB management
Самые важные production команды.
db upgrade
airflow db upgrade
Применяет Alembic migrations к metadata DB. Обязательно перед запуском новой версии Airflow.
В Helm chart запускается в airflow-run-migrations Job перед deploy. В docker-compose — через init container.
db downgrade (2.7+)
airflow db downgrade --to-version 2.8.0
Откат миграций. Используется при rollback to previous version. Опасно — некоторые миграции необратимы (drop column).
db check — connectivity test
airflow db check
Проверяет, что Airflow может подключиться к DB. Возвращает 0/1 — удобно для readiness probe.
# Kubernetes readiness
readinessProbe:
exec:
command: ["airflow", "db", "check"]
initialDelaySeconds: 30
periodSeconds: 30
db check-migrations
airflow db check-migrations
Проверяет, что миграции применены. Если есть pending — exit 1.
db clean — cleanup старых данных
airflow db clean \
--clean-before-timestamp '2026-04-12' \
--tables 'task_instance,log,xcom,dag_run' \
--skip-archive \
--yes
Удаляет старые rows. Критично для production: без cleanup metadata DB растёт до 100s GB.
--skip-archive— не создавать backup table перед удалением (по default архивируется).--clean-before-timestamp— все строки старше timestamp удаляются.
Production pattern: cron daily:
0 3 * * * airflow db clean --clean-before-timestamp $(date -u -d '30 days ago' +%Y-%m-%d) --yes --skip-archive
db shell
airflow db shell
Открывает psql/mysql shell с уже настроенным connection. Удобно для production diagnostics.
airflow connections / variables / pools
CRUD команды для метаданных.
Connections
# Create
airflow connections add warehouse_snowflake \
--conn-type snowflake \
--conn-host xy12345.us-east-1 \
--conn-login AIRFLOW_SVC \
--conn-password '...' \
--conn-extra '{"warehouse":"COMPUTE_WH","role":"ETL_ROLE"}'
# Read
airflow connections get warehouse_snowflake --output json
# Update — нет напрямую, нужно delete + add
airflow connections delete warehouse_snowflake
airflow connections add warehouse_snowflake --conn-type ...
# Export / Import
airflow connections export connections.json --file-format json
airflow connections import connections.yaml
Variables
# Set с типом
airflow variables set retry_limit 3 --json # сохранит как JSON int
# Get
airflow variables get retry_limit
# Export всех
airflow variables export vars.json
# Import
airflow variables import vars.json
Pools
airflow pools list
airflow pools set gpu_pool 4 "GPU workers pool" --include-deferred
airflow pools delete gpu_pool
# Import bulk
airflow pools import pools.json
--include-deferred (2.7+) считает deferred TI как занимающие slot — критично для accurate budget control.
airflow users — FAB user management
# Create admin
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--email [email protected] \
--role Admin \
--password '...'
# Reset password
airflow users reset-password --username admin
# Add role to user
airflow users add-role --username ci_user --role TeamA_CI
# List
airflow users list
# Delete
airflow users delete --username old_user
Альтернатива: REST API /api/v1/users — но он требует Admin role и работает только если auth_manager поддерживает write (FAB поддерживает, AWS IAM нет).
airflow jobs check — heartbeat detection
airflow jobs check \
--job-type SchedulerJob \
--hostname $(hostname) \
--limit 1 \
--allow-multiple
Проверяет, что SchedulerJob с указанным hostname активен (latest_heartbeat в пределах health threshold). Используется для liveness probe:
livenessProbe:
exec:
command:
- airflow
- jobs
- check
- --job-type
- SchedulerJob
- --hostname
- $(hostname)
periodSeconds: 30
failureThreshold: 5
Production workflows
Workflow 1: Deploy new version
# 1. Tag старую DB
pg_dump airflow > airflow_pre_2_11_upgrade.sql
# 2. Bring scheduler/workers down (Helm rollout pause)
kubectl scale deploy/airflow-scheduler --replicas=0
kubectl scale deploy/airflow-worker --replicas=0
# 3. Run migrations
kubectl run airflow-migrate --image=apache/airflow:2.11.0 \
--rm -it --restart=Never \
--env="AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=..." \
-- airflow db upgrade
# 4. Validate
airflow db check-migrations
# 5. Resume
kubectl scale deploy/airflow-scheduler --replicas=2
kubectl scale deploy/airflow-worker --replicas=10
Workflow 2: Disaster recovery — corrupted DagRun
# 1. Identify problematic run
airflow tasks states-for-dag-run my_dag manual__broken_run
# 2. Force clear all TI
airflow tasks clear my_dag \
--start-date 2026-05-12 --end-date 2026-05-12 \
--yes
# 3. Or delete DagRun completely
airflow dags delete-dag-run my_dag manual__broken_run
# 4. Re-trigger
airflow dags trigger my_dag --run-id manual__retry
Workflow 3: Stuck task adoption
# Task в state running, но scheduler/worker мёртв
# 1. List жертв
airflow tasks states-for-dag-run my_dag dag_run_id_x | grep running
# 2. Force reset
airflow db shell <<EOF
UPDATE task_instance
SET state = NULL, queued_dttm = NULL
WHERE dag_id = 'my_dag'
AND run_id = 'dag_run_id_x'
AND state = 'running';
EOF
# 3. Scheduler подберёт на следующем tick
CLI vs REST API — когда что использовать
| Сценарий | Лучше CLI | Лучше REST API |
|---|---|---|
Local DAG development (tasks test) | yes | |
db upgrade/migration | yes | |
Bulk import (connections import) | yes | |
| Deploy automation в Helm | yes | |
Cron cleanup (db clean) | yes | |
| Trigger DAG из GitHub Actions | yes | |
| Mass clear failed TI в production | yes | |
| Real-time SLA monitoring из external | yes | |
| Sync connections из Terraform | yes | |
| Programmatic UI dashboard | yes |
Rule of thumb: CLI — для внутренних operations (внутри кластера, обычно automation deployment-а). REST API — для внешних integrations (CI/CD, monitoring, third-party).
Production gotchas
1. CLI обязательно из контейнера с DB credentials
airflow dags list на localhost без DB connection упадёт с sqlalchemy.exc.OperationalError. Всегда:
kubectl exec -it deploy/airflow-scheduler -- airflow dags list
2. dags trigger --conf shell escaping
# Неправильно: shell съест кавычки
airflow dags trigger my_dag --conf "{"key":"val"}"
# Правильно: single-quote вокруг JSON
airflow dags trigger my_dag --conf '{"key":"val"}'
# Или через файл
echo '{"key":"val"}' > /tmp/conf.json
airflow dags trigger my_dag --conf "$(cat /tmp/conf.json)"
3. db clean --skip-archive не создаёт backup
По default db clean создаёт _airflow_deleted__* tables с архивными данными. Это удваивает место. --skip-archive — pure delete. Используйте для регулярного cleanup, но первый раз — без флага, посмотрите архив.
4. users create --password оставляет в history
Все shell history. Не делайте этого в shared host. Use:
airflow users create --use-random-password --username admin ...
# Затем reset через UI или CLI:
airflow users reset-password --username admin
Или используйте --password $(< /tmp/secret) и удалите /tmp/secret сразу.
5. dags test имеет полный доступ к production secrets
tasks test и dags test загружают реальные Connections/Variables (Fernet key, secrets backend). Запуск на production host = production data flow. Будьте осторожны с side effects (UPDATE в warehouse и т.п.).