Learning Platform
Глоссарий Troubleshooting
Урок 19.09 · 28 мин
Продвинутый
Migration PlaybookStep-by-StepRollbackProduction

Step-by-step migration playbook — 2.10/2.11 → 3.0/3.2

Это финальный practical playbook. Если у вас production Airflow 2.10/2.11 и нужно мигрировать на 3.x — это пошаговая инструкция с exact commands, decision points, rollback procedure. Battle-tested approach с conservative steps.

Total timeline: 6-12 месяцев preparation + 1 weekend actual migration. Не rush — major version upgrade — once-in-3-years event, делайте properly.

Migration playbook timeline — Saturday morning execution
Pre-flight: upgrade к 2.11 LTSНе migrate с 2.10 напрямую. Сначала upgrade к 2.11 — same architecture, но includes migration helpers, AIR301/AIR302 ruff rules active, supports newer providers. Low risk, 1-2 weeks. К migration day должно быть 0 ruff violations.
ruff violations → 0, staging soak test
Day -1: BackupRDS snapshot (instant PITR), pg_dump (offline backup), Helm values backup. Multiple recovery points — backup before EACH migration step, не just pre-migration. Document current Airflow version.
Saturday 09:00 UTC start
09:05 — Pause all DAGsxargs parallel airflow dags pause across all DAGs. Verify PAUSED == TOTAL. Slack communication 'migration started'. Critical: prevent new TaskInstance writes к DB during migration.
09:15 — Wait in-flight tasksWait running/queued tasks complete (max 30 min). Query: SELECT count(*) FROM task_instance WHERE state IN ('running','queued'). Если timeout — proceed anyway с warning. Минимизация partial-state.
DB writes stopped
09:45 — Helm upgradehelm upgrade airflow --version 2.0.0 --values values-3x.yaml. migrateDatabaseJob.enabled=false — мы run migrate manually. New 3.x pods crash because DB schema 2.x — это OK, ждём db migrate.
10:15 — airflow db migrateManually run migration (controlled). Alembic применяет 2.x → 3.x schema changes. Large DB: 15-30 min, иногда 1-2 hours (ab_user_role на millions rows). Monitor pg_stat_activity, не cancel — может leave inconsistent state.
schema 3.x ready
10:45 — Wait healthy componentskubectl wait для scheduler, apiServer, dagProcessor, triggerer ready. Verify API responds /api/v2/version returns 3.0.x. External Secrets sync — может занять 1-2 min для new K8s Secrets.
11:00 — Smoke testingAPI /api/v2/health, UI loads, DAGs count matches, no import errors (airflow dags list-import-errors), sample DAG dry run (airflow dags test capstone_orders_etl_realtime).
incremental unpause
11:30 — Unpause critical DAGsCritical DAGs first (capstone_orders_etl, fraud_detection_pipeline), 10s между each. Monitor 30 min. Если clean — unpause normal priority DAGs. Снизить max_active_runs временно для blast radius.
2-hour soak observation
13:00-15:00 — Soak + Communicate successWatch scheduler health (kubectl get pods), task failures (airflow tasks list-failed), Grafana metrics (scheduler_loop_duration, executor.queued_tasks). 16:00 UTC — migration window closes (rollback if not done).

Blue-green и canary — через Services и labels

Pre-requisites — что должно быть готово

Перед migration day проверьте:

✓ Pre-requisites checklist

□ Airflow 2.11 LTS running в production (NOT 2.10)
□ Staging environment identical к production
□ All DAGs pass airflow upgrade-check --to-version 3.0 (0 criticals)
□ Ruff AIR301/AIR302 violations = 0 (or accepted exceptions)
□ All API consumers updated к v2 (или have fallback)
□ Helm chart 2.x ready (official chart 2.0+ released)
□ Backup procedure tested
□ Rollback procedure tested на staging
□ Team trained on 3.x changes (FastAPI, Task SDK, Assets)
□ Communication plan (stakeholders, users)
□ Maintenance window approved
□ External Secrets backend tested на 3.x staging
□ Custom plugins (if any) compatible или have alternative

Phase 1: Final preparation (week before)

Day -7 — Final staging soak test

# Staging должен running 2-3 weeks на 3.x уже
# Last verification:

# 1. Compare metrics 2.x prod vs 3.x staging
# - scheduler.scheduler_loop_duration: < 20% regression OK
# - dag_processing.total_parse_time: < 20% regression OK
# - executor.queued_tasks: should normalize

