Learning Platform
Глоссарий Troubleshooting
Урок 11.05 · 28 мин
Продвинутый
MigrationSavepointState V2 APIProductionRollback PlanReal-World Story

Миграция с RocksDB на ForstDB — реальный production flow

Миграция state backend — одна из самых рискованных операций для stateful Flink job. State содержит часы или месяцы аккумулированной информации. Любая ошибка может стоить downtime, data loss, или просто месяца восстановления state с источника.

В этом уроке мы пройдём полный flow миграции с RocksDB на ForstDB на основе реального production case

Восстановление и rescaling из savepoint: streaming аналитика для ecommerce, 800 GB state, 50 TaskManagers, recovery SLA 5 минут — критично для real-time alerting. Включая что пошло не так и как мы fixed.

Pre-migration: оценка

Прежде чем мигрировать, нужно понять — стоит ли:

Чек-лист стоит ли мигрировать:

[ ] State size > 50 GB на TaskManager
[ ] Recovery time проблема (текущий > 5 min)
[ ] Rescaling нужен (autoscaling, capacity changes)
[ ] Кластер sizing определяется state size, не CPU
[ ] Cost оптимизация цель (EBS снизить)
[ ] Multi-region/DR в roadmap

Если 3+ чекмарки: имеет смысл рассмотреть
Если < 2: остаться на RocksDB

Кейс из текста:
  State size: 800 GB across 50 TM (16 GB avg per TM)
  Recovery time: 22 минуты (S3 download + RocksDB restore)
  Rescaling: required для weekly traffic spikes
  Cost: $8K/мес на EBS gp3 disks
  DR: manual recovery 4-6 hours

  Все 6 чекмарков -> migration appropriate

Этап 1: Подготовка (1-2 недели)

1.1 Apgrade Flink до 2.x в dev/staging
    - Flink 1.20 (LTS) -> Flink 2.2
    - Без изменений config: backend остаётся RocksDB
    - Все API compatible
    - Validate jobs работают без regressions

1.2 Setup S3 bucket для disaggregated state
    - Создать bucket в same region
    - IAM permissions (read/write/list/delete на префиксе)
    - Lifecycle policies для cleanup obsolete files
    - Versioning OFF (ForstDB управляет sам)
    - VPC endpoint для cross-AZ traffic avoid

1.3 Setup мониторинга
    - Метрики ForstDB: cache hit rate, S3 ops, compaction
    - Алерты на cache miss > 50%, S3 errors
    - Dashboards для recovery time tracking

1.4 Documentation pipeline
    - Compiled plan baseline на RocksDB
    - Document текущий state schema
    - Backup всех savepoints

В нашем кейсе: 10 дней prep, 0 production changes

Этап 2: Staging migration

2.1 Clone production job в staging cluster
    - Read same Kafka topic (consumer group разделен)
    - Same SQL/code
    - Same parallelism

2.2 Включить ForstDB в staging
    state.backend.type: forst
    state.backend.forst.local-cache.size: 20gb
    state.backend.forst.block-cache.size: 1gb
    table.exec.async-state.enabled: true
    state.checkpoints.dir: s3://staging-flink-state/checkpoints

2.3 Restore с production savepoint
    - savepoint совместим между sync RocksDB и ForstDB
    - state schema не меняется
    - state encoding compatible (с few exceptions)
    - Job starts в minutes

2.4 Validation period (2-4 недели)
    - Compare outputs с production (sample)
    - Monitor metrics: throughput, latency, state size, cache hit rate
    - Test recovery scenarios:
      * Kill TaskManager — measure recovery time
      * Rescale parallelism — measure downtime
      * Network glitch к S3 — observe behavior
    - Load tests: peak traffic simulation

В нашем кейсе:
  Day 1-7: setup, начальный run, fixes (cache size too small initially)
  Day 8-21: stabilization, baseline metrics
  Day 22-28: chaos testing

Initial results (cache 20 GB):
  Throughput: 60% от production
  Cache hit rate: 65%
  Recovery time: 45 sec (vs 22 min на RocksDB)

