Learning Platform
Глоссарий Troubleshooting
Урок 08.02 · 24 мин
Продвинутый
Savepoint Anatomy_metadata fileOperator UIDState FilesSavepoint Loader

Анатомия savepoint — что внутри _metadata

Savepoint — это не один файл, а директория со специальной структурой. Понимание этой структуры важно для трёх практических задач: (1) ручной cleanup и migration savepoint-ов между bucket-ами, (2) восстановление при corrupted savepoint (если один файл повреждён, нужно знать, какой именно), (3) использование State Processor API для inspection state — нужно понимать, что API читает изнутри.

Savepoints: создание и восстановление

В этом уроке разбираем структуру savepoint на диске, формат _metadata файла, какие state files создаются на operator и как Flink загружает savepoint при restore.


Структура директории savepoint

После flink savepoint :jobId :savepointDir в DFS появится директория:

savepoints/
  savepoint-12abcd-987654321/
    _metadata                                          # описание savepoint
    -0a1b2c3d4e5f6789                                  # operator state file
    1f2e3d4c5b6a7890                                   # keyed state file
    9876543210fedcba                                   # ...
    abcdef0123456789
    ...

Несколько moments:

  • Префикс savepoint- — convention, не обязательный. Можете указать любой path.
  • savepoint-12abcd-987654321 — directory name содержит prefix savepoint ID (12abcd — короткий ID) и timestamp/random suffix.
  • _metadata — единственный файл с известным именем. Все остальные — opaque blob-files со случайными именами (UUID-based).
  • State files имена — UUID-based, не несут смысла. Содержат binary state. Размер от килобайт до гигабайт.

Для canonical savepoint все state files создаются с нуля (full snapshot, no shared state). Для native savepoint (Flink 1.15+) могут быть references на shared state из других checkpoint-ов, но в practical случае savepoint обычно делается standalone.


Внутренности _metadata файла

_metadata — это binary file с serialized Java объектом SavepointV2 (или V3, V4, формат версионируется). Сериализация через специальный SavepointSerializer, не obychniy Java serialization.

Логическая структура (high-level):

class SavepointV2 {
    long checkpointId;                           // ID операции
    JobID jobId;                                 // job который сделал savepoint
    Collection<OperatorState> operatorStates;    // state каждого оператора
    Collection<MasterState> masterStates;        // state JobManager
    CheckpointProperties properties;             // флаги
    String externalPointer;                      // self-reference на эту директорию
}

class OperatorState {
    OperatorID operatorId;                       // hash от operator UID
    int parallelism;                             // parallelism при создании
    int maxParallelism;                          // max-parallelism (для rescale)
    Map<Integer, OperatorSubtaskState> subtaskStates;  // per subtask
}

class OperatorSubtaskState {
    StateObjectCollection<OperatorStateHandle> managedOperatorState;   // ListState, etc
    StateObjectCollection<KeyedStateHandle> managedKeyedState;          // ValueState, MapState, etc
    StateObjectCollection<OperatorStateHandle> rawOperatorState;        // raw ListState
    StateObjectCollection<KeyedStateHandle> rawKeyedState;              // raw keyed
    StateObjectCollection<InputChannelStateHandle> inputChannelState;   // unaligned ck
    StateObjectCollection<ResultSubpartitionStateHandle> outputChannelState;
}

Главное — это operatorStates map: ключ — OperatorID (хэш от user-задаваемого UID), value — состояние оператора. Для каждого оператора хранится state всех его parallel subtask-ов.

State handles внутри OperatorSubtaskState — это references на blob-files в той же директории. Например, KeyedStateHandle содержит:

class IncrementalRemoteKeyedStateHandle implements KeyedStateHandle {
    UUID backendIdentifier;
    KeyGroupRange keyGroupRange;
    long checkpointId;
    Map<StateHandleID, StreamStateHandle> sharedState;        # ссылки на shared files
    Map<StateHandleID, StreamStateHandle> privateState;       # ссылки на private files
    StreamStateHandle metaStateHandle;                        # meta info file
}

