Анатомия savepoint — что внутри _metadata
Savepoint — это не один файл, а директория со специальной структурой. Понимание этой структуры важно для трёх практических задач: (1) ручной cleanup и migration savepoint-ов между bucket-ами, (2) восстановление при corrupted savepoint (если один файл повреждён, нужно знать, какой именно), (3) использование State Processor API для inspection state — нужно понимать, что API читает изнутри.
Savepoints: создание и восстановлениеВ этом уроке разбираем структуру savepoint на диске, формат _metadata файла, какие state files создаются на operator и как Flink загружает savepoint при restore.
Структура директории savepoint
После flink savepoint :jobId :savepointDir в DFS появится директория:
savepoints/
savepoint-12abcd-987654321/
_metadata # описание savepoint
-0a1b2c3d4e5f6789 # operator state file
1f2e3d4c5b6a7890 # keyed state file
9876543210fedcba # ...
abcdef0123456789
...
Несколько moments:
- Префикс savepoint- — convention, не обязательный. Можете указать любой path.
- savepoint-12abcd-987654321 — directory name содержит prefix savepoint ID (12abcd — короткий ID) и timestamp/random suffix.
- _metadata — единственный файл с известным именем. Все остальные — opaque blob-files со случайными именами (UUID-based).
- State files имена — UUID-based, не несут смысла. Содержат binary state. Размер от килобайт до гигабайт.
Для canonical savepoint все state files создаются с нуля (full snapshot, no shared state). Для native savepoint (Flink 1.15+) могут быть references на shared state из других checkpoint-ов, но в practical случае savepoint обычно делается standalone.
Внутренности _metadata файла
_metadata — это binary file с serialized Java объектом SavepointV2 (или V3, V4, формат версионируется). Сериализация через специальный SavepointSerializer, не obychniy Java serialization.
Логическая структура (high-level):
class SavepointV2 {
long checkpointId; // ID операции
JobID jobId; // job который сделал savepoint
Collection<OperatorState> operatorStates; // state каждого оператора
Collection<MasterState> masterStates; // state JobManager
CheckpointProperties properties; // флаги
String externalPointer; // self-reference на эту директорию
}
class OperatorState {
OperatorID operatorId; // hash от operator UID
int parallelism; // parallelism при создании
int maxParallelism; // max-parallelism (для rescale)
Map<Integer, OperatorSubtaskState> subtaskStates; // per subtask
}
class OperatorSubtaskState {
StateObjectCollection<OperatorStateHandle> managedOperatorState; // ListState, etc
StateObjectCollection<KeyedStateHandle> managedKeyedState; // ValueState, MapState, etc
StateObjectCollection<OperatorStateHandle> rawOperatorState; // raw ListState
StateObjectCollection<KeyedStateHandle> rawKeyedState; // raw keyed
StateObjectCollection<InputChannelStateHandle> inputChannelState; // unaligned ck
StateObjectCollection<ResultSubpartitionStateHandle> outputChannelState;
}
Главное — это operatorStates map: ключ — OperatorID (хэш от user-задаваемого UID), value — состояние оператора. Для каждого оператора хранится state всех его parallel subtask-ов.
State handles внутри OperatorSubtaskState — это references на blob-files в той же директории. Например, KeyedStateHandle содержит:
class IncrementalRemoteKeyedStateHandle implements KeyedStateHandle {
UUID backendIdentifier;
KeyGroupRange keyGroupRange;
long checkpointId;
Map<StateHandleID, StreamStateHandle> sharedState; # ссылки на shared files
Map<StateHandleID, StreamStateHandle> privateState; # ссылки на private files
StreamStateHandle metaStateHandle; # meta info file
}
StreamStateHandle для savepoint обычно — FileStateHandle, содержащий path в DFS и size. Этот path и указывает на blob-file в savepoint directory.
OperatorID — хэш от UID
Когда вы пишете в коде:
.uid("my-fraud-detector")
Flink при job submission вычисляет OperatorID = hash("my-fraud-detector") через MD5 или SHA-256. Этот OperatorID становится ключом в metadata savepoint-а.
Если UID не задан явно, Flink fall-back на topology-based hash: хэш операторов от их позиции в графе. Это работает, но fragile — любое изменение DAG меняет hash.
Пример: вы добавили .uid("source-A") для source и .uid("filter-B") для filter. При savepoint в metadata будут:
operatorStates = {
OperatorID(hash("source-A")) -> { parallelism: 4, ... },
OperatorID(hash("filter-B")) -> { parallelism: 4, ... }
}
При restore Flink сматчит operator из nового DAG с state по OperatorID. Если в новом коде вы изменили .uid("filter-B") на .uid("filter-B-v2") — restore не найдёт state для нового UID и оператор стартует с empty state. Возможно, это то, что вы хотели (например, drop old state). Но чаще — это accident, который надо detectить.
Flink поддерживает флаг —allowNonRestoredState при restore. Без него restore failes, если в savepoint есть state для UID, которого нет в новом DAG (например, удалили operator). С флагом — Flink skipпит этот state, но это implicit data loss. ВСЕГДА думайте, безопасно ли это для вашего use case.
Один operator на диске
Возьмём конкретный операторный snapshot. Допустим, в job есть KeyedProcessFunction с UID=“fraud-detector”, parallelism=4, использует ValueState<UserProfile>.
В savepoint metadata появится:
OperatorID(hash("fraud-detector")):
parallelism: 4
maxParallelism: 128
subtaskStates: {
0: {managedKeyedState: [Handle->-0a1b2c3d4e5f6789]}
1: {managedKeyedState: [Handle->1f2e3d4c5b6a7890]}
2: {managedKeyedState: [Handle->2a3b4c5d6e7f8901]}
3: {managedKeyedState: [Handle->3c4d5e6f7a8b9012]}
}
Каждая subtask имеет свой state file (для simple case без incremental). File -0a1b2c3d4e5f6789 — это RocksDB snapshot или HashMap state subtask 0, содержащий keyed entries для key groups 0-31 (при maxParallelism 128, parallelism 4).
Структура одного state file (RocksDB native):
- Это binary SST file directly из RocksDB.
- Содержит entries: (column-family-id, serialized-key, serialized-value).
- column-family — один per state descriptor (ValueState с именем “userProfile” имеет свой column family).
Для canonical format структура другая:
- Header: magic bytes, version, key group range.
- Per key group: list of (key-bytes, value-bytes) tuples.
- Footer: index, checksum.
Restore flow: что делает Flink
Один важный момент: JobManager не загружает state files. Он только читает metadata (KB), маппит operators, и отдаёт pointers TaskManager-ам. State files (GB) загружаются параллельно на TaskManager-ах. Это distributes network load и parallelизует restore.
Operator UID и rescale
Когда вы rescale (изменяете parallelism), Flink перераспределяет key groups между новыми subtasks. Это работает потому, что в metadata сохранён key group range для каждого state handle.
Пример: savepoint при parallelism=4, maxParallelism=128:
Subtask 0: keyGroupRange = [0-31]
Subtask 1: keyGroupRange = [32-63]
Subtask 2: keyGroupRange = [64-95]
Subtask 3: keyGroupRange = [96-127]
При restore с parallelism=8:
Subtask 0: needs [0-15] -> partial read from old subtask 0
Subtask 1: needs [16-31] -> partial read from old subtask 0
Subtask 2: needs [32-47] -> partial read from old subtask 1
...
Каждый new subtask может читать ranges из нескольких state files. Это implementation в RocksDBIncrementalRestoreOperation (для RocksDB) или HeapKeyedStateBackend для HashMap.
Для canonical savepoint rescale работает однозначно — partitioning by key groups. Для native (RocksDB SST format) savepoint rescale тоже работает, но более expensive — нужно reorganize SST files по новым key group boundaries.
Self-pointer и portability
В _metadata есть поле externalPointer — это absolute path к savepoint directory. На первый взгляд это redundant: мы знаем path, потому что мы его открыли. Но это поле используется для двух кейсов:
Use case 1: Logging и observability. В logs JobManager-а и в Web UI этот pointer виден — operator может найти savepoint в DFS, проверить, что именно был restored.
Use case 2: Validation. При restore Flink проверяет: externalPointer in metadata matches path which we loaded from? Если нет (savepoint был copied в другой bucket без update metadata), он warning-ует и предлагает обновить.
Проблема: если вы copy savepoint в другой bucket (aws s3 cp --recursive s3://old s3://new), externalPointer в _metadata всё ещё указывает на old. Решение: pre-1.13 — manually edit metadata file (awkward). С 1.13+ Flink игнорирует externalPointer при load и использует path который user задал, что чище.
Чтение source
org.apache.flink.runtime.checkpoint.metadata.MetadataV2Serializer— serialization формата V2.org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer— текущий format (post-1.13).org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader— высокоуровневый loader.org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle— типичный state handle для RocksDB.org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot— meta info для каждого state descriptor.
Попробуй сам
-
Откройте _metadata через State Processor API. Используйте SavepointReader (см следующий урок) для inspection metadata вашего savepoint. Увидите list operator UID, parallelism, types.
-
Manually inspect blob files. Возьмите small savepoint, скачайте локально. Откройте non-_metadata file через
hexdump -C | head. Для RocksDB native увидите magic bytes “RocksDB”. Для canonical — Flink-specific header. -
Тест rescale из savepoint. Сделайте savepoint при parallelism=4, restart с parallelism=2 и parallelism=8. Сравните метрики processing rate и state size per subtask. Должны увидеть, что state равномерно распределён по key groups, независимо от parallelism.