После tuning cache 100 GB:
  Throughput: 110% от production
  Cache hit rate: 87%
  Recovery time: 30 sec
WARNING

Сюрприз #1 в нашем кейсе: default ForstDB local cache 0 GB. Мы запустили staging и получили 60% throughput — все state reads шли в S3 (~10 ms каждый). Cache hit rate 0% потому что cache disabled. Урок: ВСЕГДА явно set local-cache.size до production.

Этап 3: Operator migration на async state API

3.1 Identify operators которые надо переписать
    Для нашего job:
    - 3 SQL Aggregate операторы (StreamPhysicalGroupAggregate) — async-ready YES
    - 2 Join (StreamPhysicalJoin) — async-ready YES
    - 1 Custom UDF с MapState — нужно переписать NO
    - 1 MATCH_RECOGNIZE для fraud — НЕ async-ready NO

3.2 Custom UDF migration
    Before (V1 sync):
    public class EnrichmentFunction extends KeyedProcessFunction<...> {
        private MapState<String, UserProfile> profiles;
        
        public void processElement(Event e, Context ctx, Collector<Result> out) throws Exception {
            UserProfile p = profiles.get(e.userId);   // SYNC
            if (p == null) { /* lookup external */ }
            // ... business logic
        }
    }

    After (V2 async):
    public class EnrichmentFunctionV2 extends KeyedProcessFunctionV2<...> {
        private MapState<String, UserProfile> profiles;
        
        public void processElement(Event e, Context ctx, Collector<Result> out) {
            profiles.asyncGet(e.userId).thenAccept(p -> {
                if (p == null) { /* lookup external */ }
                // ... business logic
                out.collect(result);
            });
        }
    }

3.3 MATCH_RECOGNIZE — нет async option
    Decision matrix:
    A) Оставить sync, accept 30% throughput на этом operator
    B) Переписать на DataStream API + custom CEP-like logic с V2 API
    C) Отделить fraud detection в separate job на RocksDB

    Выбрали C: fraud detection — отдельный critical job 
    Main analytics job -> ForstDB
    Fraud job -> RocksDB (smaller state, latency critical)

3.4 SQL changes
    Включить async для SQL operators:
    SET table.exec.async-state.enabled = true;
    
    For custom UDF: re-register с V2 wrapper
    CREATE FUNCTION enrich AS 'com.company.EnrichmentFunctionV2';

Этап 4: Compile plan freeze

4.1 Final SQL код подготовлен, async включен
4.2 COMPILE PLAN на staging:
    
    COMPILE PLAN '/staging-plans/analytics_job_v2.json' FOR
    SELECT
      window_start, user_id, COUNT(*) AS event_cnt,
      enrich(user_id) AS profile, ...
    FROM events ...

4.3 Validate plan:
    - StreamExecGroupAggregate с async=true
    - StreamExecJoin с async=true  
    - StreamExecCalc с custom function (async wrapper)
    - StreamExecSink (KafkaSink с async)

4.4 Commit JSON в git
    - Plan freeze для production
    - Любой будущий апгрейд Flink -> план тот же
    - Защита от plan drift через cost model changes

Этап 5: Production migration (carefully)

5.1 Pre-migration safety net
    - Final savepoint на RocksDB version: s3://prod/savepoints/pre-migration
    - Stand by RocksDB cluster на ready (для emergency rollback)
    - On-call rotation extended
    - Communication: stakeholders notified

5.2 Migration window (off-peak hours)
    Step 1: Stop production job (savepoint trigger)
        bin/flink stop --savepointPath s3://prod/savepoints/migration-checkpoint <job-id>
        Wait для completion: ~5 min для 800 GB

    Step 2: Update Flink config
        state.backend.type: forst
        state.backend.forst.local-cache.size: 100gb
        state.backend.forst.block-cache.size: 2gb
        table.exec.async-state.enabled: true

    Step 3: Submit job via compiled plan
        bin/flink run \
          --fromSavepoint s3://prod/savepoints/migration-checkpoint \
          --planFile /opt/flink/plans/analytics_job_v2.json

    Step 4: Verify
        - Job starts (~2 min)
        - Watermark recovery (~5 min)
        - Output to Kafka begins
        - Metrics в dashboard

