Learning Platform
Глоссарий Troubleshooting
Урок 11.03 · 20 мин
Средний
SavepointCLIMigrationCanonical Format

Savepoints: manual snapshots для upgrade и rollback

Checkpoint — это автоматический механизм для restart после crash. Savepoint — это ручной snapshot, который вы делаете для упорядоченных операций: апгрейд кода, изменение топологии job, миграция state backend, A/B тестирование разных версий job на одних данных.

В этом уроке разберём отличия savepoint и checkpoint, форматы (canonical vs native), CLI команды для production workflow, и типичные сценарии использования.


Чем savepoint отличается от checkpoint

АспектCheckpointSavepoint
TriggerАвтоматически по intervalРучная команда
ЦельFault toleranceUpgrade, migration
LifecycleУправляется Flink (rotation)Управляется пользователем
Format (default)Native (backend-specific)Canonical (portable)
Storage pathstate.checkpoints.dirstate.savepoints.dir
OptimizationСкорость записи (incremental)Portability
Когда удаляетсяAuto rotationТолько вручную
Стоимость по времениБыстро (incremental)Медленнее (full snapshot)

Главное практическое отличие: savepoint переживает изменение версии Flink, изменение state backend, изменение топологии job. Checkpoint — нет (только в рамках того же job на том же Flink).


Формат: canonical vs native

С Flink 1.15+ savepoint может быть в двух форматах:

Canonical (default)

  • Backend-agnostic: можно восстановить на любом state backend.
  • Используется для миграций: HashMap -> RocksDB и обратно.
  • Stable format: переживает upgrade Flink версии (с некоторыми ограничениями).
  • Медленнее создаётся: full serialization в общий формат.

Native

  • Backend-specific: сохраняет данные в внутреннем формате backend’а (RocksDB SST, HashMap dump).
  • Быстрее создаётся (можно сказать «эквивалент checkpoint-формата»).
  • Не переживает смену backend’а.
# Canonical (default)
flink savepoint <job-id> s3://savepoints/

# Native (specify format)
flink savepoint --format native <job-id> s3://savepoints/
// В коде
env.getCheckpointConfig().setSavepointFormat(SavepointFormatType.CANONICAL);
// или
env.getCheckpointConfig().setSavepointFormat(SavepointFormatType.NATIVE);
TIP

Для большинства production задач используйте CANONICAL — портативность важнее. Используйте NATIVE только для очень больших state (десятки TB), где скорость snapshot критична.


CLI команды: production workflow

Создать savepoint без остановки job

flink savepoint <job-id> s3://flink-savepoints/
# Returns: savepoint completed: s3://flink-savepoints/savepoint-<job-id>-<UUID>

Job продолжает работать. Savepoint — independent snapshot.

Создать savepoint и остановить job (graceful shutdown)

flink stop --savepointPath s3://flink-savepoints/ <job-id>

Это правильный способ остановить job для upgrade. Не используйте flink cancel — он не делает savepoint.

Восстановить из savepoint

flink run -s s3://flink-savepoints/savepoint-<job-id>-<UUID> \
    -c com.example.MyJob \
    my-job.jar

Флаг -s (savepoint path) — критичен. Без него job стартует с пустым state.

Allow non-restored state

Если новая версия job убрала оператор, который был в savepoint, restore упадёт с ошибкой cannot map operator state. Чтобы restore прошёл, добавьте --allowNonRestoredState:

flink run -s s3://savepoints/... \
    --allowNonRestoredState \
    -c com.example.MyJob \
    my-job.jar

Это говорит Flink: «если в savepoint есть state для оператора, которого нет в новом job — игнорируй».

Формат savepoint изнутри: canonical vs native на уровне байтов
WARNING

--allowNonRestoredState нужно использовать осознанно. Если вы случайно поменяли UID оператора (см. ниже) — restore «потеряет» state, и job стартует с пустым ValueState. Часто такой инцидент незаметен, пока через час не выяснится, что метрики просели.


UID операторов: критично для savepoint

Чтобы savepoint мог быть восстановлен после изменений в job, каждый оператор должен иметь стабильный UID:

DataStream<Event> events = kafkaStream.uid("source-kafka").name("Kafka Source");

DataStream<Enriched> enriched = events
    .keyBy(Event::getUserId)
    .map(new EnrichFunction())
    .uid("enrich-events")
    .name("Enrich Events");

Без явного .uid() Flink генерирует UID на основе позиции оператора в DAG. Если вы добавите новый оператор в середину pipeline — UID всех последующих операторов изменится, и restore не сможет связать state с операторами.

Правило production: каждому оператору с state — обязательный .uid(). Имя оператора (.name("...")) не влияет на restore, оно только для Flink UI.


Сценарий 1: Job upgrade

Сценарий: вы обновили версию вашего кода (fix bug, новая feature) и хотите запустить новую версию без потери state.

# 1. Список running jobs
flink list

# 2. Создать savepoint и остановить
flink stop --savepointPath s3://savepoints/ <job-id>
# Returns: ...savepoint-abc123-xyz

# 3. Запустить новую версию с restore
flink run -s s3://savepoints/savepoint-abc123-xyz \
    new-version.jar

Job будет восстановлен с того же state, на котором закончил предыдущий.


Сценарий 2: State backend migration

Сценарий: переходите с HashMap на RocksDB (или наоборот).

# 1. Savepoint в canonical формате (важно!)
flink savepoint --format canonical <job-id> s3://savepoints/

