Savepoints: manual snapshots для upgrade и rollback
Checkpoint — это автоматический механизм для restart после crash. Savepoint — это ручной snapshot, который вы делаете для упорядоченных операций: апгрейд кода, изменение топологии job, миграция state backend, A/B тестирование разных версий job на одних данных.
В этом уроке разберём отличия savepoint и checkpoint, форматы (canonical vs native), CLI команды для production workflow, и типичные сценарии использования.
Чем savepoint отличается от checkpoint
| Аспект | Checkpoint | Savepoint |
|---|---|---|
| Trigger | Автоматически по interval | Ручная команда |
| Цель | Fault tolerance | Upgrade, migration |
| Lifecycle | Управляется Flink (rotation) | Управляется пользователем |
| Format (default) | Native (backend-specific) | Canonical (portable) |
| Storage path | state.checkpoints.dir | state.savepoints.dir |
| Optimization | Скорость записи (incremental) | Portability |
| Когда удаляется | Auto rotation | Только вручную |
| Стоимость по времени | Быстро (incremental) | Медленнее (full snapshot) |
Главное практическое отличие: savepoint переживает изменение версии Flink, изменение state backend, изменение топологии job. Checkpoint — нет (только в рамках того же job на том же Flink).
Формат: canonical vs native
С Flink 1.15+ savepoint может быть в двух форматах:
Canonical (default)
- Backend-agnostic: можно восстановить на любом state backend.
- Используется для миграций: HashMap -> RocksDB и обратно.
- Stable format: переживает upgrade Flink версии (с некоторыми ограничениями).
- Медленнее создаётся: full serialization в общий формат.
Native
- Backend-specific: сохраняет данные в внутреннем формате backend’а (RocksDB SST, HashMap dump).
- Быстрее создаётся (можно сказать «эквивалент checkpoint-формата»).
- Не переживает смену backend’а.
# Canonical (default)
flink savepoint <job-id> s3://savepoints/
# Native (specify format)
flink savepoint --format native <job-id> s3://savepoints/
// В коде
env.getCheckpointConfig().setSavepointFormat(SavepointFormatType.CANONICAL);
// или
env.getCheckpointConfig().setSavepointFormat(SavepointFormatType.NATIVE);
Для большинства production задач используйте CANONICAL — портативность важнее. Используйте NATIVE только для очень больших state (десятки TB), где скорость snapshot критична.
CLI команды: production workflow
Создать savepoint без остановки job
flink savepoint <job-id> s3://flink-savepoints/
# Returns: savepoint completed: s3://flink-savepoints/savepoint-<job-id>-<UUID>
Job продолжает работать. Savepoint — independent snapshot.
Создать savepoint и остановить job (graceful shutdown)
flink stop --savepointPath s3://flink-savepoints/ <job-id>
Это правильный способ остановить job для upgrade. Не используйте flink cancel — он не делает savepoint.
Восстановить из savepoint
flink run -s s3://flink-savepoints/savepoint-<job-id>-<UUID> \
-c com.example.MyJob \
my-job.jar
Флаг -s (savepoint path) — критичен. Без него job стартует с пустым state.
Allow non-restored state
Если новая версия job убрала оператор, который был в savepoint, restore упадёт с ошибкой cannot map operator state. Чтобы restore прошёл, добавьте --allowNonRestoredState:
flink run -s s3://savepoints/... \
--allowNonRestoredState \
-c com.example.MyJob \
my-job.jar
Это говорит Flink: «если в savepoint есть state для оператора, которого нет в новом job — игнорируй».
Формат savepoint изнутри: canonical vs native на уровне байтов--allowNonRestoredState нужно использовать осознанно. Если вы случайно поменяли UID оператора (см. ниже) — restore «потеряет» state, и job стартует с пустым ValueState. Часто такой инцидент незаметен, пока через час не выяснится, что метрики просели.
UID операторов: критично для savepoint
Чтобы savepoint мог быть восстановлен после изменений в job, каждый оператор должен иметь стабильный UID:
DataStream<Event> events = kafkaStream.uid("source-kafka").name("Kafka Source");
DataStream<Enriched> enriched = events
.keyBy(Event::getUserId)
.map(new EnrichFunction())
.uid("enrich-events")
.name("Enrich Events");
Без явного .uid() Flink генерирует UID на основе позиции оператора в DAG. Если вы добавите новый оператор в середину pipeline — UID всех последующих операторов изменится, и restore не сможет связать state с операторами.
Правило production: каждому оператору с state — обязательный .uid(). Имя оператора (.name("...")) не влияет на restore, оно только для Flink UI.
Сценарий 1: Job upgrade
Сценарий: вы обновили версию вашего кода (fix bug, новая feature) и хотите запустить новую версию без потери state.
# 1. Список running jobs
flink list
# 2. Создать savepoint и остановить
flink stop --savepointPath s3://savepoints/ <job-id>
# Returns: ...savepoint-abc123-xyz
# 3. Запустить новую версию с restore
flink run -s s3://savepoints/savepoint-abc123-xyz \
new-version.jar
Job будет восстановлен с того же state, на котором закончил предыдущий.
Сценарий 2: State backend migration
Сценарий: переходите с HashMap на RocksDB (или наоборот).
# 1. Savepoint в canonical формате (важно!)
flink savepoint --format canonical <job-id> s3://savepoints/
# 2. Остановить старую версию
flink cancel <job-id>
# 3. Запустить новую версию с другим state backend и restore
# (В новом коде сменили на EmbeddedRocksDBStateBackend)
flink run -s s3://savepoints/savepoint-... \
same-version-but-rocksdb.jar
Только canonical format переносит state между backend’ами.
Сценарий 3: A/B test разных версий
Сценарий: вы хотите запустить новую версию параллельно со старой на одном state.
# 1. Создать savepoint текущей версии
flink savepoint <job-id> s3://savepoints/
# 2. Запустить старую версию из savepoint (продолжает работать)
flink run -s s3://savepoints/sp-1 old-version.jar
# 3. Запустить новую версию из того же savepoint (параллельно)
flink run -s s3://savepoints/sp-1 new-version.jar
Обе версии начинают с одинакового state. Они будут читать тот же Kafka topic с разными consumer group, обрабатывать события независимо.
Diagram: savepoint lifecycle
Old version running
Старая версия job работает в production, накапливает state.Savepoint to S3
flink stop делает savepoint и останавливает job. State полностью сохранён в S3 в canonical формате.Deploy new JAR
Deploy новой версии JAR в Flink cluster (или image в K8s).Restore state
flink run -s восстанавливает state из savepoint в новую версию job. Все ValueState, ListState, BroadcastState восстановлены.New version running
Новая версия начинает работу с того же state, на котором закончила старая. Без потери данных, без backfill.Rollback option
Если новая версия имеет bug — можно сделать stop --savepoint и вернуться к старой версии с того же savepoint.Что в savepoint, что не в savepoint
В savepoint:
- Все типы state (Keyed State, Operator State, Broadcast State).
- Watermarks (current).
- Kafka/Kinesis source offsets.
- Window contents.
НЕ в savepoint:
- Конфигурация Flink (parallelism, timeouts).
- Job topology (DAG).
- Конфигурация коннекторов (Kafka servers, topics).
- Side outputs definitions.
Это значит, что при restore вы можете изменить:
- Parallelism (см. урок 4).
- Конфиг operator timeouts.
- Названия Kafka топиков (но тогда replay начнётся с нуля).
И не можете изменить:
- UID оператора (или потеряете его state).
- Тип state (например, ValueState<String> -> ValueState<Integer>).
- Серьёзное изменение топологии без
--allowNonRestoredState.
Savepoint vs checkpoint: какой использовать для restore
Если вы хотите восстановить job, у вас есть выбор:
Restore из последнего checkpoint:
flink run -s s3://checkpoints/.../chk-42/_metadata new.jar
Быстро (last state), но native format — только если backend не менялся.
Restore из savepoint:
flink run -s s3://savepoints/sp-xyz new.jar
Чуть медленнее, но canonical format — портабельный.
Production правило: для планового upgrade — savepoint. Для emergency restore после crash — checkpoint (он свежее).
Попробуй сам
- Запусти простой job со state (counter per user).
- Создай savepoint:
flink savepoint <job-id> file:///tmp/savepoints/. - Посмотри файлы savepoint’а в
/tmp/savepoints/. Открой_metadata(это binary). - Останови job, измени код (например, добавь логирование), запусти из savepoint. State должен сохраниться.
- Поменяй UID оператора с state. Попробуй restore — должен упасть. Запусти с
--allowNonRestoredState— state потеряется. Понаблюдай.
Ключевые выводы
- Savepoint — ручной snapshot для upgrade/migration/rollback. Checkpoint — автоматический для fault tolerance.
- Canonical format (default) — portable между backend’ами и версиями Flink. Native — быстрее, но привязан к backend’у.
- Каждому оператору со state — стабильный
.uid(). Без этого restore сломается при изменении DAG. flink stop --savepointPath— правильный graceful shutdown.flink cancelне создаёт savepoint.flink run -s <path>— restore.--allowNonRestoredState— если в новой версии нет оператора из savepoint.- Savepoint содержит state, не topology. Можно менять parallelism, нельзя менять UID/тип state.