Learning Platform
Глоссарий Troubleshooting
Урок 11.01 · 25 мин
Продвинутый
Disaggregated StateCloud NativeKubernetesScalabilityRecoveryOperational Pain

Motivation — зачем нужен disaggregated state

Flink 2.0 (март 2025) — первый major release за 9 лет. Главное изменение архитектуры — disaggregated state management через ForStDB как primary state backend. Чтобы понять, зачем это сделали, нужно посмотреть на operational pain Flink 1.x в современных cloud-native deployments — особенно в Kubernetes.

Этот урок не про техническую реализацию (это в уроке 2). Этот урок — про why. Почему 2025 год потребовал переписать state architecture, какие конкретные проблемы Flink 1.x не мог решить, и почему disaggregation — правильное решение.

Flink на Kubernetes: оператор и деплой

Flink 1.x имел два главных state backends:

1. HashMapStateBackend (старое имя: MemoryStateBackend / FsStateBackend)
   - State в JVM heap
   - Checkpoint в DFS (S3, HDFS) async
   - Размер: ограничен heap (несколько GB)
   - Скорость: сабмикросекундный access
   - Use case: маленькие states

2. EmbeddedRocksDBStateBackend (RocksDB)
   - State в RocksDB на локальном диске TaskManager
   - Checkpoint: RocksDB snapshot uploaded в DFS
   - Размер: ограничен local disk (1-2 TB)
   - Скорость: ~10 µs per access (LSM-tree, page cache)
   - Use case: middle to large state (GB-TB)

RocksDB был де-факто стандартом для production. Но он имел фундаментальное ограничение: состояние привязано к локальному диску TaskManager. Это создавало стек проблем, которые становились всё более болезненными по мере роста объёмов и перехода в облако.

Боль 1: Local disks — anti-pattern в Kubernetes

Kubernetes — declarative system, где pod должен быть ephemeral и stateless по дизайну. Local persistent disks (через PersistentVolumeClaim) работают, но это compromise:

Проблемы local disks в K8s:

1. Привязка pod к node
   - PVC с local storage = pod привязан к конкретному node
   - Node maintenance = pod не может переехать
   - Cluster autoscaler не работает (не может drain node)

2. Disk capacity planning
   - Нужно знать максимальный размер state заранее
   - Over-provisioning = wasted money
   - Under-provisioning = production fail

3. Multi-AZ затруднён
   - Local disks не могут реплицироваться cross-AZ
   - Failover в другой AZ = restore из S3 (медленно)

4. Storage pricing
   - EBS gp3 ~$0.08/GB/month (AWS)
   - 1 TB на TaskManager × 50 TaskManager = $4000/мес только за диски
   - Большая часть пустая или cold

В классической CRUD service архитектура K8s + stateless services + managed storage (RDS, DynamoDB, S3) уже устоялась. Flink с local disks был “странной птицей” в этой экосистеме.

Боль 2: Coupled scaling — compute и storage связаны

Сценарий: streaming job обрабатывает 100 GB/day
  - CPU нагрузка: 10% от 10 vCPU per TaskManager
  - Memory: 8 GB
  - State: 2 TB (накопленное за месяц)

Чтобы вместить 2 TB state, нужен:
  - 10 TaskManagers × 200 GB local disk
  - Или 5 TaskManagers × 400 GB local disk

Cluster sizing определяется state size, НЕ CPU/throughput.

При росте state до 5 TB:
  - Нужно увеличить число TaskManagers (для распределения state)
  - А не CPU
  - Получаем 25 нод, утилизация CPU 4%

Cloud-native warehouses (Snowflake, BigQuery, Trino) разделяют:
  - Compute scale on demand
  - Storage growth independent
  - Cost optimal

Coupled scaling — фундаментальная архитектурная проблема. Stateful streaming jobs становились “толстыми монолитами” с расточительным sizing.

Боль 3: Тяжёлые checkpoints

