Learning Platform
Глоссарий Troubleshooting
Урок 11.02 · 28 мин
Продвинутый
ForStDBLSM-treeS3 StorageHybrid CacheAsync I/OCompactionVLDB Paper

ForStDB — внутренности cloud-native KV store

ForStDB (For Streaming) — это не просто новый state backend, это специально разработанный KV store для disaggregated stateful streaming. Если RocksDB была фундаментом state в Flink 1.x, то ForStDB — её сводный брат, переориентированный с локального диска на cloud object storage. VLDB 2025 paper “Disaggregated State Management in Apache Flink 2.0” описывает архитектуру в деталях.

В этом уроке мы разбираем, как устроена ForStDB изнутри: LSM-tree adapted для cloud, как Memtable и SST файлы интегрируются с S3, роль local cache, async I/O, и compaction в облачной парадигме.

mmap: память как файл Buffering: три уровня кешей между write и диском

Стартовая точка: что взяли от RocksDB

ForStDB — это fork RocksDB. Авторы (Apache Flink community + Alibaba) сохранили самое ценное:

Из RocksDB:
  - LSM-tree структура (Memtable + L0/L1/L2/... SST levels)
  - Bloom filter для negative lookups
  - Block cache для hot data
  - Block-based table format для SST
  - Compaction strategies (universal, level)
  - WAL для durability
  - Column families
  - Iterator framework
  - Snapshots

Переписали или адаптировали:
  - Storage layer (главное изменение)
  - Async I/O везде
  - Cache architecture
  - Compaction triggers (на S3 cost-aware)
  - WAL strategy (можно отключить)

Это умное решение: LSM-tree — отличный algorithm для write-heavy workloads (что и есть streaming state), не надо переизобретать. Меняем только что нужно — где живут SST файлы.

Архитектура: ForstDB stack

ForStDB stack: от приложения до S3
Flink OperatorFlink operator (Aggregate, Join, Deduplicate) делает state.put / state.get через KeyedStateBackend интерфейс.
state.put / state.get
Async State API (FLIP-424)StateRequestBuffer аккумулирует state requests, организует их для async execution. Часть FLIP-425 Async Execution Model.
Memtable (RAM, 64 MB)In-memory write buffer. По умолчанию 64 MB. Каждая column family имеет свой. Flush в L0 SST когда заполнен или по checkpoint.
flush
Block cache (RAM)LRU + block-based. По умолчанию 8 MB на column family. Кэширует hot blocks из SST files. Можно увеличить.
L0/L1/L2 SST filesL0: untiered, до 4 файлов. L1: ~size_factor x L0. L2: ~size_factor x L1. И так далее. SST файлы immutable.
write-through
Local hybrid cache (SSD)Локальный SSD/NVMe. Hybrid cache: LRU + reserved space. По умолчанию 10-30% от working set. Может быть disable для compute-only.
S3 / HDFS (primary)Primary storage. Все SST files записаны и реплицированы через S3 SLA. Cross-region replication возможна. Lifecycle policies для cold data.
read-through
S3 read path (~10 ms)При cache miss — read с S3. Latency ~10 ms (vs ~10 µs для local). Async I/O скрывает latency через pipelining.

Write path

Запись в ForstDB проходит несколько слоёв:

state.put(key, value) — что происходит:

1. Async wrapper
   - Сразу возвращается StateFuture
   - Request добавляется в StateRequestBuffer
   - Operator продолжает обработку других records

2. Background I/O thread обрабатывает request
3. WAL append (опционально)
   - WAL пишется в S3 (write-through)
   - Может быть disable: state.backend.forst.wal.enabled: false
   - Без WAL: при crash потеря non-checkpointed данных
   - С WAL: full durability ценой latency

4. Memtable insert
   - In-memory write buffer
   - Sorted by key (skiplist)
   - Размер по умолчанию: 64 MB per column family
   - Когда заполняется -> flush

5. Flush memtable -> SST file
   - SST file записывается напрямую в S3 (write-through)
   - Параллельно: local cache hit копирует в SSD
   - Format: standard SSTable (sorted strings table)
   - Background process, не блокирует operator