В нашем кейсе:
  Total downtime: 12 min (target was < 30 min)
  No data loss (exactly-once + Kafka offsets)
  Throughput recovered в 8 min (cache warm-up)

Этап 6: Post-migration validation

6.1 First 24 hours: intensive monitoring
    Metrics watching:
    - Cache hit rate: target > 80% (initial 70%, grew to 90% в 2 часа)
    - S3 errors: target 0 (had 3 transient retries, OK)
    - Latency p99: target < 5x baseline (achieved 1.2x)
    - Checkpoint duration: target < 1 min (achieved 30 sec const)
    - End-to-end latency: target < baseline (achieved -30%)

6.2 First week: comparison с baseline
    Metric                  Before (RocksDB)    After (ForstDB)    Delta
    
    State size              800 GB on disks     800 GB в S3        moved
    Cluster nodes           50 TM               50 TM (но тоньше) -
    Local disk per node     20 GB               5 GB (cache only)  -75%
    EBS cost/month          $8000               $2000              -75%
    S3 cost/month           $200 (checkpoint)   $1500 (all)        +650%
    Total cost              $8200               $3500              -57%
    Recovery time           22 min              30 sec             -98%
    Checkpoint duration     2 min (growing)     30 sec const       -75%
    Rescaling downtime      15 min              60 sec             -93%
    Throughput              50K events/sec      55K events/sec     +10%
    P99 latency             45 ms               25 ms              -44%

6.3 First month: edge cases
    - 2x rescaling events (autoscaling)
        Before: requires planned downtime
        After: автоматически, no impact
    - 1 TaskManager pod evict (K8s rolling update)
        Before: 22 min downtime
        After: 30 sec recovery, transparent
    - S3 transient outage (1 minute)
        Before: would not affect
        After: job continued, requests retried, no data loss

6.4 Stabilization
    После 30 дней — declare migration successful
    Decommission emergency RocksDB cluster
    Archive RocksDB savepoints в Glacier
    Update runbooks

Что пошло не так — production stories

Story #1: Initial cache too small
  Day 1 staging: throughput 60%, cache hit rate 65%
  Bug: assumed default cache size was reasonable
  Reality: default 0 (disabled)
  Fix: explicitly set 100GB
  Impact: no production impact (caught в staging)

Story #2: Hot key пара (after migration)
  Day 7 production: 5% throughput drop, latency spike
  Investigation: один user_id с 50K events/min
  Cache evicting other keys для этого hot key
  Fix: SplitAggregateRule for COUNT DISTINCT, salt hot keys
  Time to fix: 1 day

Story #3: S3 PUT throttling
  Day 14: S3 errors начали появляться в logs
  Investigation: ~5000 PUT/sec пиковая нагрузка на prefix
  S3 limit: ~3500 PUT/sec per prefix
  Fix: sharding ForstDB files через prefix patterns
       state.backend.forst.s3.prefix-shards: 8
  Time to fix: half day, restart job

Story #4: Late-binding watermark issue с async
  Day 21: watermark advancing slower than expected
  Investigation: async state requests delaying watermark emit
  Что happens: watermark waits для all in-flight requests complete per key
  Fix: tune buffer-timeout lower (100ms -> 30ms)
       More frequent batches, lower watermark latency
  Impact: minor throughput drop (-3%), better SLA compliance

Story #5: Custom UDF leak
  Day 25: heap usage growing
  Investigation: custom UDF allocated CompletableFuture instead of StateFuture
  Не возвращалось в Flink object pool, memory leak
  Fix: refactor для использования StateFutureUtils
  Time to fix: 2 days (code change, test, deploy)

Rollback plan (то, что не понадобилось но был)

Если миграция fails — план B:

Trigger conditions:
  - Throughput < 50% baseline продолжительно
  - Data loss или corruption
  - Critical bug в async runtime
  - S3 cost > 3x estimate

Rollback procedure:
  1. Stop new job (без savepoint, чтобы не повредить state)
  2. Restart старый jar/config:
     bin/flink run \
       --fromSavepoint s3://prod/savepoints/pre-migration \
       <old jar>
  3. Verify metrics restored
  4. Post-mortem analysis

