Learning Platform
Глоссарий Troubleshooting
Урок 06.01 · 26 мин
Продвинутый
State backendKeyedStateBackendOperatorStateBackendSnapshotRestoreLifecycle

State в Flink — это не «одна большая HashMap», как многие представляют. Это система с двумя видами backend’ов, шестью видами primitive’ов, namespace’ами для каждого primitive, lifecycle от создания до snapshot/restore, и абстракцией над реальным storage layer (HashMap или RocksDB). Понимание этой архитектуры — фундамент для любого глубокого тюнинга state и checkpoint’ов.

Value, List и Map state в практике

В этом уроке мы посмотрим на state backend сверху — какие интерфейсы есть, как они связаны, и кто кого вызывает в lifecycle от старта оператора до checkpoint completion. В оставшихся 4 уроках модуля разберём конкретные реализации (HashMap, RocksDB) и их тюнинг.

Два типа state: keyed vs operator

Flink различает два принципиально разных типа state по тому, как они партиционируются между параллельными subtask’ами.

Keyed state — привязан к ключу. Если оператор работает в keyBy(...)-режиме, то у него есть keyed state, и каждая subtask владеет state’ом для подмножества ключей (определяется KeyGroup’ой). Это основной state в большинстве задач: window state, aggregations, joins.

Operator state — привязан к subtask’у целиком, без концепта ключа. Используется в основном source/sink connectors: например, Kafka source хранит offset’ы партиций как operator state, не привязывая их к ключу.

С точки зрения API:

// Keyed state — внутри keyed operator
ValueState<MyType> state = getRuntimeContext().getState(
    new ValueStateDescriptor<>("my-state", MyType.class));

// Operator state — внутри функции, реализующей CheckpointedFunction
ListState<MyType> state = context.getOperatorStateStore()
    .getListState(new ListStateDescriptor<>("my-state", MyType.class));

Эти два state’а живут в разных backend’ах внутри одного и того же оператора: keyedStateBackend и operatorStateBackend. На уровне snapshot они тоже сериализуются отдельно.

KeyedStateBackend: интерфейс и реализации

Интерфейс — KeyedStateBackend<K> (flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java). Главные методы:

public interface KeyedStateBackend<K> {
    void setCurrentKey(K key);
    K getCurrentKey();
    int getCurrentKeyGroupIndex();
    KeyGroupRange getKeyGroupRange();
    
    <N, S extends State, V> S getOrCreateKeyedState(
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<S, V> stateDescriptor) throws Exception;
    
    <N> Stream<K> getKeys(String state, N namespace);
}

Главное здесь — stateful API с current key. Перед каждым обращением к state ты должен установить текущий ключ через setCurrentKey(...), и тогда любой state.value() или state.update(x) будет применён именно для этого ключа.

Это не явная map[key->value]! С точки зрения userCode у тебя есть один объект ValueState<MyType>, который ты вызываешь, и он сам внутри знает, для какого ключа сейчас писать/читать.

Две основные реализации:

  • HeapKeyedStateBackend (alias HashMapStateBackend в публичном API) — всё state в Java HashMap в JVM heap. Быстро, но размер ограничен heap’ом.
  • EmbeddedRocksDBStateBackend — state в RocksDB native engine, на диске + memory кэш. Большие state, но overhead на (де)сериализацию каждого доступа.

Третья реализация (новая в 2.0) — ForStStateBackend для disaggregated state (см. модуль 10), но архитектурно она наследует ту же модель.

OperatorStateBackend: интерфейс и одна реализация

Интерфейс — OperatorStateBackend. Реально используется только одна реализация: DefaultOperatorStateBackend, которая всегда in-memory (нет RocksDB-варианта для operator state).

Почему? Потому что operator state по дизайну маленький. Kafka source хранит десятки offset’ов на partition — десятки байт суммарно. Sinks хранят transactional state — тоже мало. Нет смысла усложнять интерфейс ради 1 KiB state на task’е.

Главные state primitive operator state’а:

  • ListState<T> — список элементов на subtask.
  • BroadcastState<K,V> — read-only state, реплицируется на все subtask’и.
  • UnionListState<T> — особый ListState с union redistribution mode (см. ниже).

