Learning Platform
Глоссарий Troubleshooting
Урок 05.02 · 22 мин
Средний
ValueStateListStateMapStateStateDescriptorRichFunctionKeyedState

ValueState, ListState, MapState

После keyBy() оператор может хранить состояние per-key. Flink предлагает три базовых типа keyed state, и каждый из них оптимизирован под свой паттерн доступа. Выбор правильного типа влияет на производительность чекпойнтов, RocksDB-storage и сериализацию.

В этом уроке разбираем ValueState, ListState, MapState — что они умеют, как их использовать в Java и Python, и где границы.


Контекст: как state объявляется

Все типы keyed state объявляются одинаково — через state descriptor в методе open() функции, расширяющей RichFunction. Доступ к state работает только внутри методов вроде processElement(), и только когда есть текущий ключ (его выставляет Flink автоматически при вызове из keyed-оператора).

public class FraudDetector extends RichFlatMapFunction<Transaction, Alert> {
    private transient ValueState<Long> lastTxTimestamp;

    @Override
    public void open(OpenContext openContext) {
        ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(
            "last-tx-timestamp",   // имя — должно быть уникальным в пределах оператора
            Types.LONG              // тип
        );
        lastTxTimestamp = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(Transaction tx, Collector<Alert> out) throws Exception {
        Long previous = lastTxTimestamp.value();  // null если для текущего ключа state не существует
        if (previous != null && tx.timestamp - previous < 1000) {
            out.collect(new Alert(tx.accountId, "Too frequent"));
        }
        lastTxTimestamp.update(tx.timestamp);
    }
}

Несколько ключевых правил:

  • Поле должно быть transient. Flink сам создаёт state в open() — сам объект ValueState не сериализуется как часть функции.
  • State идентифицируется по имени. При rescale или restart Flink находит state по имени из дескриптора. Менять имя без savepoint миграции — терять данные.
  • Тип фиксируется при первом запуске. Изменение типа сериализатора требует миграции состояния через State Processor API или savepoint с conversion.
Сериализация state: как Flink хранит TypeInformation

В Python (PyFlink) синтаксис аналогичный:

from pyflink.datastream.functions import RichFlatMapFunction
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.common.typeinfo import Types

class FraudDetector(RichFlatMapFunction):
    def __init__(self):
        self.last_tx_timestamp = None

    def open(self, runtime_context):
        descriptor = ValueStateDescriptor("last-tx-timestamp", Types.LONG())
        self.last_tx_timestamp = runtime_context.get_state(descriptor)

    def flat_map(self, tx):
        previous = self.last_tx_timestamp.value()
        if previous is not None and tx.timestamp - previous < 1000:
            yield Alert(tx.account_id, "Too frequent")
        self.last_tx_timestamp.update(tx.timestamp)

ValueState: одно значение на ключ

ValueState<T> хранит одно значение типа T на каждый ключ. Это самый простой и быстрый тип state — две операции:

  • value() — прочитать текущее значение (или null, если ещё не было update).
  • update(T value) — заменить значение.
  • clear() — удалить значение (полезно для cleanup, см. урок про TTL).

Типичные применения:

  • Последний таймстамп события для дедупликации.
  • Накопленный счётчик/сумма.
  • Текущее состояние FSM (например, статус пользователя).
  • Последний “снэпшот” объекта пользователя.
ValueState: один слот на ключ
Subtask 1 хранит ValueState для каждого ключа, попавшего в его key groups
Один пользователь, один слот: ValueState`<Long>` хранит ровно одно значение. update() заменяет существующее, value() возвращает null если не было записано.
Другой ключ — независимый слот. Состояние строго изолировано: оператор не видит state другого ключа без явного обращения.
Каждый новый ключ получает свой пустой слот. value() вернёт null до первого update().
TIP

Если ваш T — это сложный POJO или большой массив, и вы обновляете только малую часть — рассмотрите MapState. ValueState всегда сериализует/десериализует значение целиком. Для RocksDB-backend это означает чтение и запись полного blob на каждое обращение.


ListState: упорядоченный список на ключ

ListState<T> хранит список элементов на ключ. Семантика — append-only в основном использовании:

  • add(T element) — добавить элемент в конец.
  • addAll(List<T>) — добавить пачку.
  • get() — вернуть Iterable<T> со всеми элементами.
  • update(List<T>) — полностью заменить список.
  • clear() — очистить.

Типичные применения:

  • Буфер событий для пакетной обработки.
  • История последних N событий (с явной обрезкой в коде).
  • Накопление элементов окна сессии.
  • Список pending tasks/orders для пользователя.
public class EventBuffer extends RichFlatMapFunction<Event, Batch> {
    private transient ListState<Event> buffer;