Время rollback: ~10 min downtime
Data loss: from last RocksDB savepoint (несколько minutes processing)
Cost: minor — outputs могут дублироваться, нужен idempotent sink

Lessons learned

1. Cache sizing — most critical decision
   - Default 0 — gotcha
   - Start 30% от working set, tune вверх
   - Monitor cache hit rate, target > 80%

2. Plan freeze (COMPILE PLAN) — must-have
   - Без него future Flink upgrades могут change plan
   - JSON в git = audit trail + reproducibility

3. Test recovery scenarios extensively
   - Killing TaskManager в staging обязательно
   - Verify recovery time matches target SLA
   - Multiple scenarios: pod evict, node failure, AZ outage

4. Custom UDFs — bottleneck
   - Если есть много custom logic с state -> большая работа переписать
   - Wrappers OK для legacy, но true async лучше long-term
   - Plan for 1-2 weeks per complex operator

5. Cost mix changes — communicate
   - EBS cost down
   - S3 cost up (PUT/GET/storage)
   - Net cost обычно вниз, но финансы должен знать
   - Lifecycle policies критичны для S3 storage cost

6. Monitor cache thrashing
   - Hot keys evict cold keys
   - Может deteriorate over time
   - Plan for re-tuning quarterly

7. Async errors silent by default
   - StateFuture.thenAccept не propagates exception
   - Wrap в exceptionally() для logging
   - Critical: enable async-state.error-handler

Migration timeline summary

Total timeline: 8 weeks

Week 1-2:  Preparation (Flink upgrade, S3 setup, monitoring)
Week 3-4:  Staging migration (clone job, validate)
Week 5:    Operator migration (async UDF rewrite)
Week 6:    Compile plan, final testing
Week 7:    Production migration (carefully)
Week 8+:   Validation, stabilization, decommission RocksDB

Active engineer time: ~3 weeks total
Calendar time: 8 weeks (buffer for testing, fixes)

ROI

One-time costs:
  - Engineer time: 3 weeks * 2 engineers = 240 hours
  - Engineering cost: ~$30K
  - S3 setup и tools: ~$2K

Ongoing benefits:
  - Cost saving: $4700/month ($8200 -> $3500)
  - Operator productivity: rescaling 15 min -> 60 sec
  - Recovery SLA improved: 22 min -> 30 sec
  - 1 fewer on-call escalation per month
  
ROI: $4700/month savings, breakeven в ~7 months

Когда НЕ мигрировать

Сценарии где RocksDB остаётся лучше:

1. Small state (<50 GB total)
   ForstDB overhead (cache management, async) не оправдан
   RocksDB local быстрее, простой

2. Latency-critical jobs (<1 ms target)
   Async добавляет ~1 ms latency для callback chain
   RocksDB local sync быстрее для hot path

3. Custom logic без re-engineering capacity
   Если custom UDFs — major undertaking
   Без переписывания на V2 ForstDB не помогает

4. Stable simple workloads
   No rescaling, no failover spikes, no DR
   ForstDB benefits не материализуются

5. On-premise без good S3
   ForstDB требует S3-compatible storage с high SLA
   HDFS работает, но менее convenient
TIP

ForstDB и RocksDB могут coexist в одном Flink cluster. Разные jobs могут использовать разные backends. Это позволяет мигрировать постепенно: critical jobs остаются на RocksDB, exploratory/large-state — на ForstDB. Со временем перенос становится easier.

Чтение source

Migration tools:
  flink-state-processor-api/   -- для bootstrap state
  bin/flink                     -- CLI tools, savepoint commands

Configs reference:
  flink-state-backends/flink-statebackend-forst/
    src/main/java/org/apache/flink/state/forst/ForStOptions.java
    -- все options документированы

Production runbooks:
  Apache Flink wiki: ForstDB migration guides
  Alibaba's blog posts on internal migration
  AWS re:Invent talks on Flink + S3 best practices
Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Какой самый важный этап подготовки до production миграции с RocksDB на ForstDB?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5