Checkpoint в RocksDB-based backend:

Checkpoint процесс (упрощённо):

1. Trigger checkpoint barrier
2. Все операторы получают barrier
3. State backend делает RocksDB Snapshot
   - Сохраняет current SST файлы как immutable
4. Async upload новых SST файлов в S3
   - Incremental: только новые/изменённые SST с последнего checkpoint
   - Скорость: ограничена network bandwidth и S3 throughput
   - Время: 1-10+ минут для большого state (100 GB+)
5. JobManager собирает acks от всех subtasks
6. Checkpoint complete; notify all operators
7. Перезапуск с этого checkpoint возможен

Проблемы:
  - Async upload может отставать
  - Если новые changes быстрее, чем upload: checkpoint growing skew
  - Backpressure из-за congested network/S3
  - Long checkpoint duration = больше work to replay при failover

Для job с 100 GB state и 1 минута checkpoint duration, во время failover теряется 1 минута обработки данных. Для критичных pipelines — это уже значительная задержка.

Боль 4: Slow recovery

Recovery — самая больная проблема. При failover TaskManager:

Recovery scenario: TaskManager пропал, надо восстановить state 200 GB

Шаги:
1. K8s обнаруживает death pod (~30 sec)
2. Создаёт новый pod
3. Pod подключается к cluster, заявляет ресурсы (~30 sec)
4. JobManager assignsstate subtasks
5. TaskManager скачивает checkpoint с S3:
   - 200 GB / 100 MB/s = 2000 sec = ~33 min
   - При S3 limit'е ~3 KB-sized requests per second через restore
   - Иногда быстрее через multi-part downloads
6. Restore RocksDB на локальном диске
7. Resume processing

Total: 30+ минут downtime для 200 GB state

При state 1 TB: 2-3 часа downtime.

SLA нарушения:
  - real-time alerting: критично
  - fraud detection: критично
  - billing aggregations: критично

Долгий recovery — главный enemy production Flink jobs. Single-digit minute recovery SLA при state > 100 GB был просто невозможен в 1.x.

Боль 5: Rescaling нелёгкий

Rescaling — изменение parallelism — требует переразделения state. State хранится по key groups (фиксированное число, обычно 128). При изменении parallelism с N до M, key groups перераспределяются:

Было: parallelism=4, каждая subtask держит 32 key groups
Стало: parallelism=8, каждая subtask держит 16 key groups

Переход:
1. Stop the job
2. Take savepoint (полный snapshot state в S3)
3. Start job с новым parallelism
4. Каждая новая subtask скачивает свои key groups с S3
   - Это full state restore!
   - Долго для большого state
5. Resume processing с savepoint

Время:
  - Downtime savepoint: минуты
  - Downtime restart: минуты-часы
  - Real autoscaling impossible в 1.x

Autoscaling требует rescaling. Rescaling требует downtime. Поэтому autoscaling Flink был полу-теоретической возможностью в 1.x — настоящего dynamic scaling по нагрузке не было.

Боль 6: Multi-region и DR

Disaster Recovery (DR) в Flink 1.x требовал сложных схем:

DR scheme 1: Periodic replication к standby
  - Production cluster в Region A
  - Standby cluster в Region B
  - Crosse-region S3 replication для checkpoints
  - Periodic restore в Region B (warm standby)
  - RPO: время между restores
  - RTO: время restore + warm-up
  - Expensive: 2x cluster cost

DR scheme 2: Hot active-active
  - Два cluster в разных regions
  - Дублирование source events
  - Merge outputs через consensus
  - Сложно: state не sync, results diverge

DR scheme 3: Cold backup
  - Только checkpoints в S3 cross-region replicated
  - При disaster — провёл новый cluster from scratch
  - RTO: hours-days

Все варианты были compromises. Real Active-Passive с быстрым failover требовал shared state, который RocksDB local не давал.

Cloud-native warehouses показали путь