    @Override
    public void open(OpenContext ctx) {
        ListStateDescriptor<Event> desc = new ListStateDescriptor<>(
            "event-buffer", TypeInformation.of(Event.class)
        );
        buffer = getRuntimeContext().getListState(desc);
    }

    @Override
    public void flatMap(Event event, Collector<Batch> out) throws Exception {
        buffer.add(event);

        // Считаем накопленное количество (тяжёлая операция — Iterable)
        long count = 0;
        for (Event ignored : buffer.get()) count++;

        if (count >= 100) {
            List<Event> snapshot = new ArrayList<>();
            buffer.get().forEach(snapshot::add);
            out.collect(new Batch(snapshot));
            buffer.clear();
        }
    }
}
WARNING

ListState.get() возвращает Iterable, и итерация по нему в RocksDB — это последовательное чтение всех байтов из state-backend. Для больших списков это медленно. Если вам нужно знать только размер списка — храните счётчик в отдельном ValueState; не итерируйте.

В Python:

from pyflink.datastream.state import ListStateDescriptor

class EventBuffer(RichFlatMapFunction):
    def open(self, runtime_context):
        desc = ListStateDescriptor("event-buffer", Types.ROW([Types.STRING(), Types.LONG()]))
        self.buffer = runtime_context.get_list_state(desc)

    def flat_map(self, event):
        self.buffer.add(event)
        items = list(self.buffer.get())
        if len(items) >= 100:
            yield Batch(items)
            self.buffer.clear()

MapState: словарь на ключ

MapState<UK, UV> хранит ассоциативный массив на каждый ключ. Это самый гибкий тип, и для RocksDB-backend он часто эффективнее ValueState с POJO-словарём внутри: Flink сериализует и хранит каждую (UK, UV)-пару отдельно.

API:

  • put(UK key, UV value) — добавить/обновить.
  • get(UK key) — прочитать значение (или null).
  • contains(UK key) — проверка существования.
  • remove(UK key) — удалить пару.
  • keys(), values(), entries() — итераторы.
  • isEmpty() — пустой ли.
  • clear() — удалить всё.

Типичные применения:

  • Last-seen timestamp per device_id внутри пользователя.
  • Счётчики per category per user.
  • Кэш ответов внешнего сервиса.
  • Sessions per user (sessionId -> SessionData).
public class PerDeviceCounter extends RichFlatMapFunction<Event, Stat> {
    private transient MapState<String, Long> deviceCounts;

    @Override
    public void open(OpenContext ctx) {
        MapStateDescriptor<String, Long> desc = new MapStateDescriptor<>(
            "device-counts", Types.STRING, Types.LONG
        );
        deviceCounts = getRuntimeContext().getMapState(desc);
    }

    @Override
    public void flatMap(Event ev, Collector<Stat> out) throws Exception {
        Long current = deviceCounts.get(ev.deviceId);
        long next = (current == null ? 0 : current) + 1;
        deviceCounts.put(ev.deviceId, next);

        if (next % 100 == 0) {
            out.collect(new Stat(ev.userId, ev.deviceId, next));
        }
    }
}

Python:

from pyflink.datastream.state import MapStateDescriptor

class PerDeviceCounter(RichFlatMapFunction):
    def open(self, runtime_context):
        desc = MapStateDescriptor(
            "device-counts",
            Types.STRING(),
            Types.LONG()
        )
        self.device_counts = runtime_context.get_map_state(desc)

    def flat_map(self, ev):
        current = self.device_counts.get(ev.device_id) or 0
        next_val = current + 1
        self.device_counts.put(ev.device_id, next_val)
        if next_val % 100 == 0:
            yield Stat(ev.user_id, ev.device_id, next_val)
TIP

MapState — это правильный выбор, когда у вас есть второй уровень группировки внутри ключа. Например, ключ keyBy — user_id, а внутри per user вы хотите хранить state per device_id. Альтернатива ValueState<HashMap> заставит Flink сериализовать всю мапу при каждом обращении, что катастрофически плохо для больших мап на RocksDB.


Выбор: ValueState vs ListState vs MapState

Решение зависит от паттерна доступа и storage-backend:

Выбор типа state
Используй когда: храните одно скалярное или небольшое POJO-значение. Часто обновляете целиком. Не нужно искать по подключу.
Используй когда: накапливаете коллекцию для дальнейшей пакетной обработки. Чаще всего append-only с периодической очисткой. Размер списка ограничен (десятки-сотни элементов, не миллионы).
Используй когда: есть второй уровень группировки внутри основного ключа. Нужен random access по подключу. Размер может быть большим — каждая пара хранится отдельно в RocksDB.

Эмпирическое правило при работе с RocksDB-backend:

  • ValueState — O(value_size) на каждое чтение/запись. Хорошо для маленьких значений (< 1 KB).
  • ListState — O(list_size) на каждое чтение через get(). Запись через add() — O(element_size). Не используйте get() в hot path, если список большой.
  • MapState — O(uv_size) для get/put конкретного UK. O(map_size) для итерации entries(). Идеально для random access.

Сериализация: type info и performance

Flink использует typed serializers — производительность зависит от выбранного сериализатора.

Хорошие сериализаторы (в порядке убывания скорости):

  • Примитивы и String (Types.STRING, Types.LONG) — встроенные.
  • POJO с правильно объявленными полями (public + setters/getters или public fields).
  • Tuple-типы — Tuple1, Tuple2, …, Tuple25.

Медленные сериализаторы (Kryo fallback):

  • Сложные generic-типы, которые Flink не может проанализировать.
  • Классы без default-конструктора.
  • Иммутабельные коллекции из стороних библиотек.

При запуске с pipeline.generic-types = false Flink упадёт, если попадётся Kryo-fallback — это полезный strict-mode для production. Альтернатива: Apache Avro (TypeInformation.of(Order.class) + Avro generation) — стабильная сериализация с поддержкой schema evolution.

WARNING

Не сохраняйте в state объекты, чей класс может измениться при следующем релизе. После checkpoint Flink хранит данные с конкретным сериализатором — если в новой версии вы переименовали поле или удалили его без миграции, restore упадёт. Для долгоживущего state используйте Avro или explicit schema migration через State Processor API.


State per ключ — не per parallelism

Частое заблуждение: “если parallelism=4, то у меня 4 копии state”. Это неверно. State существует per ключ, и количество ключей не ограничено parallelism. Один subtask может хранить миллионы ключей, каждый со своим набором state-значений.

Что определяется parallelism:

  • Количество subtasks, которые параллельно обрабатывают разные ключи.
  • Распределение key groups между subtasks.

Что определяется ключами:

  • Размер state — растёт линейно с количеством уникальных ключей.
  • Cardinality issue: если ключ unbounded (например event_id, который никогда не повторяется), state будет расти бесконечно. Это первый кандидат на проблемы (см. урок про TTL и анти-паттерны).

Попробуй сам

  1. ValueState vs MapState бенчмарк. Напишите два варианта одной задачи: подсчёт событий per (user_id, device_id). Первый — keyBy(user_id) + ValueState<HashMap<DeviceId, Long>>. Второй — keyBy(user_id) + MapState<DeviceId, Long>. Прогоните по 1M событий с 10K устройств на пользователя. Сравните размер чекпойнтов и время full snapshot.

  2. ListState size cliff. Накапливайте в ListState события до 10, 100, 1000, 10000 элементов и замеряйте время полной итерации (for (Event e : state.get())). Зависимость должна быть линейной — это нормально, но иллюстрирует, почему get() в hot path плохая идея.

  3. State migration. Запустите job с ValueState<UserSnapshot v1>, накопите state, сделайте savepoint. Добавьте поле в UserSnapshot v2 (без удаления старых полей), restore. Проверьте, что старое state читается корректно — Flink использует встроенный POJO-serializer с поддержкой добавления полей.

Проверка знанийKnowledge check
Вы пишете оператор, который для каждого user_id должен хранить мапу session_id -> List<EventId> (история событий в каждой активной сессии). Какой тип state выбрать и почему?
ОтветAnswer
Лучший выбор — MapState<String, List<String>>, где UK = session_id, UV = список event_id. Альтернатива через ValueState<HashMap<String, List<String>>> работает, но для RocksDB-backend заставит Flink при каждом обращении сериализовать и десериализовать всю мапу целиком. С MapState каждая пара хранится отдельно — добавление нового event в существующую сессию читает и пишет только одну строчку RocksDB. ListState не подходит, потому что отсутствует второй уровень индексации (по session_id). Дополнительно: для очень больших списков EventId внутри сессии стоит рассмотреть вложенный MapState<Tuple2<String, Long>, EventId> с композитным ключом session_id + sequence_number, но это уже про производительность конкретного workload-а.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Команда оптимизирует Flink-job: для каждого user_id хранится словарь sessionId -> SessionData, где словарь может содержать до 1000 активных сессий, и при обработке каждого события обновляется одна конкретная сессия. Текущая реализация использует ValueState<HashMap<String, SessionData>>. Какая реализация будет работать значительно быстрее на RocksDB-backend?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4