Learning Platform
Глоссарий Troubleshooting
Урок 09.03 · 15 мин
Средний
Operator StateCheckpointSnapshotRescaleRedistribution

Checkpointing operator state и rescale

Operator state без checkpoint’ов бесполезен — после рестарта вся информация теряется. В этом уроке углубимся в lifecycle snapshotState и initializeState, разберём как Flink выполняет redistribution состояния при изменении parallelism, и поймём граничные случаи, которые ломают наивную реализацию.

Урок продолжает тему operator state (уроки 1-2), но фокусируется на тонкостях checkpointing’а: какие вызовы происходят и в каком порядке, как написать функцию, корректно работающую при rescale, и почему UnionListState нужно использовать с большой осторожностью.


Lifecycle: open, initializeState, snapshotState, close

При запуске job каждый параллельный subtask вашей CheckpointedFunction проходит через цепочку вызовов:

1. initializeState(context)  - один раз при старте
   - либо первый запуск (context.isRestored() == false)
   - либо restore из snapshot (context.isRestored() == true)
2. open(parameters)           - один раз после initializeState
   - инициализация транзиентных ресурсов (соединения, threads)
3. invoke / processElement   - на каждое событие
4. snapshotState(context)    - на каждый checkpoint
5. close()                    - один раз при завершении job
Lifecycle CheckpointedFunction
  1. initializeState(context)
Subtask стартует. Flink вызывает initializeState ОДИН РАЗ. Внутри функция получает доступ к ListState и решает: первый запуск или restore.
один раз
  1. open(parameters)
Flink вызывает open. Здесь создаются JDBC pool, HTTP клиенты, локальные thread pools. Эти ресурсы НЕ снапшотятся.
один раз
  1. processElement
На каждое входящее событие. Здесь вы можете читать/изменять локальное состояние, накапливать в буфер, и т.д.
на каждое событие
  1. snapshotState(context)
При каждом checkpoint Flink вызывает snapshotState. Здесь функция должна перенести локальное состояние в ListState — синхронизировать.
на каждый checkpoint
  1. close()
При остановке job Flink вызывает close. Закрытие соединений и thread pools. После close subtask убит.

Важное правило: snapshotState — это синхронизация локального буфера с ListState, а initializeState — это инициализация локального буфера из ListState (или с нуля, если первый запуск). Локальные структуры (Java ArrayList, HashMap) не снапшотятся напрямую — только ListState переживает рестарт.


Anti-pattern: писать в ListState в processElement

Соблазн — записывать в ListState при каждом событии:

// АНТИ-ПАТТЕРН — не делайте так
@Override
public void invoke(Event value, Context ctx) {
    // КАЖДОЕ событие пишет в ListState
    listState.add(value);
}

Это работает, но катастрофически медленно. ListState — это не обычный ArrayList, под ним state backend (HashMap или RocksDB). При большом потоке событий каждая запись становится bottleneck’ом.

Правильно: держим локальный ArrayList/HashMap для горячего пути, а в snapshotState синхронизируем с ListState:

private List<Event> localBuffer = new ArrayList<>();
private transient ListState<Event> checkpointedState;

@Override
public void invoke(Event value, Context ctx) {
    // Быстрый локальный путь — O(1) на событие
    localBuffer.add(value);
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    // Раз в checkpoint — синхронизация с ListState (одна операция)
    checkpointedState.update(localBuffer);
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
    checkpointedState = context.getOperatorStateStore()
        .getListState(new ListStateDescriptor<>("buffer", Event.class));

    if (context.isRestored()) {
        for (Event e : checkpointedState.get()) {
            localBuffer.add(e);
        }
    }
}
WARNING

update полностью заменяет содержимое ListState на переданный список. add добавляет в конец. Для синхронизации буфера используйте update — иначе при каждом checkpoint список будет дублироваться.


Redistribution: even-split vs union

При rescale (изменении parallelism) Flink перераспределяет operator state. Алгоритм зависит от типа state:

Even-split (обычный ListState)

Flink собирает элементы со всех старых subtask’ов в один большой список и поровну раздаёт новым subtask’ам. Round-robin: subtask 0 получит первый, четвёртый, восьмой элемент; subtask 1 — второй, пятый, девятый; и т.д.

Старый state (parallelism = 2):
  Subtask 0: [a, b, c]
  Subtask 1: [d, e, f]

Все элементы: [a, b, c, d, e, f]

Новый state (parallelism = 3):
  Subtask 0: [a, d]   (индексы 0, 3)
  Subtask 1: [b, e]   (индексы 1, 4)
  Subtask 2: [c, f]   (индексы 2, 5)

Это работает для Kafka source: каждый элемент — (partition, offset), и Flink распределяет партиции по новым subtask’ам.

Union (UnionListState)

Каждый новый subtask получает полный union всех элементов со всех старых subtask’ов:

Старый state (parallelism = 2):
  Subtask 0: [a, b, c]
  Subtask 1: [d, e, f]

Все элементы: [a, b, c, d, e, f]

Новый state (parallelism = 3):
  Subtask 0: [a, b, c, d, e, f]
  Subtask 1: [a, b, c, d, e, f]
  Subtask 2: [a, b, c, d, e, f]

Каждый subtask внутри сам решает, какие элементы оставить. Это полезно для динамического распределения, но дорого по памяти.


Rescale: что нужно учесть в коде

