Learning Platform
Глоссарий Troubleshooting
Урок 11.02 · 25 мин
Средний
State BackendRocksDBHashMapPerformance

HashMap vs RocksDB: выбор state backend

State backend в Flink — это компонент, который хранит state операторов в runtime и упаковывает его в checkpoint. Выбор между HashMapStateBackend и EmbeddedRocksDBStateBackend — одно из самых важных архитектурных решений production deployment. Неправильный выбор приводит к OOM (если выбрали HashMap при большом state) или к медленным операциям (если выбрали RocksDB при маленьком state).

В этом уроке разберём, как устроен каждый backend, какие у них характеристики по памяти/latency/checkpoint speed, и научимся выбирать правильный для конкретной задачи.


Два backend’а: где хранится state

Flink 1.13+ предлагает два production-grade backend’а (third — MemoryStateBackend — deprecated для production):

HashMapStateBackend

State хранится в обычной Java heap memory в виде HashMap. Быстрый доступ (O(1) или O(log n)), но размер ограничен JVM heap.

env.setStateBackend(new HashMapStateBackend());

EmbeddedRocksDBStateBackend

State хранится в RocksDB — embedded LSM-tree key-value store, который пишет на локальный диск TaskManager’а. Размер ограничен только диском (десятки TB на современном NVMe). Доступ медленнее (off-heap + serialization).

RocksDB как LSM-tree: MemTable, SST, compaction
env.setStateBackend(new EmbeddedRocksDBStateBackend());

Что в памяти, что на диске

HashMap vs RocksDB layout памяти

HashMap backend

HashMapStateBackend: всё state — объекты Java в JVM heap. Быстро, но размер ограничен heap.

JVM heap with state objects

JVM heap — например 4GB. Каждое значение state — отдельный Java объект. Mutation = прямая модификация ссылки.

RocksDB backend

RocksDB backend: state хранится в RocksDB на локальном диске. Off-heap memory для cache.

Local SSD with RocksDB files

State files на NVMe SSD: SST файлы, WAL, MemTable in off-heap memory. Mutation = serialize + write.

Snapshot HashMap

HashMap: snapshot = serialize всех Java объектов в bytes. Полный snapshot каждый checkpoint.

S3 full snapshot

Full snapshot to S3. Размер snapshot = размер state. Долго для больших state.

Snapshot RocksDB

RocksDB: incremental checkpoint — только новые SST файлы загружаются в S3. Старые остаются связанными.

S3 incremental snapshot

Incremental snapshot to S3. Только delta с предыдущего checkpoint. Быстро даже при больших state.

Латентность операций: характеристики

ОперацияHashMapRocksDB
value.value() (read)~50 ns~5-50 microseconds
value.update(v)~100 ns~10-100 microseconds
map.put(k, v)~100 ns~20-100 microseconds
map.iterator() over 1000 keys~10 microseconds~1-10 ms

Разница в порядке 100-1000x на каждую операцию. Для job, который делает много per-event state operations, это критично.

С другой стороны, RocksDB:

  • Не ограничен heap — можно держать терабайт state.
  • Incremental checkpoint = быстрый snapshot даже на large state.
  • Compaction в фоне держит данные компактно.

Размер state: главный фактор выбора

Размер stateBackendПочему
Меньше 100 MBHashMapНа heap легко помещается, latency на порядки ниже
100 MB - 1 GBHashMap (если есть heap)Граничный случай, надо считать
1 GB - 10 GBRocksDBHeap размером 16-32GB — это уже не «лёгкий»
10 GB - 100 GBRocksDB обязательноНа heap невозможно — OOM
Больше 100 GBRocksDB + tuningВозможно, но требует серьёзного config

Как оценить размер state:

state_size ~ N_keys x (key_size + state_per_key_size)

Пример: счётчик за user_id, 10M users, по 100 байт состояния = 1 GB. Это уже RocksDB-зона.


Checkpoint speed: incremental vs full

Full snapshot (HashMap):

  • Каждый checkpoint сериализует весь state и загружает в S3.
  • Размер snapshot = размер state.
  • Для 1 GB state с upload speed 100 MB/s — 10 секунд только на upload.
  • При большом state checkpoint становится узким местом.

Incremental snapshot (RocksDB):

  • RocksDB хранит state в immutable SST файлах. Когда state меняется — создаются новые файлы, старые остаются.
  • При checkpoint Flink загружает только новые SST файлы.
  • Если в interval изменился 1% state — snapshot тоже только 1%.
  • Для 100 GB state с 1% mutation per checkpoint — upload 1 GB ~ 10 секунд.

Это значит, что RocksDB сильно лучше масштабируется по checkpoint time с ростом state.


Recovery time: HashMap быстрее

При restore из checkpoint:

HashMap:

  • Скачиваем full snapshot из S3.
  • Десериализуем все Java объекты в heap.
  • Готовы к работе.
  • Time = state_size / network_speed + deserialization.

RocksDB:

  • Скачиваем все SST файлы из S3.
  • Загружаем RocksDB instance с этими файлами.
  • Возможно, нужна compaction.
  • Time = state_size / network_speed + RocksDB startup.

RocksDB recovery немного медленнее из-за RocksDB startup, но не существенно. Для огромных state (100+ GB) — задержка десятки секунд.


Tuning RocksDB: ключевые параметры

# flink-conf.yaml
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.memory.fixed-per-slot: 256mb
state.backend.rocksdb.compaction.style: LEVEL
state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.writebuffer.size: 64mb
state.backend.rocksdb.writebuffer.count: 4

