Learning Platform
Глоссарий Troubleshooting
Урок 08.01 · 24 мин
Продвинутый
SavepointCheckpointCanonical FormatNative FormatState Ownership

Savepoint vs checkpoint — кто чем владеет

На первый взгляд savepoint и checkpoint похожи — оба содержат snapshot state job-а, оба можно использовать для restore, оба пишутся в DFS. Но это разные сущности с разной семантикой ownership, разными форматами и разными use case-ами. Незнание этих различий приводит к классическим production-факапам: “удалили старый checkpoint, не зная что job на него зависит”, или “пытались restore из savepoint в job с другим Flink-версии, не подумав про format”.

Savepoints: manual snapshots для upgrade и rollback Restore и rescale: изменение parallelism

В этом уроке разбираем семантику обоих типов snapshot-ов, их форматы, технические различия в реализации, и когда использовать какой.


Семантика ownership

Главное концептуальное различие — кто owns snapshot.

Checkpoint owned by Flink system. Flink сам решает, когда триггерить, когда subsume старые. Checkpoint существует для того, чтобы Flink мог auto-recover при crash. Удалять checkpoint вручную — это нарушение контракта: Flink может зависеть от него.

Savepoint owned by user. User явно триггерит savepoint через CLI, REST API или programmatically. Flink никогда не удалит savepoint автоматически. User отвечает за cleanup. Savepoint используется для оперативных действий: rolling upgrade, parallelism change, A/B testing, debugging.

Эта разница ownership влияет на retention:

# Checkpoint retention
state.checkpoints.num-retained: 1     # Flink хранит только последний
state.checkpoints.dir: s3://bucket/checkpoints  # автоматический cleanup

# Savepoint retention
state.savepoints.dir: s3://bucket/savepoints    # user-managed cleanup

Когда Flink subsume старый checkpoint, он автоматически удаляет соответствующие files в S3 (с учётом SharedStateRegistry для incremental). Savepoint после создания — это immutable artifact, файлы никогда не удаляются Flink-ом.

NOTE

ExternalizedCheckpoints — это hybrid. Это checkpoint-ы, которые не удаляются Flink-ом автоматически (execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION). Они owned by Flink при running job, но сохраняются при cancel, и тогда переходят в зону ответственности user. Используются часто для “стартануть второй job из последнего state” workflow.


Format: canonical vs native

Технически savepoint и checkpoint могут быть в одном из двух форматов:

Canonical format (portable). Snapshot записан в backend-agnostic формате. Это означает, что вы можете сохранить savepoint с RocksDB-backend и восстановить с HashMap-backend (или наоборот). State сериализован через Flink TypeSerializer-ы в нейтральном формате.

Native format (per-backend). Snapshot записан в формате specific для конкретного backend. Например, RocksDB native — это binary blob SST files. Преимущество: быстрая запись (no сериализация), быстрый restore (direct mount). Недостаток: вы не можете переключить backend без миграции.

Mapping:

ТипDefault formatАльтернативный
CheckpointNative(только native)
SavepointCanonicalNative (через флаг -type=native)

То есть checkpoint всегда native (для performance), а savepoint по умолчанию canonical (для portability). С Flink 1.15+ можно явно запросить native savepoint командой:

flink savepoint :jobId :savepointDir --type native

Native savepoint быстрее, но теряет portability — вы должны restore с тем же state backend type.


Анатомия savepoint vs checkpoint в S3

Структура files в DFS:

Checkpoint:

checkpoints/job-X/
  shared/                            <- shared state files (incremental)
    state-id-1.sst
    state-id-2.sst
    ...
  chk-42/
    _metadata                        <- metadata file
    [no per-checkpoint state files for incremental, only refs in metadata]
  chk-43/
    _metadata
    ...

Savepoint:

savepoints/savepoint-12345-abcdef/
  _metadata                          <- metadata
  XXX                                <- state files (no shared, all self-contained)
  YYY
  ZZZ

Заметьте: savepoint самодостаточен. Все state files в одной директории, никаких ссылок на external shared state. Это намеренно: savepoint можно скопировать в другой bucket, в другой регион, и он будет работать. Checkpoint с incremental не самодостаточен — он refernces files in shared/, которые могут быть references из multiple checkpoint-ов.


