Learning Platform
Глоссарий Troubleshooting
Урок 05.03 · 24 мин
Продвинутый
Flink 2.0ForStDisaggregated StateCloud-Native StreamingRocksDBIceberg

Flink 2.0: Disaggregated State (ForSt)

Зачем нужен disaggregated state

Flink 2.0 (релиз 24 марта 2025) — первый major релиз за 9 лет. Главное архитектурное изменение — disaggregated state management через ForSt key-value backend. Проблема была в том, что Flink 1.x хранил state в локальном RocksDB на диске TaskManager, что превращало streaming jobs в stateful монолиты с привязкой compute к storage.

Боль Flink 1.x (RocksDB local state):
  TaskManager
    - Operator state в RocksDB на локальном диске
    - Размер state ограничен local disk capacity (1-2 TB на ноду)
    - Rescaling = redistribute state = download с S3 + replay
    - Recovery time ~ size of state (минуты для 100 GB+)
    - Cluster sizing определяется max state size, не CPU/throughput

Cloud-native services (Snowflake, BigQuery):
  - Compute и storage разделены
  - Resize за секунды
  - Scale за load, а не за state

  Flink 1.x не мог так. ForSt — попытка догнать.
Архитектура: RocksDB local vs ForSt disaggregated
Flink 1.x — RocksDB local
Flink 2.0 — ForSt disaggregated

ForSt: KV backend для streaming

ForSt (For Streaming) — fork RocksDB с переписанным storage layer. LSM-tree сохраняется (как в RocksDB), но SST файлы пишутся напрямую в DFS (S3/HDFS), а не на локальный диск. Локальный диск используется только как secondary cache.

ForSt архитектура:
  Memtable (RAM, write buffer)
    ↓ flush
  L0/L1/L2... SST files
    ↓ write to
  Primary storage: S3/HDFS (durable)
    ↓ read with cache
  Local hybrid cache (1-10 GB SSD)
    - Hot working set
    - Recently accessed SST files
    - LRU или reserved-based eviction

Read path:
  1. Check memtable (RAM hit)
  2. Check block cache (RAM hit)
  3. Check local cache (SSD hit)
  4. Read from S3 (network hit)

Write path:
  1. Append to WAL (S3)
  2. Insert into memtable
  3. Periodic flush → SST → S3
  4. Compaction → новые SST в S3
TIP

ForSt = RocksDB переориентированный на DFS. Если читали ClickHouse-курс или storage-formats про MergeTree и LSM-tree — те же принципы, но storage layer теперь S3, а не локальный SSD. Hot data в памяти/cache, cold data в object storage.

LSM-tree fundamentals

Async execution model

Disaggregated state добавляет network round-trip к каждому state access. На локальном RocksDB это ~10 µs, на S3 — ~10 ms. Naïve реализация дала бы 10x замедление.

Решение: async execution model для SQL operators
  Sync (Flink 1.x):
    process_record() {
      val = state.get(key)   // 10 µs (local)
      state.put(key, val+1)
      emit(val+1)
    }

  Async (Flink 2.0):
    process_record() {
      state.getAsync(key, val => {
        state.putAsync(key, val+1, () => {
          emit(val+1)
        })
      })
    }
    // operator продолжает обработку других records,
    // пока I/O в полёте

  Эффект:
    - Без async + cache: ~10x slowdown
    - Async + 50% cache: уже быстрее Flink 1.20
    - Async + 1 GB cache: 75-120% throughput vs RocksDB local

Async snapshot caching

Между checkpoints state в S3 уже durable — повторно его выгружать не нужно. Flink 2.0 кэширует metadata snapshot и переиспользует SST файлы между checkpoints.

Flink 1.x checkpoint:
  - Stop-the-world barrier
  - Flush memtable → SST
  - Upload SST в S3 (минуты для большого state)
  - Resume processing

Flink 2.0 checkpoint:
  - SST уже в S3 (write-through)
  - Только metadata pointer обновляется
  - Стоимость checkpoint ≈ const (не зависит от state size)
  - Async snapshot caching → SST переиспользуются между checkpoints

  Result: lightweight checkpoints, state size не влияет на checkpoint duration

Throughput benchmarks

Цифры из VLDB 2025 paper “Disaggregated State Management in Apache Flink 2.0” и официального release blog.

ForSt throughput vs RocksDB local
Сценарий
ForSt + 1GB cache
ForSt no cache
Ключевое наблюдение:
  - С 1 GB hybrid cache ForSt сравним или быстрее RocksDB local
  - Без cache — 50-83% throughput, но recovery instant
  - При state > 100 GB ForSt — единственный вариант (RocksDB не помещается)

Выбор cache size:
  - Working set < 1 GB → cache 1-2 GB достаточно
  - Working set 1-10 GB → cache 10-20% от state size
  - Cold-heavy access → cache не поможет, S3 throughput = bottleneck

Fast recovery

Главный operational выигрыш — recovery time не зависит от state size.

Recovery scenario: TaskManager crash, нужно восстановить state 200 GB

Flink 1.x:
  1. Allocate new TaskManager
  2. Download checkpoint с S3 → local disk (200 GB / 100 MB/s = ~30 min)
  3. Restore RocksDB на локальном диске
  4. Resume processing
  → Total: 30+ minutes downtime

Flink 2.0:
  1. Allocate new TaskManager
  2. Update metadata pointer на checkpoint в S3
  3. Resume processing (state читается lazy с S3, прогревается cache)
  → Total: ~10 seconds downtime + warm-up period

Тот же эффект для:
  - Rescaling (добавить/убрать TaskManagers)
  - Job restart после code change
  - Failover между регионами
