Learning Platform
Глоссарий Troubleshooting
Урок 22.04 · 25 мин
Продвинутый
SavepointAutomationRetentionRollbackDRCronJob

Savepoint automation — periodic, retention, DR copy, rollback

Manual savepoint management — главный источник production-инцидентов в Flink. Команда забывает сделать savepoint перед deploy, при rollback оказывается, что последний savepoint двухнедельной давности, retention policy отсутствует и S3 bucket стоит $5000/month. Автоматизация savepoint управления превращает эти проблемы в неактуальные.

В этом уроке построим полный automation pipeline: периодические savepoints через CronJob, retention policy через Lambda, cross-region copy для DR, rollback playbook с одной командой.

CI/CD для Flink: деплой и rollback

Архитектура savepoint automation

Четыре компонента работают вместе:

Savepoint automation architecture
1. Periodic CronJobCronJob savepoint-trigger запускается каждые 5 минут. Делает POST /jobs/:id/savepoints на JobManager REST. Ждёт COMPLETED status. Logs success/failure в Prometheus.
trigger savepoint
2. Flink JM creates savepointFlink JobManager делает savepoint. Запись в S3 primary bucket по пути s3://flink-savepoints/job-name/savepoint-{id}/.
3. Primary S3 bucketS3 primary bucket flink-savepoints-primary. Содержит все savepoints с retention 30 дней (через S3 lifecycle policy). Cross-region replication enabled.
CRR async
4. DR S3 bucketDR S3 bucket flink-savepoints-dr в другом регионе. Cross-Region Replication с RTC (15 min SLA). Полная копия для DR.
5. Retention LambdaLambda retention-policy запускается раз в день через EventBridge. Удаляет savepoints старше 30 дней (но оставляет milestone savepoints для важных moments).
6. Rollback playbookRollback script (kept в Git). Принимает arg --age=2h. Находит savepoint от ~2 часов назад, патчит FlinkDeployment с initialSavepointPath. Operator перезапускает job.

Periodic CronJob с error handling

Production-grade CronJob — не просто curl POST. Нужно: error handling, monitoring, alerting на failure, deduplication (не start savepoint если предыдущий ещё running).

apiVersion: batch/v1
kind: CronJob
metadata:
  name: fraud-detection-savepoint
  namespace: fraud-team
spec:
  schedule: "*/5 * * * *"  # каждые 5 минут
  concurrencyPolicy: Forbid  # не start новый job пока предыдущий running
  successfulJobsHistoryLimit: 3
  failedJobsHistoryLimit: 5
  jobTemplate:
    spec:
      backoffLimit: 0  # fail fast — не retry если savepoint failed
      activeDeadlineSeconds: 600  # 10 min timeout
      template:
        spec:
          serviceAccountName: savepoint-trigger
          restartPolicy: Never
          containers:
          - name: trigger
            image: registry.company.com/savepoint-trigger:1.0
            env:
            - name: JOB_NAME
              value: fraud-detection
            - name: NAMESPACE
              value: fraud-team
            - name: SAVEPOINT_DIR
              value: s3://flink-savepoints-primary/fraud-detection/
            - name: PROMETHEUS_PUSHGATEWAY
              value: http://pushgateway.monitoring:9091
            command:
            - /bin/bash
            - -c
            - |
              set -euo pipefail

              # Find JobManager REST endpoint
              JM_HOST="${JOB_NAME}-rest.${NAMESPACE}:8081"

              # Get current job ID
              JOB_ID=$(curl -sf $JM_HOST/jobs | jq -r '.jobs[] | select(.status=="RUNNING") | .id')

              if [ -z "$JOB_ID" ]; then
                echo "ERROR: No running job found"
                # Push failure metric
                echo "flink_savepoint_status{job=\"$JOB_NAME\"} 0" | \
                  curl --data-binary @- $PROMETHEUS_PUSHGATEWAY/metrics/job/savepoint_cron
                exit 1
              fi

              echo "Triggering savepoint for job $JOB_ID..."
              SAVEPOINT_REQUEST=$(curl -sf -X POST \
                -H "Content-Type: application/json" \
                -d "{\"target-directory\":\"$SAVEPOINT_DIR\",\"cancel-job\":false}" \
                $JM_HOST/jobs/$JOB_ID/savepoints)

              REQUEST_ID=$(echo $SAVEPOINT_REQUEST | jq -r '.["request-id"]')
              echo "Savepoint request ID: $REQUEST_ID"

              # Poll until COMPLETED or FAILED
              START_TIME=$(date +%s)
              while true; do
                STATUS=$(curl -sf $JM_HOST/jobs/$JOB_ID/savepoints/$REQUEST_ID \
                  | jq -r '.status.id')

                if [ "$STATUS" = "COMPLETED" ]; then
                  LOCATION=$(curl -sf $JM_HOST/jobs/$JOB_ID/savepoints/$REQUEST_ID \
                    | jq -r '.operation.location')
                  DURATION=$(($(date +%s) - START_TIME))

                  echo "Savepoint COMPLETED at $LOCATION (took ${DURATION}s)"

                  # Push success metrics
                  cat <<EOF | curl --data-binary @- \
                    $PROMETHEUS_PUSHGATEWAY/metrics/job/savepoint_cron
              flink_savepoint_status{job="$JOB_NAME"} 1
              flink_savepoint_duration_seconds{job="$JOB_NAME"} $DURATION
              flink_savepoint_last_success_timestamp{job="$JOB_NAME"} $(date +%s)
              EOF
                  exit 0
                fi

                if [ "$STATUS" = "FAILED" ]; then
                  REASON=$(curl -sf $JM_HOST/jobs/$JOB_ID/savepoints/$REQUEST_ID \
                    | jq -r '.operation.failure-cause.stack-trace')
                  echo "Savepoint FAILED: $REASON"

                  echo "flink_savepoint_status{job=\"$JOB_NAME\"} 0" | \
                    curl --data-binary @- $PROMETHEUS_PUSHGATEWAY/metrics/job/savepoint_cron
                  exit 1
                fi

                sleep 5
              done

