Learning Platform
Глоссарий Troubleshooting
Урок 22.05 · 30 мин
Продвинутый
MigrationFlink 2.2ForstDBDisaggregated stateProduction upgrade

Migration 1.20 -> 2.2 + disaggregated state

Финальный урок capstone — реальный сценарий, с которым сталкивается каждая команда, поддерживающая Flink в production: миграция legacy job со старой версии на новую, с попутным переходом на новые возможности (в нашем случае — disaggregated state через ForstDB).

Возьмём конкретный пример: fraud-detection job, написанный на Flink 1.20, runs два года в production, 2TB RocksDB state, checkpoint duration постоянно растёт (12 минут к концу), regular crashes из-за RocksDB compaction storms. Цель миграции — Flink 2.2 + ForstDB для lightweight checkpoints (секунды), fast recovery (минуты вместо часа), снижение TaskManager memory footprint.

Flink 2.x: что нового и breaking changes

Pre-migration assessment

Перед миграцией — полный assessment текущего состояния. Без него migration превращается в lottery.

Шаг 1: Inventory текущего job.

# Job graph
curl -s http://jm:8081/jobs/$JOB_ID | jq '.plan'

# Текущая parallelism per vertex
curl -s http://jm:8081/jobs/$JOB_ID/vertices | jq '.vertices[] | {id, name, parallelism}'

# State backend config
curl -s http://jm:8081/jobs/$JOB_ID/config | jq '.["execution-config"]'

# Current state size
aws s3 ls s3://flink-checkpoints/$JOB_ID/ --recursive --summarize \
  | tail -2

# Checkpoint metrics
curl -s http://jm:8081/jobs/$JOB_ID/checkpoints | \
  jq '.history[] | {timestamp, duration, size, type, status}' | head -20

# Recent crashes / restarts
kubectl logs -l app=$JOB_NAME --previous --tail=100 | grep -i error

Шаг 2: Identify breaking changes.

Flink 2.x breaking changes vs 1.20 (release notes):

  • Removed Scala API. Если job использует Scala API (org.apache.flink.streaming.api.scala.*), переписать на Java DataStream API.
  • Removed legacy SourceFunction/SinkFunction. Migrate to Source V2 / Sink V2 (FLIP-27, FLIP-143).
  • TypeSerializer changes. Custom serializers могут потерять backward compatibility — нужны snapshot upgraders.
  • Configuration option renames. Many taskmanager.* configs переименованы в Flink 2.x.

Шаг 3: Test on staging.

Никогда не мигрируйте production без полного rehearsal на staging с production-like data volume.

Migration plan timeline
Week 1-2: AssessmentWeek 1-2: Assessment. Inventory job, identify breaking changes, set up staging environment. Сразу можно понять scope изменений.
Week 3-6: CodeWeek 3-6: Code migration. Update dependencies, fix breaking changes, migrate Sources/Sinks if нужно. Тесты unit и integration.
Week 7-10: StagingWeek 7-10: Staging rehearsal. Развернуть job в staging, прогнать через production-like data. Измерить state size, checkpoint duration, throughput. Iterate on tuning.
Week 11-12: ForstDBWeek 11-12: Disaggregated state migration. Перевести job с RocksDB на ForstDB. Validate checkpoint size dropped, recovery time improved.
Week 13: CutoverWeek 13: Production cutover. Создать savepoint на 1.20, restore на 2.2 в shadow region. Canary traffic 10%, мониторинг. Постепенно ramp до 100%.
Week 14+: MonitorWeek 14+: Post-migration monitoring. Smoke tests, performance comparison vs 1.20 baseline. Rollback plan готов на случай проблем.

Step 1: Code migration (1.20 -> 2.2)

Update pom.xml:

<!-- pom.xml -->
<properties>
    <flink.version>2.2.0</flink.version>
    <java.version>17</java.version>  <!-- Flink 2.x требует Java 17+ -->
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- ForstDB state backend -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-forst</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

Migrate legacy Kafka source:

