State в Flink — это не «одна большая HashMap», как многие представляют. Это система с двумя видами backend’ов, шестью видами primitive’ов, namespace’ами для каждого primitive, lifecycle от создания до snapshot/restore, и абстракцией над реальным storage layer (HashMap или RocksDB). Понимание этой архитектуры — фундамент для любого глубокого тюнинга state и checkpoint’ов.
Value, List и Map state в практикеВ этом уроке мы посмотрим на state backend сверху — какие интерфейсы есть, как они связаны, и кто кого вызывает в lifecycle от старта оператора до checkpoint completion. В оставшихся 4 уроках модуля разберём конкретные реализации (HashMap, RocksDB) и их тюнинг.
Два типа state: keyed vs operator
Flink различает два принципиально разных типа state по тому, как они партиционируются между параллельными subtask’ами.
Keyed state — привязан к ключу. Если оператор работает в keyBy(...)-режиме, то у него есть keyed state, и каждая subtask владеет state’ом для подмножества ключей (определяется KeyGroup’ой). Это основной state в большинстве задач: window state, aggregations, joins.
Operator state — привязан к subtask’у целиком, без концепта ключа. Используется в основном source/sink connectors: например, Kafka source хранит offset’ы партиций как operator state, не привязывая их к ключу.
С точки зрения API:
// Keyed state — внутри keyed operator
ValueState<MyType> state = getRuntimeContext().getState(
new ValueStateDescriptor<>("my-state", MyType.class));
// Operator state — внутри функции, реализующей CheckpointedFunction
ListState<MyType> state = context.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("my-state", MyType.class));
Эти два state’а живут в разных backend’ах внутри одного и того же оператора: keyedStateBackend и operatorStateBackend. На уровне snapshot они тоже сериализуются отдельно.
KeyedStateBackend: интерфейс и реализации
Интерфейс — KeyedStateBackend<K> (flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java). Главные методы:
public interface KeyedStateBackend<K> {
void setCurrentKey(K key);
K getCurrentKey();
int getCurrentKeyGroupIndex();
KeyGroupRange getKeyGroupRange();
<N, S extends State, V> S getOrCreateKeyedState(
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, V> stateDescriptor) throws Exception;
<N> Stream<K> getKeys(String state, N namespace);
}
Главное здесь — stateful API с current key. Перед каждым обращением к state ты должен установить текущий ключ через setCurrentKey(...), и тогда любой state.value() или state.update(x) будет применён именно для этого ключа.
Это не явная map[key->value]! С точки зрения userCode у тебя есть один объект ValueState<MyType>, который ты вызываешь, и он сам внутри знает, для какого ключа сейчас писать/читать.
Две основные реализации:
HeapKeyedStateBackend(aliasHashMapStateBackendв публичном API) — всё state в Java HashMap в JVM heap. Быстро, но размер ограничен heap’ом.EmbeddedRocksDBStateBackend— state в RocksDB native engine, на диске + memory кэш. Большие state, но overhead на (де)сериализацию каждого доступа.
Третья реализация (новая в 2.0) — ForStStateBackend для disaggregated state (см. модуль 10), но архитектурно она наследует ту же модель.
OperatorStateBackend: интерфейс и одна реализация
Интерфейс — OperatorStateBackend. Реально используется только одна реализация: DefaultOperatorStateBackend, которая всегда in-memory (нет RocksDB-варианта для operator state).
Почему? Потому что operator state по дизайну маленький. Kafka source хранит десятки offset’ов на partition — десятки байт суммарно. Sinks хранят transactional state — тоже мало. Нет смысла усложнять интерфейс ради 1 KiB state на task’е.
Главные state primitive operator state’а:
ListState<T>— список элементов на subtask.BroadcastState<K,V>— read-only state, реплицируется на все subtask’и.UnionListState<T>— особый ListState с union redistribution mode (см. ниже).
Снимки operator state делаются простой Java сериализацией всего списка.
State primitives: что предоставляет backend
И keyed, и operator backend поддерживают несколько типов state. Для keyed:
ValueState<T>— одно значение на ключ.ListState<T>— список значений на ключ.MapState<K,V>— вложенная map на ключ.ReducingState<T>— combinable аккумулятор.AggregatingState<IN,OUT>— то же, но с разными типами input/output.
Каждый primitive — это в реализации отдельная структура данных в backend’е. В HashMap — отдельная вложенная HashMap. В RocksDB — отдельный column family (см. урок 4 модуля 05).
Namespace: третья ось
Помимо ключа, у state есть namespace. Это «второе измерение» состояния, обычно используется для окон. Когда у тебя keyBy(userId).window(TumblingEventTimeWindow.of(Time.minutes(5))), для одного userId на одном operator instance накапливается N разных «вёдер» — по одному на каждое window. Это и есть namespace.
Внутри backend’а доступ к state идёт по трём ключам:
(key, namespace, stateName) -> value
Где stateName — это имя из StateDescriptor (например, "window-contents").
Для большинства non-window state’ов namespace = VoidNamespace (синглтон), но для оконных операторов в namespace кодируется само окно (start, end time).
Внутри backend каждое значение адресуется тройкой. Это позволяет одному оператору иметь несколько типов state и window'ов, не путая их.
В HashMap backend это вложенная Map<Namespace, Map<Key, V>>, в RocksDB — composite key (serialize(namespace) + serialize(key)).
Lifecycle: от создания до checkpoint
Когда задача стартует, цепочка такая:
- StateBackend.createKeyedStateBackend(parameters) — фабрика создаёт backend. Параметры включают KeyGroupRange (какую часть ключевого пространства этот subtask обслуживает), TypeSerializer для ключа, ttlConfig, MetricGroup.
- Restore — если есть checkpoint/savepoint, backend заполняется из него. HashMap-backend десериализует все данные в heap, RocksDB восстанавливает SST файлы.
- Runtime: userCode вызывает
getRuntimeContext().getState(...), что транслируется вbackend.getOrCreateKeyedState(...). Backend создаёт state primitive и кэширует. - Per-record: при поступлении каждой записи Flink делает
backend.setCurrentKey(extractKey(record)), затем userCode вызываетstate.value(),state.update(...). Backend знает текущий ключ и направляет операции туда. - Snapshot: при checkpoint coordinator triggers,
backend.snapshot(checkpointId, ...)— собирает текущее состояние вKeyedStateHandle. - Restore on failure: при failover backend перечитывает state из последнего успешного checkpoint’а.
- Dispose: при остановке задачи
backend.dispose()— освобождает ресурсы (HashMap GC’ится, RocksDB закрывается).
От старта оператора до checkpoint completion. Snapshot — снимок текущего состояния, асинхронно отправляется в state storage (HDFS/S3).
Snapshot strategies: sync vs async
Главная разница между HashMap и RocksDB backend’ами лежит в как они делают snapshot.
- HashMap (sync copy): при checkpoint backend делает synchronous deep copy всей структуры. Пока копирование идёт, оператор стоит. Потом копию async отправляют в HDFS/S3, а оператор продолжает работу с оригиналом. Минус: stop-the-world на длительность копии (для 1 GiB state — секунды).
- RocksDB (async incremental): RocksDB поддерживает MVCC snapshot’ы нативно. Backend получает «snapshot handle», и операции на оригинале продолжаются без блокировки. Snapshot потом сериализуется (как набор SST файлов) async. На incremental — только новые SST с прошлого checkpoint’а копируются. Минус: накладные расходы на per-access сериализацию.
Это главный trade-off между двумя backend’ами:
| HashMap | RocksDB | |
|---|---|---|
| Access latency | ~100 ns | ~10 μs (100x медленнее) |
| Max state size | ~heap size | TB+ |
| Snapshot blocking | sync deep copy | async snapshot |
| Memory model | JVM heap (GC pressure) | native off-heap (managed) |
| TTL support | да | да |
| Incremental checkpoint | нет | да |
В уроках 2 и 3 этого модуля мы посмотрим внутрь каждой реализации детально.
Где смотреть в коде
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java— фабрика.flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java— главный интерфейс.flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointableKeyedStateBackend.java— добавляет snapshot методы.flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java— HashMap реализация.flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java— RocksDB реализация.flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java— operator state, единственная реализация.
Чтение source
Лучший entry-point — посмотреть тест AbstractKeyedStateBackendTest в flink-runtime/src/test/java/org/apache/flink/runtime/state/. Он параметризован для всех backend’ов и проходит lifecycle: создание, заполнение, snapshot, restore, проверка. Один тест читать быстрее, чем весь код backend’а.
Чек-лист
- В Flink два state: keyed (per-key, через
keyBy) и operator (per-task, для source/sink). - Каждый оператор имеет два backend’а:
keyedStateBackendиoperatorStateBackend. KeyedStateBackend<K>— главный интерфейс. Реализации:HeapKeyedStateBackend,EmbeddedRocksDBStateBackend,ForStStateBackend.- State primitive: ValueState, ListState, MapState, ReducingState, AggregatingState.
- Доступ по тройке
(key, namespace, stateName). Namespace = окно для оконных операторов,VoidNamespaceдля остальных. - Backend имеет stateful API:
setCurrentKey()перед каждым обращением, внутри userCode current key неявный. - Lifecycle: create -> restore -> runtime (setCurrentKey + state.X) -> snapshot -> restore on failure -> dispose.
- Snapshot: HashMap = sync copy (block задачи), RocksDB = async incremental.