Метрики, которые этот CronJob экспортирует:

  • flink_savepoint_status{job="..."} — 1 если последний success, 0 если failed.
  • flink_savepoint_duration_seconds — duration последнего savepoint.
  • flink_savepoint_last_success_timestamp — Unix timestamp последнего success.

Алерты:

- alert: SavepointFailed
  expr: flink_savepoint_status == 0
  for: 1m
  labels:
    severity: warning
  annotations:
    summary: "Savepoint failed for {{ $labels.job }}"

- alert: SavepointStale
  expr: (time() - flink_savepoint_last_success_timestamp) > 1800
  for: 5m
  labels:
    severity: critical
  annotations:
    summary: "{{ $labels.job }} has no successful savepoint for > 30 min"
    runbook: https://wiki.company.com/runbooks/savepoint-stale

Retention policy через S3 Lifecycle

S3 не нужен Lambda для basic retention — S3 Lifecycle Policy делает это natively:

cat <<EOF > lifecycle.json
{
  "Rules": [
    {
      "Id": "DeleteOldSavepoints",
      "Status": "Enabled",
      "Filter": {
        "Prefix": ""
      },
      "Expiration": {
        "Days": 30
      },
      "NoncurrentVersionExpiration": {
        "NoncurrentDays": 7
      }
    },
    {
      "Id": "KeepMilestones",
      "Status": "Enabled",
      "Filter": {
        "Tag": {
          "Key": "milestone",
          "Value": "true"
        }
      },
      "Expiration": {
        "Days": 365
      }
    }
  ]
}
EOF

aws s3api put-bucket-lifecycle-configuration \
  --bucket flink-savepoints-primary \
  --lifecycle-configuration file://lifecycle.json

Это удаляет savepoints старше 30 дней, но keep’ит milestone savepoints (с тегом milestone=true) на год. Milestone savepoints — это специальные savepoints перед major releases, для долгосрочного rollback capability.

Создание milestone savepoint:

#!/bin/bash
# create-milestone-savepoint.sh — для важных моментов (перед major release)

JOB_NAME=$1
DESCRIPTION=$2  # e.g., "before-v2.3-deploy"

# Trigger savepoint как обычно
SAVEPOINT_URI=$(trigger-savepoint.sh $JOB_NAME)

# Добавить tag milestone=true
aws s3api put-object-tagging \
  --bucket flink-savepoints-primary \
  --key "${SAVEPOINT_URI#s3://flink-savepoints-primary/}" \
  --tagging "TagSet=[{Key=milestone,Value=true},{Key=description,Value=$DESCRIPTION}]"

echo "Milestone savepoint created: $SAVEPOINT_URI"
echo "Will be retained for 365 days."

Cross-region copy для DR

S3 Cross-Region Replication (CRR) с RTC (Replication Time Control):

