Changelog State Backend (FLIP-158)
Incremental checkpoints на RocksDB решают проблему “не копировать весь state на каждом checkpoint”, но создают другую — duration checkpoint-а непредсказуема. Когда RocksDB делает большую compaction, delta становится огромной, async upload занимает минуты, и весь pipeline эффективно throughput-limited не нагрузкой, а compaction storms.
Kafka log как append-only changelogChangelog State Backend (FLIP-158, GA с Flink 1.16) решает эту проблему через radically иную модель. Вместо того чтобы snapshot-ить state files state backend-а, мы поддерживаем durable changelog — append-only лог всех state mutations. Checkpoint — это просто синхронизация offset-а в changelog, плюс редкие “materialized” snapshots state backend-а в фоне. Latency checkpoint-а становится практически constant и предсказуемой.
В этом уроке разбираем механизм Changelog backend, его внутренности, и когда его использовать вместо incremental RocksDB.
Проблема “RocksDB tail latency”
На production RocksDB job-ах часто видна такая картинка: 99% checkpoint-ов завершаются за 5-10 секунд, но 1% — за 5-10 минут. Это compaction storms. Большие L0->L1 compactions, которые перепаковывают много SST files, создают delta размером в десятки GB. Async upload в S3 этого delta занимает столько же, сколько full snapshot.
Эта tail latency имеет conkretные последствия:
- Exactly-once seeding. Для 2PC sink (Kafka transactional, JDBC XA) commit транзакций происходит только после полного завершения checkpoint. Tail latency = end-to-end задержка от события до visibility в downstream sink.
- Recovery time. Поскольку minimum recoverable checkpoint = последний CompletedCheckpoint, при failure recovery откатывается на этот checkpoint. При tail latency 5 минут вы можете потерять до 5 минут работы.
- Resource utilization. На время большого checkpoint async upload занимает существенную часть network bandwidth. Это affects throughput main pipeline.
Решение Changelog backend — decouple checkpoint duration от state backend operations. Checkpoint duration становится function of changelog write latency, которая для well-tuned DFS = десятки миллисекунд. Materialization state в фоне — отдельный процесс, не блокирующий checkpoint.
Архитектура Changelog backend
Changelog State Backend — это wrapper над любым другим state backend (RocksDB, HashMap). Он добавляет durable changelog layer:
Ключевое отличие от incremental RocksDB: checkpoint-ы НЕ зависят от размера delta state backend. Каждый checkpoint фиксирует только текущий offset в changelog (несколько байт metadata). Само state остаётся в durable changelog.
Жизнь mutation в Changelog backend
Когда оператор делает state.update(value):
- Write to wrapped backend: значение пишется в RocksDB как обычно. Это нужно для последующих get-операций.
- Append to changelog: операция (put, K, V, timestamp) добавляется в in-memory log buffer.
- Flush log to DFS: периодически (каждые несколько MB или несколько секунд), in-memory buffer flush-ится в durable changelog в DFS как append-only segment.
Каждый segment changelog — это file в DFS:
changelog/job-X/subtask-0/
segment-001 <- mutations 0..N
segment-002 <- mutations N..M
segment-003 <- mutations M..K (current)
При checkpoint оператор просто фиксирует current changelog offset в state handle:
public class ChangelogStateHandle {
long checkpointId;
long changelogOffset;
List<ChangelogSegmentHandle> segments;
StateHandle materializedSnapshot; // optional snapshot of wrapped backend
}
Checkpoint ack-ится мгновенно — нужно только записать offset metadata, не upload-ить state. Полный snapshot — это materializedSnapshot + log replay from offset.
Materialization: фоновое compaction логов
Если бы Changelog backend только писал лог, restore time стал бы катастрофой — нужно было бы реплеить весь log за всё время работы job-а. Чтобы этого не происходило, периодически выполняется materialization — snapshot wrapped backend (например, RocksDB snapshot), который “захлопывает” log на этой точке.
После materialization:
- Materialized snapshot хранит state на момент M.
- Changelog содержит только mutations после M (новый сегмент).
- При restore: load materialized snapshot, replay changelog from M to current.
Materialization запускается async и НЕ блокирует checkpoint. Параметры:
- state.backend.changelog.periodic-materialize.interval — как часто запускать. Default 10 минут.
- state.backend.changelog.periodic-materialize.max-failures-allowed — terminate job, если materialization failed N раз подряд.
После завершения materialization соответствующая часть changelog становится “subsumed” — её можно удалить, потому что materialized snapshot её “содержит”.
При restore из ck-K: download M1 (один materialized snapshot, аналог full snapshot RocksDB), download changelog segments [M1..N11], replay log на M1 (применить mutations). State восстановлен.
Trade-offs vs incremental RocksDB
| Параметр | Changelog | Incremental RocksDB |
|---|---|---|
| Checkpoint duration | Constant, ~50-200ms | Variable, может быть минуты при compaction |
| Checkpoint duration p99 | ~constant | High tail latency |
| State storage в DFS | Большее (state + changelog) | Только state |
| Restore time | Slower (state + replay) | Faster (только state) |
| Materialization cost | Background, не влияет на pipeline | N/A (нет дополнительной работы) |
| Recovery от latest checkpoint | Меньше data loss (короче interval) | Больше data loss (длиннее interval) |
| Дополнительные DFS writes | Постоянный поток (changelog) | Bursty (на каждом checkpoint) |
| 2PC sink end-to-end latency | Низкая (быстрый checkpoint) | Большая на compaction storms |
| Сложность операций | Выше (нужен мониторинг materialization) | Ниже |
Когда Changelog побеждает:
- Strict latency SLA на checkpoint (exactly-once с 2PC, регуляторы).
- High variability в state mutations (compaction storms сильно влияют).
- Большой state (50+ GB), где tail latency RocksDB checkpoint значительна.
Когда лучше остаться с incremental RocksDB:
- Bandwidth-constrained DFS (Changelog даёт постоянный поток writes).
- Restore time критичен (Changelog медленнее на recovery).
- Маленький state (incremental RocksDB уже даёт checkpoint-ы за секунды).
Где changelog хранится
Changelog можно хранить в двух местах:
Option 1: DFS, тот же что для checkpoint-ов (default). Простота: один storage location, один set credentials. Минус — generic DFS не оптимизирован для append-heavy workload. На S3 это означает много мелких PUT-запросов, что может попасть в throttling.
Option 2: Dedicated changelog storage (например, Apache BookKeeper, Kafka, dedicated S3 bucket с optimized config). Лучше performance, но дополнительная инфраструктура.
В Flink 2.2 default — DFS-based через state.changelog.storage: filesystem. Можно переключить на kafka для Kafka-based changelog (experimental).
# flink-conf.yaml
state.backend: rocksdb
state.backend.changelog.enabled: true
state.changelog.storage: filesystem
state.changelog.dstl.dfs.base-path: s3://my-bucket/changelog
Internal: TaskChangelogRegistry
Аналог SharedStateRegistry для changelog — это TaskChangelogRegistry, который отслеживает, какие changelog segments всё ещё нужны.
Поскольку каждый checkpoint фиксирует offset в changelog, segments между checkpoint offsets всё ещё нужны для restore. Удалить segment можно, только если:
- Все live checkpoint-ы имеют offset > end of this segment, ИЛИ
- Materialization прошла после этого segment (т.е. wrapped backend snapshot уже содержит эти mutations).
Cleanup logic:
class TaskChangelogRegistry {
void registerSegment(ChangelogSegmentHandle handle, long checkpointId);
void unregisterCheckpoint(long checkpointId); // на subsume
void onMaterializationCompleted(long materializedUpTo);
// Internally: discard segments that are no longer referenced by any live ck
// AND covered by materialization
}
Materialization frequency определяет, как быстро old changelog segments становятся eligible for deletion. Слишком редкий materialize -> changelog растёт неограниченно, дорого по storage и slow restore. Слишком частый -> high background load, materialization congestion.
В первый раз тюнинг materialize interval — основная сложность Changelog backend. Default 10 минут подходит для большинства job-ов, но при очень heavy write rate можно нужно уменьшить до 1-2 минут. Мониторить changelog directory size в DFS — должен быть predictable и не расти unbounded. Если растёт — materialization не успевает, нужно или увеличить ресурсы для materialize, или уменьшить interval.
Migration: enable Changelog на existing job
Можно ли включить Changelog backend на запущенном job без рестарта с начала? Да, через savepoint:
- Сделать savepoint с current backend (RocksDB).
- Остановить job.
- Включить
state.backend.changelog.enabled: trueв config. - Restore из savepoint.
Restored state будет использовать Changelog backend. Внутренне Flink сначала загрузит state в wrapped RocksDB, потом начнёт писать changelog для всех новых mutations.
Обратная миграция (Changelog -> back to RocksDB only) тоже через savepoint, потому что savepoint всегда canonical format (а не Changelog-specific).
Чтение source
org.apache.flink.state.changelog.ChangelogStateBackend— главный backend, wrapper.org.apache.flink.state.changelog.AbstractChangelogStateBackend— abstract base с logic управления changelog.org.apache.flink.runtime.state.changelog.StateChangelogWriter— writer для durable log.org.apache.flink.state.changelog.PeriodicMaterializationManager— async materialization scheduler.- FLIP-158 (Generic log-based incremental checkpoint) в Apache Flink Wiki — design doc.
Попробуй сам
-
Enable changelog на staging job. Take savepoint, enable
state.backend.changelog.enabled, restore. Через час сравните метрики checkpoint duration p99: с Changelog должна быть значительно ниже. -
Мониторинг materialization. Метрики
materializationDuration,lastMaterializationTimestamp. Если materialization долго не запускается или fails — alert. -
Эксперимент с большим state. Создайте synthetic job с 10+ GB state. Сравните 100 checkpoint-ов: incremental RocksDB vs Changelog. Constructie distribution duration (p50, p95, p99, max). Должны увидеть, как Changelog даёт constant duration vs RocksDB с tail.