Operator state: ListState и UnionListState
Keyed state живёт «за ключом» — ValueState, MapState, ListState доступны только из KeyedStream и автоматически партиционируются по ключу. Но в production регулярно нужна другая разновидность состояния: operator state — состояние, привязанное к конкретному параллельному инстансу оператора, без ключа. Source-коннекторы хранят offset’ы в operator state. Sink’и буферизуют записи перед commit’ом в operator state. Custom-функции, которым нужно «помнить что-то локально», тоже идут в operator state.
В этом уроке разберём API operator state (ListState, UnionListState), интерфейс CheckpointedFunction и научимся отличать сценарии, когда оператора достаточно, а когда нужно идти в keyed state.
Operator state vs keyed state: где граница
Keyed state работает только после keyBy(...) — Flink гарантирует, что все записи с одним ключом обрабатываются одним и тем же subtask’ом, и состояние «прибито гвоздями» к ключу. Operator state работает на уровне subtask’а: каждый параллельный инстанс оператора имеет свою копию состояния.
Keyed state: per-key
Keyed state доступен только после keyBy. Flink хеширует ключ и направляет запись в keyGroup. Все записи одного ключа попадают в один subtask. ValueState/MapState/ListState — все per-key.ValueState за userId=42
State backend хранит state per keyGroup. На subtask приходится подмножество keyGroups. При rescale keyGroups перераспределяются между subtask'ами — но без расщепления per-key state.Operator state: per subtask
Operator state живёт per subtask. Доступен из любой функции (не только keyed). Реализуется через интерфейс CheckpointedFunction.4 копии ListState
Каждый из 4 параллельных инстансов оператора имеет свою копию ListState. При checkpoint Flink собирает все ListState и сохраняет в snapshot. При restore — раздаёт обратно.Когда нужен operator state:
- Source-коннектор: каждый subtask читает свою партицию (Kafka, Kinesis) и хранит offset как operator state.
- Sink-коннектор: буферизация записей для batch commit в external system (например, JDBC sink с pre-aggregation).
- Custom функция в
flatMap/map, которой нужно состояние без ключа (например, локальный счётчик, кеш per subtask).
Когда нужен keyed state:
- Везде, где «состояние за сущностью»: счётчики per user, агрегации per device, текущий статус per order.
Если стоит вопрос «использовать keyed или operator», в 95% случаев ответ — keyed. Operator state — низкоуровневый инструмент для коннекторов и редких custom-сценариев.
Offset management в Kafka: то, что хранится в operator stateAPI: ListState и UnionListState
Operator state в Flink — это всегда список элементов. Никакого ValueState или MapState — только ListState<T>. Список — это естественный контейнер для redistribution при rescale: Flink может «разрезать» список и раздать элементы новым subtask’ам.
Есть две разновидности operator state:
ListState<T> (also called even-split redistribution или просто split):
- При rescale Flink собирает все элементы со всех subtask’ов и поровну распределяет между новым числом subtask’ов.
- Используется в Kafka source — каждый element это offset для одной партиции. При rescale партиции перераспределяются между новыми subtask’ами.
UnionListState<T> (also called union redistribution):
- При rescale каждый subtask получает полный список со всех старых subtask’ов.
- Каждый новый subtask видит весь union — сам решает, какие элементы оставить.
- Использовать осторожно: при больших списках это O(N * parallelism) памяти.
// Java: получение ListState и UnionListState
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Long> descriptor =
new ListStateDescriptor<>("offsets", Types.LONG);
// Even-split (стандартный случай)
this.listState = context.getOperatorStateStore()
.getListState(descriptor);
// Union redistribution (для специальных случаев)
this.unionListState = context.getOperatorStateStore()
.getUnionListState(descriptor);
}
# Python (PyFlink 2.x): operator state
from pyflink.common.typeinfo import Types
from pyflink.datastream.state import ListStateDescriptor
from pyflink.datastream.functions import CheckpointedFunction
class StatefulCounter(CheckpointedFunction):
def initialize_state(self, context):
descriptor = ListStateDescriptor("counts", Types.LONG())
self.list_state = context.get_operator_state_store() \
.get_list_state(descriptor)
CheckpointedFunction: интерфейс для operator state
Чтобы использовать operator state, custom function должен реализовать CheckpointedFunction:
public interface CheckpointedFunction {
// Вызывается ПРИ КАЖДОМ checkpoint
// Здесь функция должна записать своё состояние в ListState
void snapshotState(FunctionSnapshotContext context) throws Exception;
// Вызывается ОДИН РАЗ при старте и при restore из snapshot
// Здесь функция получает доступ к ListState и читает из него
void initializeState(FunctionInitializationContext context) throws Exception;
}
Типичная реализация — BufferingSink, который буферизует записи до checkpoint и эпизодически дампит их во внешнюю систему:
public class BufferingSink
implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value, Context context) {
bufferedElements.add(value);
if (bufferedElements.size() >= threshold) {
for (Tuple2<String, Integer> element : bufferedElements) {
// отправка в external system (например, JDBC batch insert)
sendToDatabase(element);
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// Перед каждым checkpoint — переносим буфер в ListState
// ListState сохраняется в snapshot и переживает рестарт
checkpointedState.update(bufferedElements);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})
);
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
// isRestored() == true ТОЛЬКО при restore из snapshot
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
}
snapshotState вызывается под чекпоинт-барьером (см. модуль 11). Не делайте здесь блокирующих I/O операций — это задержит весь checkpoint. Все операции в snapshotState должны быть быстрыми: копирование локального буфера в ListState и не более.
isRestored: первый старт vs restore
context.isRestored() — критичный флаг в initializeState:
isRestored() == false— функция стартует впервые (job только создан). ListState пуст.isRestored() == true— функция восстанавливается из savepoint или checkpoint. ListState содержит данные предыдущего запуска.
Без проверки isRestored() логика будет некорректной:
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
// Без isRestored() — будет читать пустой ListState при первом старте
// и инициализировать неправильно.
if (context.isRestored()) {
// Только при restore — переносим из ListState в локальный буфер
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
// При первом старте: bufferedElements остаётся пустым ArrayList
}
Redistribution при rescale: список разрезается
Главная фишка ListState — Flink умеет «разрезать» состояние при rescale. Если у вас было 3 параллельных инстанса с ListState [a, b, c, d, e, f] (по 2 элемента в каждом), и вы увеличиваете parallelism до 4, Flink:
- Соберёт все элементы со всех инстансов:
[a, b, c, d, e, f]. - Раздаст 6 элементов 4 новым инстансам: subtask 0 ->
[a, b], subtask 1 ->[c, d], subtask 2 ->[e], subtask 3 ->[f].
Subtask 0 ListState a b
Subtask 0 до rescale: ListState содержит элементы a, b.Subtask 1 ListState c d
Subtask 1 до rescale: ListState содержит элементы c, d.Subtask 2 ListState e f
Subtask 2 до rescale: ListState содержит элементы e, f.Union all elements a b c d e f
Flink собрал все элементы со всех subtask: a, b, c, d, e, f. Это происходит во время restore из savepoint при изменении parallelism.Subtask 0 a b
Subtask 0 после rescale: получил элементы a, b. Flink раздаёт 6 элементов на 4 subtask поровну (round-robin или even-split).Subtask 1 c d
Subtask 1 после rescale: получил элементы c, d.Subtask 2 e
Subtask 2 после rescale: получил элемент e.Subtask 3 f
Subtask 3 после rescale: получил элемент f. Новый subtask, появившийся после rescale.Для Kafka source это работает идеально: элемент ListState — это (partition, offset), и при rescale партиции перераспределяются между subtask’ами с сохранением offset’а.
UnionListState: каждому subtask полный список
UnionListState ведёт себя иначе: при rescale каждый subtask получает полный объединённый список. Это нужно когда сам subtask должен видеть весь global state и принимать решение, что оставить.
Классический пример — Kafka source в исторические эпохи, когда все subtask’и должны были видеть полный список assigned partitions, чтобы решить, не нужно ли подхватить новые. Современный KafkaSource использует обычный ListState, а UnionListState остался для редких сценариев.
UnionListState тяжёл по памяти. Если в state 10 000 элементов и parallelism 100, при restore каждый из 100 subtask’ов получит все 10 000 элементов — 1 миллион элементов в памяти. Используйте только когда без этого не обойтись.
Попробуй сам
Реализуй EvenIdLogger — SinkFunction, который:
- Принимает события
Event(id: Long, value: String). - Логирует только события с чётным
id. - Хранит в
ListState<Long>последние 10 виденныхid— для отладки выводит их при каждом 100-м событии. - При restore из savepoint буфер должен сохраняться.
Подсказка: переопредели CheckpointedFunction.snapshotState и initializeState. Локальный ArrayDeque<Long> для скорости, синхронизация с ListState только в snapshotState.
Затем измерь, что произойдёт при parallelism: 2 -> 4 через savepoint: посмотри, как 20 элементов из двух subtask распределятся между четырьмя.
Ключевые выводы
- Operator state — это всегда
ListState(других типов нет). Привязан к параллельному subtask’у, не к ключу. CheckpointedFunction— интерфейс сsnapshotState(вызывается при checkpoint) иinitializeState(вызывается при старте и restore).isRestored()— флаг, показывающий, инициализируется ли state с нуля или из snapshot.ListStatevsUnionListState: первый разрезает список при rescale (Kafka source), второй раздаёт каждому subtask полный union (редкий случай).- Где используется: source/sink-коннекторы, sink-буферизация, редкие custom функции. Везде «состояние за сущностью» — keyed state, не operator.