ForStDB — внутренности cloud-native KV store
ForStDB (For Streaming) — это не просто новый state backend, это специально разработанный KV store для disaggregated stateful streaming. Если RocksDB была фундаментом state в Flink 1.x, то ForStDB — её сводный брат, переориентированный с локального диска на cloud object storage. VLDB 2025 paper “Disaggregated State Management in Apache Flink 2.0” описывает архитектуру в деталях.
В этом уроке мы разбираем, как устроена ForStDB изнутри: LSM-tree adapted для cloud, как Memtable и SST файлы интегрируются с S3, роль local cache, async I/O, и compaction в облачной парадигме.
mmap: память как файл Buffering: три уровня кешей между write и дискомСтартовая точка: что взяли от RocksDB
ForStDB — это fork RocksDB. Авторы (Apache Flink community + Alibaba) сохранили самое ценное:
Из RocksDB:
- LSM-tree структура (Memtable + L0/L1/L2/... SST levels)
- Bloom filter для negative lookups
- Block cache для hot data
- Block-based table format для SST
- Compaction strategies (universal, level)
- WAL для durability
- Column families
- Iterator framework
- Snapshots
Переписали или адаптировали:
- Storage layer (главное изменение)
- Async I/O везде
- Cache architecture
- Compaction triggers (на S3 cost-aware)
- WAL strategy (можно отключить)
Это умное решение: LSM-tree — отличный algorithm для write-heavy workloads (что и есть streaming state), не надо переизобретать. Меняем только что нужно — где живут SST файлы.
Архитектура: ForstDB stack
Write path
Запись в ForstDB проходит несколько слоёв:
state.put(key, value) — что происходит:
1. Async wrapper
- Сразу возвращается StateFuture
- Request добавляется в StateRequestBuffer
- Operator продолжает обработку других records
2. Background I/O thread обрабатывает request
3. WAL append (опционально)
- WAL пишется в S3 (write-through)
- Может быть disable: state.backend.forst.wal.enabled: false
- Без WAL: при crash потеря non-checkpointed данных
- С WAL: full durability ценой latency
4. Memtable insert
- In-memory write buffer
- Sorted by key (skiplist)
- Размер по умолчанию: 64 MB per column family
- Когда заполняется -> flush
5. Flush memtable -> SST file
- SST file записывается напрямую в S3 (write-through)
- Параллельно: local cache hit копирует в SSD
- Format: standard SSTable (sorted strings table)
- Background process, не блокирует operator
6. Background compaction
- Когда уровни overflow, compaction объединяет SST
- Output SST пишется в S3
- Old SST становятся obsolete, удаляются с S3 (lifecycle)
Главное отличие от RocksDB: SST файлы пишутся в S3 первично. Локальный диск — opportunistic cache. Если local диск исчезает (pod evict), данные не теряются — они уже в S3.
Read path
Чтение интересней:
state.get(key) — что происходит:
1. Async wrapper
- Возвращается StateFuture
- Request в StateRequestBuffer
- Operator продолжает работать
2. Background I/O thread обрабатывает
3. Check Memtable (RAM hit, ~1 µs)
- Если key найден в Memtable — return
- Memtable содержит самые свежие writes
4. Check Block cache (RAM hit, ~1 µs)
- LRU cache блоков из SST файлов
- Если block с этим key уже decoded — return
5. Check local SSD cache (SSD hit, ~10 µs)
- Hybrid cache хранит hot SST files на local disk
- Если file present — read с диска, decode block, check key
- Может попасть в block cache на следующий access
6. Read from S3 (network hit, ~10 ms)
- Если ни RAM, ни SSD не содержат данные
- Read SST file (или его части) с S3
- Decode block, check key
- Параллельно: warm up cache (write в SSD)
Optimization:
Bloom filter в каждом SST file
- Skip files которые точно не содержат key
- ~1% false positive
- Сильно ускоряет negative lookups
Storage layer: интеграция с S3
S3 не file system. ForStDB должен mapping LSM-tree операций на S3 API:
Маппинг операций:
Local FS S3
create file PUT object
read file GET object (или GET с Range header для partial)
delete file DELETE object
list directory LIST objects with prefix
rename file не атомарный — PUT new + DELETE old
Сложности:
- List operations медленные (LIST API ~100 ms)
- Eventual consistency для DELETE (некоторые S3 versions)
- Throughput limits per bucket (~5500 GET/3500 PUT per prefix per sec)
- Cost через requests (PUT/GET/LIST все billable)
Solutions:
- Sharding через prefix patterns (jobid/operator/range/file)
- Caching list operations
- Eventually consistent metadata через manifests
- Bulk operations через S3 batch
ForStDB использует abstraction S3Filesystem (или HdfsFilesystem, GcsFilesystem) для interop. Низкоуровневая интеграция через FileSystem API Flink.
WAL: durability vs performance
WAL (Write-Ahead Log) — это log всех изменений до их application к Memtable. Используется для recovery non-checkpointed data при crash. В ForstDB WAL опционален:
WAL включен (default):
Every write -> WAL append (S3 write)
-> latency higher (~10 ms добавочно)
-> full durability: всё что записано — survived crash
-> Tail of unflushed memtable восстанавливается
WAL отключен:
state.backend.forst.wal.enabled: false
-> latency lower
-> только checkpointed data survived crash
-> recovery до последнего checkpoint (1-30 sec потенциальная потеря)
-> для большинства Flink jobs OK — checkpoints частые
Решение: для большинства streaming workloads checkpoints каждые 10-60 секунд достаточны как durability barrier. WAL добавляет латентность, но не критично для recovery (max 1 checkpoint interval потери). Disable WAL — typical production config.
Compaction: LSM consolidation
Compaction объединяет накопленные SST файлы для оптимизации read path. В RocksDB compaction обычно triggered размером файлов на уровне. В ForstDB — те же triggers, но с S3-cost awareness:
Compaction stages (универсальная стратегия):
1. Trigger detection
- Level 0 имеет N+ файлов (default: 4)
- Total size level X > threshold
- Age of files (для old data)
2. Selection
- Выбрать range overlapping SST files
- Choose target level
3. Merge phase
- Stream merge sorted runs из выбранных SST
- Apply latest version per key
- Write new SST в S3 (могут быть несколько files в target level)
4. Atomic switch
- Update Manifest: новые SST visible, old obsolete
- Old SST помечены для удаления (eventually consistent)
5. Cleanup
- S3 lifecycle policy удаляет obsolete SST (через delay для consistency)
- Local cache eviction
Compaction может generation significant S3 PUT/GET traffic. ForstDB поддерживает throttle (state.backend.forst.compaction.throttle) чтобы избежать спайков cost.
Compaction cost — главный operating expense ForStDB. Для большого state с heavy writes compaction может генерить 10x throughput traffic to S3 (write amplification). Mitigate: tune compaction strategy (level vs universal), throttle, размер levels. Monitor S3 PUT requests и cost через CloudWatch.
Hybrid cache: local SSD как accelerator
Локальный SSD — не primary storage, а cache. Архитектура hybrid:
Hybrid cache layers:
Layer 1: Block cache (RAM)
- LRU, decoded blocks из SST
- Default 8 MB per column family
- Tunable: state.backend.forst.block-cache.size: 256MB
- Hottest, fastest access
Layer 2: SSD cache (local disk)
- Whole SST files (или partition by file)
- LRU + reserved space для активных column families
- Default: ~10-30% от working set
- Tunable: state.backend.forst.local-cache.size: 50GB
Layer 3: S3 (primary, all data)
- Все SST files
- 100% durability через S3 SLA
- Read latency ~10 ms
Cache hit rates production:
- Hot workload (good locality): 95%+ Layer 1+2 hit
- Random access workload: 50-70%
- Cold start: 0% (warm-up period 5-30 min)
Async I/O: обязательно
Disaggregation добавляет ~10 ms latency per S3 access. Naïve sync code:
// Sync (Flink 1.x с RocksDB)
public void processElement(Event e) {
Long value = state.value(); // ~10 µs (local RocksDB)
state.update(value + 1); // ~5 µs
out.collect(e); // ~1 µs
}
// Throughput: ~50,000 events/sec per task
// Sync на ForstDB cache miss:
public void processElement(Event e) {
Long value = state.value(); // ~10 ms (S3 read)
state.update(value + 1); // ~10 ms (S3 write для WAL)
out.collect(e); // ~1 µs
}
// Throughput: ~50 events/sec per task — 1000x slowdown!
Async I/O скрывает latency через pipelining:
// Async (Flink 2.0 ForstDB)
public void processElement(Event e) {
state.asyncValue().thenAccept(value -> {
state.asyncUpdate(value + 1).thenAccept(__ -> {
out.collect(e);
});
});
// Не блокируем; продолжаем обрабатывать следующий event
// Параллельно может быть в полёте 100s requests к S3
}
// Throughput: ~10,000-50,000 events/sec per task (зависит от cache hit rate)
Это главное архитектурное изменение Flink 2.0. State V2 API (FLIP-424, подробнее в уроке 3) обязательна для full async benefit.
Snapshot caching: lightweight checkpoints
В RocksDB local checkpoint требовал upload новых SST файлов. В ForstDB они уже в S3 — нужен только metadata pointer:
Checkpoint в RocksDB local (Flink 1.x):
1. Trigger barrier
2. Memtable flush -> SST
3. New SST upload в S3 (минуты для большого state)
4. Метадата checkpoint в JobManager
5. Notify operators
Total: 1-10 минут для 100 GB state
Checkpoint в ForstDB (Flink 2.0):
1. Trigger barrier
2. Memtable flush -> SST -> S3 (как обычно, но это streaming write-through, всегда в полёте)
3. Snapshot manifest: список SST files с этого моментa
4. Метадата + manifest pointer в JobManager
5. Notify operators
Total: 10-30 секунд (почти константа, не зависит от state size)
Async snapshot caching — оптимизация: между checkpoints SST файлы переиспользуются. Не нужно re-uploading того же SST для каждого checkpoint.
SST переиспользование:
Checkpoint N:
Active SST files: f1, f2, f3, f4, f5
Между checkpoints:
- New writes -> Memtable -> flush -> f6
- Compaction f1+f2 -> f7, then f1, f2 obsolete
Checkpoint N+1:
Active SST files: f3, f4, f5, f6, f7
Changes vs N: +f6, +f7, -f1, -f2
Snapshot manifest:
Только список текущих files (or delta)
f3, f4, f5 — переиспользованы, физически не двинуты
f6, f7 — уже в S3 (были записаны при flush/compaction)
Recovery from checkpoint N+1:
Mount manifest
Files доступны в S3
No download required, recovery instant
Recovery: lazy load
Recovery — самое драматическое улучшение:
RocksDB recovery (Flink 1.x):
1. Allocate new TaskManager (~30 sec K8s)
2. Download all SST files с S3 в local disk
200 GB / 100 MB/s = ~30 min
3. Open RocksDB на local disk
4. Resume processing
Total: 30+ min для 200 GB
ForstDB recovery (Flink 2.0):
1. Allocate new TaskManager (~30 sec)
2. Open ForstDB:
- Read snapshot manifest (small, ~MB)
- SST files still in S3, NOT downloaded
3. Resume processing immediately
- Reads from S3 lazy
- Local cache empty (warm up период)
Total: ~30 sec + warm-up period
Warm-up период — несколько minutes пока cache наполнится hot data. Performance во время warm-up 50-80% от warm steady state.
VLDB 2025 paper: ключевые insights
"Disaggregated State Management in Apache Flink 2.0"
Главные contributions:
1. Async execution model для SQL operators
- Re-architect operators для non-blocking I/O
- StateRequestBuffer + async processing
- Без изменений API для пользователя SQL/Table
2. ForStDB архитектура:
- LSM в S3 с минимальной кодовой базой
- Hybrid local cache с настраиваемыми политиками
- Asynchronous snapshot caching
3. Benchmarks:
- Nexmark queries: 75-120% throughput vs RocksDB local
- Recovery: 100x faster (30 min -> 30 sec)
- State size: unlimited (only S3 capacity)
- Cost: 20-40% reduction для large state
4. Production validation:
- Alibaba внутренний deployment
- Multi-PB state, тысячи jobs
- 99.9% SLA для recovery time
Configuration примеры
# flink-conf.yaml для ForstDB
# Включить ForstDB
state.backend.type: forst
# S3 endpoint
state.checkpoints.dir: s3://flink-state-bucket/checkpoints
state.savepoints.dir: s3://flink-state-bucket/savepoints
state.backend.forst.checkpoint-dir: s3://flink-state-bucket/state
# Async state API (рекомендуется)
table.exec.async-state.enabled: true
# Local cache tuning
state.backend.forst.local-cache.path: /mnt/local-cache/forst
state.backend.forst.local-cache.size: 50gb
state.backend.forst.block-cache.size: 1gb
# Compaction
state.backend.forst.compaction.style: LEVEL
state.backend.forst.compaction.throttle: 200mb
# WAL
state.backend.forst.wal.enabled: false # for performance
Чтение source
Flink source:
flink-state-backends/flink-statebackend-forst/
src/main/java/org/apache/flink/state/forst/
ForStStateBackend.java
ForStKeyedStateBackend.java
ForStOptions.java
flink-runtime/src/main/java/org/apache/flink/runtime/state/forst/
ForStRemoteFileSystem.java
ForStLocalCacheManager.java
flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/asyncprocessing/
AsyncKeyedProcessOperator.java
ForstDB native:
Github: https://github.com/ververica/ForSt
Fork RocksDB с переписанным storage
VLDB paper:
"Disaggregated State Management in Apache Flink 2.0"
VLDB 2025
Authors: Apache Flink community + Alibaba
FLIP документы:
FLIP-423: Introduce Disaggregated State Management Framework
FLIP-424: Async State API
FLIP-425: Asynchronous Execution Model