Savepoint automation — periodic, retention, DR copy, rollback
Manual savepoint management — главный источник production-инцидентов в Flink. Команда забывает сделать savepoint перед deploy, при rollback оказывается, что последний savepoint двухнедельной давности, retention policy отсутствует и S3 bucket стоит $5000/month. Автоматизация savepoint управления превращает эти проблемы в неактуальные.
В этом уроке построим полный automation pipeline: периодические savepoints через CronJob, retention policy через Lambda, cross-region copy для DR, rollback playbook с одной командой.
CI/CD для Flink: деплой и rollbackАрхитектура savepoint automation
Четыре компонента работают вместе:
Periodic CronJob с error handling
Production-grade CronJob — не просто curl POST. Нужно: error handling, monitoring, alerting на failure, deduplication (не start savepoint если предыдущий ещё running).
apiVersion: batch/v1
kind: CronJob
metadata:
name: fraud-detection-savepoint
namespace: fraud-team
spec:
schedule: "*/5 * * * *" # каждые 5 минут
concurrencyPolicy: Forbid # не start новый job пока предыдущий running
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 5
jobTemplate:
spec:
backoffLimit: 0 # fail fast — не retry если savepoint failed
activeDeadlineSeconds: 600 # 10 min timeout
template:
spec:
serviceAccountName: savepoint-trigger
restartPolicy: Never
containers:
- name: trigger
image: registry.company.com/savepoint-trigger:1.0
env:
- name: JOB_NAME
value: fraud-detection
- name: NAMESPACE
value: fraud-team
- name: SAVEPOINT_DIR
value: s3://flink-savepoints-primary/fraud-detection/
- name: PROMETHEUS_PUSHGATEWAY
value: http://pushgateway.monitoring:9091
command:
- /bin/bash
- -c
- |
set -euo pipefail
# Find JobManager REST endpoint
JM_HOST="${JOB_NAME}-rest.${NAMESPACE}:8081"
# Get current job ID
JOB_ID=$(curl -sf $JM_HOST/jobs | jq -r '.jobs[] | select(.status=="RUNNING") | .id')
if [ -z "$JOB_ID" ]; then
echo "ERROR: No running job found"
# Push failure metric
echo "flink_savepoint_status{job=\"$JOB_NAME\"} 0" | \
curl --data-binary @- $PROMETHEUS_PUSHGATEWAY/metrics/job/savepoint_cron
exit 1
fi
echo "Triggering savepoint for job $JOB_ID..."
SAVEPOINT_REQUEST=$(curl -sf -X POST \
-H "Content-Type: application/json" \
-d "{\"target-directory\":\"$SAVEPOINT_DIR\",\"cancel-job\":false}" \
$JM_HOST/jobs/$JOB_ID/savepoints)
REQUEST_ID=$(echo $SAVEPOINT_REQUEST | jq -r '.["request-id"]')
echo "Savepoint request ID: $REQUEST_ID"
# Poll until COMPLETED or FAILED
START_TIME=$(date +%s)
while true; do
STATUS=$(curl -sf $JM_HOST/jobs/$JOB_ID/savepoints/$REQUEST_ID \
| jq -r '.status.id')
if [ "$STATUS" = "COMPLETED" ]; then
LOCATION=$(curl -sf $JM_HOST/jobs/$JOB_ID/savepoints/$REQUEST_ID \
| jq -r '.operation.location')
DURATION=$(($(date +%s) - START_TIME))
echo "Savepoint COMPLETED at $LOCATION (took ${DURATION}s)"
# Push success metrics
cat <<EOF | curl --data-binary @- \
$PROMETHEUS_PUSHGATEWAY/metrics/job/savepoint_cron
flink_savepoint_status{job="$JOB_NAME"} 1
flink_savepoint_duration_seconds{job="$JOB_NAME"} $DURATION
flink_savepoint_last_success_timestamp{job="$JOB_NAME"} $(date +%s)
EOF
exit 0
fi
if [ "$STATUS" = "FAILED" ]; then
REASON=$(curl -sf $JM_HOST/jobs/$JOB_ID/savepoints/$REQUEST_ID \
| jq -r '.operation.failure-cause.stack-trace')
echo "Savepoint FAILED: $REASON"
echo "flink_savepoint_status{job=\"$JOB_NAME\"} 0" | \
curl --data-binary @- $PROMETHEUS_PUSHGATEWAY/metrics/job/savepoint_cron
exit 1
fi
sleep 5
done
Метрики, которые этот CronJob экспортирует:
flink_savepoint_status{job="..."}— 1 если последний success, 0 если failed.flink_savepoint_duration_seconds— duration последнего savepoint.flink_savepoint_last_success_timestamp— Unix timestamp последнего success.
Алерты:
- alert: SavepointFailed
expr: flink_savepoint_status == 0
for: 1m
labels:
severity: warning
annotations:
summary: "Savepoint failed for {{ $labels.job }}"
- alert: SavepointStale
expr: (time() - flink_savepoint_last_success_timestamp) > 1800
for: 5m
labels:
severity: critical
annotations:
summary: "{{ $labels.job }} has no successful savepoint for > 30 min"
runbook: https://wiki.company.com/runbooks/savepoint-stale
Retention policy через S3 Lifecycle
S3 не нужен Lambda для basic retention — S3 Lifecycle Policy делает это natively:
cat <<EOF > lifecycle.json
{
"Rules": [
{
"Id": "DeleteOldSavepoints",
"Status": "Enabled",
"Filter": {
"Prefix": ""
},
"Expiration": {
"Days": 30
},
"NoncurrentVersionExpiration": {
"NoncurrentDays": 7
}
},
{
"Id": "KeepMilestones",
"Status": "Enabled",
"Filter": {
"Tag": {
"Key": "milestone",
"Value": "true"
}
},
"Expiration": {
"Days": 365
}
}
]
}
EOF
aws s3api put-bucket-lifecycle-configuration \
--bucket flink-savepoints-primary \
--lifecycle-configuration file://lifecycle.json
Это удаляет savepoints старше 30 дней, но keep’ит milestone savepoints (с тегом milestone=true) на год. Milestone savepoints — это специальные savepoints перед major releases, для долгосрочного rollback capability.
Создание milestone savepoint:
#!/bin/bash
# create-milestone-savepoint.sh — для важных моментов (перед major release)
JOB_NAME=$1
DESCRIPTION=$2 # e.g., "before-v2.3-deploy"
# Trigger savepoint как обычно
SAVEPOINT_URI=$(trigger-savepoint.sh $JOB_NAME)
# Добавить tag milestone=true
aws s3api put-object-tagging \
--bucket flink-savepoints-primary \
--key "${SAVEPOINT_URI#s3://flink-savepoints-primary/}" \
--tagging "TagSet=[{Key=milestone,Value=true},{Key=description,Value=$DESCRIPTION}]"
echo "Milestone savepoint created: $SAVEPOINT_URI"
echo "Will be retained for 365 days."
Cross-region copy для DR
S3 Cross-Region Replication (CRR) с RTC (Replication Time Control):
# Включить CRR на primary bucket
aws s3api put-bucket-replication \
--bucket flink-savepoints-primary \
--replication-configuration '{
"Role": "arn:aws:iam::123456789012:role/s3-replication-role",
"Rules": [{
"ID": "ReplicateToDR",
"Priority": 1,
"Status": "Enabled",
"Filter": {},
"DeleteMarkerReplication": {
"Status": "Enabled"
},
"Destination": {
"Bucket": "arn:aws:s3:::flink-savepoints-dr",
"ReplicationTime": {
"Status": "Enabled",
"Time": {
"Minutes": 15
}
},
"Metrics": {
"Status": "Enabled",
"EventThreshold": {
"Minutes": 15
}
}
}
}]
}'
RTC даёт SLA: 99.9% объектов реплицируются за 15 минут. Без RTC лаг может быть часы в worst case.
Monitoring replication lag:
# Lambda checks replication lag
import boto3
s3 = boto3.client("s3", region_name="us-east-1")
s3_dr = boto3.client("s3", region_name="us-west-2")
def check_replication_lag():
# Найти newest savepoint в primary
primary_objects = s3.list_objects_v2(
Bucket="flink-savepoints-primary",
Prefix="fraud-detection/"
)
if not primary_objects.get("Contents"):
return
newest = max(primary_objects["Contents"], key=lambda x: x["LastModified"])
# Проверить наличие в DR
try:
dr_object = s3_dr.head_object(
Bucket="flink-savepoints-dr",
Key=newest["Key"]
)
lag_seconds = (dr_object["LastModified"] - newest["LastModified"]).total_seconds()
# Push to CloudWatch
cloudwatch.put_metric_data(
Namespace="FlinkSavepoints",
MetricData=[{
"MetricName": "ReplicationLagSeconds",
"Value": lag_seconds,
"Unit": "Seconds"
}]
)
except s3_dr.exceptions.NoSuchKey:
# Object ещё не реплицирован
lag_seconds = (datetime.utcnow() - newest["LastModified"]).total_seconds()
cloudwatch.put_metric_data(
Namespace="FlinkSavepoints",
MetricData=[{
"MetricName": "ReplicationLagSeconds",
"Value": lag_seconds,
"Unit": "Seconds"
}]
)
Lambda запускается каждые 5 минут через EventBridge. CloudWatch alarm если ReplicationLagSeconds > 900 (15 min RTC SLA).
Rollback playbook
Rollback — главный use case savepoint. Bad deploy, deadlock, data corruption — нужен возврат на known-good state.
Production rollback script:
#!/bin/bash
# rollback.sh — rollback Flink job to savepoint of certain age
# Usage: ./rollback.sh <job-name> <namespace> <age-spec>
# Examples:
# ./rollback.sh fraud-detection fraud-team 2h
# ./rollback.sh fraud-detection fraud-team 1d
# ./rollback.sh fraud-detection fraud-team latest
set -euo pipefail
JOB_NAME=$1
NAMESPACE=$2
AGE=$3 # e.g., "2h", "1d", "latest"
SAVEPOINT_BUCKET="flink-savepoints-primary"
SAVEPOINT_PREFIX="$JOB_NAME/"
# Confirm action
read -p "Rollback $JOB_NAME to savepoint from $AGE ago? (yes/no) " CONFIRM
if [ "$CONFIRM" != "yes" ]; then
echo "Aborted"
exit 1
fi
# Find target savepoint
echo "Listing savepoints in s3://$SAVEPOINT_BUCKET/$SAVEPOINT_PREFIX..."
if [ "$AGE" = "latest" ]; then
TARGET_SP=$(aws s3 ls s3://$SAVEPOINT_BUCKET/$SAVEPOINT_PREFIX --recursive \
| grep '_metadata' \
| sort -k1,2 \
| tail -1 \
| awk '{print $4}')
else
# Calculate target timestamp
TARGET_TS=$(date -u -d "$AGE ago" +%s)
# Find savepoint closest to target (but not newer)
TARGET_SP=$(aws s3 ls s3://$SAVEPOINT_BUCKET/$SAVEPOINT_PREFIX --recursive \
| grep '_metadata' \
| awk -v target=$TARGET_TS '{
# Parse ISO date and convert to Unix timestamp
gsub(/-/, " ", $1)
cmd = "date -u -d \"" $1 " " $2 "\" +%s"
cmd | getline ts
close(cmd)
diff = target - ts
if (diff > 0) {
print diff, $4
}
}' \
| sort -n \
| head -1 \
| awk '{print $2}')
fi
if [ -z "$TARGET_SP" ]; then
echo "ERROR: No savepoint found"
exit 1
fi
SAVEPOINT_URI="s3://$SAVEPOINT_BUCKET/$(dirname $TARGET_SP)"
echo "Found savepoint: $SAVEPOINT_URI"
# Create milestone savepoint of current state BEFORE rollback
echo "Creating safety savepoint of current state..."
./create-milestone-savepoint.sh $JOB_NAME "before-rollback-$(date +%s)"
# Get current FlinkDeployment spec
CURRENT_SPEC=$(kubectl get flinkdeployment $JOB_NAME -n $NAMESPACE -o yaml)
# Patch with initialSavepointPath
kubectl patch flinkdeployment $JOB_NAME -n $NAMESPACE --type='merge' -p "{
\"spec\": {
\"job\": {
\"initialSavepointPath\": \"$SAVEPOINT_URI\",
\"upgradeMode\": \"savepoint\"
}
}
}"
# Trigger rolling restart
kubectl rollout restart deployment/$JOB_NAME -n $NAMESPACE
# Wait for ready
echo "Waiting for job to restart..."
kubectl wait --for=condition=Ready \
flinkdeployment/$JOB_NAME \
-n $NAMESPACE \
--timeout=600s
echo "Rollback complete at $(date -u)"
echo "Job restarted from savepoint: $SAVEPOINT_URI"
Usage:
# Rollback на savepoint от 2 часов назад
./rollback.sh fraud-detection fraud-team 2h
# Rollback на latest savepoint (для быстрого fix)
./rollback.sh fraud-detection fraud-team latest
# Rollback на savepoint от 3 дней назад (для больших проблем)
./rollback.sh fraud-detection fraud-team 3d
Этот script решает 90% production rollback cases. Через ChatOps интеграцию (Slack bot) on-call может triggerить rollback одной командой.
Rollback двусторонний: после rollback нельзя восстановить интервал между rollback target и текущим состоянием без специальных мер. Перед rollback всегда делайте safety savepoint текущего состояния. Если rollback оказался mistake, можно восстановить с safety savepoint.
Что мониторить
Metrics dashboard для savepoint management:
1. flink_savepoint_status — health checkbox для каждого job
2. flink_savepoint_duration_seconds — растёт со временем = подозрение на state growth
3. flink_savepoint_last_success_timestamp — алерт если > 2x от cron interval
4. ReplicationLagSeconds (CloudWatch) — DR readiness
5. s3_bucket_size_bytes — стоимость storage, retention working
6. количество milestone savepoints — для audit
Итоги
Savepoint automation pipeline состоит из четырёх компонентов: periodic CronJob с error handling и metrics, retention policy через S3 Lifecycle, cross-region copy через S3 CRR с RTC, rollback playbook script с safety savepoint.
Главные принципы: deduplication (concurrencyPolicy: Forbid), explicit failure metrics (Prometheus), milestone savepoints для долгосрочного rollback, safety savepoint перед каждым rollback.
В финальном уроке capstone — миграция legacy Flink 1.20 джоба на 2.2 с переходом на disaggregated state.