StreamStateHandle для savepoint обычно — FileStateHandle, содержащий path в DFS и size. Этот path и указывает на blob-file в savepoint directory.


OperatorID — хэш от UID

Когда вы пишете в коде:

.uid("my-fraud-detector")

Flink при job submission вычисляет OperatorID = hash("my-fraud-detector") через MD5 или SHA-256. Этот OperatorID становится ключом в metadata savepoint-а.

Если UID не задан явно, Flink fall-back на topology-based hash: хэш операторов от их позиции в графе. Это работает, но fragile — любое изменение DAG меняет hash.

Пример: вы добавили .uid("source-A") для source и .uid("filter-B") для filter. При savepoint в metadata будут:

operatorStates = {
    OperatorID(hash("source-A")) -> { parallelism: 4, ... },
    OperatorID(hash("filter-B")) -> { parallelism: 4, ... }
}

При restore Flink сматчит operator из nового DAG с state по OperatorID. Если в новом коде вы изменили .uid("filter-B") на .uid("filter-B-v2") — restore не найдёт state для нового UID и оператор стартует с empty state. Возможно, это то, что вы хотели (например, drop old state). Но чаще — это accident, который надо detectить.

WARNING

Flink поддерживает флаг —allowNonRestoredState при restore. Без него restore failes, если в savepoint есть state для UID, которого нет в новом DAG (например, удалили operator). С флагом — Flink skipпит этот state, но это implicit data loss. ВСЕГДА думайте, безопасно ли это для вашего use case.


Один operator на диске

Возьмём конкретный операторный snapshot. Допустим, в job есть KeyedProcessFunction с UID=“fraud-detector”, parallelism=4, использует ValueState<UserProfile>.

В savepoint metadata появится:

OperatorID(hash("fraud-detector")):
  parallelism: 4
  maxParallelism: 128
  subtaskStates: {
    0: {managedKeyedState: [Handle->-0a1b2c3d4e5f6789]}
    1: {managedKeyedState: [Handle->1f2e3d4c5b6a7890]}
    2: {managedKeyedState: [Handle->2a3b4c5d6e7f8901]}
    3: {managedKeyedState: [Handle->3c4d5e6f7a8b9012]}
  }

Каждая subtask имеет свой state file (для simple case без incremental). File -0a1b2c3d4e5f6789 — это RocksDB snapshot или HashMap state subtask 0, содержащий keyed entries для key groups 0-31 (при maxParallelism 128, parallelism 4).

Структура одного state file (RocksDB native):

  • Это binary SST file directly из RocksDB.
  • Содержит entries: (column-family-id, serialized-key, serialized-value).
  • column-family — один per state descriptor (ValueState с именем “userProfile” имеет свой column family).

Для canonical format структура другая:

  • Header: magic bytes, version, key group range.
  • Per key group: list of (key-bytes, value-bytes) tuples.
  • Footer: index, checksum.

Loading savepoint при restore
User CLI
JobManager
SavepointLoader
DFS (S3/HDFS)
TaskManager
submit job -s :savepointDirloadCheckpointMetadata(path)open _metadatabinary metadataSavepointMetadata{operatorStates}match operators by UID hashdeployTask(execution, taskRestore)download state filesstate file contentTaskInState READY

Один важный момент: JobManager не загружает state files. Он только читает metadata (KB), маппит operators, и отдаёт pointers TaskManager-ам. State files (GB) загружаются параллельно на TaskManager-ах. Это distributes network load и parallelизует restore.


Operator UID и rescale

Когда вы rescale (изменяете parallelism), Flink перераспределяет key groups между новыми subtasks. Это работает потому, что в metadata сохранён key group range для каждого state handle.

Пример: savepoint при parallelism=4, maxParallelism=128:

Subtask 0: keyGroupRange = [0-31]
Subtask 1: keyGroupRange = [32-63]
Subtask 2: keyGroupRange = [64-95]
Subtask 3: keyGroupRange = [96-127]

При restore с parallelism=8:

