Миграция с RocksDB на ForstDB — реальный production flow
Миграция state backend — одна из самых рискованных операций для stateful Flink job. State содержит часы или месяцы аккумулированной информации. Любая ошибка может стоить downtime, data loss, или просто месяца восстановления state с источника.
В этом уроке мы пройдём полный flow миграции с RocksDB на ForstDB на основе реального production case
Восстановление и rescaling из savepoint: streaming аналитика для ecommerce, 800 GB state, 50 TaskManagers, recovery SLA 5 минут — критично для real-time alerting. Включая что пошло не так и как мы fixed.
Pre-migration: оценка
Прежде чем мигрировать, нужно понять — стоит ли:
Чек-лист стоит ли мигрировать:
[ ] State size > 50 GB на TaskManager
[ ] Recovery time проблема (текущий > 5 min)
[ ] Rescaling нужен (autoscaling, capacity changes)
[ ] Кластер sizing определяется state size, не CPU
[ ] Cost оптимизация цель (EBS снизить)
[ ] Multi-region/DR в roadmap
Если 3+ чекмарки: имеет смысл рассмотреть
Если < 2: остаться на RocksDB
Кейс из текста:
State size: 800 GB across 50 TM (16 GB avg per TM)
Recovery time: 22 минуты (S3 download + RocksDB restore)
Rescaling: required для weekly traffic spikes
Cost: $8K/мес на EBS gp3 disks
DR: manual recovery 4-6 hours
Все 6 чекмарков -> migration appropriate
Этап 1: Подготовка (1-2 недели)
1.1 Apgrade Flink до 2.x в dev/staging
- Flink 1.20 (LTS) -> Flink 2.2
- Без изменений config: backend остаётся RocksDB
- Все API compatible
- Validate jobs работают без regressions
1.2 Setup S3 bucket для disaggregated state
- Создать bucket в same region
- IAM permissions (read/write/list/delete на префиксе)
- Lifecycle policies для cleanup obsolete files
- Versioning OFF (ForstDB управляет sам)
- VPC endpoint для cross-AZ traffic avoid
1.3 Setup мониторинга
- Метрики ForstDB: cache hit rate, S3 ops, compaction
- Алерты на cache miss > 50%, S3 errors
- Dashboards для recovery time tracking
1.4 Documentation pipeline
- Compiled plan baseline на RocksDB
- Document текущий state schema
- Backup всех savepoints
В нашем кейсе: 10 дней prep, 0 production changes
Этап 2: Staging migration
2.1 Clone production job в staging cluster
- Read same Kafka topic (consumer group разделен)
- Same SQL/code
- Same parallelism
2.2 Включить ForstDB в staging
state.backend.type: forst
state.backend.forst.local-cache.size: 20gb
state.backend.forst.block-cache.size: 1gb
table.exec.async-state.enabled: true
state.checkpoints.dir: s3://staging-flink-state/checkpoints
2.3 Restore с production savepoint
- savepoint совместим между sync RocksDB и ForstDB
- state schema не меняется
- state encoding compatible (с few exceptions)
- Job starts в minutes
2.4 Validation period (2-4 недели)
- Compare outputs с production (sample)
- Monitor metrics: throughput, latency, state size, cache hit rate
- Test recovery scenarios:
* Kill TaskManager — measure recovery time
* Rescale parallelism — measure downtime
* Network glitch к S3 — observe behavior
- Load tests: peak traffic simulation
В нашем кейсе:
Day 1-7: setup, начальный run, fixes (cache size too small initially)
Day 8-21: stabilization, baseline metrics
Day 22-28: chaos testing
Initial results (cache 20 GB):
Throughput: 60% от production
Cache hit rate: 65%
Recovery time: 45 sec (vs 22 min на RocksDB)
После tuning cache 100 GB:
Throughput: 110% от production
Cache hit rate: 87%
Recovery time: 30 sec
Сюрприз #1 в нашем кейсе: default ForstDB local cache 0 GB. Мы запустили staging и получили 60% throughput — все state reads шли в S3 (~10 ms каждый). Cache hit rate 0% потому что cache disabled. Урок: ВСЕГДА явно set local-cache.size до production.
Этап 3: Operator migration на async state API
3.1 Identify operators которые надо переписать
Для нашего job:
- 3 SQL Aggregate операторы (StreamPhysicalGroupAggregate) — async-ready YES
- 2 Join (StreamPhysicalJoin) — async-ready YES
- 1 Custom UDF с MapState — нужно переписать NO
- 1 MATCH_RECOGNIZE для fraud — НЕ async-ready NO
3.2 Custom UDF migration
Before (V1 sync):
public class EnrichmentFunction extends KeyedProcessFunction<...> {
private MapState<String, UserProfile> profiles;
public void processElement(Event e, Context ctx, Collector<Result> out) throws Exception {
UserProfile p = profiles.get(e.userId); // SYNC
if (p == null) { /* lookup external */ }
// ... business logic
}
}
After (V2 async):
public class EnrichmentFunctionV2 extends KeyedProcessFunctionV2<...> {
private MapState<String, UserProfile> profiles;
public void processElement(Event e, Context ctx, Collector<Result> out) {
profiles.asyncGet(e.userId).thenAccept(p -> {
if (p == null) { /* lookup external */ }
// ... business logic
out.collect(result);
});
}
}
3.3 MATCH_RECOGNIZE — нет async option
Decision matrix:
A) Оставить sync, accept 30% throughput на этом operator
B) Переписать на DataStream API + custom CEP-like logic с V2 API
C) Отделить fraud detection в separate job на RocksDB
Выбрали C: fraud detection — отдельный critical job
Main analytics job -> ForstDB
Fraud job -> RocksDB (smaller state, latency critical)
3.4 SQL changes
Включить async для SQL operators:
SET table.exec.async-state.enabled = true;
For custom UDF: re-register с V2 wrapper
CREATE FUNCTION enrich AS 'com.company.EnrichmentFunctionV2';
Этап 4: Compile plan freeze
4.1 Final SQL код подготовлен, async включен
4.2 COMPILE PLAN на staging:
COMPILE PLAN '/staging-plans/analytics_job_v2.json' FOR
SELECT
window_start, user_id, COUNT(*) AS event_cnt,
enrich(user_id) AS profile, ...
FROM events ...
4.3 Validate plan:
- StreamExecGroupAggregate с async=true
- StreamExecJoin с async=true
- StreamExecCalc с custom function (async wrapper)
- StreamExecSink (KafkaSink с async)
4.4 Commit JSON в git
- Plan freeze для production
- Любой будущий апгрейд Flink -> план тот же
- Защита от plan drift через cost model changes
Этап 5: Production migration (carefully)
5.1 Pre-migration safety net
- Final savepoint на RocksDB version: s3://prod/savepoints/pre-migration
- Stand by RocksDB cluster на ready (для emergency rollback)
- On-call rotation extended
- Communication: stakeholders notified
5.2 Migration window (off-peak hours)
Step 1: Stop production job (savepoint trigger)
bin/flink stop --savepointPath s3://prod/savepoints/migration-checkpoint <job-id>
Wait для completion: ~5 min для 800 GB
Step 2: Update Flink config
state.backend.type: forst
state.backend.forst.local-cache.size: 100gb
state.backend.forst.block-cache.size: 2gb
table.exec.async-state.enabled: true
Step 3: Submit job via compiled plan
bin/flink run \
--fromSavepoint s3://prod/savepoints/migration-checkpoint \
--planFile /opt/flink/plans/analytics_job_v2.json
Step 4: Verify
- Job starts (~2 min)
- Watermark recovery (~5 min)
- Output to Kafka begins
- Metrics в dashboard
В нашем кейсе:
Total downtime: 12 min (target was < 30 min)
No data loss (exactly-once + Kafka offsets)
Throughput recovered в 8 min (cache warm-up)
Этап 6: Post-migration validation
6.1 First 24 hours: intensive monitoring
Metrics watching:
- Cache hit rate: target > 80% (initial 70%, grew to 90% в 2 часа)
- S3 errors: target 0 (had 3 transient retries, OK)
- Latency p99: target < 5x baseline (achieved 1.2x)
- Checkpoint duration: target < 1 min (achieved 30 sec const)
- End-to-end latency: target < baseline (achieved -30%)
6.2 First week: comparison с baseline
Metric Before (RocksDB) After (ForstDB) Delta
State size 800 GB on disks 800 GB в S3 moved
Cluster nodes 50 TM 50 TM (но тоньше) -
Local disk per node 20 GB 5 GB (cache only) -75%
EBS cost/month $8000 $2000 -75%
S3 cost/month $200 (checkpoint) $1500 (all) +650%
Total cost $8200 $3500 -57%
Recovery time 22 min 30 sec -98%
Checkpoint duration 2 min (growing) 30 sec const -75%
Rescaling downtime 15 min 60 sec -93%
Throughput 50K events/sec 55K events/sec +10%
P99 latency 45 ms 25 ms -44%
6.3 First month: edge cases
- 2x rescaling events (autoscaling)
Before: requires planned downtime
After: автоматически, no impact
- 1 TaskManager pod evict (K8s rolling update)
Before: 22 min downtime
After: 30 sec recovery, transparent
- S3 transient outage (1 minute)
Before: would not affect
After: job continued, requests retried, no data loss
6.4 Stabilization
После 30 дней — declare migration successful
Decommission emergency RocksDB cluster
Archive RocksDB savepoints в Glacier
Update runbooks
Что пошло не так — production stories
Story #1: Initial cache too small
Day 1 staging: throughput 60%, cache hit rate 65%
Bug: assumed default cache size was reasonable
Reality: default 0 (disabled)
Fix: explicitly set 100GB
Impact: no production impact (caught в staging)
Story #2: Hot key пара (after migration)
Day 7 production: 5% throughput drop, latency spike
Investigation: один user_id с 50K events/min
Cache evicting other keys для этого hot key
Fix: SplitAggregateRule for COUNT DISTINCT, salt hot keys
Time to fix: 1 day
Story #3: S3 PUT throttling
Day 14: S3 errors начали появляться в logs
Investigation: ~5000 PUT/sec пиковая нагрузка на prefix
S3 limit: ~3500 PUT/sec per prefix
Fix: sharding ForstDB files через prefix patterns
state.backend.forst.s3.prefix-shards: 8
Time to fix: half day, restart job
Story #4: Late-binding watermark issue с async
Day 21: watermark advancing slower than expected
Investigation: async state requests delaying watermark emit
Что happens: watermark waits для all in-flight requests complete per key
Fix: tune buffer-timeout lower (100ms -> 30ms)
More frequent batches, lower watermark latency
Impact: minor throughput drop (-3%), better SLA compliance
Story #5: Custom UDF leak
Day 25: heap usage growing
Investigation: custom UDF allocated CompletableFuture instead of StateFuture
Не возвращалось в Flink object pool, memory leak
Fix: refactor для использования StateFutureUtils
Time to fix: 2 days (code change, test, deploy)
Rollback plan (то, что не понадобилось но был)
Если миграция fails — план B:
Trigger conditions:
- Throughput < 50% baseline продолжительно
- Data loss или corruption
- Critical bug в async runtime
- S3 cost > 3x estimate
Rollback procedure:
1. Stop new job (без savepoint, чтобы не повредить state)
2. Restart старый jar/config:
bin/flink run \
--fromSavepoint s3://prod/savepoints/pre-migration \
<old jar>
3. Verify metrics restored
4. Post-mortem analysis
Время rollback: ~10 min downtime
Data loss: from last RocksDB savepoint (несколько minutes processing)
Cost: minor — outputs могут дублироваться, нужен idempotent sink
Lessons learned
1. Cache sizing — most critical decision
- Default 0 — gotcha
- Start 30% от working set, tune вверх
- Monitor cache hit rate, target > 80%
2. Plan freeze (COMPILE PLAN) — must-have
- Без него future Flink upgrades могут change plan
- JSON в git = audit trail + reproducibility
3. Test recovery scenarios extensively
- Killing TaskManager в staging обязательно
- Verify recovery time matches target SLA
- Multiple scenarios: pod evict, node failure, AZ outage
4. Custom UDFs — bottleneck
- Если есть много custom logic с state -> большая работа переписать
- Wrappers OK для legacy, но true async лучше long-term
- Plan for 1-2 weeks per complex operator
5. Cost mix changes — communicate
- EBS cost down
- S3 cost up (PUT/GET/storage)
- Net cost обычно вниз, но финансы должен знать
- Lifecycle policies критичны для S3 storage cost
6. Monitor cache thrashing
- Hot keys evict cold keys
- Может deteriorate over time
- Plan for re-tuning quarterly
7. Async errors silent by default
- StateFuture.thenAccept не propagates exception
- Wrap в exceptionally() для logging
- Critical: enable async-state.error-handler
Migration timeline summary
Total timeline: 8 weeks
Week 1-2: Preparation (Flink upgrade, S3 setup, monitoring)
Week 3-4: Staging migration (clone job, validate)
Week 5: Operator migration (async UDF rewrite)
Week 6: Compile plan, final testing
Week 7: Production migration (carefully)
Week 8+: Validation, stabilization, decommission RocksDB
Active engineer time: ~3 weeks total
Calendar time: 8 weeks (buffer for testing, fixes)
ROI
One-time costs:
- Engineer time: 3 weeks * 2 engineers = 240 hours
- Engineering cost: ~$30K
- S3 setup и tools: ~$2K
Ongoing benefits:
- Cost saving: $4700/month ($8200 -> $3500)
- Operator productivity: rescaling 15 min -> 60 sec
- Recovery SLA improved: 22 min -> 30 sec
- 1 fewer on-call escalation per month
ROI: $4700/month savings, breakeven в ~7 months
Когда НЕ мигрировать
Сценарии где RocksDB остаётся лучше:
1. Small state (<50 GB total)
ForstDB overhead (cache management, async) не оправдан
RocksDB local быстрее, простой
2. Latency-critical jobs (<1 ms target)
Async добавляет ~1 ms latency для callback chain
RocksDB local sync быстрее для hot path
3. Custom logic без re-engineering capacity
Если custom UDFs — major undertaking
Без переписывания на V2 ForstDB не помогает
4. Stable simple workloads
No rescaling, no failover spikes, no DR
ForstDB benefits не материализуются
5. On-premise без good S3
ForstDB требует S3-compatible storage с high SLA
HDFS работает, но менее convenient
ForstDB и RocksDB могут coexist в одном Flink cluster. Разные jobs могут использовать разные backends. Это позволяет мигрировать постепенно: critical jobs остаются на RocksDB, exploratory/large-state — на ForstDB. Со временем перенос становится easier.
Чтение source
Migration tools:
flink-state-processor-api/ -- для bootstrap state
bin/flink -- CLI tools, savepoint commands
Configs reference:
flink-state-backends/flink-statebackend-forst/
src/main/java/org/apache/flink/state/forst/ForStOptions.java
-- все options документированы
Production runbooks:
Apache Flink wiki: ForstDB migration guides
Alibaba's blog posts on internal migration
AWS re:Invent talks on Flink + S3 best practices