# 2. Остановить старую версию
flink cancel <job-id>

# 3. Запустить новую версию с другим state backend и restore
# (В новом коде сменили на EmbeddedRocksDBStateBackend)
flink run -s s3://savepoints/savepoint-... \
    same-version-but-rocksdb.jar

Только canonical format переносит state между backend’ами.


Сценарий 3: A/B test разных версий

Сценарий: вы хотите запустить новую версию параллельно со старой на одном state.

# 1. Создать savepoint текущей версии
flink savepoint <job-id> s3://savepoints/

# 2. Запустить старую версию из savepoint (продолжает работать)
flink run -s s3://savepoints/sp-1 old-version.jar

# 3. Запустить новую версию из того же savepoint (параллельно)
flink run -s s3://savepoints/sp-1 new-version.jar

Обе версии начинают с одинакового state. Они будут читать тот же Kafka topic с разными consumer group, обрабатывать события независимо.


Diagram: savepoint lifecycle

Savepoint workflow для upgrade

Old version running

Старая версия job работает в production, накапливает state.
flink stop --savepoint

Savepoint to S3

flink stop делает savepoint и останавливает job. State полностью сохранён в S3 в canonical формате.

Deploy new JAR

Deploy новой версии JAR в Flink cluster (или image в K8s).
flink run -s s3path

Restore state

flink run -s восстанавливает state из savepoint в новую версию job. Все ValueState, ListState, BroadcastState восстановлены.

New version running

Новая версия начинает работу с того же state, на котором закончила старая. Без потери данных, без backfill.
rollback if bug

Rollback option

Если новая версия имеет bug — можно сделать stop --savepoint и вернуться к старой версии с того же savepoint.

Что в savepoint, что не в savepoint

В savepoint:

  • Все типы state (Keyed State, Operator State, Broadcast State).
  • Watermarks (current).
  • Kafka/Kinesis source offsets.
  • Window contents.

НЕ в savepoint:

  • Конфигурация Flink (parallelism, timeouts).
  • Job topology (DAG).
  • Конфигурация коннекторов (Kafka servers, topics).
  • Side outputs definitions.

Это значит, что при restore вы можете изменить:

  • Parallelism (см. урок 4).
  • Конфиг operator timeouts.
  • Названия Kafka топиков (но тогда replay начнётся с нуля).

И не можете изменить:

  • UID оператора (или потеряете его state).
  • Тип state (например, ValueState<String> -> ValueState<Integer>).
  • Серьёзное изменение топологии без --allowNonRestoredState.

Savepoint vs checkpoint: какой использовать для restore

Если вы хотите восстановить job, у вас есть выбор:

Restore из последнего checkpoint:

flink run -s s3://checkpoints/.../chk-42/_metadata new.jar

Быстро (last state), но native format — только если backend не менялся.

Restore из savepoint:

flink run -s s3://savepoints/sp-xyz new.jar

Чуть медленнее, но canonical format — портабельный.

Production правило: для планового upgrade — savepoint. Для emergency restore после crash — checkpoint (он свежее).


Попробуй сам

  1. Запусти простой job со state (counter per user).
  2. Создай savepoint: flink savepoint <job-id> file:///tmp/savepoints/.
  3. Посмотри файлы savepoint’а в /tmp/savepoints/. Открой _metadata (это binary).
  4. Останови job, измени код (например, добавь логирование), запусти из savepoint. State должен сохраниться.
  5. Поменяй UID оператора с state. Попробуй restore — должен упасть. Запусти с --allowNonRestoredState — state потеряется. Понаблюдай.

Ключевые выводы

  1. Savepoint — ручной snapshot для upgrade/migration/rollback. Checkpoint — автоматический для fault tolerance.
  2. Canonical format (default) — portable между backend’ами и версиями Flink. Native — быстрее, но привязан к backend’у.
  3. Каждому оператору со state — стабильный .uid(). Без этого restore сломается при изменении DAG.
  4. flink stop --savepointPath — правильный graceful shutdown. flink cancel не создаёт savepoint.
  5. flink run -s <path> — restore. --allowNonRestoredState — если в новой версии нет оператора из savepoint.
  6. Savepoint содержит state, не topology. Можно менять parallelism, нельзя менять UID/тип state.
Проверка знанийKnowledge check
Вы остановили job через flink cancel (без savepoint), затем задеплоили новую версию. После запуска новой версии job начал с пустого state — counter за каждого user'а обнулился. Что вы сделали не так, и какой корректный workflow для production upgrade?
ОтветAnswer
Ошибка: flink cancel НЕ создаёт savepoint, он просто останавливает job. Если retention settings не RETAIN_ON_CANCELLATION, последний checkpoint тоже удаляется. State потерян. При запуске новой версии Flink не получил никакого -s параметра и начал с пустого state. Корректный workflow: (1) Stop с savepoint: flink stop --savepointPath s3://savepoints/ <job-id> — это атомарно создаёт savepoint и останавливает. (2) Деплой новой версии JAR. (3) Restore: flink run -s s3://savepoints/savepoint-... new.jar. Дополнительные best practices: убедиться, что каждый оператор с state имеет explicit .uid(...) — это критично для restore при изменениях DAG. Включить в production CheckpointConfig retain on cancellation, чтобы даже при случайном flink cancel state не терялся.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. В чём принципиальная разница checkpoint и savepoint?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4