Параллельно с Flink struggle, cloud-native warehouses (Snowflake, BigQuery, Databricks) показали другой подход:

Snowflake architecture:
  - Storage: micro-partitions в S3 (managed by Snowflake)
  - Compute: ephemeral virtual warehouses, auto-suspend
  - Metadata: separate layer (FoundationDB-like)
  - Scaling: instant resize warehouse, transparent
  - Failover: warehouse failure = другой warehouse picks up
  
BigQuery:
  - Storage: Colossus (Google internal DFS)
  - Compute: Borg shared pool
  - Query: Dremel engine, allocate slots on demand
  - Scaling: автоматически по нагрузке

Common pattern:
  - Storage и compute разделены физически
  - Compute scale up/down instantly
  - State персистентный в shared object storage
  - Network bandwidth — common bottleneck, но manageable

Эти warehouses доказали, что disaggregation работает в production. Flink хотел того же — disaggregated state.

Goals для Flink 2.0 state management:

1. Cloud-native: state живёт в shared object storage (S3/GCS/HDFS), не на local disks
2. Lightweight checkpoints: checkpoint duration ≈ const, не растёт со state
3. Fast recovery: seconds, не minutes/hours
4. Painless rescaling: добавить/убрать TaskManagers без downtime
5. True autoscaling: react на load во время минуты, не часы
6. Multi-region DR: shared state в cross-region S3 replication
7. Cost reduction: меньше нод, меньше disks
8. Independent scaling: compute и storage растут независимо

Trade-offs допустимы:
- Per-access latency больше (network vs local SSD)
- Async execution required
- Throughput немного ниже без cache (но с cache — paritet или better)

Как disaggregation решает эти проблемы

Боль                           Решение в Flink 2.0

Local disks anti-pattern  ->   State в S3, локальный диск только cache
Coupled scaling          ->   Compute scale by CPU, storage scale by S3 growth
Heavy checkpoints        ->   SST уже в S3 (write-through), metadata pointer only
Slow recovery            ->   Lazy load из S3, no download phase
Painful rescaling        ->   State уже в S3, новые subtasks lookup, no shuffle
DR complexity            ->   Cross-region S3 replication = state shared

Главный insight: если state уже в S3, его не надо туда заливать на checkpoint и не надо оттуда скачивать на recovery. Checkpoint становится почти бесплатным (только metadata), recovery — практически instantaneous (только metadata pointer обновляется).

Параллели в других системах

Flink не первый, кто пошёл по пути disaggregation. Прецеденты:

RocksDB-cloud (Rockset):
  - Fork RocksDB с support для S3 storage layer
  - Идея: то же что ForStDB, но раньше (2019)
  - Production-используется в Rockset (real-time analytics)

ClickHouse SharedMergeTree (Cloud version):
  - Storage в shared object storage
  - Replicas только compute
  - 2023+ implementation

Kafka tiered storage (KIP-405):
  - Старые сегменты в S3
  - Локальные диски — только hot data
  - Той же идеи hot/cold separation

Apache Spark Structured Streaming State v2:
  - Похожие ambitions
  - Но более конструктивно: state в S3 через snapshots, не runtime read

Snowflake/Iceberg:
  - Analytics warehouses, не streaming
  - Disaggregated storage стандарт для analytical workloads

Flink 2.0 — последняя из больших streaming systems, кто принял disaggregated state. Это естественная эволюция, не революция.

Production sympоms, которые исправляются

Симптом в Flink 1.x          После Flink 2.0 / ForStDB

"Recovery занимает 30 мин"   -> Recovery ~10 sec + warm-up
"Не могу автоскейлить"        -> Reactive Mode с autoscaling работает
"Local disks 50% пусты"       -> Уменьшить до cache size (10-30%)
"Checkpoint duration растёт"  -> Checkpoint ~ const время
"State > 1 TB не работает"    -> Unlimited state size
"Cluster expensive"           -> 20-40% cheaper для большого state
"DR через manual restore"     -> S3 cross-region replication
"Не могу мигрировать AZ"      -> Stateless TaskManagers, free migration