Снимки operator state делаются простой Java сериализацией всего списка.

State primitives: что предоставляет backend

И keyed, и operator backend поддерживают несколько типов state. Для keyed:

  • ValueState<T> — одно значение на ключ.
  • ListState<T> — список значений на ключ.
  • MapState<K,V> — вложенная map на ключ.
  • ReducingState<T> — combinable аккумулятор.
  • AggregatingState<IN,OUT> — то же, но с разными типами input/output.

Каждый primitive — это в реализации отдельная структура данных в backend’е. В HashMap — отдельная вложенная HashMap. В RocksDB — отдельный column family (см. урок 4 модуля 05).

Namespace: третья ось

Помимо ключа, у state есть namespace. Это «второе измерение» состояния, обычно используется для окон. Когда у тебя keyBy(userId).window(TumblingEventTimeWindow.of(Time.minutes(5))), для одного userId на одном operator instance накапливается N разных «вёдер» — по одному на каждое window. Это и есть namespace.

Внутри backend’а доступ к state идёт по трём ключам:

(key, namespace, stateName) -> value

Где stateName — это имя из StateDescriptor (например, "window-contents").

Для большинства non-window state’ов namespace = VoidNamespace (синглтон), но для оконных операторов в namespace кодируется само окно (start, end time).

Доступ к state по тройке (key, namespace, stateName)

Внутри backend каждое значение адресуется тройкой. Это позволяет одному оператору иметь несколько типов state и window'ов, не путая их.

Оператор window-agg3 state primitive: window-contents (ListState), trigger-state (ValueState), late-counter (ValueState)
(userId=1, Window(0-300), window-contents)List`<Event>`
(userId=1, Window(0-300), trigger-state)long
(userId=1, VoidNamespace, late-counter)int
(userId=1, Window(300-600), window-contents)List`<Event>`То же state primitive, но другая window — другое значение.
(userId=2, Window(0-300), window-contents)List`<Event>`Другой ключ — независимая запись.

В HashMap backend это вложенная Map<Namespace, Map<Key, V>>, в RocksDB — composite key (serialize(namespace) + serialize(key)).

Lifecycle: от создания до checkpoint

Когда задача стартует, цепочка такая:

  1. StateBackend.createKeyedStateBackend(parameters) — фабрика создаёт backend. Параметры включают KeyGroupRange (какую часть ключевого пространства этот subtask обслуживает), TypeSerializer для ключа, ttlConfig, MetricGroup.
  2. Restore — если есть checkpoint/savepoint, backend заполняется из него. HashMap-backend десериализует все данные в heap, RocksDB восстанавливает SST файлы.
  3. Runtime: userCode вызывает getRuntimeContext().getState(...), что транслируется в backend.getOrCreateKeyedState(...). Backend создаёт state primitive и кэширует.
  4. Per-record: при поступлении каждой записи Flink делает backend.setCurrentKey(extractKey(record)), затем userCode вызывает state.value(), state.update(...). Backend знает текущий ключ и направляет операции туда.
  5. Snapshot: при checkpoint coordinator triggers, backend.snapshot(checkpointId, ...) — собирает текущее состояние в KeyedStateHandle.
  6. Restore on failure: при failover backend перечитывает state из последнего успешного checkpoint’а.
  7. Dispose: при остановке задачи backend.dispose() — освобождает ресурсы (HashMap GC’ится, RocksDB закрывается).
Lifecycle KeyedStateBackend

От старта оператора до checkpoint completion. Snapshot — снимок текущего состояния, асинхронно отправляется в state storage (HDFS/S3).

1. Task startup -> StateBackend.createKeyedStateBackend()фабрика создаёт backend instance
2. Restore from checkpoint (если есть)HashMap: deserialize -> heap. RocksDB: download SST -> open DB.
3. userCode: getRuntimeContext().getState(descriptor)getOrCreateKeyedState() -> возвращает ValueState/ListState/etc proxy
4. Per record: setCurrentKey(extractKey(record))backend помнит current key до следующего setCurrentKey
state.value() / update() / get() / put()операции применяются к (current_key, namespace, state_name)
5. Checkpoint trigger: snapshot(checkpointId, ...)async: backend копирует состояние в KeyedStateHandle, кладёт в S3/HDFS
6. On failure: восстановление из последнего успешного KeyedStateHandlejob restart, backend.restore()
7. Task shutdown: backend.dispose()release resources