# Включить CRR на primary bucket
aws s3api put-bucket-replication \
  --bucket flink-savepoints-primary \
  --replication-configuration '{
    "Role": "arn:aws:iam::123456789012:role/s3-replication-role",
    "Rules": [{
      "ID": "ReplicateToDR",
      "Priority": 1,
      "Status": "Enabled",
      "Filter": {},
      "DeleteMarkerReplication": {
        "Status": "Enabled"
      },
      "Destination": {
        "Bucket": "arn:aws:s3:::flink-savepoints-dr",
        "ReplicationTime": {
          "Status": "Enabled",
          "Time": {
            "Minutes": 15
          }
        },
        "Metrics": {
          "Status": "Enabled",
          "EventThreshold": {
            "Minutes": 15
          }
        }
      }
    }]
  }'

RTC даёт SLA: 99.9% объектов реплицируются за 15 минут. Без RTC лаг может быть часы в worst case.

Monitoring replication lag:

# Lambda checks replication lag
import boto3

s3 = boto3.client("s3", region_name="us-east-1")
s3_dr = boto3.client("s3", region_name="us-west-2")

def check_replication_lag():
    # Найти newest savepoint в primary
    primary_objects = s3.list_objects_v2(
        Bucket="flink-savepoints-primary",
        Prefix="fraud-detection/"
    )
    if not primary_objects.get("Contents"):
        return
    newest = max(primary_objects["Contents"], key=lambda x: x["LastModified"])

    # Проверить наличие в DR
    try:
        dr_object = s3_dr.head_object(
            Bucket="flink-savepoints-dr",
            Key=newest["Key"]
        )
        lag_seconds = (dr_object["LastModified"] - newest["LastModified"]).total_seconds()
        # Push to CloudWatch
        cloudwatch.put_metric_data(
            Namespace="FlinkSavepoints",
            MetricData=[{
                "MetricName": "ReplicationLagSeconds",
                "Value": lag_seconds,
                "Unit": "Seconds"
            }]
        )
    except s3_dr.exceptions.NoSuchKey:
        # Object ещё не реплицирован
        lag_seconds = (datetime.utcnow() - newest["LastModified"]).total_seconds()
        cloudwatch.put_metric_data(
            Namespace="FlinkSavepoints",
            MetricData=[{
                "MetricName": "ReplicationLagSeconds",
                "Value": lag_seconds,
                "Unit": "Seconds"
            }]
        )

Lambda запускается каждые 5 минут через EventBridge. CloudWatch alarm если ReplicationLagSeconds > 900 (15 min RTC SLA).


Rollback playbook

Rollback — главный use case savepoint. Bad deploy, deadlock, data corruption — нужен возврат на known-good state.

Production rollback script:

#!/bin/bash
# rollback.sh — rollback Flink job to savepoint of certain age
# Usage: ./rollback.sh <job-name> <namespace> <age-spec>
# Examples:
#   ./rollback.sh fraud-detection fraud-team 2h
#   ./rollback.sh fraud-detection fraud-team 1d
#   ./rollback.sh fraud-detection fraud-team latest

set -euo pipefail

JOB_NAME=$1
NAMESPACE=$2
AGE=$3  # e.g., "2h", "1d", "latest"

SAVEPOINT_BUCKET="flink-savepoints-primary"
SAVEPOINT_PREFIX="$JOB_NAME/"

# Confirm action
read -p "Rollback $JOB_NAME to savepoint from $AGE ago? (yes/no) " CONFIRM
if [ "$CONFIRM" != "yes" ]; then
  echo "Aborted"
  exit 1
fi

# Find target savepoint
echo "Listing savepoints in s3://$SAVEPOINT_BUCKET/$SAVEPOINT_PREFIX..."

if [ "$AGE" = "latest" ]; then
  TARGET_SP=$(aws s3 ls s3://$SAVEPOINT_BUCKET/$SAVEPOINT_PREFIX --recursive \
    | grep '_metadata' \
    | sort -k1,2 \
    | tail -1 \
    | awk '{print $4}')
else
  # Calculate target timestamp
  TARGET_TS=$(date -u -d "$AGE ago" +%s)

  # Find savepoint closest to target (but not newer)
  TARGET_SP=$(aws s3 ls s3://$SAVEPOINT_BUCKET/$SAVEPOINT_PREFIX --recursive \
    | grep '_metadata' \
    | awk -v target=$TARGET_TS '{
        # Parse ISO date and convert to Unix timestamp
        gsub(/-/, " ", $1)
        cmd = "date -u -d \"" $1 " " $2 "\" +%s"
        cmd | getline ts
        close(cmd)
        diff = target - ts
        if (diff > 0) {
          print diff, $4
        }
      }' \
    | sort -n \
    | head -1 \
    | awk '{print $2}')