// БЫЛО (Flink 1.20, deprecated):
FlinkKafkaConsumer<Transaction> kafkaSource = new FlinkKafkaConsumer<>(
    "transactions",
    new TransactionDeserializer(),
    kafkaProps
);
DataStream<Transaction> source = env.addSource(kafkaSource);

// СТАЛО (Flink 2.2, FLIP-27):
KafkaSource<Transaction> kafkaSource = KafkaSource.<Transaction>builder()
    .setBootstrapServers("kafka:9092")
    .setTopics("transactions")
    .setGroupId("fraud-detection")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new TransactionDeserializer())
    .build();

DataStream<Transaction> source = env.fromSource(
    kafkaSource,
    WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((t, ts) -> t.timestamp),
    "transactions"
);

Migrate legacy sink:

// БЫЛО:
FlinkKafkaProducer<Alert> alertSink = new FlinkKafkaProducer<>(
    "fraud-alerts",
    new AlertSerializer(),
    kafkaProps,
    FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);
processed.addSink(alertSink);

// СТАЛО (Sink V2):
KafkaSink<Alert> alertSink = KafkaSink.<Alert>builder()
    .setBootstrapServers("kafka:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.<Alert>builder()
        .setTopic("fraud-alerts")
        .setValueSerializationSchema(new AlertSerializer())
        .build())
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("fraud-detection-")
    .build();

processed.sinkTo(alertSink);

После code migration — unit tests:

mvn test -Dtest=FraudDetectionJobTest

Integration test с MiniCluster:

@Test
public void testEndToEndPipeline() throws Exception {
    MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(
        new MiniClusterResourceConfiguration.Builder()
            .setNumberSlotsPerTaskManager(2)
            .setNumberTaskManagers(2)
            .build()
    );
    cluster.before();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // ... build job graph identical to production
    env.execute("test-job");

    // Assert на output
    // ...

    cluster.after();
}

Step 2: Staging rehearsal с RocksDB

Развернуть migrated job в staging environment с production state. Загрузить production savepoint в staging:

# Copy production savepoint to staging
aws s3 sync \
  s3://flink-savepoints-prod/fraud-detection/savepoint-abc/ \
  s3://flink-savepoints-staging/fraud-detection/savepoint-abc/

# Deploy migrated job в staging
kubectl apply -f flink-deployment-staging.yaml -n fraud-staging
# spec.job.initialSavepointPath = s3://flink-savepoints-staging/fraud-detection/savepoint-abc

Monitoring в staging:

# Checkpoint duration
histogram_quantile(0.99,
  flink_jobmanager_job_lastCheckpointDuration{job="fraud-detection"}
)

# Throughput
rate(flink_taskmanager_job_task_numRecordsIn{job="fraud-detection"}[5m])

# Backpressure
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job="fraud-detection"}

# State size growth rate
deriv(flink_taskmanager_job_task_operator_rocksdb_total_sst_files_size[1h])

Baseline сравнение: производительность migrated job должна быть >= 1.20. Если новая медленнее — debugging.

WARNING

Не переходите к ForstDB пока не валидировали что Flink 2.2 + RocksDB работает корректно с production savepoint. Сначала зафиксируйте migration к 2.2, потом отдельно migration к ForstDB. Два big change одновременно — рецепт катастрофы.


Step 3: Migration к ForstDB (disaggregated state)

После того как Flink 2.2 + RocksDB работает в staging, можно переходить на ForstDB.

ForstDB config:

spec:
  flinkConfiguration:
    # State backend
    state.backend.type: forst
    state.backend.forst.remote-dir: s3://flink-state-fraud/forst/
    state.backend.forst.local-dir: /tmp/flink-forst-local

    # Disaggregated state requires async state API
    table.exec.async-state.enabled: "true"

    # ForstDB tuning
    state.backend.forst.cache.size: 4gb
    state.backend.forst.compression: snappy

    # Checkpoint config — lightweight для ForstDB
    state.checkpoints.dir: s3://flink-checkpoints-fraud/
    execution.checkpointing.interval: 30s  # можно чаще с ForstDB
    execution.checkpointing.timeout: 5min  # обычно секунды