Subtask 0: needs [0-15]   -> partial read from old subtask 0
Subtask 1: needs [16-31]  -> partial read from old subtask 0
Subtask 2: needs [32-47]  -> partial read from old subtask 1
...

Каждый new subtask может читать ranges из нескольких state files. Это implementation в RocksDBIncrementalRestoreOperation (для RocksDB) или HeapKeyedStateBackend для HashMap.

Для canonical savepoint rescale работает однозначно — partitioning by key groups. Для native (RocksDB SST format) savepoint rescale тоже работает, но более expensive — нужно reorganize SST files по новым key group boundaries.


Self-pointer и portability

В _metadata есть поле externalPointer — это absolute path к savepoint directory. На первый взгляд это redundant: мы знаем path, потому что мы его открыли. Но это поле используется для двух кейсов:

Use case 1: Logging и observability. В logs JobManager-а и в Web UI этот pointer виден — operator может найти savepoint в DFS, проверить, что именно был restored.

Use case 2: Validation. При restore Flink проверяет: externalPointer in metadata matches path which we loaded from? Если нет (savepoint был copied в другой bucket без update metadata), он warning-ует и предлагает обновить.

Проблема: если вы copy savepoint в другой bucket (aws s3 cp --recursive s3://old s3://new), externalPointer в _metadata всё ещё указывает на old. Решение: pre-1.13 — manually edit metadata file (awkward). С 1.13+ Flink игнорирует externalPointer при load и использует path который user задал, что чище.


Чтение source

  • org.apache.flink.runtime.checkpoint.metadata.MetadataV2Serializer — serialization формата V2.
  • org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer — текущий format (post-1.13).
  • org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader — высокоуровневый loader.
  • org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle — типичный state handle для RocksDB.
  • org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot — meta info для каждого state descriptor.

Попробуй сам

  1. Откройте _metadata через State Processor API. Используйте SavepointReader (см следующий урок) для inspection metadata вашего savepoint. Увидите list operator UID, parallelism, types.

  2. Manually inspect blob files. Возьмите small savepoint, скачайте локально. Откройте non-_metadata file через hexdump -C | head. Для RocksDB native увидите magic bytes “RocksDB”. Для canonical — Flink-specific header.

  3. Тест rescale из savepoint. Сделайте savepoint при parallelism=4, restart с parallelism=2 и parallelism=8. Сравните метрики processing rate и state size per subtask. Должны увидеть, что state равномерно распределён по key groups, независимо от parallelism.

Проверка знанийKnowledge check
У вас savepoint-12345-abcd с _metadata и 64 blob files. Вы скопировали savepoint в другой S3 bucket через aws s3 cp --recursive, но получили ошибку при restore: 'cannot find state file s3://old-bucket/savepoints/savepoint-12345-abcd/-1a2b3c4d'. Что произошло и как исправить (без перезапуска оригинального savepoint)?
ОтветAnswer
State files в savepoint metadata записаны с absolute path. При copy в другой bucket абсолютные paths внутри _metadata всё ещё указывают на old bucket. Когда TaskManager пытается download state file, он использует path из metadata, идёт в old bucket, и если у него нет доступа (или old bucket удалён), failes. Решение зависит от Flink version: 1) Flink 1.13+: при restore можно использовать FileSystemPath path-substitution через config. Передать -Dstate.savepoint.storage.path=NEW_BUCKET, Flink translates всех paths internally. 2) Pre-1.13 или если path-substitution не работает: нужно регенерировать _metadata. Можно через SavepointLoader.loadCheckpointMetadata() прочитать, потом manually обновить FileStateHandle paths, и rewrite metadata с новыми paths через SavepointSerializer. 3) Альтернатива: использовать State Processor API для конвертации savepoint — read old savepoint, write новый в new location. Это самый надёжный способ. 4) Best practice для DR: используйте filesystem с consistent path namespace (S3 cross-region replication, или relative paths через customized FsCheckpointStorage). На production важно тестировать DR workflow до того, как он понадобится.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какой файл всегда присутствует в директории savepoint и что он содержит?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4