fi

if [ -z "$TARGET_SP" ]; then
  echo "ERROR: No savepoint found"
  exit 1
fi

SAVEPOINT_URI="s3://$SAVEPOINT_BUCKET/$(dirname $TARGET_SP)"
echo "Found savepoint: $SAVEPOINT_URI"

# Create milestone savepoint of current state BEFORE rollback
echo "Creating safety savepoint of current state..."
./create-milestone-savepoint.sh $JOB_NAME "before-rollback-$(date +%s)"

# Get current FlinkDeployment spec
CURRENT_SPEC=$(kubectl get flinkdeployment $JOB_NAME -n $NAMESPACE -o yaml)

# Patch with initialSavepointPath
kubectl patch flinkdeployment $JOB_NAME -n $NAMESPACE --type='merge' -p "{
  \"spec\": {
    \"job\": {
      \"initialSavepointPath\": \"$SAVEPOINT_URI\",
      \"upgradeMode\": \"savepoint\"
    }
  }
}"

# Trigger rolling restart
kubectl rollout restart deployment/$JOB_NAME -n $NAMESPACE

# Wait for ready
echo "Waiting for job to restart..."
kubectl wait --for=condition=Ready \
  flinkdeployment/$JOB_NAME \
  -n $NAMESPACE \
  --timeout=600s

echo "Rollback complete at $(date -u)"
echo "Job restarted from savepoint: $SAVEPOINT_URI"

Usage:

# Rollback на savepoint от 2 часов назад
./rollback.sh fraud-detection fraud-team 2h

# Rollback на latest savepoint (для быстрого fix)
./rollback.sh fraud-detection fraud-team latest

# Rollback на savepoint от 3 дней назад (для больших проблем)
./rollback.sh fraud-detection fraud-team 3d

Этот script решает 90% production rollback cases. Через ChatOps интеграцию (Slack bot) on-call может triggerить rollback одной командой.

WARNING

Rollback двусторонний: после rollback нельзя восстановить интервал между rollback target и текущим состоянием без специальных мер. Перед rollback всегда делайте safety savepoint текущего состояния. Если rollback оказался mistake, можно восстановить с safety savepoint.


Что мониторить

Metrics dashboard для savepoint management:

1. flink_savepoint_status — health checkbox для каждого job
2. flink_savepoint_duration_seconds — растёт со временем = подозрение на state growth
3. flink_savepoint_last_success_timestamp — алерт если > 2x от cron interval
4. ReplicationLagSeconds (CloudWatch) — DR readiness
5. s3_bucket_size_bytes — стоимость storage, retention working
6. количество milestone savepoints — для audit

Итоги

Savepoint automation pipeline состоит из четырёх компонентов: periodic CronJob с error handling и metrics, retention policy через S3 Lifecycle, cross-region copy через S3 CRR с RTC, rollback playbook script с safety savepoint.

Главные принципы: deduplication (concurrencyPolicy: Forbid), explicit failure metrics (Prometheus), milestone savepoints для долгосрочного rollback, safety savepoint перед каждым rollback.

В финальном уроке capstone — миграция legacy Flink 1.20 джоба на 2.2 с переходом на disaggregated state.

Проверка знанийKnowledge check
CronJob запускает savepoint каждые 5 минут. concurrencyPolicy не указан (default Allow). Savepoint обычно занимает 30 секунд, но при росте state до 500GB начал занимать 8 минут. Что произойдёт и какое правильное настройка?
ОтветAnswer
concurrencyPolicy=Allow (default) означает Kubernetes создаёт новый Job каждый раз по schedule. Если предыдущий savepoint still running, появится 2nd CronJob pod, который пытается trigger savepoint на том же Flink job. Flink JobManager может попытаться обработать 2 одновременных savepoint requests, но typically они конфликтуют (один в progress, второй timeouts или fails). Через 30 минут (6 schedule intervals) у вас 6 pods, JM перегружен, alerts шумят. Правильное решение: (1) concurrencyPolicy=Forbid — Kubernetes не start новый пока активный есть. (2) Increase schedule interval для large state — для 500GB state каждые 15-30 минут вместо 5. (3) Monitor savepoint duration и proactively reschedule. Альтернатива — incremental checkpoints вместо full savepoints для большого state.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. CronJob savepoint-trigger каждые 5 минут. concurrencyPolicy указано Forbid. Что произойдёт если savepoint занимает 7 минут (больше interval)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5