Checkpointing operator state и rescale
Operator state без checkpoint’ов бесполезен — после рестарта вся информация теряется. В этом уроке углубимся в lifecycle snapshotState и initializeState, разберём как Flink выполняет redistribution состояния при изменении parallelism, и поймём граничные случаи, которые ломают наивную реализацию.
Урок продолжает тему operator state (уроки 1-2), но фокусируется на тонкостях checkpointing’а: какие вызовы происходят и в каком порядке, как написать функцию, корректно работающую при rescale, и почему UnionListState нужно использовать с большой осторожностью.
Lifecycle: open, initializeState, snapshotState, close
При запуске job каждый параллельный subtask вашей CheckpointedFunction проходит через цепочку вызовов:
1. initializeState(context) - один раз при старте
- либо первый запуск (context.isRestored() == false)
- либо restore из snapshot (context.isRestored() == true)
2. open(parameters) - один раз после initializeState
- инициализация транзиентных ресурсов (соединения, threads)
3. invoke / processElement - на каждое событие
4. snapshotState(context) - на каждый checkpoint
5. close() - один раз при завершении job
- initializeState(context)
- open(parameters)
- processElement
- snapshotState(context)
- close()
Важное правило: snapshotState — это синхронизация локального буфера с ListState, а initializeState — это инициализация локального буфера из ListState (или с нуля, если первый запуск). Локальные структуры (Java ArrayList, HashMap) не снапшотятся напрямую — только ListState переживает рестарт.
Anti-pattern: писать в ListState в processElement
Соблазн — записывать в ListState при каждом событии:
// АНТИ-ПАТТЕРН — не делайте так
@Override
public void invoke(Event value, Context ctx) {
// КАЖДОЕ событие пишет в ListState
listState.add(value);
}
Это работает, но катастрофически медленно. ListState — это не обычный ArrayList, под ним state backend (HashMap или RocksDB). При большом потоке событий каждая запись становится bottleneck’ом.
Правильно: держим локальный ArrayList/HashMap для горячего пути, а в snapshotState синхронизируем с ListState:
private List<Event> localBuffer = new ArrayList<>();
private transient ListState<Event> checkpointedState;
@Override
public void invoke(Event value, Context ctx) {
// Быстрый локальный путь — O(1) на событие
localBuffer.add(value);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// Раз в checkpoint — синхронизация с ListState (одна операция)
checkpointedState.update(localBuffer);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
checkpointedState = context.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("buffer", Event.class));
if (context.isRestored()) {
for (Event e : checkpointedState.get()) {
localBuffer.add(e);
}
}
}
update полностью заменяет содержимое ListState на переданный список. add добавляет в конец. Для синхронизации буфера используйте update — иначе при каждом checkpoint список будет дублироваться.
Redistribution: even-split vs union
При rescale (изменении parallelism) Flink перераспределяет operator state. Алгоритм зависит от типа state:
Even-split (обычный ListState)
Flink собирает элементы со всех старых subtask’ов в один большой список и поровну раздаёт новым subtask’ам. Round-robin: subtask 0 получит первый, четвёртый, восьмой элемент; subtask 1 — второй, пятый, девятый; и т.д.
Старый state (parallelism = 2):
Subtask 0: [a, b, c]
Subtask 1: [d, e, f]
Все элементы: [a, b, c, d, e, f]
Новый state (parallelism = 3):
Subtask 0: [a, d] (индексы 0, 3)
Subtask 1: [b, e] (индексы 1, 4)
Subtask 2: [c, f] (индексы 2, 5)
Это работает для Kafka source: каждый элемент — (partition, offset), и Flink распределяет партиции по новым subtask’ам.
Union (UnionListState)
Каждый новый subtask получает полный union всех элементов со всех старых subtask’ов:
Старый state (parallelism = 2):
Subtask 0: [a, b, c]
Subtask 1: [d, e, f]
Все элементы: [a, b, c, d, e, f]
Новый state (parallelism = 3):
Subtask 0: [a, b, c, d, e, f]
Subtask 1: [a, b, c, d, e, f]
Subtask 2: [a, b, c, d, e, f]
Каждый subtask внутри сам решает, какие элементы оставить. Это полезно для динамического распределения, но дорого по памяти.
Rescale: что нужно учесть в коде
Если subtask написан в предположении «у меня всегда мой буфер только мой» и после rescale данные перетекли — логика может сломаться. Типичные грабли:
Грабли 1: дубликаты sink’а после rescale
Sink буферизует записи в ListState. При rescale от parallelism=2 до parallelism=4 каждая запись физически уходит к одному из 4 новых subtask’ов (even-split). Если sink дополнительно использует UnionListState — каждый из 4 subtask’ов получит полный буфер и попытается дозаписать все элементы. Дубликаты в БД.
Решение: для sink-буферизации использовать обычный ListState, не UnionListState. И сделать downstream идемпотентным (upsert по PK), на всякий случай.
Checkpoint coordinator: как Flink координирует снапшот состоянияГрабли 2: state становится огромным после union rescale
UnionListState с 10 000 элементов и parallelism 100 после union даёт 1 миллион элементов суммарно. При следующем checkpoint размер snapshot — 1 миллион записей. Если parallelism вырастет до 200, после следующего рестора будет уже 2 миллиона.
Решение: в initializeState для UnionListState добавьте логику фильтрации — каждый subtask оставляет только «свою долю» (например, по index % parallelism == subtaskIndex). Тогда дубликаты уйдут.
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
unionListState = context.getOperatorStateStore()
.getUnionListState(descriptor);
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
int idx = 0;
if (context.isRestored()) {
for (Element e : unionListState.get()) {
if (idx % parallelism == subtaskIndex) {
localBuffer.add(e);
}
idx++;
}
}
}
Что НЕ должно быть в snapshotState
snapshotState вызывается синхронно во время checkpoint barrier. Долгие операции здесь блокируют всю топологию и приводят к checkpoint timeout.
Никогда не делайте в snapshotState:
- Блокирующие I/O вызовы (HTTP, JDBC, GRPC).
- Чтение из больших файлов или баз данных.
- Тяжёлая сериализация (упаковка большого объекта в Avro/Protobuf на лету).
snapshotState — это просто listState.update(localBuffer). Всё, что тяжелее — выносите в фоновый поток с очередью.
Когда вообще не нужен operator state
Распространённая ошибка — использовать operator state там, где достаточно конфигурации или keyed state.
| Что нужно | Правильный инструмент |
|---|---|
| Запомнить «последний viewed item per user» | Keyed state (ValueState за userId) |
| Глобальный счётчик для метрик | Flink Metrics (Counter/Gauge), не state |
| Конфиг, известный при старте | Параметр конструктора функции |
| Динамические правила | Broadcast state |
| Source offset, sink buffer | Operator state (это его прямое назначение) |
Operator state дорогой для использования: требует ручной синхронизации с локальным буфером, имеет нетривиальный rescale, не имеет TTL. Если задача решается keyed state или broadcast state — берите их.
Попробуй сам
Расширь BufferingSink из урока 1:
- Добавь логирование в
initializeState: при первом старте печатай[subtask N] first start, при restore —[subtask N] restored M elements. - Запусти job с parallelism=2, отправь 20 событий, сделай savepoint, останови.
- Запусти job с parallelism=4 из того же savepoint. Посмотри логи — увидь, как 20 элементов распределились между 4 subtask’ами.
- Сделай ещё одно изменение: переключи
ListStateнаUnionListStateи повтори. Сравни логи (каждый из 4 должен получить полный список).
Ключевые выводы
initializeStateвызывается один раз — при старте или restore.isRestored()различает эти случаи.snapshotStateвызывается на каждый checkpoint — должен быть быстрым, без I/O. Локальный буфер копируется в ListState.- Не пишите в ListState на каждое событие — это медленно. Локальная коллекция плюс sync в
snapshotState. - Even-split (
ListState): при rescale Flink собирает элементы и раздаёт поровну. Идеально для source/sink буферов. - Union (
UnionListState): каждый subtask получает полный union — дорого по памяти, нужно вручную фильтровать. - Operator state — низкоуровневый инструмент. Для бизнес-логики чаще нужны keyed или broadcast state.