6. Background compaction
   - Когда уровни overflow, compaction объединяет SST
   - Output SST пишется в S3
   - Old SST становятся obsolete, удаляются с S3 (lifecycle)

Главное отличие от RocksDB: SST файлы пишутся в S3 первично. Локальный диск — opportunistic cache. Если local диск исчезает (pod evict), данные не теряются — они уже в S3.

Read path

Чтение интересней:

state.get(key) — что происходит:

1. Async wrapper
   - Возвращается StateFuture
   - Request в StateRequestBuffer
   - Operator продолжает работать

2. Background I/O thread обрабатывает

3. Check Memtable (RAM hit, ~1 µs)
   - Если key найден в Memtable — return
   - Memtable содержит самые свежие writes

4. Check Block cache (RAM hit, ~1 µs)
   - LRU cache блоков из SST файлов
   - Если block с этим key уже decoded — return

5. Check local SSD cache (SSD hit, ~10 µs)
   - Hybrid cache хранит hot SST files на local disk
   - Если file present — read с диска, decode block, check key
   - Может попасть в block cache на следующий access

6. Read from S3 (network hit, ~10 ms)
   - Если ни RAM, ни SSD не содержат данные
   - Read SST file (или его части) с S3
   - Decode block, check key
   - Параллельно: warm up cache (write в SSD)

Optimization:
  Bloom filter в каждом SST file
    - Skip files которые точно не содержат key
    - ~1% false positive
    - Сильно ускоряет negative lookups

Storage layer: интеграция с S3

S3 не file system. ForStDB должен mapping LSM-tree операций на S3 API:

Маппинг операций:

Local FS                  S3
create file               PUT object
read file                 GET object (или GET с Range header для partial)
delete file               DELETE object
list directory            LIST objects with prefix
rename file               не атомарный — PUT new + DELETE old

Сложности:
  - List operations медленные (LIST API ~100 ms)
  - Eventual consistency для DELETE (некоторые S3 versions)
  - Throughput limits per bucket (~5500 GET/3500 PUT per prefix per sec)
  - Cost через requests (PUT/GET/LIST все billable)

Solutions:
  - Sharding через prefix patterns (jobid/operator/range/file)
  - Caching list operations
  - Eventually consistent metadata через manifests
  - Bulk operations через S3 batch

ForStDB использует abstraction S3Filesystem (или HdfsFilesystem, GcsFilesystem) для interop. Низкоуровневая интеграция через FileSystem API Flink.

WAL: durability vs performance

WAL (Write-Ahead Log) — это log всех изменений до их application к Memtable. Используется для recovery non-checkpointed data при crash. В ForstDB WAL опционален:

WAL включен (default):
  Every write -> WAL append (S3 write)
  -> latency higher (~10 ms добавочно)
  -> full durability: всё что записано — survived crash
  -> Tail of unflushed memtable восстанавливается

WAL отключен:
  state.backend.forst.wal.enabled: false
  -> latency lower
  -> только checkpointed data survived crash
  -> recovery до последнего checkpoint (1-30 sec потенциальная потеря)
  -> для большинства Flink jobs OK — checkpoints частые

Решение: для большинства streaming workloads checkpoints каждые 10-60 секунд достаточны как durability barrier. WAL добавляет латентность, но не критично для recovery (max 1 checkpoint interval потери). Disable WAL — typical production config.

Compaction: LSM consolidation

Compaction объединяет накопленные SST файлы для оптимизации read path. В RocksDB compaction обычно triggered размером файлов на уровне. В ForstDB — те же triggers, но с S3-cost awareness:

Compaction stages (универсальная стратегия):

1. Trigger detection
   - Level 0 имеет N+ файлов (default: 4)
   - Total size level X > threshold
   - Age of files (для old data)

2. Selection
   - Выбрать range overlapping SST files
   - Choose target level

3. Merge phase
   - Stream merge sorted runs из выбранных SST
   - Apply latest version per key
   - Write new SST в S3 (могут быть несколько files в target level)

