Learning Platform
Глоссарий Troubleshooting
Урок 06.02 · 24 мин
Продвинутый
HashMapStateBackendHeap stateSync snapshotLow-latencyCopy-on-write

HashMapStateBackend — единственный state backend в Flink, который не сериализует данные в hot path.

Иерархия памяти: L1/L2/L3 и RAM Это даёт ему фантастическую скорость доступа: твой state.value() возвращает прямой Java-объект, без копирования и десериализации. Это идеально для low-latency задач с маленьким state’ом, но создаёт уникальные ограничения, которые отличают его от RocksDB.

В этом уроке — как HashMapStateBackend устроен внутри, почему сериализация всё-таки нужна (но в холодном пути), что такое CopyOnWriteStateMap и почему snapshot останавливает задачу.

Структура: вложенные HashMap

Внутри HeapKeyedStateBackend (это внутреннее имя HashMapStateBackend) state представлен трёхуровневой иерархией Java коллекций:

Map<String,                           // stateName ("counter", "window-contents")
    StateTable<K,
        Map<N,                        // namespace (window, VoidNamespace)
            Map<K, V>>>>              // key -> value

StateTable — это не просто HashMap<K, V>, а специальная реализация CopyOnWriteStateMap<K, V> (для most cases). О copy-on-write — ниже.

Когда userCode зовёт state.value():

  1. Backend знает current key (через setCurrentKey()).
  2. Знает текущий namespace (через operator-side context).
  3. Находит соответствующий StateTable по stateName.
  4. В StateTable делает get(namespace, key).
  5. Возвращает прямой объект.

Ноль сериализации, ноль аллокаций. Если ты делаешь миллион state.value() в секунду на простом keyBy + count — каждый занимает ~100 наносекунд.

Зачем тогда сериализация?

Парадокс: если в hot path нет сериализации, то откуда вообще требование иметь TypeSerializer в ValueStateDescriptor? Ответ: сериализация нужна для snapshot и restore.

При checkpoint бэкенд проходит весь StateTable, сериализует каждое значение через TypeSerializer, и пишет байты в FsCheckpointStreamFactory (HDFS/S3 stream). Это не в hot path, но если сериализатор медленный — checkpoint длится дольше.

При restore — обратная процедура: backend читает байты из checkpoint и десериализует обратно в Java объекты.

Это важно понимать: даже если у тебя «нет сериализации в runtime», тип сериализатора всё равно влияет на performance — через скорость и размер checkpoint’ов.

CopyOnWriteStateMap: lock-free snapshot

Главная техническая фишка HashMapStateBackend — CopyOnWriteStateMap. Это специальная hash map, написанная Flink специально для snapshot semantics.

Проблема, которую она решает: оператор продолжает обрабатывать записи, а нам нужно сделать консистентный snapshot этой же hash map. Обычно для этого пришлось бы либо:

  • Полностью lock’ить map на время snapshot (плохо — оператор простаивает).
  • Делать defensive deep copy (плохо — память х2 на момент копирования).

Flink выбрал третий путь — copy-on-write. Logical:

  1. При начале snapshot увеличивается номер «version» state map.
  2. После этого каждая модификация (put/remove) создаёт новую копию только модифицируемого bucket’а, а старая остаётся как часть snapshot version.
  3. Snapshot потом неспешно проходит по старой version, сериализует и отправляет в storage.

Это похоже на immutable persistent data structures из Clojure/Haskell. Памяти временно расходуется столько же, сколько изменений происходит между началом и концом snapshot. Если за это время изменилось 10% bucket’ов — overhead 10%, не 100%.

CopyOnWriteStateMap: версии при snapshot

При snapshot v++. Новые модификации делают копию bucket, оставляя версию snapshot нетронутой. Snapshot async проходит по snapshot version, оригинальная version продолжает работу.

Steady state (v=5)StateMap: один набор bucket'ов, все модификации in-place
Snapshot start: v=6 (но snapshot читает v=5)новые put'ы делают copy bucket'а до модификации
bucket #1 (не менялся)v=5 и v=6 ссылаются на один экземпляр
bucket #2 (был put)v=5 — старая копия, v=6 — новая копия
bucket #3 (не менялся)shared
Snapshot readerитерирует bucket'ы версии 5, сериализует, шлёт в S3Snapshot не блокирует продакшен. Оператор продолжает put/get в версию 6.
Producer (оператор)put/get на версии 6
Snapshot complete -> v=5 garbage collectedbucket'ы версии 5, которые были скопированы, больше не нужны

Эта схема даёт non-blocking snapshot, но добавляет overhead на каждый put в районе 30-50 ns. На большинстве workload это незаметно.

Snapshot: что именно отправляется в S3

При checkpoint бэкенд создаёт KeyedStateHandle, который состоит из:

  • Метаданные: KeyGroupRange (диапазон ключевых групп), список stateNames и их serializers.
  • Данные: для каждого state primitive — последовательность сериализованных entries (key, namespace, value).

Формат — Flink-internal, оптимизированный для линейного read’а. Один файл в S3 на каждый KeyGroup’у — это даёт parallel restore.

Размер checkpoint = sum size of all serialized values. Для in-memory state 1 GiB — checkpoint в S3 тоже ~1 GiB (плюс metadata ~1%). Если у тебя 10 минут interval и job хранит 1 GiB — это 6 GiB новых данных в S3 каждый час. Учитывай при расчёте storage costs.

HashMapStateBackend не поддерживает incremental checkpoint. Каждый snapshot — полный. Это вторая принципиальная разница с RocksDB, где можно отправлять только новые SST файлы.

Когда HashMapStateBackend идеален

