HashMap vs RocksDB: выбор state backend
State backend в Flink — это компонент, который хранит state операторов в runtime и упаковывает его в checkpoint. Выбор между HashMapStateBackend и EmbeddedRocksDBStateBackend — одно из самых важных архитектурных решений production deployment. Неправильный выбор приводит к OOM (если выбрали HashMap при большом state) или к медленным операциям (если выбрали RocksDB при маленьком state).
В этом уроке разберём, как устроен каждый backend, какие у них характеристики по памяти/latency/checkpoint speed, и научимся выбирать правильный для конкретной задачи.
Два backend’а: где хранится state
Flink 1.13+ предлагает два production-grade backend’а (third — MemoryStateBackend — deprecated для production):
HashMapStateBackend
State хранится в обычной Java heap memory в виде HashMap. Быстрый доступ (O(1) или O(log n)), но размер ограничен JVM heap.
env.setStateBackend(new HashMapStateBackend());
EmbeddedRocksDBStateBackend
State хранится в RocksDB — embedded LSM-tree key-value store, который пишет на локальный диск TaskManager’а. Размер ограничен только диском (десятки TB на современном NVMe). Доступ медленнее (off-heap + serialization).
RocksDB как LSM-tree: MemTable, SST, compactionenv.setStateBackend(new EmbeddedRocksDBStateBackend());
Что в памяти, что на диске
HashMap backend
HashMapStateBackend: всё state — объекты Java в JVM heap. Быстро, но размер ограничен heap.JVM heap with state objects
JVM heap — например 4GB. Каждое значение state — отдельный Java объект. Mutation = прямая модификация ссылки.RocksDB backend
RocksDB backend: state хранится в RocksDB на локальном диске. Off-heap memory для cache.Local SSD with RocksDB files
State files на NVMe SSD: SST файлы, WAL, MemTable in off-heap memory. Mutation = serialize + write.Snapshot HashMap
HashMap: snapshot = serialize всех Java объектов в bytes. Полный snapshot каждый checkpoint.S3 full snapshot
Full snapshot to S3. Размер snapshot = размер state. Долго для больших state.Snapshot RocksDB
RocksDB: incremental checkpoint — только новые SST файлы загружаются в S3. Старые остаются связанными.S3 incremental snapshot
Incremental snapshot to S3. Только delta с предыдущего checkpoint. Быстро даже при больших state.Латентность операций: характеристики
| Операция | HashMap | RocksDB |
|---|---|---|
value.value() (read) | ~50 ns | ~5-50 microseconds |
value.update(v) | ~100 ns | ~10-100 microseconds |
map.put(k, v) | ~100 ns | ~20-100 microseconds |
map.iterator() over 1000 keys | ~10 microseconds | ~1-10 ms |
Разница в порядке 100-1000x на каждую операцию. Для job, который делает много per-event state operations, это критично.
С другой стороны, RocksDB:
- Не ограничен heap — можно держать терабайт state.
- Incremental checkpoint = быстрый snapshot даже на large state.
- Compaction в фоне держит данные компактно.
Размер state: главный фактор выбора
| Размер state | Backend | Почему |
|---|---|---|
| Меньше 100 MB | HashMap | На heap легко помещается, latency на порядки ниже |
| 100 MB - 1 GB | HashMap (если есть heap) | Граничный случай, надо считать |
| 1 GB - 10 GB | RocksDB | Heap размером 16-32GB — это уже не «лёгкий» |
| 10 GB - 100 GB | RocksDB обязательно | На heap невозможно — OOM |
| Больше 100 GB | RocksDB + tuning | Возможно, но требует серьёзного config |
Как оценить размер state:
state_size ~ N_keys x (key_size + state_per_key_size)
Пример: счётчик за user_id, 10M users, по 100 байт состояния = 1 GB. Это уже RocksDB-зона.
Checkpoint speed: incremental vs full
Full snapshot (HashMap):
- Каждый checkpoint сериализует весь state и загружает в S3.
- Размер snapshot = размер state.
- Для 1 GB state с upload speed 100 MB/s — 10 секунд только на upload.
- При большом state checkpoint становится узким местом.
Incremental snapshot (RocksDB):
- RocksDB хранит state в immutable SST файлах. Когда state меняется — создаются новые файлы, старые остаются.
- При checkpoint Flink загружает только новые SST файлы.
- Если в interval изменился 1% state — snapshot тоже только 1%.
- Для 100 GB state с 1% mutation per checkpoint — upload 1 GB ~ 10 секунд.
Это значит, что RocksDB сильно лучше масштабируется по checkpoint time с ростом state.
Recovery time: HashMap быстрее
При restore из checkpoint:
HashMap:
- Скачиваем full snapshot из S3.
- Десериализуем все Java объекты в heap.
- Готовы к работе.
- Time =
state_size / network_speed+ deserialization.
RocksDB:
- Скачиваем все SST файлы из S3.
- Загружаем RocksDB instance с этими файлами.
- Возможно, нужна compaction.
- Time =
state_size / network_speed+ RocksDB startup.
RocksDB recovery немного медленнее из-за RocksDB startup, но не существенно. Для огромных state (100+ GB) — задержка десятки секунд.
Tuning RocksDB: ключевые параметры
# flink-conf.yaml
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.memory.fixed-per-slot: 256mb
state.backend.rocksdb.compaction.style: LEVEL
state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.writebuffer.size: 64mb
state.backend.rocksdb.writebuffer.count: 4
memory.managed: true — RocksDB использует Flink-managed memory (off-heap). По умолчанию выделяется ~30% TaskManager memory.
writebuffer.size — размер MemTable. Больше = реже flush на диск = больше memory.
thread.num — параллелизм для background compaction.
RocksDB tuning — большая тема. Дефолты Flink хороши для большинства случаев. Не тюньте RocksDB без чёткого понимания, что вы делаете — можно сделать сильно хуже.
Сравнительная таблица
| Характеристика | HashMap | RocksDB |
|---|---|---|
| Хранилище | JVM heap | Локальный диск + off-heap cache |
| Max state size | 10-20 GB (heap limit) | TB (disk limit) |
| Per-op latency | 50-100 ns | 5-100 microseconds |
| Checkpoint type | Full | Incremental |
| Checkpoint speed (large state) | Медленный (full upload) | Быстрый (delta upload) |
| Recovery speed | Быстрый | Чуть медленнее (RocksDB startup) |
| Memory pressure | Высокая (всё на heap) | Низкая (cache off-heap) |
| GC pauses (heap) | Чувствителен — long pauses | Не страдает |
| TTL support | Да | Да |
| Хорошо для | Маленький state, low latency | Большой state, scalability |
Когда какой выбирать
Кейс 1: real-time fraud detection
- State: keyed state per user_id, ~50M пользователей, ~200 байт per user.
- Total state: ~10 GB.
- Latency требование: per-event
< 50 ms.
Выбор: RocksDB. State слишком большой для HashMap. Latency 50 ms terpit RocksDB overhead.
Кейс 2: event aggregation
- State: ~10 ключей (per event type), большие коллекции в каждом (тысячи элементов).
- Total state: ~100 MB.
- Latency требование: per-event
< 10 ms.
Выбор: HashMap. State умещается в heap, low latency requirements.
Кейс 3: stateful enrichment
- State: side input lookup, ~500K entries, ~1 KB каждая.
- Total state: ~500 MB.
- Latency требование: per-event
< 5 ms.
Выбор: HashMap (если heap позволяет). RocksDB добавит 100-1000x latency.
Кейс 4: session windows за месяц
- State: per user_id, потенциально миллионы активных сессий.
- Total state: 50+ GB.
- Latency: per-event
< 100 ms.
Выбор: RocksDB. Heap столько не вместит.
Изменение state backend в runtime
flink-conf.yaml:
state.backend: rocksdb
# или
state.backend: hashmap
Можно переопределить per-job в коде:
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // true = incremental
Можно ли переключить с одного backend на другой?
Да, через savepoint:
- Сделать savepoint текущего job (
flink savepoint <job-id>). - Остановить job.
- Поменять
state.backendв conf или коде. - Восстановить из savepoint (
flink run -s <savepoint-path>).
Savepoint format — backend-agnostic (canonical format), поэтому переключение работает. Не работает с native format (см. урок 3).
Попробуй сам
- Создай job, который агрегирует counter per userId (keyed state ValueState<Long>).
- Сгенерируй искусственный поток с 1M уникальных user_id.
- Запусти с HashMapStateBackend. Замерь throughput и lastCheckpointDuration.
- Запусти с EmbeddedRocksDBStateBackend. Замерь то же самое.
- Увеличь до 10M user_id. Какой backend выживет?
Ключевые выводы
- HashMap: JVM heap, быстрый, ограничен размером heap (~10 GB). Full snapshot каждый checkpoint.
- RocksDB: локальный диск + off-heap cache, медленнее (100-1000x latency), но scalable до TB. Incremental snapshot.
- Размер state — главный фактор. До 1 GB — HashMap. Больше 10 GB — RocksDB. Между — зависит.
- Latency: HashMap — 50 ns, RocksDB — 5-100 microseconds на операцию. Для high-throughput per-event ops это критично.
- Checkpoint: incremental RocksDB сильно лучше для большого state — только delta загружается.
- Переключение между backend’ами возможно через canonical savepoint.