memory.managed: true — RocksDB использует Flink-managed memory (off-heap). По умолчанию выделяется ~30% TaskManager memory.

writebuffer.size — размер MemTable. Больше = реже flush на диск = больше memory.

thread.num — параллелизм для background compaction.

WARNING

RocksDB tuning — большая тема. Дефолты Flink хороши для большинства случаев. Не тюньте RocksDB без чёткого понимания, что вы делаете — можно сделать сильно хуже.


Сравнительная таблица

ХарактеристикаHashMapRocksDB
ХранилищеJVM heapЛокальный диск + off-heap cache
Max state size10-20 GB (heap limit)TB (disk limit)
Per-op latency50-100 ns5-100 microseconds
Checkpoint typeFullIncremental
Checkpoint speed (large state)Медленный (full upload)Быстрый (delta upload)
Recovery speedБыстрыйЧуть медленнее (RocksDB startup)
Memory pressureВысокая (всё на heap)Низкая (cache off-heap)
GC pauses (heap)Чувствителен — long pausesНе страдает
TTL supportДаДа
Хорошо дляМаленький state, low latencyБольшой state, scalability

Когда какой выбирать

Кейс 1: real-time fraud detection

  • State: keyed state per user_id, ~50M пользователей, ~200 байт per user.
  • Total state: ~10 GB.
  • Latency требование: per-event < 50 ms.

Выбор: RocksDB. State слишком большой для HashMap. Latency 50 ms terpit RocksDB overhead.

Кейс 2: event aggregation

  • State: ~10 ключей (per event type), большие коллекции в каждом (тысячи элементов).
  • Total state: ~100 MB.
  • Latency требование: per-event < 10 ms.

Выбор: HashMap. State умещается в heap, low latency requirements.

Кейс 3: stateful enrichment

  • State: side input lookup, ~500K entries, ~1 KB каждая.
  • Total state: ~500 MB.
  • Latency требование: per-event < 5 ms.

Выбор: HashMap (если heap позволяет). RocksDB добавит 100-1000x latency.

Кейс 4: session windows за месяц

  • State: per user_id, потенциально миллионы активных сессий.
  • Total state: 50+ GB.
  • Latency: per-event < 100 ms.

Выбор: RocksDB. Heap столько не вместит.


Изменение state backend в runtime

flink-conf.yaml:

state.backend: rocksdb
# или
state.backend: hashmap

Можно переопределить per-job в коде:

env.setStateBackend(new EmbeddedRocksDBStateBackend(true));  // true = incremental

Можно ли переключить с одного backend на другой?

Да, через savepoint:

  1. Сделать savepoint текущего job (flink savepoint <job-id>).
  2. Остановить job.
  3. Поменять state.backend в conf или коде.
  4. Восстановить из savepoint (flink run -s <savepoint-path>).

Savepoint format — backend-agnostic (canonical format), поэтому переключение работает. Не работает с native format (см. урок 3).


Попробуй сам

  1. Создай job, который агрегирует counter per userId (keyed state ValueState<Long>).
  2. Сгенерируй искусственный поток с 1M уникальных user_id.
  3. Запусти с HashMapStateBackend. Замерь throughput и lastCheckpointDuration.
  4. Запусти с EmbeddedRocksDBStateBackend. Замерь то же самое.
  5. Увеличь до 10M user_id. Какой backend выживет?

Ключевые выводы

  1. HashMap: JVM heap, быстрый, ограничен размером heap (~10 GB). Full snapshot каждый checkpoint.
  2. RocksDB: локальный диск + off-heap cache, медленнее (100-1000x latency), но scalable до TB. Incremental snapshot.
  3. Размер state — главный фактор. До 1 GB — HashMap. Больше 10 GB — RocksDB. Между — зависит.
  4. Latency: HashMap — 50 ns, RocksDB — 5-100 microseconds на операцию. Для high-throughput per-event ops это критично.
  5. Checkpoint: incremental RocksDB сильно лучше для большого state — только delta загружается.
  6. Переключение между backend’ами возможно через canonical savepoint.
Проверка знанийKnowledge check
Вы запустили Flink job с HashMapStateBackend. Job обрабатывает 10K events/sec, keyed state per user (ValueState содержит JSON ~500 байт). После 12 часов работы Flink TaskManager падает с OutOfMemoryError. lastCheckpointSize в Flink UI = 8 GB и растёт. Что произошло, и какие конкретные шаги предпринять?
ОтветAnswer
Что произошло: state накапливается линейно, потому что новые user_id всё время добавляются (или ValueState не очищается через TTL). За 12 часов набрался state на 8 GB, что превысило размер JVM heap. HashMapStateBackend держит всё на heap — OOM закономерен. Шаги: (1) Переключиться на EmbeddedRocksDBStateBackend: env.setStateBackend(new EmbeddedRocksDBStateBackend(true)) с включённым incremental — RocksDB вынесет state на диск, snapshot станет инкрементальным. (2) Добавить state TTL через StateTtlConfig: если user не приходит N дней — state удаляется (например, ttl=Duration.ofDays(7), cleanupStrategies.cleanupInBackground()). Это ограничит размер state. (3) Сделать savepoint существующего state в canonical формате, перезапустить с RocksDB и TTL config — данные переедут в новое хранилище без потерь. После этого мониторить lastCheckpointDuration: incremental snapshot должен быть быстрым даже при большом state.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Главные характеристики, по которым отличаются HashMapStateBackend и EmbeddedRocksDBStateBackend?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4