HashMapStateBackend — единственный state backend в Flink, который не сериализует данные в hot path.
Иерархия памяти: L1/L2/L3 и RAM Это даёт ему фантастическую скорость доступа: твой state.value() возвращает прямой Java-объект, без копирования и десериализации. Это идеально для low-latency задач с маленьким state’ом, но создаёт уникальные ограничения, которые отличают его от RocksDB.
В этом уроке — как HashMapStateBackend устроен внутри, почему сериализация всё-таки нужна (но в холодном пути), что такое CopyOnWriteStateMap и почему snapshot останавливает задачу.
Структура: вложенные HashMap
Внутри HeapKeyedStateBackend (это внутреннее имя HashMapStateBackend) state представлен трёхуровневой иерархией Java коллекций:
Map<String, // stateName ("counter", "window-contents")
StateTable<K,
Map<N, // namespace (window, VoidNamespace)
Map<K, V>>>> // key -> value
StateTable — это не просто HashMap<K, V>, а специальная реализация CopyOnWriteStateMap<K, V> (для most cases). О copy-on-write — ниже.
Когда userCode зовёт state.value():
- Backend знает current key (через
setCurrentKey()). - Знает текущий namespace (через operator-side context).
- Находит соответствующий
StateTableпо stateName. - В StateTable делает
get(namespace, key). - Возвращает прямой объект.
Ноль сериализации, ноль аллокаций. Если ты делаешь миллион state.value() в секунду на простом keyBy + count — каждый занимает ~100 наносекунд.
Зачем тогда сериализация?
Парадокс: если в hot path нет сериализации, то откуда вообще требование иметь TypeSerializer в ValueStateDescriptor? Ответ: сериализация нужна для snapshot и restore.
При checkpoint бэкенд проходит весь StateTable, сериализует каждое значение через TypeSerializer, и пишет байты в FsCheckpointStreamFactory (HDFS/S3 stream). Это не в hot path, но если сериализатор медленный — checkpoint длится дольше.
При restore — обратная процедура: backend читает байты из checkpoint и десериализует обратно в Java объекты.
Это важно понимать: даже если у тебя «нет сериализации в runtime», тип сериализатора всё равно влияет на performance — через скорость и размер checkpoint’ов.
CopyOnWriteStateMap: lock-free snapshot
Главная техническая фишка HashMapStateBackend — CopyOnWriteStateMap. Это специальная hash map, написанная Flink специально для snapshot semantics.
Проблема, которую она решает: оператор продолжает обрабатывать записи, а нам нужно сделать консистентный snapshot этой же hash map. Обычно для этого пришлось бы либо:
- Полностью lock’ить map на время snapshot (плохо — оператор простаивает).
- Делать defensive deep copy (плохо — память х2 на момент копирования).
Flink выбрал третий путь — copy-on-write. Logical:
- При начале snapshot увеличивается номер «version» state map.
- После этого каждая модификация (put/remove) создаёт новую копию только модифицируемого bucket’а, а старая остаётся как часть snapshot version.
- Snapshot потом неспешно проходит по старой version, сериализует и отправляет в storage.
Это похоже на immutable persistent data structures из Clojure/Haskell. Памяти временно расходуется столько же, сколько изменений происходит между началом и концом snapshot. Если за это время изменилось 10% bucket’ов — overhead 10%, не 100%.
При snapshot v++. Новые модификации делают копию bucket, оставляя версию snapshot нетронутой. Snapshot async проходит по snapshot version, оригинальная version продолжает работу.
Эта схема даёт non-blocking snapshot, но добавляет overhead на каждый put в районе 30-50 ns. На большинстве workload это незаметно.
Snapshot: что именно отправляется в S3
При checkpoint бэкенд создаёт KeyedStateHandle, который состоит из:
- Метаданные: KeyGroupRange (диапазон ключевых групп), список stateNames и их serializers.
- Данные: для каждого state primitive — последовательность сериализованных entries (key, namespace, value).
Формат — Flink-internal, оптимизированный для линейного read’а. Один файл в S3 на каждый KeyGroup’у — это даёт parallel restore.
Размер checkpoint = sum size of all serialized values. Для in-memory state 1 GiB — checkpoint в S3 тоже ~1 GiB (плюс metadata ~1%). Если у тебя 10 минут interval и job хранит 1 GiB — это 6 GiB новых данных в S3 каждый час. Учитывай при расчёте storage costs.
HashMapStateBackend не поддерживает incremental checkpoint. Каждый snapshot — полный. Это вторая принципиальная разница с RocksDB, где можно отправлять только новые SST файлы.
Когда HashMapStateBackend идеален
Конкретные характеристики job, для которой это лучший выбор:
- State помещается в heap — ориентировочно до 1-2 GiB на slot (с учётом overhead для JVM и GC).
- Latency критична — каждые 100 наносекунд на доступ имеют значение (например, low-latency trading).
- Простой тип значений — Long counter, набор Long’ов, маленькие POJO. Когда (де)сериализация значений тяжела, RocksDB убивает performance.
- Не нужны incremental checkpoint’ы — checkpoint interval больше 5 минут, размер до 1 GiB, network в S3 не bottleneck.
Производственные примеры из реальной практики:
- Real-time fraud detection с маленьким per-user state (счётчики, баллы).
- Sessionization с относительно короткими session’ами (window per user state).
- Aggregation для metrics с десятками тысяч ключей.
Когда НЕ использовать HashMapStateBackend
- State > 1-2 GiB на slot — heap не выдержит, GC pause будут невыносимыми (1 секунда+ на каждый Old GC).
- Очень дорогой type serializer — Kryo / POJO с десятками полей. Каждый checkpoint станет страданием.
- Желательны incremental checkpoint’ы — стрим с быстрым state ростом, инкремент сэкономит часы (например, IoT events с миллиардами unique device_id).
- Long-running job с уверенной утечкой state — например, unbounded broadcast или session’ы без TTL. RocksDB переживёт лучше.
TTL: как это работает в HashMap backend
StateTtlConfig поддерживается, но реализован через lazy cleanup:
- Когда state записывается, сохраняется timestamp последней модификации.
- При чтении значения backend проверяет: если
now - last_access > ttl, значение считается мёртвым и возвращается null. - Реальное удаление происходит при следующем snapshot — там пройдёт
incremental cleanup(FLINK-9938), который удалит истекшие entry.
Параметры:
StateTtlConfig.newBuilder(Time.minutes(60))
.cleanupIncrementally(10, false) // удалять 10 entries за visit
.cleanupFullSnapshot() // при full snapshot удалить все expired
.build();
Без TTL config истекшие entry никогда не удаляются — HashMap будет расти, пока не упадёт OOM. Это самая частая утечка memory в jobs, использующих HashMap backend.
TTL cleanupIncrementally(N, false) — параметр runCleanupForEveryRecord имеет значение. Если true, очистка проходит при каждой записи — overhead на read path. Если false (рекомендуется), очистка проходит только при доступе к state, что менее агрессивно. При больших state’ах с редкими доступами false может означать, что cleanup никогда не пройдёт, и стоит дополнительно полагаться на cleanupFullSnapshot.
Конфигурация
В flink-conf.yaml:
state.backend.type: hashmap
state.checkpoints.dir: s3://my-checkpoints/
state.savepoints.dir: s3://my-savepoints/
В коде задаётся параметрами job:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
new FileSystemCheckpointStorage("s3://my-checkpoints/"));
FileSystemCheckpointStorage нужен явно — backend сам по себе только хранит state в памяти, для checkpoint нужно отдельно указать persistent storage.
Чтение source
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java— основной класс. МетодgetOrCreateKeyedState()— где создаются конкретные state primitive.flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java— самый интересный код в backend. Алгоритм бакетов с версионированием.flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java,HeapListState.java, и т.д. — реализации state primitive над StateTable.flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMapTest.java— отличные тесты, показывающие как версионирование работает на конкретных сценариях.
Чек-лист
- HashMapStateBackend хранит state в JVM heap, не сериализует в hot path (~100 ns на доступ).
- Структура:
Map<stateName, StateTable<key, Map<namespace, Map<key, value>>>>. CopyOnWriteStateMap— lock-free snapshot через copy-on-write на bucket level.- Сериализация нужна для snapshot/restore, не для runtime.
- Snapshot — sync deep copy через COW (non-blocking для остальных операций, но overhead на put).
- НЕ поддерживает incremental checkpoint — каждый снимок полный.
- Идеален при: state до 1-2 GiB на slot, low-latency требования, простые типы.
- НЕ использовать: state > 2 GiB, тяжёлые сериализаторы, нужен incremental.
- TTL через lazy cleanup; обязательно настраивай
cleanupIncrementallyилиcleanupFullSnapshot, иначе протечёт.