Trigger механизмы

Checkpoint trigger:

CheckpointCoordinator periodic timer  -> auto trigger каждые interval
REST API POST /jobs/{id}/checkpoints  -> manual trigger через API (since 1.15)

Savepoint trigger:

flink savepoint :jobId :savepointDir                          -> CLI
flink stop --savepointPath :savepointDir :jobId               -> stop + savepoint atomic
flink cancel --savepointPath :savepointDir :jobId             -> deprecated, prefer stop
REST POST /jobs/{id}/savepoints                                -> REST API
SavepointMetadata savepoint = client.triggerSavepoint(...)    -> programmatic

Самый частый workflow в production: flink stop --savepointPath для graceful upgrade. Это атомарная операция: job заканчивается ровно после savepoint, не обрабатывая больше событий. Новый job стартует из savepoint.


Использование: когда что

Savepoint use cases:

  • Rolling upgrade. Stop job with savepoint, upgrade Flink/code, restart from savepoint. Это стандартный workflow для production updates.
  • Parallelism change (rescale). Take savepoint, change parallelism в config, restart. Это аналогично rescale на checkpoint, но с user control timing.
  • Backend migration. Switch from RocksDB to HashMap или наоборот через canonical savepoint.
  • A/B testing. Stop job, take savepoint, start two jobs с разными версиями code, оба восстанавливаются из одного savepoint.
  • Disaster recovery snapshot. Periodic savepoint в другой регион/bucket для DR scenarios.
  • State debugging. Take savepoint, читать его через State Processor API для inspection.

Checkpoint use cases:

  • Auto-recovery. Flink сам делает restore при crash из последнего CompletedCheckpoint.
  • Backpressure detection. Метрики checkpoint duration — primary signal для health monitoring.
  • 2PC sink commit. notifyCheckpointComplete после checkpoint = commit транзакций в Kafka/JDBC.

Не путайте: вы НЕ должны использовать checkpoint для manual restore в большинстве случаев. Checkpoint owned by system — Flink может удалить его в любой момент. Если вам нужен manual restore — используйте savepoint или externalized checkpoint.


Performance comparison

Время создания и restore: savepoint vs checkpoint
Create checkpointIncremental RocksDB checkpoint: 10-30s для 10 GB state, поскольку только delta upload-ится в DFS.
Create savepointCanonical savepoint: 2-5 минут для 10 GB state, поскольку всё state нужно сериализовать и записать самодостаточно.
Restore checkpointRestore incremental: download всех нужных SST files из shared/ + chk-N/. Параллельный download.
Restore savepointRestore canonical: download всех files + deserialize в backend format. Дольше из-за deserialization.
Storage costIncremental checkpoint: minimal storage (только новые SST + shared).
SavepointSelf-contained: hold весь state. 10x больше места.

Из-за этой разницы trade-off ясен: savepoint медленный, но portable и self-contained. Checkpoint быстрый, но tied to system.

Native savepoint частично mitigates trade-off: время создания и restore близко к checkpoint, но теряется portability backend.


Comparison table

ПараметрCheckpointSavepoint
Triggered byFlink schedulerUser CLI/REST/API
Format defaultNative (per-backend)Canonical (portable)
Format alternativeN/ANative (since 1.15)
State containmentМожет share с другими ck (incremental)Always self-contained
OwnershipFlink systemUser
Auto-deletionYes, subsume policyNever
SpeedFast (incremental)Slow (full + canonical)
StorageMinimalSelf-contained, full state
Backend switchNot supportedSupported (canonical)
Use caseAuto-recoveryOperational ops
RescaleSupported (limited cases)Always supported
Schema evolutionLimitedSupported (with State Processor API help)
State inspectionAwkward (need to know internals)Easy via State Processor API

Edge cases и gotchas

Gotcha 1: Удаление savepoint при running job. Savepoint — это immutable artifact. Если job был restored from savepoint A, и потом savepoint A был удалён — job продолжает работать (state уже в memory/RocksDB), но в случае crash вы потеряли rollback point. Решение: keep savepoint минимум до подтверждения, что job стабильно работает с новой версией.

Gotcha 2: Pre-Flink-1.15 incremental savepoint. До 1.15 savepoint всегда canonical (full snapshot). На large state это означает многоминутные savepoint operations и блокировку pipeline (синхронная phase). С 1.15+ можно использовать native savepoint для быстрого operational workflow.

