Incremental checkpoints с RocksDB
В предыдущих уроках мы говорили о checkpoint-ах в абстрактных терминах “snapshot state”. Но что именно происходит, когда RocksDB-backend делает snapshot? Если state — гигабайт RocksDB-данных, и checkpoint interval — 30 секунд, не можем же мы каждые 30 секунд писать гигабайт на S3. Это убьёт network throughput и S3 costs.
Решение — incremental checkpoints. С RocksDB-backend каждый checkpoint содержит только те SST files, которые появились с последнего checkpoint, плюс reference на старые files, переиспользуемые с предыдущих checkpoint-ов. JobManager поддерживает SharedStateRegistry — reference counting structure, которая отслеживает, какие files всё ещё нужны. В этом уроке разбираем механизм incremental, LSM tree поведение под checkpoint-ом, и trade-offs vs full snapshot.
fsync и durability при записи на дискRocksDB как LSM tree
Чтобы понять incremental, надо понять структуру RocksDB. RocksDB — это Log-Structured Merge tree (LSM tree). Данные хранятся в нескольких уровнях:
- MemTable — in-memory write buffer. Все put-ы попадают сюда. Когда переполнен (~64MB по умолчанию), flush-ится в L0.
- L0 (Level 0) — иммутабельные SST files на диске. Файлы могут перекрываться по ключам.
- L1, L2, … L6 — нижние уровни. Files не перекрываются внутри уровня. Каждый уровень в 10x больше предыдущего по data volume.
Когда уровень переполняется, compaction перепаковывает данные в следующий уровень. SST files старого уровня удаляются, появляются новые в нижнем уровне.
Ключевое свойство для incremental: SST files immutable. Файл, однажды записанный, не меняется до compaction, после которой удаляется и заменяется новыми. Это значит, что если мы запомним “SST файл X.sst был в checkpoint 41”, и проверим, что он всё ещё существует в checkpoint 42 — мы можем переиспользовать его reference без копирования файла.
Из этой картинки видна экономия incremental. Между checkpoint 41 и 42 поменялись 3 файла (m, n, l). Остальные 80+ файлов остались теми же. Если бы мы делали full snapshot — нам пришлось бы скопировать ВСЕ 8.8 GB на S3. Incremental — только 30-50 MB новых файлов плюс metadata.
Анатомия incremental snapshot
Когда Coordinator триггерит checkpoint, для каждой parallel subtask RocksDB-backend делает следующее (синхронная phase):
- Flush MemTable: текущий MemTable принудительно flush-ится в новый SST в L0. Это гарантирует, что все puts до момента snapshot будут persisted.
- Get list of all SST files: RocksDB-API даёт список всех SST файлов на диске для этой column family.
- Hard-link files to snapshot dir: для каждого SST создаётся hard-link в директорию snapshot — это атомарно, и предотвращает удаление файла последующим compaction (он держит file open).
Async phase (background):
- For each SST file: проверить, есть ли он в “previous checkpoint files set”. Если есть — добавить shared reference в snapshot metadata. Если нет — upload файла в DFS, добавить private reference.
- Write metadata file: создаётся
_metadataв snapshot dir с описанием всех файлов snapshot. - Send ack: TaskExecutor отправляет acknowledgeCheckpoint с TaskStateSnapshot, включающим список shared/private state handles.
Snapshot metadata structure:
checkpoint-42/
task-A-subtask-0/
chk-42/
_metadata <- TaskStateSnapshot
[shared] -> ../chk-37/m.sst <- reference на старый файл
[shared] -> ../chk-37/n.sst
private/l.sst <- новый файл, частный для этого subtask
Важно: shared files живут отдельно от per-checkpoint директорий. Точная организация зависит от ChannelStateWriteFactory, но общая идея — есть shared/ директория, где лежат файлы переиспользуемые между checkpoint-ами, и private/ директория per-checkpoint для уникальных файлов этого checkpoint-а.
SharedStateRegistry: refcount на JobManager
Самая сложная часть incremental — как удалять старые файлы. Если checkpoint 42 ссылается на m.sst, который originally upload в checkpoint 37, мы не можем удалить m.sst при subsume checkpoint 37 — он ещё нужен. Но когда удалить?
Решение — SharedStateRegistry на JobManager-стороне. Это reference-counting map: для каждого shared file хранится count, сколько живых checkpoint-ов на него ссылаются. File удаляется, когда count достигает 0.
class SharedStateRegistry {
Map<RegistryKey, Tuple2<StateHandle, Integer>> registry;
public void registerReference(StateHandle handle) {
// Add or increment count for this handle
}
public void unregisterReference(StateHandle handle) {
// Decrement count; if 0, discard handle (delete from DFS)
}
}
При добавлении CompletedCheckpoint в Coordinator:
- Для каждого shared file в snapshot —
registerReference()(count++). - При subsume старого checkpoint —
unregisterReference()для всех его shared files (count—). - Если count достигает 0 — fileDiscard в DFS.
Важная деталь: registerReference идемпотентен. Если subtask отправит ack с тем же файлом дважды (например, при retry), count не удвоится. Identity файла определяется по StateHandle (включает file path + checksum-like ID).
Trade-offs incremental vs full
| Параметр | Incremental | Full |
|---|---|---|
| Checkpoint size в S3 | Только delta (KB-MB) | Весь state (GB) |
| Sync phase duration | Hard-link list (мс) | То же |
| Async phase duration | Upload delta (секунды) | Upload весь state (минуты) |
| Restore time | Download всех нужных files (полный state) | Download single archive |
| Cleanup complexity | SharedStateRegistry на JM, reference counting | Просто DELETE директории |
| JM memory | O(N) на каждый shared file | Минимальный |
| S3 request rate | Высокий (много мелких files) | Низкий (несколько крупных) |
| Сompactity при compaction storm | Большой spike в delta | Стабильный |
Когда incremental выгоден:
- Большой state (>1 GB) с относительно стабильной структурой.
- Частые checkpoint-ы (interval менее 5 минут).
- DFS с дешёвыми storage costs, дорогим bandwidth (типичный S3).
Когда лучше full:
- Маленький state (менее 100 MB) — incremental overhead больше экономии.
- Очень частые compaction (write-heavy workload) — delta каждый раз большая, экономии нет.
- DFS с дорогим storage, дешёвым bandwidth.
В Flink 2.2 incremental на RocksDB — это default. Включается через state.backend.incremental: true. Для full можно явно отключить — в production используется редко.
Restore time для incremental может быть быстрее, чем для full. Incremental загружает только нужные SST files, и они уже в RocksDB-формате — после download просто открываются. Full загружает один большой archive, который надо распаковать в RocksDB. На очень больших state (десятки GB) разница может быть 2-3x в пользу incremental.
Patología: compaction storm
Главный риск incremental — compaction storm. RocksDB периодически делает большие compactions, переписывая много SST files за один раз. Если это совпало с checkpoint, delta может быть огромной — почти весь state, потому что все старые files compacted и не переиспользуются.
Симптомы:
- Большинство checkpoint-ов 50-100 MB в S3.
- Раз в несколько часов — checkpoint 5-10 GB.
- Spike в async phase duration этих checkpoint-ов.
- S3 throttling в этот момент.
Решения:
- rocksdb.compaction.style: level (default) — level compaction делает много мелких compactions вместо одного большого. Это smoother delta.
- state.backend.rocksdb.options-factory — позволяет тонко тюнить rate limit для compaction. Замедление compaction = smoother profile.
- Увеличить max-concurrent-checkpoints до 2-3 — если один checkpoint в шторме, другой может продолжаться.
- Снизить checkpoint interval — checkpoint-ы становятся меньше каждый, шторма становятся реже относительно общего объёма.
SharedStateRegistry edge cases
Edge case 1: JobManager failover. При HA setup JM может рестартовать. Новый JM читает CompletedCheckpoint metadata из ZK/K8s store, восстанавливает SharedStateRegistry в memory. Это reconstruction процесс — для каждого live checkpoint registerReference для всех его shared files. Производительность важна — при больших state с десятками checkpoint-ов registry restore может занять минуты.
Edge case 2: Decline checkpoint. Subtask отправил declineCheckpoint вместо ack. Coordinator должен discard все state handles от subtask-ов, которые УСПЕЛИ acknowledge. Для shared handles — unregisterReference. Это специальная логика abort-flow в PendingCheckpoint.discardOnAbort.
Edge case 3: Rescale из incremental. При rescale (изменение parallelism) Flink перераспределяет state по key groups. Для incremental snapshot это означает, что новая subtask может получить часть state из нескольких старых subtask-ов. Файлы не “режутся” — каждый файл может быть скопирован полностью в несколько new locations, или RocksDB читает только нужные ranges из файла. Это удорожает rescale из incremental, но всё ещё дешевле, чем переигрывать всё с начала.
Чтение source
org.apache.flink.runtime.state.SharedStateRegistry— рефcounting structure.org.apache.flink.runtime.state.SharedStateRegistryFactory— instantiation для JM.org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase— base class для RocksDB snapshot.org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy— incremental implementation.org.apache.flink.contrib.streaming.state.snapshot.RocksDBStateUploader— async upload SST files в DFS.
В тестах смотреть IncrementalKeyedStateHandleTest и SharedStateRegistryTest.
Попробуй сам
-
Посмотрите на структуру checkpoint в S3. На запущенном job с RocksDB+incremental откройте директорию checkpoint в S3/HDFS. Увидите
shared/иchk-N/директории. Подсчитайте, сколько уникальных файлов в shared/ vs сколько ссылок в metadata. Это и есть экономия incremental. -
Метрика lastCheckpointSize. В Web UI или через REST API получите эту метрику за неделю. Увидите spike-ы в момент compactions. Сопоставьте с GC pauses TaskManager — большие compactions часто коррелируют с тяжёлой работой RocksDB.
-
Эксперимент: full vs incremental на одном job. Запустите две версии job на разных Flink-cluster-ах: одна с incremental, другая без. Сравните средний checkpoint duration, S3 storage usage за час, и время восстановления при искусственном failure. На state 5+ GB разница в реальные деньги.