Migration к ForstDB требует savepoint, не checkpoint — state backend change не работает через incremental restore. Process:

# 1. Trigger savepoint на 1.20 + RocksDB
SAVEPOINT_URI=$(trigger-savepoint.sh fraud-detection)

# 2. Stop job, ничего не меняем (savepoint exists)
kubectl delete flinkdeployment fraud-detection -n fraud-team

# 3. Deploy с новой config (2.2 + ForstDB) и initialSavepointPath
kubectl apply -f - <<EOF
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: fraud-detection
  namespace: fraud-team
spec:
  image: flink:2.2.0
  flinkVersion: v2_2
  flinkConfiguration:
    state.backend.type: forst
    state.backend.forst.remote-dir: s3://flink-state-fraud/forst/
    table.exec.async-state.enabled: "true"
    ...
  job:
    jarURI: s3://flink-jobs/fraud-detection-2.2.jar
    parallelism: 16
    initialSavepointPath: $SAVEPOINT_URI
    upgradeMode: savepoint
EOF

# 4. Monitor restore
# ForstDB рестарт должен быть быстрым — secondary download SST files в background
# Job становится RUNNING почти сразу

Ожидаемые изменения после ForstDB migration:

МетрикаRocksDB (before)ForstDB (after)
Checkpoint duration (p99)12 min5-30 sec
Checkpoint size (incremental)50-200MB10-50MB (только metadata)
Recovery time30+ min2-5 min
TaskManager memory16Gb8Gb
RocksDB compaction issuesRegularМинимальны

Step 4: Async state API migration

ForstDB работает асинхронно — все state operations возвращают Future вместо blocking calls. Это требует переписывания stateful logic:

// БЫЛО (RocksDB sync API):
public void processElement(Transaction t, Context ctx, Collector<Alert> out) {
    Long lastSeen = lastSeenState.value();  // blocking read

    if (lastSeen != null && Math.abs(t.timestamp - lastSeen) < 1000) {
        out.collect(new Alert("velocity-fraud", t));
    }

    lastSeenState.update(t.timestamp);  // blocking write
}

// СТАЛО (ForstDB async API):
public void processElement(Transaction t, Context ctx, Collector<Alert> out) {
    lastSeenState.asyncValue().thenAccept(lastSeen -> {
        if (lastSeen != null && Math.abs(t.timestamp - lastSeen) < 1000) {
            out.collect(new Alert("velocity-fraud", t));
        }

        lastSeenState.asyncUpdate(t.timestamp);
    });
}

Альтернатива — keep sync API через ForstDB local cache (state.backend.forst.cache.size), но это дает worse performance. Production-grade ForstDB deployments всегда используют async API для всех hot paths.

Migration complexity: каждый stateful operator нужно переписать. Это main reason почему ForstDB migration занимает 2+ недель — не сам state backend swap, а refactoring всей логики на async.

TIP

Если у вас сотни stateful operators и async migration слишком costly, ForstDB можно использовать в hybrid mode: sync API + большой local cache (state.backend.forst.cache.size=16gb). Это даёт checkpoint benefits, но без full latency benefits async API. Это compromise для legacy code.


Step 5: Production cutover с canary

Production cutover — самый рискованный момент. Strategy: shadow region + gradual canary:

# 1. Создать новый deployment в shadow namespace
kubectl apply -n fraud-shadow -f flink-deployment-2.2-forst.yaml

# 2. Shadow читает из same Kafka topic, пишет в shadow output topic
# Compare outputs из production vs shadow:
python compare-outputs.py \
  --prod-topic fraud-alerts \
  --shadow-topic fraud-alerts-shadow \
  --duration 1h

# 3. Если outputs match (или delta acceptable), proceed to canary
# Canary 10% — split Kafka топик через consumer groups:
# Old job consumes 90% (group=fraud-detection)
# New job consumes 10% (group=fraud-detection-canary с manual partition assignment)