Конкретные характеристики job, для которой это лучший выбор:

  1. State помещается в heap — ориентировочно до 1-2 GiB на slot (с учётом overhead для JVM и GC).
  2. Latency критична — каждые 100 наносекунд на доступ имеют значение (например, low-latency trading).
  3. Простой тип значений — Long counter, набор Long’ов, маленькие POJO. Когда (де)сериализация значений тяжела, RocksDB убивает performance.
  4. Не нужны incremental checkpoint’ы — checkpoint interval больше 5 минут, размер до 1 GiB, network в S3 не bottleneck.

Производственные примеры из реальной практики:

  • Real-time fraud detection с маленьким per-user state (счётчики, баллы).
  • Sessionization с относительно короткими session’ами (window per user state).
  • Aggregation для metrics с десятками тысяч ключей.

Когда НЕ использовать HashMapStateBackend

  1. State > 1-2 GiB на slot — heap не выдержит, GC pause будут невыносимыми (1 секунда+ на каждый Old GC).
  2. Очень дорогой type serializer — Kryo / POJO с десятками полей. Каждый checkpoint станет страданием.
  3. Желательны incremental checkpoint’ы — стрим с быстрым state ростом, инкремент сэкономит часы (например, IoT events с миллиардами unique device_id).
  4. Long-running job с уверенной утечкой state — например, unbounded broadcast или session’ы без TTL. RocksDB переживёт лучше.

TTL: как это работает в HashMap backend

StateTtlConfig поддерживается, но реализован через lazy cleanup:

  1. Когда state записывается, сохраняется timestamp последней модификации.
  2. При чтении значения backend проверяет: если now - last_access > ttl, значение считается мёртвым и возвращается null.
  3. Реальное удаление происходит при следующем snapshot — там пройдёт incremental cleanup (FLINK-9938), который удалит истекшие entry.

Параметры:

StateTtlConfig.newBuilder(Time.minutes(60))
    .cleanupIncrementally(10, false)  // удалять 10 entries за visit
    .cleanupFullSnapshot()             // при full snapshot удалить все expired
    .build();

Без TTL config истекшие entry никогда не удаляются — HashMap будет расти, пока не упадёт OOM. Это самая частая утечка memory в jobs, использующих HashMap backend.

WARNING

TTL cleanupIncrementally(N, false) — параметр runCleanupForEveryRecord имеет значение. Если true, очистка проходит при каждой записи — overhead на read path. Если false (рекомендуется), очистка проходит только при доступе к state, что менее агрессивно. При больших state’ах с редкими доступами false может означать, что cleanup никогда не пройдёт, и стоит дополнительно полагаться на cleanupFullSnapshot.

Конфигурация

В flink-conf.yaml:

state.backend.type: hashmap
state.checkpoints.dir: s3://my-checkpoints/
state.savepoints.dir: s3://my-savepoints/

В коде задаётся параметрами job:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(
    new FileSystemCheckpointStorage("s3://my-checkpoints/"));

FileSystemCheckpointStorage нужен явно — backend сам по себе только хранит state в памяти, для checkpoint нужно отдельно указать persistent storage.

Чтение source

  • flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java — основной класс. Метод getOrCreateKeyedState() — где создаются конкретные state primitive.
  • flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java — самый интересный код в backend. Алгоритм бакетов с версионированием.
  • flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java, HeapListState.java, и т.д. — реализации state primitive над StateTable.
  • flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMapTest.java — отличные тесты, показывающие как версионирование работает на конкретных сценариях.

Чек-лист

  • HashMapStateBackend хранит state в JVM heap, не сериализует в hot path (~100 ns на доступ).
  • Структура: Map<stateName, StateTable<key, Map<namespace, Map<key, value>>>>.
  • CopyOnWriteStateMap — lock-free snapshot через copy-on-write на bucket level.
  • Сериализация нужна для snapshot/restore, не для runtime.
  • Snapshot — sync deep copy через COW (non-blocking для остальных операций, но overhead на put).
  • НЕ поддерживает incremental checkpoint — каждый снимок полный.
  • Идеален при: state до 1-2 GiB на slot, low-latency требования, простые типы.
  • НЕ использовать: state > 2 GiB, тяжёлые сериализаторы, нужен incremental.
  • TTL через lazy cleanup; обязательно настраивай cleanupIncrementally или cleanupFullSnapshot, иначе протечёт.
Проверка знанийKnowledge check
Job использует HashMapStateBackend, у каждого user'а копится session state (ListState<Event>). State растёт ~10 MiB/мин, heap = 4 GiB, checkpoint interval = 10 минут. Через сколько времени job начнёт сильно тормозить и почему?
ОтветAnswer
Через ~30-60 минут начнутся явные проблемы. Math: 10 MiB/мин × 30 мин = 300 MiB чистого state. Реальное потребление heap из-за overhead Java объектов и COW (на каждый bucket с put — копия) будет 2-3x. То есть к 30-й минуте heap уже на 1+ GiB занят только state'ом. К 60-й минуте — ~2 GiB. Дальше начинается каскад: (1) Young GC всё чаще — больше short-lived объектов из COW. (2) Old GC всё длиннее — большие state объекты не вмещаются в Young space, перетекают в Old. На 2 GiB Old space — Old GC может занять 1-3 секунды. (3) snapshot копирование занимает всё больше времени. (4) В конце концов G1 не справится — Full GC, который может остановить job на 10+ секунд, что обычно превышает heartbeat timeout и приведёт к failover. Решения: либо включить TTL (с cleanupIncrementally), либо переехать на RocksDBStateBackend, либо ограничить session window максимальной длиной.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Главное преимущество HashMapStateBackend перед RocksDB — это access latency. Какое примерно соотношение?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5