Learning Platform
Глоссарий Troubleshooting
Урок 09.01 · 18 мин
Средний
Operator StateListStateUnionListStateCheckpointedFunction

Operator state: ListState и UnionListState

Keyed state живёт «за ключом» — ValueState, MapState, ListState доступны только из KeyedStream и автоматически партиционируются по ключу. Но в production регулярно нужна другая разновидность состояния: operator state — состояние, привязанное к конкретному параллельному инстансу оператора, без ключа. Source-коннекторы хранят offset’ы в operator state. Sink’и буферизуют записи перед commit’ом в operator state. Custom-функции, которым нужно «помнить что-то локально», тоже идут в operator state.

В этом уроке разберём API operator state (ListState, UnionListState), интерфейс CheckpointedFunction и научимся отличать сценарии, когда оператора достаточно, а когда нужно идти в keyed state.


Operator state vs keyed state: где граница

Keyed state работает только после keyBy(...) — Flink гарантирует, что все записи с одним ключом обрабатываются одним и тем же subtask’ом, и состояние «прибито гвоздями» к ключу. Operator state работает на уровне subtask’а: каждый параллельный инстанс оператора имеет свою копию состояния.

Keyed state vs operator state

Keyed state: per-key

Keyed state доступен только после keyBy. Flink хеширует ключ и направляет запись в keyGroup. Все записи одного ключа попадают в один subtask. ValueState/MapState/ListState — все per-key.
keyBy(userId)

ValueState за userId=42

State backend хранит state per keyGroup. На subtask приходится подмножество keyGroups. При rescale keyGroups перераспределяются между subtask'ами — но без расщепления per-key state.

Operator state: per subtask

Operator state живёт per subtask. Доступен из любой функции (не только keyed). Реализуется через интерфейс CheckpointedFunction.
parallelism = 4

4 копии ListState

Каждый из 4 параллельных инстансов оператора имеет свою копию ListState. При checkpoint Flink собирает все ListState и сохраняет в snapshot. При restore — раздаёт обратно.

Когда нужен operator state:

  • Source-коннектор: каждый subtask читает свою партицию (Kafka, Kinesis) и хранит offset как operator state.
  • Sink-коннектор: буферизация записей для batch commit в external system (например, JDBC sink с pre-aggregation).
  • Custom функция в flatMap/map, которой нужно состояние без ключа (например, локальный счётчик, кеш per subtask).

Когда нужен keyed state:

  • Везде, где «состояние за сущностью»: счётчики per user, агрегации per device, текущий статус per order.

Если стоит вопрос «использовать keyed или operator», в 95% случаев ответ — keyed. Operator state — низкоуровневый инструмент для коннекторов и редких custom-сценариев.

Offset management в Kafka: то, что хранится в operator state

API: ListState и UnionListState

Operator state в Flink — это всегда список элементов. Никакого ValueState или MapState — только ListState<T>. Список — это естественный контейнер для redistribution при rescale: Flink может «разрезать» список и раздать элементы новым subtask’ам.

Есть две разновидности operator state:

ListState<T> (also called even-split redistribution или просто split):

  • При rescale Flink собирает все элементы со всех subtask’ов и поровну распределяет между новым числом subtask’ов.
  • Используется в Kafka source — каждый element это offset для одной партиции. При rescale партиции перераспределяются между новыми subtask’ами.

UnionListState<T> (also called union redistribution):

  • При rescale каждый subtask получает полный список со всех старых subtask’ов.
  • Каждый новый subtask видит весь union — сам решает, какие элементы оставить.
  • Использовать осторожно: при больших списках это O(N * parallelism) памяти.
// Java: получение ListState и UnionListState
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
    ListStateDescriptor<Long> descriptor =
        new ListStateDescriptor<>("offsets", Types.LONG);

    // Even-split (стандартный случай)
    this.listState = context.getOperatorStateStore()
        .getListState(descriptor);

    // Union redistribution (для специальных случаев)
    this.unionListState = context.getOperatorStateStore()
        .getUnionListState(descriptor);
}
# Python (PyFlink 2.x): operator state
from pyflink.common.typeinfo import Types
from pyflink.datastream.state import ListStateDescriptor
from pyflink.datastream.functions import CheckpointedFunction

