Savepoint-based DR — active-passive паттерн
Active-passive DR через savepoint replication — самый распространённый production-подход для Flink. Идея простая: periodic savepoint в shared object store (S3 с cross-region replication), standby Flink-кластер в DR-регионе всегда готов запустить job. При disaster — restore из последнего реплицированного savepoint, переключение downstream consumers.
В этом уроке разберём механику этого паттерна: как настроить cross-region savepoint replication, как организовать failover процедуру, какие подводные камни ждут на пути к надёжному RTO < 10 минут.
Архитектура savepoint-based DR
Базовая схема состоит из четырёх компонентов:
- Primary Flink cluster в region A — обрабатывает события 24/7.
- Shared savepoint storage — S3 bucket с cross-region replication в region B.
- Standby Flink cluster в region B — либо запущенный пустым (warm), либо вообще не существующий (cold, создаётся по триггеру).
- Triggered savepoint workflow — CronJob (или Flink Operator’s PeriodicSavepoint) триггерит savepoint каждые N минут.
Настройка periodic savepoint и cross-region replication
Базовая настройка периодических savepoint через Flink Kubernetes Operator (2.x):
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: fraud-detection
spec:
image: flink:2.2.0
flinkVersion: v2_2
flinkConfiguration:
state.savepoints.dir: s3://flink-savepoints-primary/fraud-detection/
state.checkpoint.dir: s3://flink-checkpoints-primary/fraud-detection/
execution.checkpointing.interval: 60s
execution.checkpointing.mode: EXACTLY_ONCE
s3.endpoint: s3.us-east-1.amazonaws.com
s3.access-key: AKIA...
s3.secret-key: ...
jobManager:
resource:
memory: "4096m"
cpu: 2
taskManager:
resource:
memory: "8192m"
cpu: 4
job:
jarURI: s3://flink-jobs/fraud-detection-2.2.jar
parallelism: 16
upgradeMode: savepoint
savepointTriggerNonce: 0
podTemplate:
spec:
serviceAccountName: flink
Operator с upgradeMode: savepoint создаёт savepoint при любом изменении spec, но не периодически. Для periodic savepoint используем CronJob:
apiVersion: batch/v1
kind: CronJob
metadata:
name: fraud-detection-savepoint
spec:
schedule: "*/5 * * * *" # каждые 5 минут
jobTemplate:
spec:
template:
spec:
containers:
- name: savepoint-trigger
image: curlimages/curl:8.0
command:
- sh
- -c
- |
JM=fraud-detection-rest.flink:8081
JOB_ID=$(curl -s $JM/jobs | jq -r '.jobs[0].id')
SAVEPOINT_ID=$(curl -X POST \
$JM/jobs/$JOB_ID/savepoints \
-d '{"target-directory":"s3://flink-savepoints-primary/fraud-detection/"}' \
| jq -r '.["request-id"]')
# poll until ready
while true; do
STATUS=$(curl -s $JM/jobs/$JOB_ID/savepoints/$SAVEPOINT_ID | jq -r '.status.id')
[ "$STATUS" = "COMPLETED" ] && break
sleep 5
done
echo "Savepoint complete"
restartPolicy: OnFailure
Cross-region replication настраивается на уровне S3:
# Включить S3 CRR на bucket flink-savepoints-primary
aws s3api put-bucket-replication \
--bucket flink-savepoints-primary \
--replication-configuration file://crr-config.json
crr-config.json:
{
"Role": "arn:aws:iam::123456789012:role/s3-replication-role",
"Rules": [{
"ID": "ReplicateToDR",
"Priority": 1,
"Status": "Enabled",
"Filter": {},
"Destination": {
"Bucket": "arn:aws:s3:::flink-savepoints-dr",
"ReplicationTime": {
"Status": "Enabled",
"Time": { "Minutes": 15 }
},
"Metrics": {
"Status": "Enabled",
"EventThreshold": { "Minutes": 15 }
}
}
}]
}
S3 CRR гарантирует replication в течение 15 минут (с RTC — Replication Time Control). Без RTC лаг может достигать часов в худшем случае.
Без S3 RTC (Replication Time Control) репликация в worst case может занимать часы. Для DR это неприемлемо. Альтернатива RTC: явно sync через aws s3 sync в той же CronJob, что триггерит savepoint — это даёт детерминированный лаг, но требует maintenance script.
Failover процедура: sequence diagram
Failover — критический момент DR. Любая ошибка здесь увеличивает RTO. Sequence типичной failover процедуры:
Скрипт failover (упрощённый):
#!/bin/bash
# failover.sh — переключиться с primary на DR
set -euo pipefail
JOB_NAME="fraud-detection"
DR_NAMESPACE="flink-dr"
DR_S3_BUCKET="s3://flink-savepoints-dr"
# 1. Найти last savepoint
LAST_SP=$(aws s3 ls $DR_S3_BUCKET/$JOB_NAME/ --recursive \
| grep '_metadata' \
| sort -k1,2 \
| tail -1 \
| awk '{print $4}')
if [ -z "$LAST_SP" ]; then
echo "ERROR: no savepoints found in DR"
exit 1
fi
SAVEPOINT_URI="${DR_S3_BUCKET}/$(dirname $LAST_SP)"
echo "Using savepoint: $SAVEPOINT_URI"
# 2. Создать FlinkDeployment в DR namespace с этим savepoint
kubectl apply -n $DR_NAMESPACE -f - <<EOF
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: $JOB_NAME
spec:
image: flink:2.2.0
flinkVersion: v2_2
flinkConfiguration:
state.savepoints.dir: $DR_S3_BUCKET/$JOB_NAME/
state.checkpoint.dir: s3://flink-checkpoints-dr/$JOB_NAME/
job:
jarURI: s3://flink-jobs-dr/$JOB_NAME-2.2.jar
parallelism: 16
initialSavepointPath: $SAVEPOINT_URI
upgradeMode: savepoint
EOF
# 3. Подождать RUNNING
echo "Waiting for job to start..."
kubectl wait --for=condition=Ready \
-n $DR_NAMESPACE \
flinkdeployment/$JOB_NAME \
--timeout=600s
# 4. Notify
echo "Failover complete at $(date -u)"
Подводные камни: что обычно ломается
RocksDB warmup. После restore из savepoint RocksDB начинает с холодного block cache. Первые 5-15 минут throughput может быть в 2-3 раза ниже стационарного. Это надо учитывать при capacity planning DR-кластера: либо provision на 3x от нормальной нагрузки в первые 15 минут (autoscaling может не успеть), либо warmup-стратегия (предзагрузить hot keys из event stream до switch traffic).
Sink-side state. Если у вас Kafka sink с EXACTLY_ONCE — при restore Flink попытается abort незакоммиченные транзакции с тем же transactional.id. Если в DR-регионе используется другой Kafka cluster, transactional.id будет распознан как новый — старые транзакции в old Kafka останутся pending до transaction.timeout.ms. Это значит дубликаты сообщений для consumers, читающих с read_committed. Решение — либо общий Kafka cluster (multi-region Kafka), либо MirrorMaker 2 с offset translation (урок 03).
Watermark holes. Если событие приходит в Kafka в primary region за секунду до disaster, и MirrorMaker 2 ещё не успел его реплицировать, то после failover это событие никогда не будет обработано. Если watermark уже прошёл момент этого события — оно станет late event и попадёт в side output (или будет dropped). Watermark generator после failover должен использовать allowedLateness с запасом на лаг репликации.
Schema Registry. Subject IDs в Confluent Schema Registry — локальные для cluster. Если primary использует subject id=42 для UserEvent, а DR Schema Registry имеет id=43 для того же subject, то после failover Flink не сможет десериализовать сообщения с binary id=42. Решение: либо общий Schema Registry (multi-region), либо auto.register.schemas=false плюс ручная синхронизация subjects.
Failback (возврат в primary после восстановления) — не зеркальная процедура failover. В период работы из DR primary полностью отстал по offsets. Простое перезапуск job в primary с last savepoint означает reprocessing всех событий за период DR-работы — это часы или дни данных. Правильный failback — это новый savepoint из DR-кластера, копирование в primary, restart там. Это требует cross-region savepoint copy в обе стороны.
RTO budget breakdown
Типичный RTO для savepoint-based DR разбивается так:
Detection: 1-2 min (alerting fires, on-call ack)
Decision: 2-5 min (manual approval of failover)
Find savepoint: 5 sec (S3 ls + sort)
Start cluster: 30 sec (kubectl apply, pod scheduling)
Job init: 2-5 min (download jar, init state, connect Kafka)
Restore state: 1-30 min (depends on state size and S3 throughput)
Warmup: 5-15 min (RocksDB cache, throughput stabilizes)
Switch traffic: 30 sec (DNS / LB)
---
Total RTO: ~15-60 min для маленького state, до нескольких часов для TB-scale
Чтобы укладываться в RTO < 15 минут, нужно:
- Warm standby, не cold — Flink-кластер уже запущен в DR с дублирующими pods, чтобы не ждать scheduling.
- Pre-loaded jar в TaskManager image — не качать с S3 при старте.
- Параллельный download savepoint — несколько TaskManager одновременно качают разные часть state.
- Pre-warm RocksDB — если бизнес-критично, дополнительная стратегия (incremental restore из старых checkpoint в standby).
Итоги
Savepoint-based DR — production-ready подход для большинства Flink-инсталляций. Ключевые компоненты: cross-region savepoint replication (через S3 CRR с RTC), warm standby cluster в DR-регионе, automated failover script, runbook’и. Реалистичный RTO 15-30 минут для state до 100GB, RPO определяется частотой savepoint (обычно 5-15 минут).
В следующем уроке разберём Kafka mirroring — без него savepoint-based DR не закроет полный pipeline, потому что после failover нужен источник событий в DR-регионе.