Learning Platform
Глоссарий Troubleshooting
Урок 07.05 · 24 мин
Продвинутый
Changelog State BackendFLIP-158Generic IncrementalDurable LogCheckpoint Decoupling

Changelog State Backend (FLIP-158)

Incremental checkpoints на RocksDB решают проблему “не копировать весь state на каждом checkpoint”, но создают другую — duration checkpoint-а непредсказуема. Когда RocksDB делает большую compaction, delta становится огромной, async upload занимает минуты, и весь pipeline эффективно throughput-limited не нагрузкой, а compaction storms.

Kafka log как append-only changelog

Changelog State Backend (FLIP-158, GA с Flink 1.16) решает эту проблему через radically иную модель. Вместо того чтобы snapshot-ить state files state backend-а, мы поддерживаем durable changelog — append-only лог всех state mutations. Checkpoint — это просто синхронизация offset-а в changelog, плюс редкие “materialized” snapshots state backend-а в фоне. Latency checkpoint-а становится практически constant и предсказуемой.

В этом уроке разбираем механизм Changelog backend, его внутренности, и когда его использовать вместо incremental RocksDB.


Проблема “RocksDB tail latency”

На production RocksDB job-ах часто видна такая картинка: 99% checkpoint-ов завершаются за 5-10 секунд, но 1% — за 5-10 минут. Это compaction storms. Большие L0->L1 compactions, которые перепаковывают много SST files, создают delta размером в десятки GB. Async upload в S3 этого delta занимает столько же, сколько full snapshot.

Эта tail latency имеет conkretные последствия:

  • Exactly-once seeding. Для 2PC sink (Kafka transactional, JDBC XA) commit транзакций происходит только после полного завершения checkpoint. Tail latency = end-to-end задержка от события до visibility в downstream sink.
  • Recovery time. Поскольку minimum recoverable checkpoint = последний CompletedCheckpoint, при failure recovery откатывается на этот checkpoint. При tail latency 5 минут вы можете потерять до 5 минут работы.
  • Resource utilization. На время большого checkpoint async upload занимает существенную часть network bandwidth. Это affects throughput main pipeline.

Решение Changelog backend — decouple checkpoint duration от state backend operations. Checkpoint duration становится function of changelog write latency, которая для well-tuned DFS = десятки миллисекунд. Materialization state в фоне — отдельный процесс, не блокирующий checkpoint.


Архитектура Changelog backend

Changelog State Backend — это wrapper над любым другим state backend (RocksDB, HashMap). Он добавляет durable changelog layer:

Changelog backend = wrapper над state backend + durable log
Operator putОператор делает put в state — addToState(K, V).
Changelog wrapperЗапись попадает одновременно в обычный state backend (RocksDB) AND в durable changelog.
Wrapped backend (RocksDB)Underlying state backend (RocksDB, HashMap) — обычное место для query state. Используется для get/scan операций.
+
Durable changelog (S3)Durable changelog — append-only лог всех mutations. Записывается в DFS (S3 или dedicated changelog storage).
Checkpoint = changelog offsetCheckpoint = sync changelog offset + ack. Materialization wrapped backend происходит async и не влияет на checkpoint latency.

Ключевое отличие от incremental RocksDB: checkpoint-ы НЕ зависят от размера delta state backend. Каждый checkpoint фиксирует только текущий offset в changelog (несколько байт metadata). Само state остаётся в durable changelog.


Жизнь mutation в Changelog backend

Когда оператор делает state.update(value):

  1. Write to wrapped backend: значение пишется в RocksDB как обычно. Это нужно для последующих get-операций.
  2. Append to changelog: операция (put, K, V, timestamp) добавляется в in-memory log buffer.
  3. Flush log to DFS: периодически (каждые несколько MB или несколько секунд), in-memory buffer flush-ится в durable changelog в DFS как append-only segment.

Каждый segment changelog — это file в DFS:

changelog/job-X/subtask-0/
  segment-001  <- mutations 0..N
  segment-002  <- mutations N..M
  segment-003  <- mutations M..K (current)

При checkpoint оператор просто фиксирует current changelog offset в state handle:

public class ChangelogStateHandle {
    long checkpointId;
    long changelogOffset;
    List<ChangelogSegmentHandle> segments;
    StateHandle materializedSnapshot;  // optional snapshot of wrapped backend
}

Checkpoint ack-ится мгновенно — нужно только записать offset metadata, не upload-ить state. Полный snapshot — это materializedSnapshot + log replay from offset.