class StatefulCounter(CheckpointedFunction):
    def initialize_state(self, context):
        descriptor = ListStateDescriptor("counts", Types.LONG())
        self.list_state = context.get_operator_state_store() \
            .get_list_state(descriptor)

CheckpointedFunction: интерфейс для operator state

Чтобы использовать operator state, custom function должен реализовать CheckpointedFunction:

public interface CheckpointedFunction {
    // Вызывается ПРИ КАЖДОМ checkpoint
    // Здесь функция должна записать своё состояние в ListState
    void snapshotState(FunctionSnapshotContext context) throws Exception;

    // Вызывается ОДИН РАЗ при старте и при restore из snapshot
    // Здесь функция получает доступ к ListState и читает из него
    void initializeState(FunctionInitializationContext context) throws Exception;
}

Типичная реализация — BufferingSink, который буферизует записи до checkpoint и эпизодически дампит их во внешнюю систему:

public class BufferingSink
        implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction {

    private final int threshold;
    private transient ListState<Tuple2<String, Integer>> checkpointedState;
    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value, Context context) {
        bufferedElements.add(value);
        if (bufferedElements.size() >= threshold) {
            for (Tuple2<String, Integer> element : bufferedElements) {
                // отправка в external system (например, JDBC batch insert)
                sendToDatabase(element);
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // Перед каждым checkpoint — переносим буфер в ListState
        // ListState сохраняется в snapshot и переживает рестарт
        checkpointedState.update(bufferedElements);
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})
            );

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        // isRestored() == true ТОЛЬКО при restore из snapshot
        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}
WARNING

snapshotState вызывается под чекпоинт-барьером (см. модуль 11). Не делайте здесь блокирующих I/O операций — это задержит весь checkpoint. Все операции в snapshotState должны быть быстрыми: копирование локального буфера в ListState и не более.


isRestored: первый старт vs restore

context.isRestored() — критичный флаг в initializeState:

  • isRestored() == false — функция стартует впервые (job только создан). ListState пуст.
  • isRestored() == true — функция восстанавливается из savepoint или checkpoint. ListState содержит данные предыдущего запуска.

Без проверки isRestored() логика будет некорректной:

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
    checkpointedState = context.getOperatorStateStore().getListState(descriptor);

    // Без isRestored() — будет читать пустой ListState при первом старте
    // и инициализировать неправильно.
    if (context.isRestored()) {
        // Только при restore — переносим из ListState в локальный буфер
        for (Tuple2<String, Integer> element : checkpointedState.get()) {
            bufferedElements.add(element);
        }
    }
    // При первом старте: bufferedElements остаётся пустым ArrayList
}

Redistribution при rescale: список разрезается

Главная фишка ListState — Flink умеет «разрезать» состояние при rescale. Если у вас было 3 параллельных инстанса с ListState [a, b, c, d, e, f] (по 2 элемента в каждом), и вы увеличиваете parallelism до 4, Flink:

  1. Соберёт все элементы со всех инстансов: [a, b, c, d, e, f].
  2. Раздаст 6 элементов 4 новым инстансам: subtask 0 -> [a, b], subtask 1 -> [c, d], subtask 2 -> [e], subtask 3 -> [f].
ListState redistribution при rescale 3 to 4

Subtask 0 ListState a b

Subtask 0 до rescale: ListState содержит элементы a, b.

Subtask 1 ListState c d

Subtask 1 до rescale: ListState содержит элементы c, d.

Subtask 2 ListState e f

Subtask 2 до rescale: ListState содержит элементы e, f.
rescale 3 to 4

Union all elements a b c d e f