# 2. End-to-end DAG runs
for dag in critical_dag_1 critical_dag_2 critical_dag_3; do
    airflow dags test $dag 2026-05-12  # Should pass на staging 3.x
done

# 3. API consumer tests
python tests/api_consumer_smoke_test.py --target=staging

# 4. UI walkthrough — все standard ops work?

Day -3 — Communicate

To: [email protected]
Subject: Airflow 3.x Migration — This Saturday 9am UTC

Hi team,

We're migrating Airflow к 3.x this Saturday.

Timeline:
  09:00 UTC — Start migration
  09:15 UTC — All DAGs paused
  10:30 UTC — Helm upgrade complete
  11:00 UTC — db migrate complete
  11:30 UTC — Smoke testing
  12:30 UTC — Unpause DAGs gradually
  14:00 UTC — Full operations resumed (estimate)
  16:00 UTC — Migration window closes (rollback if not done)

What's changing:
  - UI looks different (React-based)
  - REST API v1 deprecated (we'll support both for 3 months)
  - Some new features available (DAG Versioning, etc)

What stays same:
  - All DAGs functional
  - SLAs / data freshness
  - Slack alerting

Issues during migration: page DataEng-OnCall
After-hours support: Saturday + Sunday all day

Thanks,
Data Platform Team

Day -1 — Pre-flight backup

# 1. Snapshot RDS (instant point-in-time recovery если нужно)
aws rds create-db-snapshot \
  --db-instance-identifier airflow-prod \
  --db-snapshot-identifier airflow-pre-3x-migration-20260515

# 2. pg_dump backup (offline backup для extra safety)
kubectl exec -n airflow airflow-scheduler-0 -- bash -c "
  pg_dump --no-owner --no-acl -F custom -j 4 \
    postgresql://airflow_admin:\$PASSWORD@pgbouncer:6432/airflow > /tmp/airflow-2x.dump
"
kubectl cp -n airflow airflow-scheduler-0:/tmp/airflow-2x.dump ./airflow-2x.dump

# 3. Vault secret rotation reset
# Force Fernet key refresh — guarantee можно decrypt после migration

# 4. Helm values backup
helm get values airflow -n airflow > values-2x.yaml.backup

# 5. Document current Airflow version
kubectl exec -n airflow airflow-scheduler-0 -- airflow version > airflow-version-pre.txt

Phase 2: Migration day execution

Step 1 (09:00 UTC) — Initiate maintenance mode

# Communicate "migration started"
curl -X POST $SLACK_WEBHOOK -d '{"text":"🔧 Airflow migration started — DAGs will be paused в 5 min"}'

# Verify on-call available
echo "On-call SRE: $ON_CALL_NAME"
echo "Communications open в #airflow-migration channel"

Step 2 (09:05) — Pause all DAGs

# Pause via CLI (parallel)
kubectl exec -n airflow airflow-webserver-0 -- bash -c '
  airflow dags list -o json | jq -r ".[].dag_id" | \
    xargs -P 10 -I{} airflow dags pause {}
'

# Verify all paused
PAUSED=$(kubectl exec -n airflow airflow-webserver-0 -- bash -c '
  airflow dags list -o json | jq -r ".[] | select(.paused == \"True\") | .dag_id" | wc -l
')
TOTAL=$(kubectl exec -n airflow airflow-webserver-0 -- bash -c '
  airflow dags list -o json | jq -r ".[].dag_id" | wc -l
')
echo "Paused: $PAUSED / $TOTAL"
# Expected: $PAUSED == $TOTAL

Step 3 (09:15) — Wait for in-flight tasks

# Wait for running tasks to complete (max 30 minutes)
WAIT_START=$(date +%s)
MAX_WAIT=1800  # 30 minutes

while true; do
    RUNNING=$(kubectl exec -n airflow airflow-webserver-0 -- bash -c '
        psql -t -c "SELECT count(*) FROM task_instance WHERE state IN (\'\'running\'\', \'\'queued\'\') AND queued_dttm > now() - interval \'\'1 hour\'\';"
    ' | tr -d ' ')

    if [ "$RUNNING" -eq "0" ]; then
        echo "All tasks completed"
        break
    fi

    NOW=$(date +%s)
    if [ $((NOW - WAIT_START)) -gt $MAX_WAIT ]; then
        echo "WARNING: $RUNNING tasks still running, proceeding anyway"
        break
    fi

    echo "Waiting for $RUNNING tasks ($(($NOW - $WAIT_START))s elapsed)..."
    sleep 30
done

Step 4 (09:45) — Helm upgrade

# Apply 3.x chart с migrate job disabled (мы запустим manually)
helm upgrade airflow apache-airflow/airflow \
  --version 2.0.0 \
  --namespace airflow \
  --values values-3x.yaml \
  --set migrateDatabaseJob.enabled=false \
  --set apiServer.defaultUser.enabled=false \
  --wait \
  --timeout 30m

Helm создаст new pods (API server, scheduler 3.x, etc) but они crash because DB schema is 2.x. Это OK — следующий step migrate-нет.

Step 5 (10:15) — DB migration

# Run migration manually (controlled)
kubectl exec -n airflow $(kubectl get pods -n airflow -l component=scheduler -o jsonpath='{.items[0].metadata.name}') -- \
  airflow db migrate

# Monitor — migration may take 15-30 min на large DB
# Watch logs:
kubectl logs -n airflow $(kubectl get pods -n airflow -l component=scheduler -o jsonpath='{.items[0].metadata.name}') --tail=50 -f

# Verify migration complete
kubectl exec -n airflow $(kubectl get pods -n airflow -l component=scheduler -o jsonpath='{.items[0].metadata.name}') -- \
  airflow db check-migrations

Step 6 (10:45) — Wait for healthy components

# All scheduler pods ready
kubectl wait --for=condition=ready pod -n airflow -l component=scheduler --timeout=10m

# API server ready
kubectl wait --for=condition=ready pod -n airflow -l component=api-server --timeout=10m

# DAG Processor running
kubectl wait --for=condition=ready pod -n airflow -l component=dag-processor --timeout=5m

# Triggerer running
kubectl wait --for=condition=ready pod -n airflow -l component=triggerer --timeout=5m

# Verify API responds
curl -s https://airflow.example.com/api/v2/version | jq .
# Expected: {"version": "3.0.x"}

Step 7 (11:00) — Smoke testing

# 1. API server responds
curl -s https://airflow.example.com/api/v2/health | jq .
# Expected: {"status": "healthy", ...}

# 2. UI loads
curl -s https://airflow.example.com/ | grep -c "Airflow"
# Expected: > 0

# 3. List DAGs (via API)
curl -s -H "Authorization: Bearer $TOKEN" \
  https://airflow.example.com/api/v2/dags | jq '.dags | length'
# Expected: equal к pre-migration count

# 4. No import errors
kubectl exec -n airflow $(kubectl get pods -n airflow -l component=scheduler -o jsonpath='{.items[0].metadata.name}') -- \
  airflow dags list-import-errors
# Expected: empty

# 5. Sample DAG dry run
kubectl exec -n airflow $(kubectl get pods -n airflow -l component=scheduler -o jsonpath='{.items[0].metadata.name}') -- \
  airflow dags test capstone_orders_etl_realtime 2026-05-15
# Expected: success

Step 8 (11:30) — Unpause critical DAGs first

# Define unpause order (critical first, low-priority later)
CRITICAL_DAGS=(
    "capstone_orders_etl_realtime"
    "daily_revenue_aggregation"
    "fraud_detection_pipeline"
)

NORMAL_DAGS=(
    "weekly_metrics_dashboard"
    "monthly_billing_report"
    "ad_hoc_data_quality_check"
)

# Unpause critical first
for dag in "${CRITICAL_DAGS[@]}"; do
    echo "Unpausing $dag..."
    kubectl exec -n airflow $(kubectl get pods -n airflow -l component=scheduler -o jsonpath='{.items[0].metadata.name}') -- \
      airflow dags unpause $dag
    sleep 10  # 10s между each
done

# Watch for issues 30 minutes
echo "Monitoring critical DAGs for 30 min..."
sleep 1800

# If clean, unpause rest
for dag in "${NORMAL_DAGS[@]}"; do
    kubectl exec -n airflow $(kubectl get pods -n airflow -l component=scheduler -o jsonpath='{.items[0].metadata.name}') -- \
      airflow dags unpause $dag
    sleep 5
done

Step 9 (12:30) — All DAGs unpaused

# Unpause all remaining (anything missed)
kubectl exec -n airflow airflow-webserver-0 -- bash -c '
  airflow dags list -o json | jq -r ".[] | select(.paused == \"True\") | .dag_id" | \
    xargs -P 10 -I{} airflow dags unpause {}
'

# Final verification
echo "=== Final state ==="
kubectl exec -n airflow airflow-webserver-0 -- bash -c '
  echo "Total DAGs: $(airflow dags list -o json | jq -r \".[].dag_id\" | wc -l)"
  echo "Paused: $(airflow dags list -o json | jq -r \".[] | select(.paused == \"True\") | .dag_id\" | wc -l)"
  echo "Active: $(airflow dags list -o json | jq -r \".[] | select(.paused == \"False\") | .dag_id\" | wc -l)"
'

Step 10 (13:00-15:00) — Soak observation period

# Monitor for 2 hours
# 1. Scheduler health
watch -n 10 "kubectl get pods -n airflow"

# 2. Task failures
watch -n 30 "kubectl exec -n airflow airflow-webserver-0 -- bash -c \"
    airflow tasks list-failed --state failed --since '\$(date -u -d '1 hour ago' --iso-8601)'
\""

# 3. Metrics dashboard
# Grafana: scheduler.scheduler_loop_duration, executor.queued_tasks, executor.open_slots

# 4. Slack channel #airflow-migration — monitor user reports

Step 11 (15:00) — Communicate success

curl -X POST $SLACK_WEBHOOK -d '{"text":"✅ Airflow 3.x migration complete and stable. All DAGs running. Issues? Page DataEng-OnCall."}'

Phase 3: Post-migration (week after)

Day +1 — Monitor

- Continuous monitoring 24h
- Daily standup: any new issues?
- Performance regression analysis (compare metrics 2.x baseline vs 3.x first day)
- API consumer logs — anyone hitting v1 endpoints?

Day +7 — Final review

- All critical DAGs ran successfully за неделю?
- Performance OK?
- User feedback?
- Decommission 2.x backup snapshots (keep оlast 35 days for now)
- Update documentation
- Postmortem report (lessons learned, time taken, issues)

Rollback procedure

If migration fails — rollback к 2.x. Time-sensitive — better к do early than try fix forward в production crisis.

Rollback в течение 1 hour (минимальная data loss)

# 1. Pause all DAGs в 3.x (prevent further DB writes)
kubectl exec -n airflow ... airflow dags pause-all

# 2. Wait для in-flight tasks (max 10 min)
sleep 600

# 3. Helm rollback к 2.x
helm rollback airflow <previous-revision> -n airflow --wait --timeout 30m

# 4. Restore DB из pre-migration snapshot
aws rds restore-db-instance-from-db-snapshot \
  --db-instance-identifier airflow-prod-rollback \
  --db-snapshot-identifier airflow-pre-3x-migration-20260515

# 5. Update Airflow connection strings к restored RDS
kubectl rollout restart deployment -n airflow

# 6. Smoke test 2.x
curl -s https://airflow.example.com/health
# Expected: 2.x health check

# 7. Unpause DAGs
... airflow dags unpause-all

# 8. Communicate rollback
curl -X POST $SLACK_WEBHOOK -d '{"text":"⚠️ Airflow rolled back к 2.11. Investigating 3.x issue. ETA migration retry: tbd."}'

Data loss in rollback: any DagRuns / XCom / log entries created в 3.x lost (restore из snapshot). Typically 1-2 hours of activity loss.

Rollback > 24 hours later

Difficult — DB diverged significantly. Options:

  • Forward fix 3.x bug — better path
  • Manual data reconciliation (export specific DagRuns 3.x → restore base → re-import)
  • Accept data loss + restore snapshot

Best practice: rollback decision должен быть made в первые 2-4 часа. Beyond — commit к fix forward.


Common issues and fixes

Issue: airflow db migrate fails

Symptom: migration stuck or errors

Fix:

# Check Alembic version
kubectl exec ... airflow db check-migrations

# Show current revision
psql ... -c "SELECT version_num FROM alembic_version;"

# Manually rollback last migration если problem
psql ... -c "DELETE FROM alembic_version;"
# Then re-run migrate

# Worst case: restore от backup, fix migration issue в staging first

Issue: DAGs not parsing после migration

Symptom: airflow dags list-import-errors shows errors

Fix:

# Common cause — old imports не migrated
# Run ruff fix
kubectl exec ... ruff check --fix --select AIR301,AIR302 /opt/airflow/dags

# Check DAG Bundle config (new в 3.x)
kubectl exec ... cat /opt/airflow/airflow.cfg | grep -A 20 dag_bundles

Issue: API consumers fail

Symptom: monitoring tools / scripts failing

Fix:

  • Verify v1 endpoint still works (3.x maintains for 6+ months)
  • Update consumer к v2 endpoints
  • Use apache-airflow-client Python package

Issue: UI shows wrong DAGs

Symptom: missing DAGs в UI

Fix:

  • Check DAG Bundles syncing properly
  • Verify gitSync replaced с dagBundles в Helm values
  • Check DAG Processor logs

Production gotchas migration day

Не attempt в Friday evening. If anything goes wrong, your team тратит weekend. Saturday morning — team rested, full weekend для recovery if needed.

Снизить max_active_runs временно. Set max_active_runs=2 на capstone DAGs first 24 hours — limit blast radius если migration introduced subtle bugs.

Monitor scheduler loop duration carefully. 3.x with standalone DAG Processor should be faster scheduler ticks. Если slower — something misconfigured.

Provider versions matter. Some providers may have breaking changes между 2.x-compatible versions and 3.x versions. Pin everything в requirements.txt.

External Secrets sync timing. After Helm upgrade may take 1-2 min для External Secrets Operator sync new Kubernetes Secrets. Pods может start before secrets ready — wait для Pending → Running.

Backup before EACH step. Не just pre-migration backup — backup before db migrate, before Helm upgrade. Multiple recovery points.

Communicate constantly. Slack channel updates every 30 min. Stakeholders нervous about migration — visibility critical.


Проверка знанийKnowledge check
Migration day, step 5: airflow db migrate launched на Saturday 10:00 UTC. По прошествии 45 минут не завершилось — log показывает 'Adding column user_id to ab_user_role' running. Что делать?
ОтветAnswer
Long-running DB migration — common scenario для large Airflow DB. **DO NOT cancel** — может leave DB в inconsistent state requiring restore. Steps: (1) **Verify progress, not deadlock**: connect к Postgres `psql ...` → `SELECT state, query, query_start, wait_event FROM pg_stat_activity WHERE state != 'idle' ORDER BY query_start;` — если query running (not waiting), это just slow. If wait_event = 'Lock' и stuck — есть problem. (2) **Estimate completion time**: large tables (ab_user_role в этом случае — likely millions rows из FAB history) take long для ALTER TABLE. Check table size: `SELECT pg_size_pretty(pg_total_relation_size('ab_user_role'));`. 100 GB table может ALTER 1-2 hours. (3) **Investigate если actually stuck**: `SELECT * FROM pg_locks WHERE NOT granted;` — есть other queries holding locks? `SELECT * FROM pg_stat_replication;` — replication lag? `SHOW wal_writer_delay;` — WAL writes flowing? (4) **Communicate timeline update**: Slack — 'DB migration таking longer than expected, ETA расширен на 2 часа. No action needed.' Set new completion target. (5) **Monitor I/O**: AWS CloudWatch — DB IOPS / disk write bytes. If reading/writing aggressively — work happening, just slow. Если IOPS = 0 — stuck. (6) **Verify не killed by RDS maintenance window**: RDS may interrupt long queries during maintenance. Check parameter группы для statement_timeout (если set globally, может kill migration). (7) **Decision point at 2 hours**: (a) If still progressing (IOPS активный, log shows movement) — wait дальше; (b) If completely stuck (no progress over 30 min) — consider rollback (Helm rollback + restore DB snapshot — accept data loss); (c) **AVOID**: kill -9 migration script — leaves Alembic version inconsistent, hard recovery. (8) **Long-term lesson**: pre-migration testing с production DB clone должен expose такие slow migrations. Migrate против 100GB staging copy в advance — measure time. Plan maintenance window long enough. **Specific к ab_user_role migration**: legacy FAB tables sometimes contain millions rows of stale audit data. Pre-migration cleanup: `DELETE FROM ab_permission_view_role WHERE created_on < now() - interval '2 years';` etc — reduce table size before migration. **Recovery if migrate actually fails (rare)**: `SELECT version_num FROM alembic_version;` — current revision. `DELETE FROM alembic_version;` — reset. Re-run `airflow db migrate` — Alembic re-detects state. If schema inconsistent — restore от snapshot, fix issue, retry. **Best practice**: pre-migration `pg_dump` taken (мы did это), restore option always available. Don't panic, communicate, monitor, decide at 2-hour mark.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Pre-requisites для migration day. Что критично?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 10