4. Atomic switch
   - Update Manifest: новые SST visible, old obsolete
   - Old SST помечены для удаления (eventually consistent)

5. Cleanup
   - S3 lifecycle policy удаляет obsolete SST (через delay для consistency)
   - Local cache eviction

Compaction может generation significant S3 PUT/GET traffic. ForstDB поддерживает throttle (state.backend.forst.compaction.throttle) чтобы избежать спайков cost.

TIP

Compaction cost — главный operating expense ForStDB. Для большого state с heavy writes compaction может генерить 10x throughput traffic to S3 (write amplification). Mitigate: tune compaction strategy (level vs universal), throttle, размер levels. Monitor S3 PUT requests и cost через CloudWatch.

Hybrid cache: local SSD как accelerator

Локальный SSD — не primary storage, а cache. Архитектура hybrid:

Hybrid cache layers:

Layer 1: Block cache (RAM)
  - LRU, decoded blocks из SST
  - Default 8 MB per column family
  - Tunable: state.backend.forst.block-cache.size: 256MB
  - Hottest, fastest access

Layer 2: SSD cache (local disk)
  - Whole SST files (или partition by file)
  - LRU + reserved space для активных column families
  - Default: ~10-30% от working set
  - Tunable: state.backend.forst.local-cache.size: 50GB

Layer 3: S3 (primary, all data)
  - Все SST files
  - 100% durability через S3 SLA
  - Read latency ~10 ms

Cache hit rates production:
  - Hot workload (good locality): 95%+ Layer 1+2 hit
  - Random access workload: 50-70%
  - Cold start: 0% (warm-up period 5-30 min)

Async I/O: обязательно

Disaggregation добавляет ~10 ms latency per S3 access. Naïve sync code:

// Sync (Flink 1.x с RocksDB)
public void processElement(Event e) {
    Long value = state.value();   // ~10 µs (local RocksDB)
    state.update(value + 1);       // ~5 µs
    out.collect(e);                // ~1 µs
}
// Throughput: ~50,000 events/sec per task

// Sync на ForstDB cache miss:
public void processElement(Event e) {
    Long value = state.value();   // ~10 ms (S3 read)
    state.update(value + 1);       // ~10 ms (S3 write для WAL)
    out.collect(e);                // ~1 µs
}
// Throughput: ~50 events/sec per task — 1000x slowdown!

Async I/O скрывает latency через pipelining:

// Async (Flink 2.0 ForstDB)
public void processElement(Event e) {
    state.asyncValue().thenAccept(value -> {
        state.asyncUpdate(value + 1).thenAccept(__ -> {
            out.collect(e);
        });
    });
    // Не блокируем; продолжаем обрабатывать следующий event
    // Параллельно может быть в полёте 100s requests к S3
}
// Throughput: ~10,000-50,000 events/sec per task (зависит от cache hit rate)

Это главное архитектурное изменение Flink 2.0. State V2 API (FLIP-424, подробнее в уроке 3) обязательна для full async benefit.

Snapshot caching: lightweight checkpoints

В RocksDB local checkpoint требовал upload новых SST файлов. В ForstDB они уже в S3 — нужен только metadata pointer:

Checkpoint в RocksDB local (Flink 1.x):
  1. Trigger barrier
  2. Memtable flush -> SST
  3. New SST upload в S3 (минуты для большого state)
  4. Метадата checkpoint в JobManager
  5. Notify operators

  Total: 1-10 минут для 100 GB state

Checkpoint в ForstDB (Flink 2.0):
  1. Trigger barrier
  2. Memtable flush -> SST -> S3 (как обычно, но это streaming write-through, всегда в полёте)
  3. Snapshot manifest: список SST files с этого моментa
  4. Метадата + manifest pointer в JobManager
  5. Notify operators

  Total: 10-30 секунд (почти константа, не зависит от state size)

Async snapshot caching — оптимизация: между checkpoints SST файлы переиспользуются. Не нужно re-uploading того же SST для каждого checkpoint.

SST переиспользование:

Checkpoint N:
  Active SST files: f1, f2, f3, f4, f5

