ValueState, ListState, MapState
После keyBy() оператор может хранить состояние per-key. Flink предлагает три базовых типа keyed state, и каждый из них оптимизирован под свой паттерн доступа. Выбор правильного типа влияет на производительность чекпойнтов, RocksDB-storage и сериализацию.
В этом уроке разбираем ValueState, ListState, MapState — что они умеют, как их использовать в Java и Python, и где границы.
Контекст: как state объявляется
Все типы keyed state объявляются одинаково — через state descriptor в методе open() функции, расширяющей RichFunction. Доступ к state работает только внутри методов вроде processElement(), и только когда есть текущий ключ (его выставляет Flink автоматически при вызове из keyed-оператора).
public class FraudDetector extends RichFlatMapFunction<Transaction, Alert> {
private transient ValueState<Long> lastTxTimestamp;
@Override
public void open(OpenContext openContext) {
ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(
"last-tx-timestamp", // имя — должно быть уникальным в пределах оператора
Types.LONG // тип
);
lastTxTimestamp = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Transaction tx, Collector<Alert> out) throws Exception {
Long previous = lastTxTimestamp.value(); // null если для текущего ключа state не существует
if (previous != null && tx.timestamp - previous < 1000) {
out.collect(new Alert(tx.accountId, "Too frequent"));
}
lastTxTimestamp.update(tx.timestamp);
}
}
Несколько ключевых правил:
- Поле должно быть
transient. Flink сам создаёт state вopen()— сам объектValueStateне сериализуется как часть функции. - State идентифицируется по имени. При rescale или restart Flink находит state по имени из дескриптора. Менять имя без savepoint миграции — терять данные.
- Тип фиксируется при первом запуске. Изменение типа сериализатора требует миграции состояния через State Processor API или savepoint с conversion.
В Python (PyFlink) синтаксис аналогичный:
from pyflink.datastream.functions import RichFlatMapFunction
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.common.typeinfo import Types
class FraudDetector(RichFlatMapFunction):
def __init__(self):
self.last_tx_timestamp = None
def open(self, runtime_context):
descriptor = ValueStateDescriptor("last-tx-timestamp", Types.LONG())
self.last_tx_timestamp = runtime_context.get_state(descriptor)
def flat_map(self, tx):
previous = self.last_tx_timestamp.value()
if previous is not None and tx.timestamp - previous < 1000:
yield Alert(tx.account_id, "Too frequent")
self.last_tx_timestamp.update(tx.timestamp)
ValueState: одно значение на ключ
ValueState<T> хранит одно значение типа T на каждый ключ. Это самый простой и быстрый тип state — две операции:
value()— прочитать текущее значение (илиnull, если ещё не былоupdate).update(T value)— заменить значение.clear()— удалить значение (полезно для cleanup, см. урок про TTL).
Типичные применения:
- Последний таймстамп события для дедупликации.
- Накопленный счётчик/сумма.
- Текущее состояние FSM (например, статус пользователя).
- Последний “снэпшот” объекта пользователя.
Если ваш T — это сложный POJO или большой массив, и вы обновляете только малую часть — рассмотрите MapState. ValueState всегда сериализует/десериализует значение целиком. Для RocksDB-backend это означает чтение и запись полного blob на каждое обращение.
ListState: упорядоченный список на ключ
ListState<T> хранит список элементов на ключ. Семантика — append-only в основном использовании:
add(T element)— добавить элемент в конец.addAll(List<T>)— добавить пачку.get()— вернутьIterable<T>со всеми элементами.update(List<T>)— полностью заменить список.clear()— очистить.
Типичные применения:
- Буфер событий для пакетной обработки.
- История последних N событий (с явной обрезкой в коде).
- Накопление элементов окна сессии.
- Список pending tasks/orders для пользователя.
public class EventBuffer extends RichFlatMapFunction<Event, Batch> {
private transient ListState<Event> buffer;
@Override
public void open(OpenContext ctx) {
ListStateDescriptor<Event> desc = new ListStateDescriptor<>(
"event-buffer", TypeInformation.of(Event.class)
);
buffer = getRuntimeContext().getListState(desc);
}
@Override
public void flatMap(Event event, Collector<Batch> out) throws Exception {
buffer.add(event);
// Считаем накопленное количество (тяжёлая операция — Iterable)
long count = 0;
for (Event ignored : buffer.get()) count++;
if (count >= 100) {
List<Event> snapshot = new ArrayList<>();
buffer.get().forEach(snapshot::add);
out.collect(new Batch(snapshot));
buffer.clear();
}
}
}
ListState.get() возвращает Iterable, и итерация по нему в RocksDB — это последовательное чтение всех байтов из state-backend. Для больших списков это медленно. Если вам нужно знать только размер списка — храните счётчик в отдельном ValueState; не итерируйте.
В Python:
from pyflink.datastream.state import ListStateDescriptor
class EventBuffer(RichFlatMapFunction):
def open(self, runtime_context):
desc = ListStateDescriptor("event-buffer", Types.ROW([Types.STRING(), Types.LONG()]))
self.buffer = runtime_context.get_list_state(desc)
def flat_map(self, event):
self.buffer.add(event)
items = list(self.buffer.get())
if len(items) >= 100:
yield Batch(items)
self.buffer.clear()
MapState: словарь на ключ
MapState<UK, UV> хранит ассоциативный массив на каждый ключ. Это самый гибкий тип, и для RocksDB-backend он часто эффективнее ValueState с POJO-словарём внутри: Flink сериализует и хранит каждую (UK, UV)-пару отдельно.
API:
put(UK key, UV value)— добавить/обновить.get(UK key)— прочитать значение (или null).contains(UK key)— проверка существования.remove(UK key)— удалить пару.keys(),values(),entries()— итераторы.isEmpty()— пустой ли.clear()— удалить всё.
Типичные применения:
- Last-seen timestamp per device_id внутри пользователя.
- Счётчики per category per user.
- Кэш ответов внешнего сервиса.
- Sessions per user (sessionId -> SessionData).
public class PerDeviceCounter extends RichFlatMapFunction<Event, Stat> {
private transient MapState<String, Long> deviceCounts;
@Override
public void open(OpenContext ctx) {
MapStateDescriptor<String, Long> desc = new MapStateDescriptor<>(
"device-counts", Types.STRING, Types.LONG
);
deviceCounts = getRuntimeContext().getMapState(desc);
}
@Override
public void flatMap(Event ev, Collector<Stat> out) throws Exception {
Long current = deviceCounts.get(ev.deviceId);
long next = (current == null ? 0 : current) + 1;
deviceCounts.put(ev.deviceId, next);
if (next % 100 == 0) {
out.collect(new Stat(ev.userId, ev.deviceId, next));
}
}
}
Python:
from pyflink.datastream.state import MapStateDescriptor
class PerDeviceCounter(RichFlatMapFunction):
def open(self, runtime_context):
desc = MapStateDescriptor(
"device-counts",
Types.STRING(),
Types.LONG()
)
self.device_counts = runtime_context.get_map_state(desc)
def flat_map(self, ev):
current = self.device_counts.get(ev.device_id) or 0
next_val = current + 1
self.device_counts.put(ev.device_id, next_val)
if next_val % 100 == 0:
yield Stat(ev.user_id, ev.device_id, next_val)
MapState — это правильный выбор, когда у вас есть второй уровень группировки внутри ключа. Например, ключ keyBy — user_id, а внутри per user вы хотите хранить state per device_id. Альтернатива ValueState<HashMap> заставит Flink сериализовать всю мапу при каждом обращении, что катастрофически плохо для больших мап на RocksDB.
Выбор: ValueState vs ListState vs MapState
Решение зависит от паттерна доступа и storage-backend:
Эмпирическое правило при работе с RocksDB-backend:
- ValueState — O(value_size) на каждое чтение/запись. Хорошо для маленьких значений (
< 1 KB). - ListState — O(list_size) на каждое чтение через
get(). Запись черезadd()— O(element_size). Не используйтеget()в hot path, если список большой. - MapState — O(uv_size) для
get/putконкретного UK. O(map_size) для итерацииentries(). Идеально для random access.
Сериализация: type info и performance
Flink использует typed serializers — производительность зависит от выбранного сериализатора.
Хорошие сериализаторы (в порядке убывания скорости):
- Примитивы и String (
Types.STRING,Types.LONG) — встроенные. - POJO с правильно объявленными полями (public + setters/getters или public fields).
- Tuple-типы — Tuple1, Tuple2, …, Tuple25.
Медленные сериализаторы (Kryo fallback):
- Сложные generic-типы, которые Flink не может проанализировать.
- Классы без default-конструктора.
- Иммутабельные коллекции из стороних библиотек.
При запуске с pipeline.generic-types = false Flink упадёт, если попадётся Kryo-fallback — это полезный strict-mode для production. Альтернатива: Apache Avro (TypeInformation.of(Order.class) + Avro generation) — стабильная сериализация с поддержкой schema evolution.
Не сохраняйте в state объекты, чей класс может измениться при следующем релизе. После checkpoint Flink хранит данные с конкретным сериализатором — если в новой версии вы переименовали поле или удалили его без миграции, restore упадёт. Для долгоживущего state используйте Avro или explicit schema migration через State Processor API.
State per ключ — не per parallelism
Частое заблуждение: “если parallelism=4, то у меня 4 копии state”. Это неверно. State существует per ключ, и количество ключей не ограничено parallelism. Один subtask может хранить миллионы ключей, каждый со своим набором state-значений.
Что определяется parallelism:
- Количество subtasks, которые параллельно обрабатывают разные ключи.
- Распределение key groups между subtasks.
Что определяется ключами:
- Размер state — растёт линейно с количеством уникальных ключей.
- Cardinality issue: если ключ unbounded (например
event_id, который никогда не повторяется), state будет расти бесконечно. Это первый кандидат на проблемы (см. урок про TTL и анти-паттерны).
Попробуй сам
-
ValueState vs MapState бенчмарк. Напишите два варианта одной задачи: подсчёт событий per (user_id, device_id). Первый — keyBy(user_id) + ValueState
<HashMap<DeviceId, Long>>. Второй — keyBy(user_id) + MapState<DeviceId, Long>. Прогоните по 1M событий с 10K устройств на пользователя. Сравните размер чекпойнтов и время full snapshot. -
ListState size cliff. Накапливайте в ListState события до 10, 100, 1000, 10000 элементов и замеряйте время полной итерации (
for (Event e : state.get())). Зависимость должна быть линейной — это нормально, но иллюстрирует, почему get() в hot path плохая идея. -
State migration. Запустите job с
ValueState<UserSnapshot v1>, накопите state, сделайте savepoint. Добавьте поле вUserSnapshot v2(без удаления старых полей), restore. Проверьте, что старое state читается корректно — Flink использует встроенный POJO-serializer с поддержкой добавления полей.