Migration 1.20 -> 2.2 + disaggregated state
Финальный урок capstone — реальный сценарий, с которым сталкивается каждая команда, поддерживающая Flink в production: миграция legacy job со старой версии на новую, с попутным переходом на новые возможности (в нашем случае — disaggregated state через ForstDB).
Возьмём конкретный пример: fraud-detection job, написанный на Flink 1.20, runs два года в production, 2TB RocksDB state, checkpoint duration постоянно растёт (12 минут к концу), regular crashes из-за RocksDB compaction storms. Цель миграции — Flink 2.2 + ForstDB для lightweight checkpoints (секунды), fast recovery (минуты вместо часа), снижение TaskManager memory footprint.
Flink 2.x: что нового и breaking changesPre-migration assessment
Перед миграцией — полный assessment текущего состояния. Без него migration превращается в lottery.
Шаг 1: Inventory текущего job.
# Job graph
curl -s http://jm:8081/jobs/$JOB_ID | jq '.plan'
# Текущая parallelism per vertex
curl -s http://jm:8081/jobs/$JOB_ID/vertices | jq '.vertices[] | {id, name, parallelism}'
# State backend config
curl -s http://jm:8081/jobs/$JOB_ID/config | jq '.["execution-config"]'
# Current state size
aws s3 ls s3://flink-checkpoints/$JOB_ID/ --recursive --summarize \
| tail -2
# Checkpoint metrics
curl -s http://jm:8081/jobs/$JOB_ID/checkpoints | \
jq '.history[] | {timestamp, duration, size, type, status}' | head -20
# Recent crashes / restarts
kubectl logs -l app=$JOB_NAME --previous --tail=100 | grep -i error
Шаг 2: Identify breaking changes.
Flink 2.x breaking changes vs 1.20 (release notes):
- Removed Scala API. Если job использует Scala API (
org.apache.flink.streaming.api.scala.*), переписать на Java DataStream API. - Removed legacy SourceFunction/SinkFunction. Migrate to Source V2 / Sink V2 (FLIP-27, FLIP-143).
- TypeSerializer changes. Custom serializers могут потерять backward compatibility — нужны snapshot upgraders.
- Configuration option renames. Many
taskmanager.*configs переименованы в Flink 2.x.
Шаг 3: Test on staging.
Никогда не мигрируйте production без полного rehearsal на staging с production-like data volume.
Step 1: Code migration (1.20 -> 2.2)
Update pom.xml:
<!-- pom.xml -->
<properties>
<flink.version>2.2.0</flink.version>
<java.version>17</java.version> <!-- Flink 2.x требует Java 17+ -->
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- ForstDB state backend -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-forst</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
Migrate legacy Kafka source:
// БЫЛО (Flink 1.20, deprecated):
FlinkKafkaConsumer<Transaction> kafkaSource = new FlinkKafkaConsumer<>(
"transactions",
new TransactionDeserializer(),
kafkaProps
);
DataStream<Transaction> source = env.addSource(kafkaSource);
// СТАЛО (Flink 2.2, FLIP-27):
KafkaSource<Transaction> kafkaSource = KafkaSource.<Transaction>builder()
.setBootstrapServers("kafka:9092")
.setTopics("transactions")
.setGroupId("fraud-detection")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new TransactionDeserializer())
.build();
DataStream<Transaction> source = env.fromSource(
kafkaSource,
WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((t, ts) -> t.timestamp),
"transactions"
);
Migrate legacy sink:
// БЫЛО:
FlinkKafkaProducer<Alert> alertSink = new FlinkKafkaProducer<>(
"fraud-alerts",
new AlertSerializer(),
kafkaProps,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);
processed.addSink(alertSink);
// СТАЛО (Sink V2):
KafkaSink<Alert> alertSink = KafkaSink.<Alert>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.<Alert>builder()
.setTopic("fraud-alerts")
.setValueSerializationSchema(new AlertSerializer())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("fraud-detection-")
.build();
processed.sinkTo(alertSink);
После code migration — unit tests:
mvn test -Dtest=FraudDetectionJobTest
Integration test с MiniCluster:
@Test
public void testEndToEndPipeline() throws Exception {
MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(2)
.build()
);
cluster.before();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ... build job graph identical to production
env.execute("test-job");
// Assert на output
// ...
cluster.after();
}
Step 2: Staging rehearsal с RocksDB
Развернуть migrated job в staging environment с production state. Загрузить production savepoint в staging:
# Copy production savepoint to staging
aws s3 sync \
s3://flink-savepoints-prod/fraud-detection/savepoint-abc/ \
s3://flink-savepoints-staging/fraud-detection/savepoint-abc/
# Deploy migrated job в staging
kubectl apply -f flink-deployment-staging.yaml -n fraud-staging
# spec.job.initialSavepointPath = s3://flink-savepoints-staging/fraud-detection/savepoint-abc
Monitoring в staging:
# Checkpoint duration
histogram_quantile(0.99,
flink_jobmanager_job_lastCheckpointDuration{job="fraud-detection"}
)
# Throughput
rate(flink_taskmanager_job_task_numRecordsIn{job="fraud-detection"}[5m])
# Backpressure
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job="fraud-detection"}
# State size growth rate
deriv(flink_taskmanager_job_task_operator_rocksdb_total_sst_files_size[1h])
Baseline сравнение: производительность migrated job должна быть >= 1.20. Если новая медленнее — debugging.
Не переходите к ForstDB пока не валидировали что Flink 2.2 + RocksDB работает корректно с production savepoint. Сначала зафиксируйте migration к 2.2, потом отдельно migration к ForstDB. Два big change одновременно — рецепт катастрофы.
Step 3: Migration к ForstDB (disaggregated state)
После того как Flink 2.2 + RocksDB работает в staging, можно переходить на ForstDB.
ForstDB config:
spec:
flinkConfiguration:
# State backend
state.backend.type: forst
state.backend.forst.remote-dir: s3://flink-state-fraud/forst/
state.backend.forst.local-dir: /tmp/flink-forst-local
# Disaggregated state requires async state API
table.exec.async-state.enabled: "true"
# ForstDB tuning
state.backend.forst.cache.size: 4gb
state.backend.forst.compression: snappy
# Checkpoint config — lightweight для ForstDB
state.checkpoints.dir: s3://flink-checkpoints-fraud/
execution.checkpointing.interval: 30s # можно чаще с ForstDB
execution.checkpointing.timeout: 5min # обычно секунды
Migration к ForstDB требует savepoint, не checkpoint — state backend change не работает через incremental restore. Process:
# 1. Trigger savepoint на 1.20 + RocksDB
SAVEPOINT_URI=$(trigger-savepoint.sh fraud-detection)
# 2. Stop job, ничего не меняем (savepoint exists)
kubectl delete flinkdeployment fraud-detection -n fraud-team
# 3. Deploy с новой config (2.2 + ForstDB) и initialSavepointPath
kubectl apply -f - <<EOF
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: fraud-detection
namespace: fraud-team
spec:
image: flink:2.2.0
flinkVersion: v2_2
flinkConfiguration:
state.backend.type: forst
state.backend.forst.remote-dir: s3://flink-state-fraud/forst/
table.exec.async-state.enabled: "true"
...
job:
jarURI: s3://flink-jobs/fraud-detection-2.2.jar
parallelism: 16
initialSavepointPath: $SAVEPOINT_URI
upgradeMode: savepoint
EOF
# 4. Monitor restore
# ForstDB рестарт должен быть быстрым — secondary download SST files в background
# Job становится RUNNING почти сразу
Ожидаемые изменения после ForstDB migration:
| Метрика | RocksDB (before) | ForstDB (after) |
|---|---|---|
| Checkpoint duration (p99) | 12 min | 5-30 sec |
| Checkpoint size (incremental) | 50-200MB | 10-50MB (только metadata) |
| Recovery time | 30+ min | 2-5 min |
| TaskManager memory | 16Gb | 8Gb |
| RocksDB compaction issues | Regular | Минимальны |
Step 4: Async state API migration
ForstDB работает асинхронно — все state operations возвращают Future вместо blocking calls. Это требует переписывания stateful logic:
// БЫЛО (RocksDB sync API):
public void processElement(Transaction t, Context ctx, Collector<Alert> out) {
Long lastSeen = lastSeenState.value(); // blocking read
if (lastSeen != null && Math.abs(t.timestamp - lastSeen) < 1000) {
out.collect(new Alert("velocity-fraud", t));
}
lastSeenState.update(t.timestamp); // blocking write
}
// СТАЛО (ForstDB async API):
public void processElement(Transaction t, Context ctx, Collector<Alert> out) {
lastSeenState.asyncValue().thenAccept(lastSeen -> {
if (lastSeen != null && Math.abs(t.timestamp - lastSeen) < 1000) {
out.collect(new Alert("velocity-fraud", t));
}
lastSeenState.asyncUpdate(t.timestamp);
});
}
Альтернатива — keep sync API через ForstDB local cache (state.backend.forst.cache.size), но это дает worse performance. Production-grade ForstDB deployments всегда используют async API для всех hot paths.
Migration complexity: каждый stateful operator нужно переписать. Это main reason почему ForstDB migration занимает 2+ недель — не сам state backend swap, а refactoring всей логики на async.
Если у вас сотни stateful operators и async migration слишком costly, ForstDB можно использовать в hybrid mode: sync API + большой local cache (state.backend.forst.cache.size=16gb). Это даёт checkpoint benefits, но без full latency benefits async API. Это compromise для legacy code.
Step 5: Production cutover с canary
Production cutover — самый рискованный момент. Strategy: shadow region + gradual canary:
# 1. Создать новый deployment в shadow namespace
kubectl apply -n fraud-shadow -f flink-deployment-2.2-forst.yaml
# 2. Shadow читает из same Kafka topic, пишет в shadow output topic
# Compare outputs из production vs shadow:
python compare-outputs.py \
--prod-topic fraud-alerts \
--shadow-topic fraud-alerts-shadow \
--duration 1h
# 3. Если outputs match (или delta acceptable), proceed to canary
# Canary 10% — split Kafka топик через consumer groups:
# Old job consumes 90% (group=fraud-detection)
# New job consumes 10% (group=fraud-detection-canary с manual partition assignment)
# 4. Monitor canary 24 часа. Метрики:
# - error rate
# - alert quality (precision/recall vs old job)
# - latency p99
# - resource usage
# 5. Если canary OK — ramp to 50%, потом 100%
# 6. Decommission old job (Flink 1.20)
kubectl delete flinkdeployment fraud-detection-legacy -n fraud-team
Rollback plan на случай canary failure:
# Quick rollback: switch consumer groups back
# Old job обрабатывает 100%, новый stopped
kubectl scale flinkdeployment fraud-detection-canary -n fraud-team --replicas=0
kubectl scale flinkdeployment fraud-detection-legacy -n fraud-team --replicas=16
# Investigate root cause в shadow environment
Не decommission old job до тех пор, пока new job не работает full traffic минимум 7 дней.
Validation: что измерять после migration
Post-migration checklist (за 2 недели мониторинга):
Performance:
- Throughput >= baseline (1.20)
- Latency p99
<= baseline+ 10% - Checkpoint duration
< 1 min (благодаря ForstDB) - Recovery time
< 5 min (test через chaos engineering)
Reliability:
- Zero data loss vs baseline (compare outputs)
- No new error patterns в logs
- Checkpoint success rate > 99.9%
Resource:
- TaskManager memory дропнул (благодаря disaggregated state)
- S3 cost увеличился (новые ForstDB SST files в remote)
- Total cost (compute + storage)
<= 1.20baseline
Operational:
- Savepoint automation working
- Rollback playbook tested (на staging)
- On-call team trained on 2.2 + ForstDB specifics
Итоги: ключевые learnings
Migration legacy Flink job на новую версию + новый state backend — это 3-4 месяца работы. Главные принципы:
1. Two changes at once — рецепт катастрофы. Сначала migration к Flink 2.2 (на RocksDB), потом отдельная migration к ForstDB. Каждое изменение валидируется независимо.
2. Staging rehearsal с production state — обязательно. Без него вы узнаете о breaking changes в production. Production savepoint должен restoreить-ся в staging без потерь.
3. Async state API — main complexity for ForstDB migration. Все stateful operators нужно переписать. Estimate effort accordingly.
4. Canary + gradual rollout, не big bang. Shadow region для validation, 10% canary 24h, 50% week, 100% после.
5. Rollback plan на каждом этапе. Savepoint backwards-compatible: можно вернуть с 2.2 на 1.20 (через State Processor API), но это сложно. Лучше keep old job running до full validation new.
Capstone модуль завершён. Если вы прошли все 21 модуль курса — вы понимаете Flink internals глубже, чем 99% команд, и готовы строить production-grade streaming platforms. Поздравляю!