Gotcha 3: Stop with savepoint failure. Команда flink stop --savepointPath атомарная — либо savepoint успешно создан и job остановлен, либо ни то ни другое. Если savepoint failed (например, S3 throttling), job продолжает работать. На production это означает, что upgrade workflow должен retry-ить с экспоненциальным backoff.

Gotcha 4: Restore из чужого checkpoint. Можно ли restore job из checkpoint другого job? Технически да через флаг -s :path/chk-N, но это antipattern. Checkpoint owned by source job; если source job ещё работает, он может удалить shared state files. Используйте savepoint для cross-job restores.

Gotcha 5: Operator UID mismatch. При restore Flink матчит state к operator-ам по UID (set explicitly через .uid("my-uid")). Если UID изменился между jobs, restore failes with cannot map operator state to job topology. Это критично для production — всегда задавайте UID для всех stateful operators.

WARNING

Никогда не используйте сгенерированные UID для stateful operators в production. Если вы не задаёте .uid(), Flink генерирует UID на основе topology hash. Любое изменение DAG (даже добавление нового оператора в другом месте) ломает hash, и при restore state не находит свою цель. ВСЕГДА задавайте .uid("descriptive-name") для каждого stateful operator. Это критическое правило при production.


Чтение source

  • org.apache.flink.runtime.checkpoint.Checkpoint — interface для CompletedCheckpoint и SavepointMetadata.
  • org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer — формат сериализации savepoint metadata.
  • org.apache.flink.runtime.state.SnapshotStrategyRunner — strategy для создания snapshot (canonical vs native).
  • org.apache.flink.runtime.state.CheckpointStorageLocation — abstraction для location savepoint/checkpoint.
  • FLIP-203 (Incremental savepoints) в Apache Flink Wiki — design для native savepoint.

Попробуй сам

  1. Сравните structure в S3. Возьмите working job, сделайте savepoint и подождите checkpoint. Сравните директории через aws s3 ls --recursive. Увидите разницу в самодостаточности: savepoint в одной директории, checkpoint references shared/.

  2. Сделайте rolling upgrade. Take savepoint, stop job, update code (например, добавьте print в operator), restart from savepoint. Никаких потерь state — operator продолжает работу с того же места.

  3. Switch backend через canonical savepoint. Создайте job с HashMap backend, take savepoint, остановите. Поменяйте config на RocksDB backend, restore. Job работает, но теперь state в RocksDB. Это полезно для миграции HashMap -> RocksDB при росте state.

Проверка знанийKnowledge check
Production-job работает с state 50 GB на RocksDB-backend. Вы хотите сделать rolling upgrade — обновить Flink с 2.0 на 2.2. Какие три критические проверки нужно сделать перед операцией, и какой workflow обеспечит safe rollback при проблемах?
ОтветAnswer
Три критические проверки: 1) Все operator UID set explicitly через .uid("...") в коде. Без этого после upgrade UID могут сгенерироваться по-другому, и state не свяжется с новыми operators. 2) Compatibility savepoint format между 2.0 и 2.2 — обычно Flink поддерживает rolling upgrade в пределах major version, но проверьте release notes для breaking changes (особенно в типах сериализации). 3) Backup state separately — сделайте offline copy savepoint в отдельный bucket/региона перед операцией. Workflow с rollback: a) Take savepoint v2.0 (canonical для portability), copy в DR bucket. b) Verify savepoint complete — посмотрите _metadata file, размер директории. c) Stop job через flink stop --savepointPath (атомарно). d) Backup current Flink binaries и config. e) Deploy 2.2 binaries. f) Start job from savepoint. g) Monitor stability 30+ минут: error rate, checkpoint duration, state size, lag. h) Если проблемы — stop new job, downgrade binaries обратно на 2.0, restart from original savepoint v2.0. State не потерян, потери только в новых событиях за время downgrade. Главный риск без подготовки: невозможность rollback, потому что savepoint v2.2 не загрузится в Flink 2.0 (forward compatibility не гарантирована). Поэтому критично сохранить v2.0 savepoint перед upgrade.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что главное отличие между savepoint и checkpoint?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4