WARNING

Trade-off: recovery быстрый, но первые секунды медленные. После failover cache холодный, performance 50-80% от warm. Если SLA требует instant warm performance — pre-warm cache на standby TaskManager или используйте active-active с consensus. Для большинства streaming jobs (analytics, enrichment) lazy warm-up приемлем.

Cluster sizing implications

Disaggregated state меняет sizing strategy кластера.

Cluster sizing: до и после ForSt
Параметр
Flink 1.x (RocksDB local)
Flink 2.0 (ForSt)
Sizing rule of thumb для ForSt:
  - TaskManagers по CPU/throughput, не по state size
  - Local disk: cache_size = 10-30% от state working set
  - S3 throughput: ~5000 PUT/s на bucket (limit), shard prefixes для scaling
  - Network: 10 Gbps NIC рекомендуется (S3 traffic)

Cost модель меняется:
  - Меньше нод (state не bound by local disk)
  - Меньше disks (cache size << full state)
  - Больше S3 requests (PUT/GET за каждый flush/compaction)
  - В итоге обычно 20-40% дешевле для large-state jobs

Cloud-native parity

Flink 2.0 наконец сравнимо с cloud-native warehouses по operational характеристикам.

Cloud-native parity достигнута по:

  ✓ Compute/storage separation
    Flink 1.x: bound. Flink 2.0: разделено.

  ✓ Instant rescaling
    Flink 1.x: minutes. Flink 2.0: seconds.

  ✓ Multi-tenancy
    Flink 1.x: noisy neighbour через локальный диск. Flink 2.0: S3 как shared layer.

  ✓ Geo-replication
    Flink 1.x: manual. Flink 2.0: cross-region S3 replication работает.

  ✓ Disaster recovery
    Flink 1.x: replay с Kafka (минуты-часы). Flink 2.0: lazy warm-up из S3 (секунды).

Stateful Flink jobs становятся такими же elastic, как stateless web services.
Kafka tiered storage — параллель с ForSt

Integration с Iceberg

Flink 2.0 + ForSt + Iceberg = streaming-lakehouse stack из коробки.

Pattern: Flink 2.0 как stream processor для Iceberg lakehouse

  Kafka topic (events) →
  Flink 2.0 (stateful processing, ForSt state в S3) →
  Iceberg table (S3 Parquet)

Преимущества:
  1. Один S3 bucket для state и для analytical tables
  2. Recovery state читает из того же S3, что Iceberg metadata
  3. Sink writer для Iceberg в Flink 2.0 поддерживает exactly-once
  4. Cross-job state sharing через keyed state с TTL
  5. Schema evolution в Iceberg + state schema evolution в ForSt

Реальный stack 2025:
  - Source: Kafka / Pulsar
  - Compute: Flink 2.0
  - State: ForSt → S3
  - Sink: Iceberg (через Flink Dynamic Iceberg Sink)
  - Query: Trino / StarRocks / Snowflake / Spark

Сравнение с Spark Structured Streaming

Spark 4.0 (апрель 2025) выпустил transformWithState (State API v2) — аналогичные ambitions для stateful streaming, но другой подход к storage.

Flink 2.0 ForSt vs Spark 4.0 State API v2
Аспект
Flink 2.0 ForSt
Spark 4.0 State v2
Arbitrary State API v2 в Spark 4.0
Сильный сигнал для миграции:
  - State size > 100 GB на TaskManager
  - Регулярный rescaling (autoscaling по нагрузке)
  - Recovery SLA менее 1 минуты
  - Cluster sizing определяется state size, не CPU
  - Multi-region deployment с DR
  - Cost оптимизация: уменьшить EBS, увеличить S3

Слабый сигнал (можно остаться на 1.x):
  - State < 10 GB
  - Низкая частота failover
  - Stateless или near-stateless jobs
  - Critical latency (sub-ms) и cache miss недопустим

Migration path:
  1. Flink 1.20 (LTS) → Flink 2.0 без code changes для большинства jobs
  2. Сохраняется backward compat для DataStream API
  3. ForSt включается через config: state.backend.type: forst
  4. Параллельный запуск 1.x и 2.0 jobs возможен на одном кластере
WARNING

Не все API совместимы. Flink 2.0 удалил DataSet API (deprecated с 1.12), Scala API в основном (рекомендуется Java/Python), некоторые legacy connectors. Проверьте release notes перед миграцией. SQL и Table API работают без изменений.

Production checklist для ForSt

1. Storage:
   - S3 bucket с правильным prefix sharding (для high request rate)
   - Region такой же, как Flink cluster (cross-region traffic дорог)
   - Lifecycle policy: старые checkpoints → Glacier через 7 дней
   - Cross-region replication для DR

2. Cache:
   - Local SSD (NVMe для high IOPS)
   - cache_size = 10-30% от working set
   - Reserved space policy для критичных jobs

3. Network:
   - 10 Gbps+ NIC на TaskManager (S3 traffic)
   - VPC endpoint для S3 (не идти через internet gateway)
   - Bandwidth monitoring (S3 throughput может стать bottleneck)

4. Monitoring:
   - Cache hit rate (целить > 80%)
   - S3 PUT/GET rate (для cost)
   - State size growth rate
   - Checkpoint duration (должна быть const, не расти со state)
   - Recovery time after failover

5. Cost:
   - S3 PUT requests (5/1000 = $0.005)
   - S3 storage (Standard $0.023/GB/мес)
   - Cross-AZ traffic (free in-region, но проверить cross-region)
   - EBS reduction (cache only, обычно 80% снижение)
Проверка знанийKnowledge check
ОтветAnswer

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3