Между checkpoints:
  - New writes -> Memtable -> flush -> f6
  - Compaction f1+f2 -> f7, then f1, f2 obsolete

Checkpoint N+1:
  Active SST files: f3, f4, f5, f6, f7
  Changes vs N: +f6, +f7, -f1, -f2

Snapshot manifest:
  Только список текущих files (or delta)
  f3, f4, f5 — переиспользованы, физически не двинуты
  f6, f7 — уже в S3 (были записаны при flush/compaction)

Recovery from checkpoint N+1:
  Mount manifest
  Files доступны в S3
  No download required, recovery instant

Recovery: lazy load

Recovery — самое драматическое улучшение:

RocksDB recovery (Flink 1.x):
  1. Allocate new TaskManager (~30 sec K8s)
  2. Download all SST files с S3 в local disk
     200 GB / 100 MB/s = ~30 min
  3. Open RocksDB на local disk
  4. Resume processing

  Total: 30+ min для 200 GB

ForstDB recovery (Flink 2.0):
  1. Allocate new TaskManager (~30 sec)
  2. Open ForstDB:
     - Read snapshot manifest (small, ~MB)
     - SST files still in S3, NOT downloaded
  3. Resume processing immediately
     - Reads from S3 lazy
     - Local cache empty (warm up период)

  Total: ~30 sec + warm-up period

Warm-up период — несколько minutes пока cache наполнится hot data. Performance во время warm-up 50-80% от warm steady state.

VLDB 2025 paper: ключевые insights

"Disaggregated State Management in Apache Flink 2.0"

Главные contributions:

1. Async execution model для SQL operators
   - Re-architect operators для non-blocking I/O
   - StateRequestBuffer + async processing
   - Без изменений API для пользователя SQL/Table

2. ForStDB архитектура:
   - LSM в S3 с минимальной кодовой базой
   - Hybrid local cache с настраиваемыми политиками
   - Asynchronous snapshot caching

3. Benchmarks:
   - Nexmark queries: 75-120% throughput vs RocksDB local
   - Recovery: 100x faster (30 min -> 30 sec)
   - State size: unlimited (only S3 capacity)
   - Cost: 20-40% reduction для large state

4. Production validation:
   - Alibaba внутренний deployment
   - Multi-PB state, тысячи jobs
   - 99.9% SLA для recovery time

Configuration примеры

# flink-conf.yaml для ForstDB

# Включить ForstDB
state.backend.type: forst

# S3 endpoint
state.checkpoints.dir: s3://flink-state-bucket/checkpoints
state.savepoints.dir: s3://flink-state-bucket/savepoints
state.backend.forst.checkpoint-dir: s3://flink-state-bucket/state

# Async state API (рекомендуется)
table.exec.async-state.enabled: true

# Local cache tuning
state.backend.forst.local-cache.path: /mnt/local-cache/forst
state.backend.forst.local-cache.size: 50gb
state.backend.forst.block-cache.size: 1gb

# Compaction
state.backend.forst.compaction.style: LEVEL
state.backend.forst.compaction.throttle: 200mb

# WAL
state.backend.forst.wal.enabled: false  # for performance

Чтение source

Flink source:
  flink-state-backends/flink-statebackend-forst/
    src/main/java/org/apache/flink/state/forst/
      ForStStateBackend.java
      ForStKeyedStateBackend.java
      ForStOptions.java

  flink-runtime/src/main/java/org/apache/flink/runtime/state/forst/
      ForStRemoteFileSystem.java
      ForStLocalCacheManager.java

  flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/asyncprocessing/
      AsyncKeyedProcessOperator.java

ForstDB native:
  Github: https://github.com/ververica/ForSt
  Fork RocksDB с переписанным storage

VLDB paper:
  "Disaggregated State Management in Apache Flink 2.0"
  VLDB 2025
  Authors: Apache Flink community + Alibaba

FLIP документы:
  FLIP-423: Introduce Disaggregated State Management Framework
  FLIP-424: Async State API
  FLIP-425: Asynchronous Execution Model
Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. В чём ключевое отличие ForstDB от RocksDB на уровне storage?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5