Flink 2.0: Disaggregated State (ForSt)
Зачем нужен disaggregated state
Flink 2.0 (релиз 24 марта 2025) — первый major релиз за 9 лет. Главное архитектурное изменение — disaggregated state management через ForSt key-value backend. Проблема была в том, что Flink 1.x хранил state в локальном RocksDB на диске TaskManager, что превращало streaming jobs в stateful монолиты с привязкой compute к storage.
Боль Flink 1.x (RocksDB local state):
TaskManager
- Operator state в RocksDB на локальном диске
- Размер state ограничен local disk capacity (1-2 TB на ноду)
- Rescaling = redistribute state = download с S3 + replay
- Recovery time ~ size of state (минуты для 100 GB+)
- Cluster sizing определяется max state size, не CPU/throughput
Cloud-native services (Snowflake, BigQuery):
- Compute и storage разделены
- Resize за секунды
- Scale за load, а не за state
Flink 1.x не мог так. ForSt — попытка догнать.
ForSt: KV backend для streaming
ForSt (For Streaming) — fork RocksDB с переписанным storage layer. LSM-tree сохраняется (как в RocksDB), но SST файлы пишутся напрямую в DFS (S3/HDFS), а не на локальный диск. Локальный диск используется только как secondary cache.
ForSt архитектура:
Memtable (RAM, write buffer)
↓ flush
L0/L1/L2... SST files
↓ write to
Primary storage: S3/HDFS (durable)
↓ read with cache
Local hybrid cache (1-10 GB SSD)
- Hot working set
- Recently accessed SST files
- LRU или reserved-based eviction
Read path:
1. Check memtable (RAM hit)
2. Check block cache (RAM hit)
3. Check local cache (SSD hit)
4. Read from S3 (network hit)
Write path:
1. Append to WAL (S3)
2. Insert into memtable
3. Periodic flush → SST → S3
4. Compaction → новые SST в S3
ForSt = RocksDB переориентированный на DFS. Если читали ClickHouse-курс или storage-formats про MergeTree и LSM-tree — те же принципы, но storage layer теперь S3, а не локальный SSD. Hot data в памяти/cache, cold data в object storage.
Async execution model
Disaggregated state добавляет network round-trip к каждому state access. На локальном RocksDB это ~10 µs, на S3 — ~10 ms. Naïve реализация дала бы 10x замедление.
Решение: async execution model для SQL operators
Sync (Flink 1.x):
process_record() {
val = state.get(key) // 10 µs (local)
state.put(key, val+1)
emit(val+1)
}
Async (Flink 2.0):
process_record() {
state.getAsync(key, val => {
state.putAsync(key, val+1, () => {
emit(val+1)
})
})
}
// operator продолжает обработку других records,
// пока I/O в полёте
Эффект:
- Без async + cache: ~10x slowdown
- Async + 50% cache: уже быстрее Flink 1.20
- Async + 1 GB cache: 75-120% throughput vs RocksDB local
Async snapshot caching
Между checkpoints state в S3 уже durable — повторно его выгружать не нужно. Flink 2.0 кэширует metadata snapshot и переиспользует SST файлы между checkpoints.
Flink 1.x checkpoint:
- Stop-the-world barrier
- Flush memtable → SST
- Upload SST в S3 (минуты для большого state)
- Resume processing
Flink 2.0 checkpoint:
- SST уже в S3 (write-through)
- Только metadata pointer обновляется
- Стоимость checkpoint ≈ const (не зависит от state size)
- Async snapshot caching → SST переиспользуются между checkpoints
Result: lightweight checkpoints, state size не влияет на checkpoint duration
Throughput benchmarks
Цифры из VLDB 2025 paper “Disaggregated State Management in Apache Flink 2.0” и официального release blog.
Ключевое наблюдение:
- С 1 GB hybrid cache ForSt сравним или быстрее RocksDB local
- Без cache — 50-83% throughput, но recovery instant
- При state > 100 GB ForSt — единственный вариант (RocksDB не помещается)
Выбор cache size:
- Working set < 1 GB → cache 1-2 GB достаточно
- Working set 1-10 GB → cache 10-20% от state size
- Cold-heavy access → cache не поможет, S3 throughput = bottleneck
Fast recovery
Главный operational выигрыш — recovery time не зависит от state size.
Recovery scenario: TaskManager crash, нужно восстановить state 200 GB
Flink 1.x:
1. Allocate new TaskManager
2. Download checkpoint с S3 → local disk (200 GB / 100 MB/s = ~30 min)
3. Restore RocksDB на локальном диске
4. Resume processing
→ Total: 30+ minutes downtime
Flink 2.0:
1. Allocate new TaskManager
2. Update metadata pointer на checkpoint в S3
3. Resume processing (state читается lazy с S3, прогревается cache)
→ Total: ~10 seconds downtime + warm-up period
Тот же эффект для:
- Rescaling (добавить/убрать TaskManagers)
- Job restart после code change
- Failover между регионами
Trade-off: recovery быстрый, но первые секунды медленные. После failover cache холодный, performance 50-80% от warm. Если SLA требует instant warm performance — pre-warm cache на standby TaskManager или используйте active-active с consensus. Для большинства streaming jobs (analytics, enrichment) lazy warm-up приемлем.
Cluster sizing implications
Disaggregated state меняет sizing strategy кластера.
Sizing rule of thumb для ForSt:
- TaskManagers по CPU/throughput, не по state size
- Local disk: cache_size = 10-30% от state working set
- S3 throughput: ~5000 PUT/s на bucket (limit), shard prefixes для scaling
- Network: 10 Gbps NIC рекомендуется (S3 traffic)
Cost модель меняется:
- Меньше нод (state не bound by local disk)
- Меньше disks (cache size << full state)
- Больше S3 requests (PUT/GET за каждый flush/compaction)
- В итоге обычно 20-40% дешевле для large-state jobs
Cloud-native parity
Flink 2.0 наконец сравнимо с cloud-native warehouses по operational характеристикам.
Cloud-native parity достигнута по:
✓ Compute/storage separation
Flink 1.x: bound. Flink 2.0: разделено.
✓ Instant rescaling
Flink 1.x: minutes. Flink 2.0: seconds.
✓ Multi-tenancy
Flink 1.x: noisy neighbour через локальный диск. Flink 2.0: S3 как shared layer.
✓ Geo-replication
Flink 1.x: manual. Flink 2.0: cross-region S3 replication работает.
✓ Disaster recovery
Flink 1.x: replay с Kafka (минуты-часы). Flink 2.0: lazy warm-up из S3 (секунды).
Stateful Flink jobs становятся такими же elastic, как stateless web services.
Kafka tiered storage — параллель с ForSt
Integration с Iceberg
Flink 2.0 + ForSt + Iceberg = streaming-lakehouse stack из коробки.
Pattern: Flink 2.0 как stream processor для Iceberg lakehouse
Kafka topic (events) →
Flink 2.0 (stateful processing, ForSt state в S3) →
Iceberg table (S3 Parquet)
Преимущества:
1. Один S3 bucket для state и для analytical tables
2. Recovery state читает из того же S3, что Iceberg metadata
3. Sink writer для Iceberg в Flink 2.0 поддерживает exactly-once
4. Cross-job state sharing через keyed state с TTL
5. Schema evolution в Iceberg + state schema evolution в ForSt
Реальный stack 2025:
- Source: Kafka / Pulsar
- Compute: Flink 2.0
- State: ForSt → S3
- Sink: Iceberg (через Flink Dynamic Iceberg Sink)
- Query: Trino / StarRocks / Snowflake / Spark
Сравнение с Spark Structured Streaming
Spark 4.0 (апрель 2025) выпустил transformWithState (State API v2) — аналогичные ambitions для stateful streaming, но другой подход к storage.
Когда переходить на Flink 2.0
Сильный сигнал для миграции:
- State size > 100 GB на TaskManager
- Регулярный rescaling (autoscaling по нагрузке)
- Recovery SLA менее 1 минуты
- Cluster sizing определяется state size, не CPU
- Multi-region deployment с DR
- Cost оптимизация: уменьшить EBS, увеличить S3
Слабый сигнал (можно остаться на 1.x):
- State < 10 GB
- Низкая частота failover
- Stateless или near-stateless jobs
- Critical latency (sub-ms) и cache miss недопустим
Migration path:
1. Flink 1.20 (LTS) → Flink 2.0 без code changes для большинства jobs
2. Сохраняется backward compat для DataStream API
3. ForSt включается через config: state.backend.type: forst
4. Параллельный запуск 1.x и 2.0 jobs возможен на одном кластере
Не все API совместимы. Flink 2.0 удалил DataSet API (deprecated с 1.12), Scala API в основном (рекомендуется Java/Python), некоторые legacy connectors. Проверьте release notes перед миграцией. SQL и Table API работают без изменений.
Production checklist для ForSt
1. Storage:
- S3 bucket с правильным prefix sharding (для high request rate)
- Region такой же, как Flink cluster (cross-region traffic дорог)
- Lifecycle policy: старые checkpoints → Glacier через 7 дней
- Cross-region replication для DR
2. Cache:
- Local SSD (NVMe для high IOPS)
- cache_size = 10-30% от working set
- Reserved space policy для критичных jobs
3. Network:
- 10 Gbps+ NIC на TaskManager (S3 traffic)
- VPC endpoint для S3 (не идти через internet gateway)
- Bandwidth monitoring (S3 throughput может стать bottleneck)
4. Monitoring:
- Cache hit rate (целить > 80%)
- S3 PUT/GET rate (для cost)
- State size growth rate
- Checkpoint duration (должна быть const, не расти со state)
- Recovery time after failover
5. Cost:
- S3 PUT requests (5/1000 = $0.005)
- S3 storage (Standard $0.023/GB/мес)
- Cross-AZ traffic (free in-region, но проверить cross-region)
- EBS reduction (cache only, обычно 80% снижение)