Learning Platform
Глоссарий Troubleshooting
Урок 07.04 · 26 мин
Продвинутый
Incremental CheckpointRocksDBSST FilesSharedStateRegistryLSM-tree Snapshot

Incremental checkpoints с RocksDB

В предыдущих уроках мы говорили о checkpoint-ах в абстрактных терминах “snapshot state”. Но что именно происходит, когда RocksDB-backend делает snapshot? Если state — гигабайт RocksDB-данных, и checkpoint interval — 30 секунд, не можем же мы каждые 30 секунд писать гигабайт на S3. Это убьёт network throughput и S3 costs.

Решение — incremental checkpoints. С RocksDB-backend каждый checkpoint содержит только те SST files, которые появились с последнего checkpoint, плюс reference на старые files, переиспользуемые с предыдущих checkpoint-ов. JobManager поддерживает SharedStateRegistry — reference counting structure, которая отслеживает, какие files всё ещё нужны. В этом уроке разбираем механизм incremental, LSM tree поведение под checkpoint-ом, и trade-offs vs full snapshot.

fsync и durability при записи на диск

RocksDB как LSM tree

Чтобы понять incremental, надо понять структуру RocksDB. RocksDB — это Log-Structured Merge tree (LSM tree). Данные хранятся в нескольких уровнях:

  • MemTable — in-memory write buffer. Все put-ы попадают сюда. Когда переполнен (~64MB по умолчанию), flush-ится в L0.
  • L0 (Level 0) — иммутабельные SST files на диске. Файлы могут перекрываться по ключам.
  • L1, L2, … L6 — нижние уровни. Files не перекрываются внутри уровня. Каждый уровень в 10x больше предыдущего по data volume.

Когда уровень переполняется, compaction перепаковывает данные в следующий уровень. SST files старого уровня удаляются, появляются новые в нижнем уровне.

Ключевое свойство для incremental: SST files immutable. Файл, однажды записанный, не меняется до compaction, после которой удаляется и заменяется новыми. Это значит, что если мы запомним “SST файл X.sst был в checkpoint 41”, и проверим, что он всё ещё существует в checkpoint 42 — мы можем переиспользовать его reference без копирования файла.

LSM tree между двумя checkpoint-ами
Состояние RocksDB при checkpoint 41
MemTableIn-memory buffer для новых put-ов. На checkpoint flush в L0 как новый SST.
L0L0 содержит 3 SST files: a.sst, b.sst, c.sst. Эти files пойдут в snapshot 41.
L1L1 содержит 8 SST files, всего 800 MB. Они стабильны.
L2L2 содержит 80 SST files, 8 GB. Большие и стабильные.
Через 30 секунд: новые puts, MemTable flush, L0 compaction в L1
MemTableНовый MemTable снова заполняется.
L0L0 теперь имеет 2 NEW SST: m.sst, n.sst (после flush). Старые a/b/c compacted в L1.
L1L1 имеет MIX: 7 старых SST (d-k не тронуты) + 1 new SST (l.sst — результат compaction a/b/c с частью d-k).
L2L2 не тронут — старые 80 files те же.
Incremental checkpoint 42 = NEW files (m, n, l) + REFs на 7 старых из L1 + REFs на все L2

Из этой картинки видна экономия incremental. Между checkpoint 41 и 42 поменялись 3 файла (m, n, l). Остальные 80+ файлов остались теми же. Если бы мы делали full snapshot — нам пришлось бы скопировать ВСЕ 8.8 GB на S3. Incremental — только 30-50 MB новых файлов плюс metadata.


Анатомия incremental snapshot

Когда Coordinator триггерит checkpoint, для каждой parallel subtask RocksDB-backend делает следующее (синхронная phase):

  1. Flush MemTable: текущий MemTable принудительно flush-ится в новый SST в L0. Это гарантирует, что все puts до момента snapshot будут persisted.
  2. Get list of all SST files: RocksDB-API даёт список всех SST файлов на диске для этой column family.
  3. Hard-link files to snapshot dir: для каждого SST создаётся hard-link в директорию snapshot — это атомарно, и предотвращает удаление файла последующим compaction (он держит file open).

Async phase (background):

  1. For each SST file: проверить, есть ли он в “previous checkpoint files set”. Если есть — добавить shared reference в snapshot metadata. Если нет — upload файла в DFS, добавить private reference.
  2. Write metadata file: создаётся _metadata в snapshot dir с описанием всех файлов snapshot.
  3. Send ack: TaskExecutor отправляет acknowledgeCheckpoint с TaskStateSnapshot, включающим список shared/private state handles.

Snapshot metadata structure:

checkpoint-42/
  task-A-subtask-0/
    chk-42/
      _metadata                      <- TaskStateSnapshot
      [shared] -> ../chk-37/m.sst    <- reference на старый файл
      [shared] -> ../chk-37/n.sst
      private/l.sst                  <- новый файл, частный для этого subtask

Важно: shared files живут отдельно от per-checkpoint директорий. Точная организация зависит от ChannelStateWriteFactory, но общая идея — есть shared/ директория, где лежат файлы переиспользуемые между checkpoint-ами, и private/ директория per-checkpoint для уникальных файлов этого checkpoint-а.


SharedStateRegistry: refcount на JobManager

Самая сложная часть incremental — как удалять старые файлы. Если checkpoint 42 ссылается на m.sst, который originally upload в checkpoint 37, мы не можем удалить m.sst при subsume checkpoint 37 — он ещё нужен. Но когда удалить?

Решение — SharedStateRegistry на JobManager-стороне. Это reference-counting map: для каждого shared file хранится count, сколько живых checkpoint-ов на него ссылаются. File удаляется, когда count достигает 0.

class SharedStateRegistry {
    Map<RegistryKey, Tuple2<StateHandle, Integer>> registry;
    
    public void registerReference(StateHandle handle) {
        // Add or increment count for this handle
    }
    
    public void unregisterReference(StateHandle handle) {
        // Decrement count; if 0, discard handle (delete from DFS)
    }
}

При добавлении CompletedCheckpoint в Coordinator:

  • Для каждого shared file в snapshot — registerReference() (count++).
  • При subsume старого checkpoint — unregisterReference() для всех его shared files (count—).
  • Если count достигает 0 — fileDiscard в DFS.
SharedStateRegistry lifecycle для одного SST файла
Checkpoint 37m.sst впервые upload в DFS. Registry: m.sst -> count=1.
Checkpoint 38m.sst всё ещё нужен (не compacted). Registry: m.sst -> count=2.
Checkpoint 39m.sst опять референсится. Registry: m.sst -> count=3.
Subsume 37Старый checkpoint 37 удалён. unregister m.sst. Registry: m.sst -> count=2.
CompactionRocksDB сделал compaction, m.sst удалён локально. В новом snapshot 40 он не появится — registerReference не вызывается.
Subsume 38, 39Постепенно старые checkpoint-ы удаляются. count m.sst падает до 0.
Discardcount = 0 -> файл физически удаляется из DFS. Окончательное освобождение storage.

Важная деталь: registerReference идемпотентен. Если subtask отправит ack с тем же файлом дважды (например, при retry), count не удвоится. Identity файла определяется по StateHandle (включает file path + checksum-like ID).


Trade-offs incremental vs full

ПараметрIncrementalFull
Checkpoint size в S3Только delta (KB-MB)Весь state (GB)
Sync phase durationHard-link list (мс)То же
Async phase durationUpload delta (секунды)Upload весь state (минуты)
Restore timeDownload всех нужных files (полный state)Download single archive
Cleanup complexitySharedStateRegistry на JM, reference countingПросто DELETE директории
JM memoryO(N) на каждый shared fileМинимальный
S3 request rateВысокий (много мелких files)Низкий (несколько крупных)
Сompactity при compaction stormБольшой spike в deltaСтабильный

Когда incremental выгоден:

  • Большой state (>1 GB) с относительно стабильной структурой.
  • Частые checkpoint-ы (interval менее 5 минут).
  • DFS с дешёвыми storage costs, дорогим bandwidth (типичный S3).

Когда лучше full:

  • Маленький state (менее 100 MB) — incremental overhead больше экономии.
  • Очень частые compaction (write-heavy workload) — delta каждый раз большая, экономии нет.
  • DFS с дорогим storage, дешёвым bandwidth.

В Flink 2.2 incremental на RocksDB — это default. Включается через state.backend.incremental: true. Для full можно явно отключить — в production используется редко.

TIP

Restore time для incremental может быть быстрее, чем для full. Incremental загружает только нужные SST files, и они уже в RocksDB-формате — после download просто открываются. Full загружает один большой archive, который надо распаковать в RocksDB. На очень больших state (десятки GB) разница может быть 2-3x в пользу incremental.


Patología: compaction storm

Главный риск incremental — compaction storm. RocksDB периодически делает большие compactions, переписывая много SST files за один раз. Если это совпало с checkpoint, delta может быть огромной — почти весь state, потому что все старые files compacted и не переиспользуются.

Симптомы:

  • Большинство checkpoint-ов 50-100 MB в S3.
  • Раз в несколько часов — checkpoint 5-10 GB.
  • Spike в async phase duration этих checkpoint-ов.
  • S3 throttling в этот момент.