# 4. Monitor canary 24 часа. Метрики:
# - error rate
# - alert quality (precision/recall vs old job)
# - latency p99
# - resource usage

# 5. Если canary OK — ramp to 50%, потом 100%
# 6. Decommission old job (Flink 1.20)
kubectl delete flinkdeployment fraud-detection-legacy -n fraud-team

Rollback plan на случай canary failure:

# Quick rollback: switch consumer groups back
# Old job обрабатывает 100%, новый stopped

kubectl scale flinkdeployment fraud-detection-canary -n fraud-team --replicas=0
kubectl scale flinkdeployment fraud-detection-legacy -n fraud-team --replicas=16

# Investigate root cause в shadow environment

Не decommission old job до тех пор, пока new job не работает full traffic минимум 7 дней.


Validation: что измерять после migration

Post-migration checklist (за 2 недели мониторинга):

Performance:

  • Throughput >= baseline (1.20)
  • Latency p99 <= baseline + 10%
  • Checkpoint duration < 1 min (благодаря ForstDB)
  • Recovery time < 5 min (test через chaos engineering)

Reliability:

  • Zero data loss vs baseline (compare outputs)
  • No new error patterns в logs
  • Checkpoint success rate > 99.9%

Resource:

  • TaskManager memory дропнул (благодаря disaggregated state)
  • S3 cost увеличился (новые ForstDB SST files в remote)
  • Total cost (compute + storage) <= 1.20 baseline

Operational:

  • Savepoint automation working
  • Rollback playbook tested (на staging)
  • On-call team trained on 2.2 + ForstDB specifics

Итоги: ключевые learnings

Migration legacy Flink job на новую версию + новый state backend — это 3-4 месяца работы. Главные принципы:

1. Two changes at once — рецепт катастрофы. Сначала migration к Flink 2.2 (на RocksDB), потом отдельная migration к ForstDB. Каждое изменение валидируется независимо.

2. Staging rehearsal с production state — обязательно. Без него вы узнаете о breaking changes в production. Production savepoint должен restoreить-ся в staging без потерь.

3. Async state API — main complexity for ForstDB migration. Все stateful operators нужно переписать. Estimate effort accordingly.

4. Canary + gradual rollout, не big bang. Shadow region для validation, 10% canary 24h, 50% week, 100% после.

5. Rollback plan на каждом этапе. Savepoint backwards-compatible: можно вернуть с 2.2 на 1.20 (через State Processor API), но это сложно. Лучше keep old job running до full validation new.

Capstone модуль завершён. Если вы прошли все 21 модуль курса — вы понимаете Flink internals глубже, чем 99% команд, и готовы строить production-grade streaming platforms. Поздравляю!

Проверка знанийKnowledge check
Команда мигрирует production Flink 1.20 job (2TB state, 16 parallelism) на 2.2 + ForstDB. Архитект предлагает скопировать prod savepoint в staging, restore на 2.2 + ForstDB сразу (один шаг), проверить throughput за час, deploy в prod. Что не так с этим планом и какой правильный approach?
ОтветAnswer
Главная проблема в plan — combination of multiple risky changes без сепарации. Migration Flink version и migration state backend — каждое само по себе major undertaking с разными failure modes. Делать оба одновременно означает: если staging показывает проблему, ты не знаешь, это от version change или от state backend change. Debugging requires разделения. Production-grade migration: Phase 1 (4-6 weeks) — migration code к 2.2 API + RocksDB на staging + canary в prod. Phase 2 (4-6 weeks) — migration к ForstDB включая async refactor + staging validation + canary. Total: 3-4 месяца. Это long, но это единственный способ избежать catastrophic prod outages в 2TB state production job. Plus каждый phase должен иметь rollback plan — возможность вернуться к старой версии за менее чем 30 минут.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. Migration legacy Flink 1.20 job на 2.2 + ForstDB. Архитект предлагает doing both в один шаг для эффективности. Какие риски и правильная sequence?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5