Если subtask написан в предположении «у меня всегда мой буфер только мой» и после rescale данные перетекли — логика может сломаться. Типичные грабли:

Грабли 1: дубликаты sink’а после rescale

Sink буферизует записи в ListState. При rescale от parallelism=2 до parallelism=4 каждая запись физически уходит к одному из 4 новых subtask’ов (even-split). Если sink дополнительно использует UnionListState — каждый из 4 subtask’ов получит полный буфер и попытается дозаписать все элементы. Дубликаты в БД.

Решение: для sink-буферизации использовать обычный ListState, не UnionListState. И сделать downstream идемпотентным (upsert по PK), на всякий случай.

Checkpoint coordinator: как Flink координирует снапшот состояния

Грабли 2: state становится огромным после union rescale

UnionListState с 10 000 элементов и parallelism 100 после union даёт 1 миллион элементов суммарно. При следующем checkpoint размер snapshot — 1 миллион записей. Если parallelism вырастет до 200, после следующего рестора будет уже 2 миллиона.

Решение: в initializeState для UnionListState добавьте логику фильтрации — каждый subtask оставляет только «свою долю» (например, по index % parallelism == subtaskIndex). Тогда дубликаты уйдут.

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
    unionListState = context.getOperatorStateStore()
        .getUnionListState(descriptor);

    int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
    int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
    int idx = 0;

    if (context.isRestored()) {
        for (Element e : unionListState.get()) {
            if (idx % parallelism == subtaskIndex) {
                localBuffer.add(e);
            }
            idx++;
        }
    }
}

Что НЕ должно быть в snapshotState

snapshotState вызывается синхронно во время checkpoint barrier. Долгие операции здесь блокируют всю топологию и приводят к checkpoint timeout.

DANGER

Никогда не делайте в snapshotState:

  • Блокирующие I/O вызовы (HTTP, JDBC, GRPC).
  • Чтение из больших файлов или баз данных.
  • Тяжёлая сериализация (упаковка большого объекта в Avro/Protobuf на лету).

snapshotState — это просто listState.update(localBuffer). Всё, что тяжелее — выносите в фоновый поток с очередью.


Когда вообще не нужен operator state

Распространённая ошибка — использовать operator state там, где достаточно конфигурации или keyed state.

Что нужноПравильный инструмент
Запомнить «последний viewed item per user»Keyed state (ValueState за userId)
Глобальный счётчик для метрикFlink Metrics (Counter/Gauge), не state
Конфиг, известный при стартеПараметр конструктора функции
Динамические правилаBroadcast state
Source offset, sink bufferOperator state (это его прямое назначение)

Operator state дорогой для использования: требует ручной синхронизации с локальным буфером, имеет нетривиальный rescale, не имеет TTL. Если задача решается keyed state или broadcast state — берите их.


Попробуй сам

Расширь BufferingSink из урока 1:

  1. Добавь логирование в initializeState: при первом старте печатай [subtask N] first start, при restore — [subtask N] restored M elements.
  2. Запусти job с parallelism=2, отправь 20 событий, сделай savepoint, останови.
  3. Запусти job с parallelism=4 из того же savepoint. Посмотри логи — увидь, как 20 элементов распределились между 4 subtask’ами.
  4. Сделай ещё одно изменение: переключи ListState на UnionListState и повтори. Сравни логи (каждый из 4 должен получить полный список).

Ключевые выводы

  1. initializeState вызывается один раз — при старте или restore. isRestored() различает эти случаи.
  2. snapshotState вызывается на каждый checkpoint — должен быть быстрым, без I/O. Локальный буфер копируется в ListState.
  3. Не пишите в ListState на каждое событие — это медленно. Локальная коллекция плюс sync в snapshotState.
  4. Even-split (ListState): при rescale Flink собирает элементы и раздаёт поровну. Идеально для source/sink буферов.
  5. Union (UnionListState): каждый subtask получает полный union — дорого по памяти, нужно вручную фильтровать.
  6. Operator state — низкоуровневый инструмент. Для бизнес-логики чаще нужны keyed или broadcast state.
Проверка знанийKnowledge check
Ваш sink использует UnionListState для буферизации не записанных в БД событий. При parallelism=4 каждый subtask имеет в среднем 1000 элементов в буфере. Что произойдёт с количеством элементов в каждом subtask после восстановления из savepoint при rescale до parallelism=8, если вы не сделаете фильтрацию в initializeState? К чему это приведёт?
ОтветAnswer
После union rescale каждый из 8 новых subtask получит ПОЛНЫЙ объединённый буфер: 4 x 1000 = 4000 элементов. Это значит, что каждый subtask будет пытаться записать в БД 4000 элементов. Суммарно 8 x 4000 = 32 000 записей в БД, тогда как реально уникальных элементов было 4 x 1000 = 4000. Это приведёт к 8x дубликатов в БД. Дополнительно, на следующем checkpoint размер state составит 32 000 элементов (вместо 4000) — state раздуется в 8 раз, последующий rescale ещё больше его раздует. Решение: в initializeState после получения UnionListState отфильтровать элементы по условию index % parallelism == subtaskIndex, оставив каждому subtask только свою долю. И сделать downstream идемпотентным (upsert по PK), чтобы дубликаты не ломали данные при ошибках.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. В CheckpointedFunction вызывается listState.add(value) в processElement при каждом событии. Throughput job упал в 50 раз по сравнению с in-memory ArrayList. Почему?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3