Решения:

  • rocksdb.compaction.style: level (default) — level compaction делает много мелких compactions вместо одного большого. Это smoother delta.
  • state.backend.rocksdb.options-factory — позволяет тонко тюнить rate limit для compaction. Замедление compaction = smoother profile.
  • Увеличить max-concurrent-checkpoints до 2-3 — если один checkpoint в шторме, другой может продолжаться.
  • Снизить checkpoint interval — checkpoint-ы становятся меньше каждый, шторма становятся реже относительно общего объёма.

SharedStateRegistry edge cases

Edge case 1: JobManager failover. При HA setup JM может рестартовать. Новый JM читает CompletedCheckpoint metadata из ZK/K8s store, восстанавливает SharedStateRegistry в memory. Это reconstruction процесс — для каждого live checkpoint registerReference для всех его shared files. Производительность важна — при больших state с десятками checkpoint-ов registry restore может занять минуты.

Edge case 2: Decline checkpoint. Subtask отправил declineCheckpoint вместо ack. Coordinator должен discard все state handles от subtask-ов, которые УСПЕЛИ acknowledge. Для shared handles — unregisterReference. Это специальная логика abort-flow в PendingCheckpoint.discardOnAbort.

Edge case 3: Rescale из incremental. При rescale (изменение parallelism) Flink перераспределяет state по key groups. Для incremental snapshot это означает, что новая subtask может получить часть state из нескольких старых subtask-ов. Файлы не “режутся” — каждый файл может быть скопирован полностью в несколько new locations, или RocksDB читает только нужные ranges из файла. Это удорожает rescale из incremental, но всё ещё дешевле, чем переигрывать всё с начала.


Чтение source

  • org.apache.flink.runtime.state.SharedStateRegistry — рефcounting structure.
  • org.apache.flink.runtime.state.SharedStateRegistryFactory — instantiation для JM.
  • org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase — base class для RocksDB snapshot.
  • org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy — incremental implementation.
  • org.apache.flink.contrib.streaming.state.snapshot.RocksDBStateUploader — async upload SST files в DFS.

В тестах смотреть IncrementalKeyedStateHandleTest и SharedStateRegistryTest.


Попробуй сам

  1. Посмотрите на структуру checkpoint в S3. На запущенном job с RocksDB+incremental откройте директорию checkpoint в S3/HDFS. Увидите shared/ и chk-N/ директории. Подсчитайте, сколько уникальных файлов в shared/ vs сколько ссылок в metadata. Это и есть экономия incremental.

  2. Метрика lastCheckpointSize. В Web UI или через REST API получите эту метрику за неделю. Увидите spike-ы в момент compactions. Сопоставьте с GC pauses TaskManager — большие compactions часто коррелируют с тяжёлой работой RocksDB.

  3. Эксперимент: full vs incremental на одном job. Запустите две версии job на разных Flink-cluster-ах: одна с incremental, другая без. Сравните средний checkpoint duration, S3 storage usage за час, и время восстановления при искусственном failure. На state 5+ GB разница в реальные деньги.

Проверка знанийKnowledge check
У вас RocksDB-job с state 50 GB и 30-секундным checkpoint interval. Решили retention 5 checkpoint-ов (state.checkpoints.num-retained=5). Через неделю заметили, что S3 storage директории checkpoint-а растёт неконтролируемо — уже 2 TB. CheckpointCoordinator работает нормально, метрики lastCheckpointSize стабильные ~100 MB. В чём может быть проблема?
ОтветAnswer
Это типичный симптом утечки в SharedStateRegistry или discard-логике. Возможные причины: 1) Bug в subsume — старые checkpoint-ы помечены как удалённые в metadata, но physical delete в S3 не произошёл (S3 eventual consistency, race condition, бракованный CleanupExecutor). Проверить логи Coordinator на "Failed to discard". 2) Orphaned shared files — SharedStateRegistry потерял refcount из-за JM crash без proper shutdown. Файлы physically есть, но "никому не принадлежат". При следующем restore они не считаются нужными, но и не удаляются. 3) Long-living savepoint-ы или externally-triggered checkpoint-ы, которые не subsumed автоматически (retention politely игнорирует savepoint-ы). Шаги диагностики: count files in checkpoint dir, compare с reference count в metadata; найти файлы starше, чем oldest live checkpoint — это orphans. Решение: периодический cleanup job, который сканирует S3 на orphaned files (старше определённого порога и не упомянутые в metadata любого live checkpoint), удаляет их. Также включить в monitoring alert на growth checkpoint directory size > expected (5 * lastCheckpointSize в простом случае).

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. Какое фундаментальное свойство SST files в RocksDB позволяет incremental checkpoint?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5