Trade-offs disaggregation

Disaggregation не бесплатна. Главные trade-offs:

1. Per-access latency higher
   - Local RocksDB: ~10 µs
   - S3 cache miss: ~10 ms
   - 1000x slower for cache miss
   - Mitigated by: async execution, hybrid cache

2. Cache hit rate critical
   - При low hit rate throughput сильно падает
   - Требует thoughtful sizing cache
   - Hot workloads OK, random access — challenge

3. S3 как single point of dependency
   - S3 outage = job stops
   - В 1.x только checkpoint failures, processing продолжается
   - Mitigated by: S3 high SLA (99.99%+), retry logic

4. Network bandwidth utilization
   - Constant traffic к S3
   - Cost через cross-AZ traffic possible
   - Mitigated by: VPC endpoints, region locality

5. Operational complexity
   - Новый backend, новые metrics
   - Tuning требует понимания cache behavior
   - Mitigated by: docs, defaults, observability
WARNING

Не все workloads выигрывают от disaggregation. Для маленьких states (менее 10 GB), стабильных нагрузок без rescaling, низкой failure rate — local RocksDB может оставаться лучшим выбором. ForStDB shine для большого state и dynamic environments. Решение должно быть осознанным.

Cloud-native parity score

После Flink 2.0 / ForStDB Apache Flink стал значительно ближе к cloud-native warehouses по operational характеристикам:

Operational parity Flink с cloud-native warehouses
Aspect
Flink 1.x
Flink 2.0+
Snowflake/BQ

Flink 2.0 — единственная streaming system, которая сравнима по операционным характеристикам с cloud-native analytical warehouses, оставаясь sub-second latency true streaming. Snowflake и BQ всё ещё бэтч/micro-batch, Flink — true streaming + cloud-native ops.

Когда стоит мигрировать на ForStDB

Сильные сигналы для миграции:
  - State size > 100 GB on TaskManager
  - Регулярный rescaling (autoscaling по нагрузке)
  - Recovery SLA < 1 минуты
  - Cluster sizing определяется state size, не CPU
  - Multi-region deployment с DR
  - Cost optimization: убрать EBS, передать в S3
  - Operational pain: long recovery, painful rescaling

Слабые сигналы (можно остаться на 1.x/RocksDB):
  - State < 10 GB
  - Низкая частота failover
  - Stateless или near-stateless jobs
  - Critical latency (sub-ms) и cache miss недопустим
  - Простой single-region deployment

Migration path:
  1. Flink 1.20 (LTS) -> Flink 2.0 без code changes для большинства jobs
  2. ForStDB включается через config: state.backend.type: forst
  3. State V2 API нужен для full async benefits
  4. Параллельный запуск 1.x и 2.0 jobs возможен на одном кластере
  5. Подробно — в уроке 5 этого модуля

Source

Apache Flink 2.0 release blog (March 2025):
  "Apache Flink 2.0: Disaggregated State Management"

VLDB 2025 paper:
  Authors: Apache Flink community
  Title: "Disaggregated State Management in Apache Flink 2.0"
  Key contributions:
    - ForStDB architecture (LSM-tree adapted for cloud)
    - Async execution model для SQL operators
    - Async snapshot caching
    - Performance benchmarks vs RocksDB local

Flink source:
  flink-state-backends/flink-statebackend-forst/
  flink-runtime/src/main/java/org/apache/flink/runtime/state/forst/

FLIP документы:
  FLIP-423: Introduce Disaggregated State Management Framework
  FLIP-424: Async State API
  FLIP-425: Asynchronous Execution Model
Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. Какая фундаментальная проблема Flink 1.x с RocksDB на локальном диске была главной мотивацией для disaggregation?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5