Materialization: фоновое compaction логов

Если бы Changelog backend только писал лог, restore time стал бы катастрофой — нужно было бы реплеить весь log за всё время работы job-а. Чтобы этого не происходило, периодически выполняется materialization — snapshot wrapped backend (например, RocksDB snapshot), который “захлопывает” log на этой точке.

После materialization:

  • Materialized snapshot хранит state на момент M.
  • Changelog содержит только mutations после M (новый сегмент).
  • При restore: load materialized snapshot, replay changelog from M to current.

Materialization запускается async и НЕ блокирует checkpoint. Параметры:

  • state.backend.changelog.periodic-materialize.interval — как часто запускать. Default 10 минут.
  • state.backend.changelog.periodic-materialize.max-failures-allowed — terminate job, если materialization failed N раз подряд.

После завершения materialization соответствующая часть changelog становится “subsumed” — её можно удалить, потому что materialized snapshot её “содержит”.

Lifecycle changelog и materialization
T=0Job стартует. Empty state, empty changelog.
T=1minПрошёл 1 минута. Несколько MB changelog накопилось. Checkpoint фиксирует offset = N1.
T=5min5 минут работы. changelog растёт. Каждый checkpoint фиксирует свой offset, restore от любого ck = load empty + replay all.
T=10min: MaterializationBackground materialization. Wrapped RocksDB snapshot создан = M1. Changelog до M1.offset = subsumed.
T=11min: ck-KНовый checkpoint K после materialization. state handle = M1 + changelog offset = N11 (M1 + 1 минута новых mutations).

При restore из ck-K: download M1 (один materialized snapshot, аналог full snapshot RocksDB), download changelog segments [M1..N11], replay log на M1 (применить mutations). State восстановлен.


Trade-offs vs incremental RocksDB

ПараметрChangelogIncremental RocksDB
Checkpoint durationConstant, ~50-200msVariable, может быть минуты при compaction
Checkpoint duration p99~constantHigh tail latency
State storage в DFSБольшее (state + changelog)Только state
Restore timeSlower (state + replay)Faster (только state)
Materialization costBackground, не влияет на pipelineN/A (нет дополнительной работы)
Recovery от latest checkpointМеньше data loss (короче interval)Больше data loss (длиннее interval)
Дополнительные DFS writesПостоянный поток (changelog)Bursty (на каждом checkpoint)
2PC sink end-to-end latencyНизкая (быстрый checkpoint)Большая на compaction storms
Сложность операцийВыше (нужен мониторинг materialization)Ниже

Когда Changelog побеждает:

  • Strict latency SLA на checkpoint (exactly-once с 2PC, регуляторы).
  • High variability в state mutations (compaction storms сильно влияют).
  • Большой state (50+ GB), где tail latency RocksDB checkpoint значительна.

Когда лучше остаться с incremental RocksDB:

  • Bandwidth-constrained DFS (Changelog даёт постоянный поток writes).
  • Restore time критичен (Changelog медленнее на recovery).
  • Маленький state (incremental RocksDB уже даёт checkpoint-ы за секунды).

Где changelog хранится

Changelog можно хранить в двух местах:

Option 1: DFS, тот же что для checkpoint-ов (default). Простота: один storage location, один set credentials. Минус — generic DFS не оптимизирован для append-heavy workload. На S3 это означает много мелких PUT-запросов, что может попасть в throttling.

Option 2: Dedicated changelog storage (например, Apache BookKeeper, Kafka, dedicated S3 bucket с optimized config). Лучше performance, но дополнительная инфраструктура.

В Flink 2.2 default — DFS-based через state.changelog.storage: filesystem. Можно переключить на kafka для Kafka-based changelog (experimental).

# flink-conf.yaml
state.backend: rocksdb
state.backend.changelog.enabled: true
state.changelog.storage: filesystem
state.changelog.dstl.dfs.base-path: s3://my-bucket/changelog

Internal: TaskChangelogRegistry

Аналог SharedStateRegistry для changelog — это TaskChangelogRegistry, который отслеживает, какие changelog segments всё ещё нужны.

Поскольку каждый checkpoint фиксирует offset в changelog, segments между checkpoint offsets всё ещё нужны для restore. Удалить segment можно, только если:

  • Все live checkpoint-ы имеют offset > end of this segment, ИЛИ
  • Materialization прошла после этого segment (т.е. wrapped backend snapshot уже содержит эти mutations).