Snapshot strategies: sync vs async

Главная разница между HashMap и RocksDB backend’ами лежит в как они делают snapshot.

  • HashMap (sync copy): при checkpoint backend делает synchronous deep copy всей структуры. Пока копирование идёт, оператор стоит. Потом копию async отправляют в HDFS/S3, а оператор продолжает работу с оригиналом. Минус: stop-the-world на длительность копии (для 1 GiB state — секунды).
  • RocksDB (async incremental): RocksDB поддерживает MVCC snapshot’ы нативно. Backend получает «snapshot handle», и операции на оригинале продолжаются без блокировки. Snapshot потом сериализуется (как набор SST файлов) async. На incremental — только новые SST с прошлого checkpoint’а копируются. Минус: накладные расходы на per-access сериализацию.

Это главный trade-off между двумя backend’ами:

HashMapRocksDB
Access latency~100 ns~10 μs (100x медленнее)
Max state size~heap sizeTB+
Snapshot blockingsync deep copyasync snapshot
Memory modelJVM heap (GC pressure)native off-heap (managed)
TTL supportдада
Incremental checkpointнетда

В уроках 2 и 3 этого модуля мы посмотрим внутрь каждой реализации детально.

Где смотреть в коде

  • flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java — фабрика.
  • flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java — главный интерфейс.
  • flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointableKeyedStateBackend.java — добавляет snapshot методы.
  • flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java — HashMap реализация.
  • flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java — RocksDB реализация.
  • flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java — operator state, единственная реализация.

Чтение source

Лучший entry-point — посмотреть тест AbstractKeyedStateBackendTest в flink-runtime/src/test/java/org/apache/flink/runtime/state/. Он параметризован для всех backend’ов и проходит lifecycle: создание, заполнение, snapshot, restore, проверка. Один тест читать быстрее, чем весь код backend’а.

Чек-лист

  • В Flink два state: keyed (per-key, через keyBy) и operator (per-task, для source/sink).
  • Каждый оператор имеет два backend’а: keyedStateBackend и operatorStateBackend.
  • KeyedStateBackend<K> — главный интерфейс. Реализации: HeapKeyedStateBackend, EmbeddedRocksDBStateBackend, ForStStateBackend.
  • State primitive: ValueState, ListState, MapState, ReducingState, AggregatingState.
  • Доступ по тройке (key, namespace, stateName). Namespace = окно для оконных операторов, VoidNamespace для остальных.
  • Backend имеет stateful API: setCurrentKey() перед каждым обращением, внутри userCode current key неявный.
  • Lifecycle: create -> restore -> runtime (setCurrentKey + state.X) -> snapshot -> restore on failure -> dispose.
  • Snapshot: HashMap = sync copy (block задачи), RocksDB = async incremental.
Проверка знанийKnowledge check
Оператор делает keyBy(userId), window(5 min tumbling), и внутри окна вызывает ValueState<Integer> counter. Какие тройки (key, namespace, stateName) backend хранит для трёх user-id и двух последовательных окон, и сколько entries это даёт суммарно?
ОтветAnswer
Для каждой пары (userId, window) — одна запись counter. Окно представлено namespace = TimeWindow(start, end). Тройки: (1, Window(0,300), counter), (1, Window(300,600), counter), (2, Window(0,300), counter), (2, Window(300,600), counter), (3, Window(0,300), counter), (3, Window(300,600), counter). Итого 6 entries в state backend. Если оператор хранит ещё trigger-state или window-contents — это дополнительные state primitive, каждый со своим именем, и количество entries умножается на число state primitive. Например, при window-contents (ListState) + counter (ValueState) для тех же 3 users × 2 windows — 12 entries (6 per state primitive × 2 primitives).

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какая принципиальная разница между keyed state и operator state в Flink?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5