Motivation — зачем нужен disaggregated state
Flink 2.0 (март 2025) — первый major release за 9 лет. Главное изменение архитектуры — disaggregated state management через ForStDB как primary state backend. Чтобы понять, зачем это сделали, нужно посмотреть на operational pain Flink 1.x в современных cloud-native deployments — особенно в Kubernetes.
Этот урок не про техническую реализацию (это в уроке 2). Этот урок — про why. Почему 2025 год потребовал переписать state architecture, какие конкретные проблемы Flink 1.x не мог решить, и почему disaggregation — правильное решение.
Flink на Kubernetes: оператор и деплойКонтекст: как Flink 1.x хранил state
Flink 1.x имел два главных state backends:
1. HashMapStateBackend (старое имя: MemoryStateBackend / FsStateBackend)
- State в JVM heap
- Checkpoint в DFS (S3, HDFS) async
- Размер: ограничен heap (несколько GB)
- Скорость: сабмикросекундный access
- Use case: маленькие states
2. EmbeddedRocksDBStateBackend (RocksDB)
- State в RocksDB на локальном диске TaskManager
- Checkpoint: RocksDB snapshot uploaded в DFS
- Размер: ограничен local disk (1-2 TB)
- Скорость: ~10 µs per access (LSM-tree, page cache)
- Use case: middle to large state (GB-TB)
RocksDB был де-факто стандартом для production. Но он имел фундаментальное ограничение: состояние привязано к локальному диску TaskManager. Это создавало стек проблем, которые становились всё более болезненными по мере роста объёмов и перехода в облако.
Боль 1: Local disks — anti-pattern в Kubernetes
Kubernetes — declarative system, где pod должен быть ephemeral и stateless по дизайну. Local persistent disks (через PersistentVolumeClaim) работают, но это compromise:
Проблемы local disks в K8s:
1. Привязка pod к node
- PVC с local storage = pod привязан к конкретному node
- Node maintenance = pod не может переехать
- Cluster autoscaler не работает (не может drain node)
2. Disk capacity planning
- Нужно знать максимальный размер state заранее
- Over-provisioning = wasted money
- Under-provisioning = production fail
3. Multi-AZ затруднён
- Local disks не могут реплицироваться cross-AZ
- Failover в другой AZ = restore из S3 (медленно)
4. Storage pricing
- EBS gp3 ~$0.08/GB/month (AWS)
- 1 TB на TaskManager × 50 TaskManager = $4000/мес только за диски
- Большая часть пустая или cold
В классической CRUD service архитектура K8s + stateless services + managed storage (RDS, DynamoDB, S3) уже устоялась. Flink с local disks был “странной птицей” в этой экосистеме.
Боль 2: Coupled scaling — compute и storage связаны
Сценарий: streaming job обрабатывает 100 GB/day
- CPU нагрузка: 10% от 10 vCPU per TaskManager
- Memory: 8 GB
- State: 2 TB (накопленное за месяц)
Чтобы вместить 2 TB state, нужен:
- 10 TaskManagers × 200 GB local disk
- Или 5 TaskManagers × 400 GB local disk
Cluster sizing определяется state size, НЕ CPU/throughput.
При росте state до 5 TB:
- Нужно увеличить число TaskManagers (для распределения state)
- А не CPU
- Получаем 25 нод, утилизация CPU 4%
Cloud-native warehouses (Snowflake, BigQuery, Trino) разделяют:
- Compute scale on demand
- Storage growth independent
- Cost optimal
Coupled scaling — фундаментальная архитектурная проблема. Stateful streaming jobs становились “толстыми монолитами” с расточительным sizing.
Боль 3: Тяжёлые checkpoints
Checkpoint в RocksDB-based backend:
Checkpoint процесс (упрощённо):
1. Trigger checkpoint barrier
2. Все операторы получают barrier
3. State backend делает RocksDB Snapshot
- Сохраняет current SST файлы как immutable
4. Async upload новых SST файлов в S3
- Incremental: только новые/изменённые SST с последнего checkpoint
- Скорость: ограничена network bandwidth и S3 throughput
- Время: 1-10+ минут для большого state (100 GB+)
5. JobManager собирает acks от всех subtasks
6. Checkpoint complete; notify all operators
7. Перезапуск с этого checkpoint возможен
Проблемы:
- Async upload может отставать
- Если новые changes быстрее, чем upload: checkpoint growing skew
- Backpressure из-за congested network/S3
- Long checkpoint duration = больше work to replay при failover
Для job с 100 GB state и 1 минута checkpoint duration, во время failover теряется 1 минута обработки данных. Для критичных pipelines — это уже значительная задержка.
Боль 4: Slow recovery
Recovery — самая больная проблема. При failover TaskManager:
Recovery scenario: TaskManager пропал, надо восстановить state 200 GB
Шаги:
1. K8s обнаруживает death pod (~30 sec)
2. Создаёт новый pod
3. Pod подключается к cluster, заявляет ресурсы (~30 sec)
4. JobManager assignsstate subtasks
5. TaskManager скачивает checkpoint с S3:
- 200 GB / 100 MB/s = 2000 sec = ~33 min
- При S3 limit'е ~3 KB-sized requests per second через restore
- Иногда быстрее через multi-part downloads
6. Restore RocksDB на локальном диске
7. Resume processing
Total: 30+ минут downtime для 200 GB state
При state 1 TB: 2-3 часа downtime.
SLA нарушения:
- real-time alerting: критично
- fraud detection: критично
- billing aggregations: критично
Долгий recovery — главный enemy production Flink jobs. Single-digit minute recovery SLA при state > 100 GB был просто невозможен в 1.x.
Боль 5: Rescaling нелёгкий
Rescaling — изменение parallelism — требует переразделения state. State хранится по key groups (фиксированное число, обычно 128). При изменении parallelism с N до M, key groups перераспределяются:
Было: parallelism=4, каждая subtask держит 32 key groups
Стало: parallelism=8, каждая subtask держит 16 key groups
Переход:
1. Stop the job
2. Take savepoint (полный snapshot state в S3)
3. Start job с новым parallelism
4. Каждая новая subtask скачивает свои key groups с S3
- Это full state restore!
- Долго для большого state
5. Resume processing с savepoint
Время:
- Downtime savepoint: минуты
- Downtime restart: минуты-часы
- Real autoscaling impossible в 1.x
Autoscaling требует rescaling. Rescaling требует downtime. Поэтому autoscaling Flink был полу-теоретической возможностью в 1.x — настоящего dynamic scaling по нагрузке не было.
Боль 6: Multi-region и DR
Disaster Recovery (DR) в Flink 1.x требовал сложных схем:
DR scheme 1: Periodic replication к standby
- Production cluster в Region A
- Standby cluster в Region B
- Crosse-region S3 replication для checkpoints
- Periodic restore в Region B (warm standby)
- RPO: время между restores
- RTO: время restore + warm-up
- Expensive: 2x cluster cost
DR scheme 2: Hot active-active
- Два cluster в разных regions
- Дублирование source events
- Merge outputs через consensus
- Сложно: state не sync, results diverge
DR scheme 3: Cold backup
- Только checkpoints в S3 cross-region replicated
- При disaster — провёл новый cluster from scratch
- RTO: hours-days
Все варианты были compromises. Real Active-Passive с быстрым failover требовал shared state, который RocksDB local не давал.
Cloud-native warehouses показали путь
Параллельно с Flink struggle, cloud-native warehouses (Snowflake, BigQuery, Databricks) показали другой подход:
Snowflake architecture:
- Storage: micro-partitions в S3 (managed by Snowflake)
- Compute: ephemeral virtual warehouses, auto-suspend
- Metadata: separate layer (FoundationDB-like)
- Scaling: instant resize warehouse, transparent
- Failover: warehouse failure = другой warehouse picks up
BigQuery:
- Storage: Colossus (Google internal DFS)
- Compute: Borg shared pool
- Query: Dremel engine, allocate slots on demand
- Scaling: автоматически по нагрузке
Common pattern:
- Storage и compute разделены физически
- Compute scale up/down instantly
- State персистентный в shared object storage
- Network bandwidth — common bottleneck, но manageable
Эти warehouses доказали, что disaggregation работает в production. Flink хотел того же — disaggregated state.
Что хотели от Flink 2.0
Goals для Flink 2.0 state management:
1. Cloud-native: state живёт в shared object storage (S3/GCS/HDFS), не на local disks
2. Lightweight checkpoints: checkpoint duration ≈ const, не растёт со state
3. Fast recovery: seconds, не minutes/hours
4. Painless rescaling: добавить/убрать TaskManagers без downtime
5. True autoscaling: react на load во время минуты, не часы
6. Multi-region DR: shared state в cross-region S3 replication
7. Cost reduction: меньше нод, меньше disks
8. Independent scaling: compute и storage растут независимо
Trade-offs допустимы:
- Per-access latency больше (network vs local SSD)
- Async execution required
- Throughput немного ниже без cache (но с cache — paritet или better)
Как disaggregation решает эти проблемы
Боль Решение в Flink 2.0
Local disks anti-pattern -> State в S3, локальный диск только cache
Coupled scaling -> Compute scale by CPU, storage scale by S3 growth
Heavy checkpoints -> SST уже в S3 (write-through), metadata pointer only
Slow recovery -> Lazy load из S3, no download phase
Painful rescaling -> State уже в S3, новые subtasks lookup, no shuffle
DR complexity -> Cross-region S3 replication = state shared
Главный insight: если state уже в S3, его не надо туда заливать на checkpoint и не надо оттуда скачивать на recovery. Checkpoint становится почти бесплатным (только metadata), recovery — практически instantaneous (только metadata pointer обновляется).
Параллели в других системах
Flink не первый, кто пошёл по пути disaggregation. Прецеденты:
RocksDB-cloud (Rockset):
- Fork RocksDB с support для S3 storage layer
- Идея: то же что ForStDB, но раньше (2019)
- Production-используется в Rockset (real-time analytics)
ClickHouse SharedMergeTree (Cloud version):
- Storage в shared object storage
- Replicas только compute
- 2023+ implementation
Kafka tiered storage (KIP-405):
- Старые сегменты в S3
- Локальные диски — только hot data
- Той же идеи hot/cold separation
Apache Spark Structured Streaming State v2:
- Похожие ambitions
- Но более конструктивно: state в S3 через snapshots, не runtime read
Snowflake/Iceberg:
- Analytics warehouses, не streaming
- Disaggregated storage стандарт для analytical workloads
Flink 2.0 — последняя из больших streaming systems, кто принял disaggregated state. Это естественная эволюция, не революция.
Production sympоms, которые исправляются
Симптом в Flink 1.x После Flink 2.0 / ForStDB
"Recovery занимает 30 мин" -> Recovery ~10 sec + warm-up
"Не могу автоскейлить" -> Reactive Mode с autoscaling работает
"Local disks 50% пусты" -> Уменьшить до cache size (10-30%)
"Checkpoint duration растёт" -> Checkpoint ~ const время
"State > 1 TB не работает" -> Unlimited state size
"Cluster expensive" -> 20-40% cheaper для большого state
"DR через manual restore" -> S3 cross-region replication
"Не могу мигрировать AZ" -> Stateless TaskManagers, free migration
Trade-offs disaggregation
Disaggregation не бесплатна. Главные trade-offs:
1. Per-access latency higher
- Local RocksDB: ~10 µs
- S3 cache miss: ~10 ms
- 1000x slower for cache miss
- Mitigated by: async execution, hybrid cache
2. Cache hit rate critical
- При low hit rate throughput сильно падает
- Требует thoughtful sizing cache
- Hot workloads OK, random access — challenge
3. S3 как single point of dependency
- S3 outage = job stops
- В 1.x только checkpoint failures, processing продолжается
- Mitigated by: S3 high SLA (99.99%+), retry logic
4. Network bandwidth utilization
- Constant traffic к S3
- Cost через cross-AZ traffic possible
- Mitigated by: VPC endpoints, region locality
5. Operational complexity
- Новый backend, новые metrics
- Tuning требует понимания cache behavior
- Mitigated by: docs, defaults, observability
Не все workloads выигрывают от disaggregation. Для маленьких states (менее 10 GB), стабильных нагрузок без rescaling, низкой failure rate — local RocksDB может оставаться лучшим выбором. ForStDB shine для большого state и dynamic environments. Решение должно быть осознанным.
Cloud-native parity score
После Flink 2.0 / ForStDB Apache Flink стал значительно ближе к cloud-native warehouses по operational характеристикам:
Flink 2.0 — единственная streaming system, которая сравнима по операционным характеристикам с cloud-native analytical warehouses, оставаясь sub-second latency true streaming. Snowflake и BQ всё ещё бэтч/micro-batch, Flink — true streaming + cloud-native ops.
Когда стоит мигрировать на ForStDB
Сильные сигналы для миграции:
- State size > 100 GB on TaskManager
- Регулярный rescaling (autoscaling по нагрузке)
- Recovery SLA < 1 минуты
- Cluster sizing определяется state size, не CPU
- Multi-region deployment с DR
- Cost optimization: убрать EBS, передать в S3
- Operational pain: long recovery, painful rescaling
Слабые сигналы (можно остаться на 1.x/RocksDB):
- State < 10 GB
- Низкая частота failover
- Stateless или near-stateless jobs
- Critical latency (sub-ms) и cache miss недопустим
- Простой single-region deployment
Migration path:
1. Flink 1.20 (LTS) -> Flink 2.0 без code changes для большинства jobs
2. ForStDB включается через config: state.backend.type: forst
3. State V2 API нужен для full async benefits
4. Параллельный запуск 1.x и 2.0 jobs возможен на одном кластере
5. Подробно — в уроке 5 этого модуля
Source
Apache Flink 2.0 release blog (March 2025):
"Apache Flink 2.0: Disaggregated State Management"
VLDB 2025 paper:
Authors: Apache Flink community
Title: "Disaggregated State Management in Apache Flink 2.0"
Key contributions:
- ForStDB architecture (LSM-tree adapted for cloud)
- Async execution model для SQL operators
- Async snapshot caching
- Performance benchmarks vs RocksDB local
Flink source:
flink-state-backends/flink-statebackend-forst/
flink-runtime/src/main/java/org/apache/flink/runtime/state/forst/
FLIP документы:
FLIP-423: Introduce Disaggregated State Management Framework
FLIP-424: Async State API
FLIP-425: Asynchronous Execution Model