Flink собрал все элементы со всех subtask: a, b, c, d, e, f. Это происходит во время restore из savepoint при изменении parallelism.
even split

Subtask 0 a b

Subtask 0 после rescale: получил элементы a, b. Flink раздаёт 6 элементов на 4 subtask поровну (round-robin или even-split).

Subtask 1 c d

Subtask 1 после rescale: получил элементы c, d.

Subtask 2 e

Subtask 2 после rescale: получил элемент e.

Subtask 3 f

Subtask 3 после rescale: получил элемент f. Новый subtask, появившийся после rescale.

Для Kafka source это работает идеально: элемент ListState — это (partition, offset), и при rescale партиции перераспределяются между subtask’ами с сохранением offset’а.


UnionListState: каждому subtask полный список

UnionListState ведёт себя иначе: при rescale каждый subtask получает полный объединённый список. Это нужно когда сам subtask должен видеть весь global state и принимать решение, что оставить.

Классический пример — Kafka source в исторические эпохи, когда все subtask’и должны были видеть полный список assigned partitions, чтобы решить, не нужно ли подхватить новые. Современный KafkaSource использует обычный ListState, а UnionListState остался для редких сценариев.

WARNING

UnionListState тяжёл по памяти. Если в state 10 000 элементов и parallelism 100, при restore каждый из 100 subtask’ов получит все 10 000 элементов — 1 миллион элементов в памяти. Используйте только когда без этого не обойтись.


Попробуй сам

Реализуй EvenIdLoggerSinkFunction, который:

  1. Принимает события Event(id: Long, value: String).
  2. Логирует только события с чётным id.
  3. Хранит в ListState<Long> последние 10 виденных id — для отладки выводит их при каждом 100-м событии.
  4. При restore из savepoint буфер должен сохраняться.

Подсказка: переопредели CheckpointedFunction.snapshotState и initializeState. Локальный ArrayDeque<Long> для скорости, синхронизация с ListState только в snapshotState.

Затем измерь, что произойдёт при parallelism: 2 -> 4 через savepoint: посмотри, как 20 элементов из двух subtask распределятся между четырьмя.


Ключевые выводы

  1. Operator state — это всегда ListState (других типов нет). Привязан к параллельному subtask’у, не к ключу.
  2. CheckpointedFunction — интерфейс с snapshotState (вызывается при checkpoint) и initializeState (вызывается при старте и restore).
  3. isRestored() — флаг, показывающий, инициализируется ли state с нуля или из snapshot.
  4. ListState vs UnionListState: первый разрезает список при rescale (Kafka source), второй раздаёт каждому subtask полный union (редкий случай).
  5. Где используется: source/sink-коннекторы, sink-буферизация, редкие custom функции. Везде «состояние за сущностью» — keyed state, не operator.
Проверка знанийKnowledge check
У вас sink, который буферизует записи в локальный List и каждые 100 записей делает batch insert в PostgreSQL. После crash вы хотите, чтобы буферизованные, но ещё не записанные в PostgreSQL записи сохранились. Что именно нужно сделать с этим списком в snapshotState и initializeState, чтобы это работало при rescale 4 to 8?
ОтветAnswer
В snapshotState: вызвать checkpointedState.update(bufferedList) — перенести содержимое локального ArrayList в ListState. Это снимок локального буфера, который Flink сохранит в snapshot. В initializeState: если context.isRestored() == true, прочитать checkpointedState.get() и сложить элементы в локальный ArrayList. При rescale 4 -> 8 Flink соберёт все элементы со всех 4 subtask и поровну раздаст 8 новым subtask — каждый получит ~1/8 общего буфера. Важно: чтобы это сработало, нужно использовать обычный ListState (не UnionListState), иначе каждый из 8 subtask получит ПОЛНЫЙ объединённый буфер и попытается записать его в PostgreSQL — будут дубликаты.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какой тип состояния доступен ТОЛЬКО как operator state в Flink (нет аналога в keyed state)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3