Learning Platform
Глоссарий Troubleshooting
Урок 21.02 · 25 мин
Продвинутый
SavepointDisaster RecoveryActive-passiveFailoverS3 cross-region

Savepoint-based DR — active-passive паттерн

Active-passive DR через savepoint replication — самый распространённый production-подход для Flink. Идея простая: periodic savepoint в shared object store (S3 с cross-region replication), standby Flink-кластер в DR-регионе всегда готов запустить job. При disaster — restore из последнего реплицированного savepoint, переключение downstream consumers.

В этом уроке разберём механику этого паттерна: как настроить cross-region savepoint replication, как организовать failover процедуру, какие подводные камни ждут на пути к надёжному RTO < 10 минут.

Восстановление из savepoint с rescaling

Архитектура savepoint-based DR

Базовая схема состоит из четырёх компонентов:

  1. Primary Flink cluster в region A — обрабатывает события 24/7.
  2. Shared savepoint storage — S3 bucket с cross-region replication в region B.
  3. Standby Flink cluster в region B — либо запущенный пустым (warm), либо вообще не существующий (cold, создаётся по триггеру).
  4. Triggered savepoint workflow — CronJob (или Flink Operator’s PeriodicSavepoint) триггерит savepoint каждые N минут.
Architecture: savepoint-based DR
Primary cluster (region A)Region A: primary Flink-кластер. Запущен в K8s, обрабатывает Kafka топики, пишет в downstream. CronJob каждые 5 минут вызывает /jobs/:jobid/savepoints через JM REST API.
trigger savepoint
S3 region AS3 bucket flink-savepoints-a с cross-region replication в flink-savepoints-b. Replication через S3 native CRR (Cross-Region Replication) или batch sync через s3 sync в CronJob.
CRR async (1-2 min lag)
S3 region BS3 bucket flink-savepoints-b в region B. Содержит копию всех savepoints с лагом 1-2 минуты (для CRR). Готов для restore в standby cluster.
Standby cluster (region B)Region B: standby Flink-кластер. Запущен в K8s (warm) или не существует (cold). При failover — restore last savepoint из S3 region B.
restore on failover
Kafka (region B)Kafka источник в region B. Может быть либо primary Kafka (если он multi-region), либо реплика через MirrorMaker 2. Offset translation требуется при switch — см. урок 03.

Настройка periodic savepoint и cross-region replication

Базовая настройка периодических savepoint через Flink Kubernetes Operator (2.x):

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: fraud-detection
spec:
  image: flink:2.2.0
  flinkVersion: v2_2
  flinkConfiguration:
    state.savepoints.dir: s3://flink-savepoints-primary/fraud-detection/
    state.checkpoint.dir: s3://flink-checkpoints-primary/fraud-detection/
    execution.checkpointing.interval: 60s
    execution.checkpointing.mode: EXACTLY_ONCE
    s3.endpoint: s3.us-east-1.amazonaws.com
    s3.access-key: AKIA...
    s3.secret-key: ...
  jobManager:
    resource:
      memory: "4096m"
      cpu: 2
  taskManager:
    resource:
      memory: "8192m"
      cpu: 4
  job:
    jarURI: s3://flink-jobs/fraud-detection-2.2.jar
    parallelism: 16
    upgradeMode: savepoint
    savepointTriggerNonce: 0
  podTemplate:
    spec:
      serviceAccountName: flink

Operator с upgradeMode: savepoint создаёт savepoint при любом изменении spec, но не периодически. Для periodic savepoint используем CronJob:

apiVersion: batch/v1
kind: CronJob
metadata:
  name: fraud-detection-savepoint
spec:
  schedule: "*/5 * * * *"  # каждые 5 минут
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: savepoint-trigger
            image: curlimages/curl:8.0
            command:
            - sh
            - -c
            - |
              JM=fraud-detection-rest.flink:8081
              JOB_ID=$(curl -s $JM/jobs | jq -r '.jobs[0].id')
              SAVEPOINT_ID=$(curl -X POST \
                $JM/jobs/$JOB_ID/savepoints \
                -d '{"target-directory":"s3://flink-savepoints-primary/fraud-detection/"}' \
                | jq -r '.["request-id"]')
              # poll until ready
              while true; do
                STATUS=$(curl -s $JM/jobs/$JOB_ID/savepoints/$SAVEPOINT_ID | jq -r '.status.id')
                [ "$STATUS" = "COMPLETED" ] && break
                sleep 5
              done
              echo "Savepoint complete"
          restartPolicy: OnFailure

Cross-region replication настраивается на уровне S3:

# Включить S3 CRR на bucket flink-savepoints-primary
aws s3api put-bucket-replication \
  --bucket flink-savepoints-primary \
  --replication-configuration file://crr-config.json

crr-config.json:

{
  "Role": "arn:aws:iam::123456789012:role/s3-replication-role",
  "Rules": [{
    "ID": "ReplicateToDR",
    "Priority": 1,
    "Status": "Enabled",
    "Filter": {},
    "Destination": {
      "Bucket": "arn:aws:s3:::flink-savepoints-dr",
      "ReplicationTime": {
        "Status": "Enabled",
        "Time": { "Minutes": 15 }
      },
      "Metrics": {
        "Status": "Enabled",
        "EventThreshold": { "Minutes": 15 }
      }
    }
  }]
}

S3 CRR гарантирует replication в течение 15 минут (с RTC — Replication Time Control). Без RTC лаг может достигать часов в худшем случае.

WARNING

Без S3 RTC (Replication Time Control) репликация в worst case может занимать часы. Для DR это неприемлемо. Альтернатива RTC: явно sync через aws s3 sync в той же CronJob, что триггерит savepoint — это даёт детерминированный лаг, но требует maintenance script.


Failover процедура: sequence diagram

Failover — критический момент DR. Любая ошибка здесь увеличивает RTO. Sequence типичной failover процедуры:

Sequence: failover from primary to DR
1. Detect failureTrigger: alerting обнаружил, что primary region недоступен. Это может быть failed health check, network partition, или явный сигнал от cloud provider о исчезновении availability zone.
alert fires
2. Decide failoverOn-call engineer (или automation) принимает решение о failover. Это решение требует человеческого подтверждения в большинстве случаев — false-positive failover хуже, чем 5 минут downtime.
3. Find last savepoint in DRНайти last successful savepoint в S3 region B. Команда: aws s3 ls s3://flink-savepoints-dr/job-id/ | sort | tail -1
savepoint URI
4. Start standby with savepointЗапустить FlinkDeployment в DR-кластере с initialSavepointPath = last savepoint URI. Flink K8s Operator создаст pods и restore from savepoint.
5. Wait for job RUNNINGПодождать пока job в DR-регионе достигнет RUNNING state. Это 2-5 минут на маленький state, до 30+ минут на огромный state (TB-scale). Recovery time = function of state size + RocksDB warmup.
job healthy
6. Switch downstream trafficПереключить downstream consumers на DR-регион. Это может быть DNS switch, load balancer reconfiguration, или ручное переключение endpoint в consumer configs.
7. Update monitoringMark primary как 'failed' в monitoring, чтобы не путать alerts. Дальше начинается investigation primary failure и подготовка failback.
8. Service running in DRFailover complete. Сервис работает из DR-региона. RTO измеряется от шага 1 до шага 6. Дальше работа продолжается в DR пока primary не починят и не сделают failback.

Скрипт failover (упрощённый):

#!/bin/bash
# failover.sh — переключиться с primary на DR

set -euo pipefail

JOB_NAME="fraud-detection"
DR_NAMESPACE="flink-dr"
DR_S3_BUCKET="s3://flink-savepoints-dr"

# 1. Найти last savepoint
LAST_SP=$(aws s3 ls $DR_S3_BUCKET/$JOB_NAME/ --recursive \
  | grep '_metadata' \
  | sort -k1,2 \
  | tail -1 \
  | awk '{print $4}')

if [ -z "$LAST_SP" ]; then
  echo "ERROR: no savepoints found in DR"
  exit 1
fi

SAVEPOINT_URI="${DR_S3_BUCKET}/$(dirname $LAST_SP)"
echo "Using savepoint: $SAVEPOINT_URI"

# 2. Создать FlinkDeployment в DR namespace с этим savepoint
kubectl apply -n $DR_NAMESPACE -f - <<EOF
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: $JOB_NAME
spec:
  image: flink:2.2.0
  flinkVersion: v2_2
  flinkConfiguration:
    state.savepoints.dir: $DR_S3_BUCKET/$JOB_NAME/
    state.checkpoint.dir: s3://flink-checkpoints-dr/$JOB_NAME/
  job:
    jarURI: s3://flink-jobs-dr/$JOB_NAME-2.2.jar
    parallelism: 16
    initialSavepointPath: $SAVEPOINT_URI
    upgradeMode: savepoint
EOF

# 3. Подождать RUNNING
echo "Waiting for job to start..."
kubectl wait --for=condition=Ready \
  -n $DR_NAMESPACE \
  flinkdeployment/$JOB_NAME \
  --timeout=600s

# 4. Notify
echo "Failover complete at $(date -u)"

Подводные камни: что обычно ломается