Cleanup logic:

class TaskChangelogRegistry {
    void registerSegment(ChangelogSegmentHandle handle, long checkpointId);
    void unregisterCheckpoint(long checkpointId);  // на subsume
    void onMaterializationCompleted(long materializedUpTo);
    // Internally: discard segments that are no longer referenced by any live ck
    //             AND covered by materialization
}

Materialization frequency определяет, как быстро old changelog segments становятся eligible for deletion. Слишком редкий materialize -> changelog растёт неограниченно, дорого по storage и slow restore. Слишком частый -> high background load, materialization congestion.

WARNING

В первый раз тюнинг materialize interval — основная сложность Changelog backend. Default 10 минут подходит для большинства job-ов, но при очень heavy write rate можно нужно уменьшить до 1-2 минут. Мониторить changelog directory size в DFS — должен быть predictable и не расти unbounded. Если растёт — materialization не успевает, нужно или увеличить ресурсы для materialize, или уменьшить interval.


Migration: enable Changelog на existing job

Можно ли включить Changelog backend на запущенном job без рестарта с начала? Да, через savepoint:

  1. Сделать savepoint с current backend (RocksDB).
  2. Остановить job.
  3. Включить state.backend.changelog.enabled: true в config.
  4. Restore из savepoint.

Restored state будет использовать Changelog backend. Внутренне Flink сначала загрузит state в wrapped RocksDB, потом начнёт писать changelog для всех новых mutations.

Обратная миграция (Changelog -> back to RocksDB only) тоже через savepoint, потому что savepoint всегда canonical format (а не Changelog-specific).


Чтение source

  • org.apache.flink.state.changelog.ChangelogStateBackend — главный backend, wrapper.
  • org.apache.flink.state.changelog.AbstractChangelogStateBackend — abstract base с logic управления changelog.
  • org.apache.flink.runtime.state.changelog.StateChangelogWriter — writer для durable log.
  • org.apache.flink.state.changelog.PeriodicMaterializationManager — async materialization scheduler.
  • FLIP-158 (Generic log-based incremental checkpoint) в Apache Flink Wiki — design doc.

Попробуй сам

  1. Enable changelog на staging job. Take savepoint, enable state.backend.changelog.enabled, restore. Через час сравните метрики checkpoint duration p99: с Changelog должна быть значительно ниже.

  2. Мониторинг materialization. Метрики materializationDuration, lastMaterializationTimestamp. Если materialization долго не запускается или fails — alert.

  3. Эксперимент с большим state. Создайте synthetic job с 10+ GB state. Сравните 100 checkpoint-ов: incremental RocksDB vs Changelog. Constructie distribution duration (p50, p95, p99, max). Должны увидеть, как Changelog даёт constant duration vs RocksDB с tail.

Проверка знанийKnowledge check
Вы включили Changelog backend для job с большим state, где раньше были проблемы с tail latency checkpoint-ов. Через неделю чекпойнты стабильно проходят за 100ms, но recovery time после crash увеличилось с 30 секунд до 8 минут. Также DFS storage usage директории checkpoint вырос в 3 раза. Что происходит и как тюнить?
ОтветAnswer
Это classic trade-off Changelog: checkpoint быстрый, но full state = materialized snapshot + длинный log to replay. Если materialization запускается редко относительно нагрузки, между materialization-ами накапливается много changelog, который при restore нужно replay-ить — это и есть 8 минут. Storage usage растёт, потому что changelog между materialization-ами hangs around, плюс не subsumed segments. Решения: 1) Увеличить frequency materialization — изменить state.backend.changelog.periodic-materialize.interval с 10 минут на 2-3 минуты. Materialization будет happen чаще, log будет короче, restore быстрее. Но это увеличит background load на wrapped backend (RocksDB будет чаще делать full snapshot). 2) Проверить, что materialization не fails — если failures, log накапливается. Метрика materializationFailureCount. 3) Если RocksDB не успевает делать частые materialize — увеличить resources для TaskManager (RAM для buffer cache, CPU для compaction). 4) Long-term: рассмотреть hybrid — Changelog для some operators (low-latency exactly-once), regular RocksDB для others (low storage cost). Цель — найти sweet spot между checkpoint latency и restore latency, который соответствует SLA вашего use case.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Какую главную проблему incremental RocksDB checkpoint решает Changelog State Backend?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5