Savepoint vs checkpoint — кто чем владеет
На первый взгляд savepoint и checkpoint похожи — оба содержат snapshot state job-а, оба можно использовать для restore, оба пишутся в DFS. Но это разные сущности с разной семантикой ownership, разными форматами и разными use case-ами. Незнание этих различий приводит к классическим production-факапам: “удалили старый checkpoint, не зная что job на него зависит”, или “пытались restore из savepoint в job с другим Flink-версии, не подумав про format”.
Savepoints: manual snapshots для upgrade и rollback Restore и rescale: изменение parallelismВ этом уроке разбираем семантику обоих типов snapshot-ов, их форматы, технические различия в реализации, и когда использовать какой.
Семантика ownership
Главное концептуальное различие — кто owns snapshot.
Checkpoint owned by Flink system. Flink сам решает, когда триггерить, когда subsume старые. Checkpoint существует для того, чтобы Flink мог auto-recover при crash. Удалять checkpoint вручную — это нарушение контракта: Flink может зависеть от него.
Savepoint owned by user. User явно триггерит savepoint через CLI, REST API или programmatically. Flink никогда не удалит savepoint автоматически. User отвечает за cleanup. Savepoint используется для оперативных действий: rolling upgrade, parallelism change, A/B testing, debugging.
Эта разница ownership влияет на retention:
# Checkpoint retention
state.checkpoints.num-retained: 1 # Flink хранит только последний
state.checkpoints.dir: s3://bucket/checkpoints # автоматический cleanup
# Savepoint retention
state.savepoints.dir: s3://bucket/savepoints # user-managed cleanup
Когда Flink subsume старый checkpoint, он автоматически удаляет соответствующие files в S3 (с учётом SharedStateRegistry для incremental). Savepoint после создания — это immutable artifact, файлы никогда не удаляются Flink-ом.
ExternalizedCheckpoints — это hybrid. Это checkpoint-ы, которые не удаляются Flink-ом автоматически (execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION). Они owned by Flink при running job, но сохраняются при cancel, и тогда переходят в зону ответственности user. Используются часто для “стартануть второй job из последнего state” workflow.
Format: canonical vs native
Технически savepoint и checkpoint могут быть в одном из двух форматов:
Canonical format (portable). Snapshot записан в backend-agnostic формате. Это означает, что вы можете сохранить savepoint с RocksDB-backend и восстановить с HashMap-backend (или наоборот). State сериализован через Flink TypeSerializer-ы в нейтральном формате.
Native format (per-backend). Snapshot записан в формате specific для конкретного backend. Например, RocksDB native — это binary blob SST files. Преимущество: быстрая запись (no сериализация), быстрый restore (direct mount). Недостаток: вы не можете переключить backend без миграции.
Mapping:
| Тип | Default format | Альтернативный |
|---|---|---|
| Checkpoint | Native | (только native) |
| Savepoint | Canonical | Native (через флаг -type=native) |
То есть checkpoint всегда native (для performance), а savepoint по умолчанию canonical (для portability). С Flink 1.15+ можно явно запросить native savepoint командой:
flink savepoint :jobId :savepointDir --type native
Native savepoint быстрее, но теряет portability — вы должны restore с тем же state backend type.
Анатомия savepoint vs checkpoint в S3
Структура files в DFS:
Checkpoint:
checkpoints/job-X/
shared/ <- shared state files (incremental)
state-id-1.sst
state-id-2.sst
...
chk-42/
_metadata <- metadata file
[no per-checkpoint state files for incremental, only refs in metadata]
chk-43/
_metadata
...
Savepoint:
savepoints/savepoint-12345-abcdef/
_metadata <- metadata
XXX <- state files (no shared, all self-contained)
YYY
ZZZ
Заметьте: savepoint самодостаточен. Все state files в одной директории, никаких ссылок на external shared state. Это намеренно: savepoint можно скопировать в другой bucket, в другой регион, и он будет работать. Checkpoint с incremental не самодостаточен — он refernces files in shared/, которые могут быть references из multiple checkpoint-ов.
Trigger механизмы
Checkpoint trigger:
CheckpointCoordinator periodic timer -> auto trigger каждые interval
REST API POST /jobs/{id}/checkpoints -> manual trigger через API (since 1.15)
Savepoint trigger:
flink savepoint :jobId :savepointDir -> CLI
flink stop --savepointPath :savepointDir :jobId -> stop + savepoint atomic
flink cancel --savepointPath :savepointDir :jobId -> deprecated, prefer stop
REST POST /jobs/{id}/savepoints -> REST API
SavepointMetadata savepoint = client.triggerSavepoint(...) -> programmatic
Самый частый workflow в production: flink stop --savepointPath для graceful upgrade. Это атомарная операция: job заканчивается ровно после savepoint, не обрабатывая больше событий. Новый job стартует из savepoint.
Использование: когда что
Savepoint use cases:
- Rolling upgrade. Stop job with savepoint, upgrade Flink/code, restart from savepoint. Это стандартный workflow для production updates.
- Parallelism change (rescale). Take savepoint, change parallelism в config, restart. Это аналогично rescale на checkpoint, но с user control timing.
- Backend migration. Switch from RocksDB to HashMap или наоборот через canonical savepoint.
- A/B testing. Stop job, take savepoint, start two jobs с разными версиями code, оба восстанавливаются из одного savepoint.
- Disaster recovery snapshot. Periodic savepoint в другой регион/bucket для DR scenarios.
- State debugging. Take savepoint, читать его через State Processor API для inspection.
Checkpoint use cases:
- Auto-recovery. Flink сам делает restore при crash из последнего CompletedCheckpoint.
- Backpressure detection. Метрики checkpoint duration — primary signal для health monitoring.
- 2PC sink commit. notifyCheckpointComplete после checkpoint = commit транзакций в Kafka/JDBC.
Не путайте: вы НЕ должны использовать checkpoint для manual restore в большинстве случаев. Checkpoint owned by system — Flink может удалить его в любой момент. Если вам нужен manual restore — используйте savepoint или externalized checkpoint.
Performance comparison
Из-за этой разницы trade-off ясен: savepoint медленный, но portable и self-contained. Checkpoint быстрый, но tied to system.
Native savepoint частично mitigates trade-off: время создания и restore близко к checkpoint, но теряется portability backend.
Comparison table
| Параметр | Checkpoint | Savepoint |
|---|---|---|
| Triggered by | Flink scheduler | User CLI/REST/API |
| Format default | Native (per-backend) | Canonical (portable) |
| Format alternative | N/A | Native (since 1.15) |
| State containment | Может share с другими ck (incremental) | Always self-contained |
| Ownership | Flink system | User |
| Auto-deletion | Yes, subsume policy | Never |
| Speed | Fast (incremental) | Slow (full + canonical) |
| Storage | Minimal | Self-contained, full state |
| Backend switch | Not supported | Supported (canonical) |
| Use case | Auto-recovery | Operational ops |
| Rescale | Supported (limited cases) | Always supported |
| Schema evolution | Limited | Supported (with State Processor API help) |
| State inspection | Awkward (need to know internals) | Easy via State Processor API |
Edge cases и gotchas
Gotcha 1: Удаление savepoint при running job. Savepoint — это immutable artifact. Если job был restored from savepoint A, и потом savepoint A был удалён — job продолжает работать (state уже в memory/RocksDB), но в случае crash вы потеряли rollback point. Решение: keep savepoint минимум до подтверждения, что job стабильно работает с новой версией.
Gotcha 2: Pre-Flink-1.15 incremental savepoint. До 1.15 savepoint всегда canonical (full snapshot). На large state это означает многоминутные savepoint operations и блокировку pipeline (синхронная phase). С 1.15+ можно использовать native savepoint для быстрого operational workflow.
Gotcha 3: Stop with savepoint failure. Команда flink stop --savepointPath атомарная — либо savepoint успешно создан и job остановлен, либо ни то ни другое. Если savepoint failed (например, S3 throttling), job продолжает работать. На production это означает, что upgrade workflow должен retry-ить с экспоненциальным backoff.
Gotcha 4: Restore из чужого checkpoint. Можно ли restore job из checkpoint другого job? Технически да через флаг -s :path/chk-N, но это antipattern. Checkpoint owned by source job; если source job ещё работает, он может удалить shared state files. Используйте savepoint для cross-job restores.
Gotcha 5: Operator UID mismatch. При restore Flink матчит state к operator-ам по UID (set explicitly через .uid("my-uid")). Если UID изменился между jobs, restore failes with cannot map operator state to job topology. Это критично для production — всегда задавайте UID для всех stateful operators.
Никогда не используйте сгенерированные UID для stateful operators в production. Если вы не задаёте .uid(), Flink генерирует UID на основе topology hash. Любое изменение DAG (даже добавление нового оператора в другом месте) ломает hash, и при restore state не находит свою цель. ВСЕГДА задавайте .uid("descriptive-name") для каждого stateful operator. Это критическое правило при production.
Чтение source
org.apache.flink.runtime.checkpoint.Checkpoint— interface для CompletedCheckpoint и SavepointMetadata.org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer— формат сериализации savepoint metadata.org.apache.flink.runtime.state.SnapshotStrategyRunner— strategy для создания snapshot (canonical vs native).org.apache.flink.runtime.state.CheckpointStorageLocation— abstraction для location savepoint/checkpoint.- FLIP-203 (Incremental savepoints) в Apache Flink Wiki — design для native savepoint.
Попробуй сам
-
Сравните structure в S3. Возьмите working job, сделайте savepoint и подождите checkpoint. Сравните директории через
aws s3 ls --recursive. Увидите разницу в самодостаточности: savepoint в одной директории, checkpoint references shared/. -
Сделайте rolling upgrade. Take savepoint, stop job, update code (например, добавьте
printв operator), restart from savepoint. Никаких потерь state — operator продолжает работу с того же места. -
Switch backend через canonical savepoint. Создайте job с HashMap backend, take savepoint, остановите. Поменяйте config на RocksDB backend, restore. Job работает, но теперь state в RocksDB. Это полезно для миграции HashMap -> RocksDB при росте state.