RocksDB warmup. После restore из savepoint RocksDB начинает с холодного block cache. Первые 5-15 минут throughput может быть в 2-3 раза ниже стационарного. Это надо учитывать при capacity planning DR-кластера: либо provision на 3x от нормальной нагрузки в первые 15 минут (autoscaling может не успеть), либо warmup-стратегия (предзагрузить hot keys из event stream до switch traffic).

Sink-side state. Если у вас Kafka sink с EXACTLY_ONCE — при restore Flink попытается abort незакоммиченные транзакции с тем же transactional.id. Если в DR-регионе используется другой Kafka cluster, transactional.id будет распознан как новый — старые транзакции в old Kafka останутся pending до transaction.timeout.ms. Это значит дубликаты сообщений для consumers, читающих с read_committed. Решение — либо общий Kafka cluster (multi-region Kafka), либо MirrorMaker 2 с offset translation (урок 03).

Watermark holes. Если событие приходит в Kafka в primary region за секунду до disaster, и MirrorMaker 2 ещё не успел его реплицировать, то после failover это событие никогда не будет обработано. Если watermark уже прошёл момент этого события — оно станет late event и попадёт в side output (или будет dropped). Watermark generator после failover должен использовать allowedLateness с запасом на лаг репликации.

Schema Registry. Subject IDs в Confluent Schema Registry — локальные для cluster. Если primary использует subject id=42 для UserEvent, а DR Schema Registry имеет id=43 для того же subject, то после failover Flink не сможет десериализовать сообщения с binary id=42. Решение: либо общий Schema Registry (multi-region), либо auto.register.schemas=false плюс ручная синхронизация subjects.

NOTE

Failback (возврат в primary после восстановления) — не зеркальная процедура failover. В период работы из DR primary полностью отстал по offsets. Простое перезапуск job в primary с last savepoint означает reprocessing всех событий за период DR-работы — это часы или дни данных. Правильный failback — это новый savepoint из DR-кластера, копирование в primary, restart там. Это требует cross-region savepoint copy в обе стороны.


RTO budget breakdown

Типичный RTO для savepoint-based DR разбивается так:

Detection:        1-2 min   (alerting fires, on-call ack)
Decision:         2-5 min   (manual approval of failover)
Find savepoint:   5 sec     (S3 ls + sort)
Start cluster:    30 sec    (kubectl apply, pod scheduling)
Job init:         2-5 min   (download jar, init state, connect Kafka)
Restore state:    1-30 min  (depends on state size and S3 throughput)
Warmup:           5-15 min  (RocksDB cache, throughput stabilizes)
Switch traffic:   30 sec    (DNS / LB)
---
Total RTO:        ~15-60 min для маленького state, до нескольких часов для TB-scale

Чтобы укладываться в RTO < 15 минут, нужно:

  • Warm standby, не cold — Flink-кластер уже запущен в DR с дублирующими pods, чтобы не ждать scheduling.
  • Pre-loaded jar в TaskManager image — не качать с S3 при старте.
  • Параллельный download savepoint — несколько TaskManager одновременно качают разные часть state.
  • Pre-warm RocksDB — если бизнес-критично, дополнительная стратегия (incremental restore из старых checkpoint в standby).

Итоги

Savepoint-based DR — production-ready подход для большинства Flink-инсталляций. Ключевые компоненты: cross-region savepoint replication (через S3 CRR с RTC), warm standby cluster в DR-регионе, automated failover script, runbook’и. Реалистичный RTO 15-30 минут для state до 100GB, RPO определяется частотой savepoint (обычно 5-15 минут).

В следующем уроке разберём Kafka mirroring — без него savepoint-based DR не закроет полный pipeline, потому что после failover нужен источник событий в DR-регионе.

Проверка знанийKnowledge check
После failover в DR-регион Flink job стартует, но throughput в 3 раза ниже стационарного и не растёт даже через 30 минут. RocksDB metrics показывают высокий cache miss rate. State весит 500GB. Какая наиболее вероятная причина и решение?
ОтветAnswer
Симптомы (низкий throughput после restore, высокий cache miss rate) — классический cold RocksDB cache. State 500GB не помещается в block cache (обычно 256MB-2GB), и каждый lookup за state, не попавший в cache, требует чтение SST file с диска или S3. После warmup hot keys оказываются в cache и throughput восстанавливается, но при таком большом state warmup может занимать часы. Решения: больший block cache (для hot working set), pre-warm через replay events в standby, или использовать disaggregated state (Flink 2.0+) с remote SST в S3 для lightweight recovery.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. S3 Cross-Region Replication без RTC (Replication Time Control) даёт какой реальный SLA для replication delay, и почему это критично для DR?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4