Restore и rescale: изменение parallelism
В предыдущем уроке мы научились создавать savepoint’ы. Теперь разберём обратную сторону — restore: как восстановить job из snapshot, как изменить parallelism при restore, и какие гарантии Flink даёт при rescale разных типов state (keyed, operator, broadcast).
Главный gotcha rescale — параметр maxParallelism, который задаётся при первом запуске job и не может быть изменён при rescale. Если вы не подумали о нём заранее, через год вам придётся делать миграцию через canonical savepoint, что больно.
Базовый restore
flink run -s s3://savepoints/savepoint-abc123 my-job.jar
Flink:
- Скачивает savepoint metadata.
- Для каждого оператора в новом job ищет соответствующий state в savepoint по UID.
- Для keyed state — распределяет keyGroups между параллельными subtask’ами.
- Для operator state — распределяет элементы списка (even-split или union).
- Для broadcast state — каждый subtask получает полную копию.
- Запускает job, и source начинает читать с offset’а из savepoint.
Restore с другим parallelism
flink run -s s3://savepoints/sp-xyz -p 16 my-job.jar
Флаг -p 16 устанавливает parallelism = 16 для нового запуска. Если savepoint был сделан с parallelism = 8 — Flink перераспределит state.
Как это работает для разных типов state:
Key groups internals: как rescale перераспределяет stateKeyed state: keyGroups
Keyed state хранится не «per key», а «per keyGroup». KeyGroup — это логический shard, в который Flink распределяет ключи через хеш-функцию. Каждый subtask отвечает за подмножество keyGroups.
При rescale Flink перераспределяет keyGroups между новыми subtask’ами без перемещения данных внутри keyGroup’ы.
Operator state (ListState)
Even-split: Flink собирает все элементы со всех старых subtask’ов и поровну раздаёт новым subtask’ам. См. модуль 08, урок 3.
Operator state (UnionListState)
Каждый новый subtask получает полный union всех элементов. См. модуль 08, урок 3.
Broadcast state
Каждый новый subtask получает полную копию state. (Как broadcast делает для main потока.)
maxParallelism: критичный параметр
maxParallelism — максимальное число параллельных subtask’ов, до которого job может масштабироваться. Это жёсткий потолок rescale.
Default: Flink вычисляет автоматически на основе initial parallelism:
parallelism < 32->maxParallelism = 128parallelism = 32-128->maxParallelism = parallelism * 1.5(округлено)parallelism >= 128->maxParallelism = parallelism * 1.5
Минимум 128, максимум 32768.
env.setParallelism(8);
env.setMaxParallelism(256); // Явно задать
env.set_parallelism(8)
env.set_max_parallelism(256)
Почему maxParallelism критичен
maxParallelism определяет количество keyGroups для keyed state. KeyGroup — атомарная единица rescale: Flink может распределить keyGroups между subtask’ами, но не может разбить одну keyGroup на две.
Правило: maxParallelism определяет максимум, до которого можно увеличить parallelism при rescale. Если maxParallelism = 128, нельзя сделать parallelism = 256.
Start parallelism 4 max 128
Job стартует с parallelism=4, Flink установил maxParallelism=128 (default).128 keyGroups across 4 subtasks
Flink создал 128 keyGroups. Распределил между 4 subtask: subtask 0 владеет keyGroup 0-31, subtask 1 — 32-63, и т.д.Rescale to 64
Rescale до parallelism=64. maxParallelism=128 позволяет.128 keyGroups across 64 subtasks
Каждый subtask получает 2 keyGroups: subtask 0 = keyGroup 0,1; subtask 1 = keyGroup 2,3; и т.д. Без переноса данных внутри keyGroup.Rescale to 256 FAILS
Rescale до parallelism=256. maxParallelism=128 НЕ позволяет — нельзя разбить keyGroup.Error
Job не запустится — parallelism > maxParallelism. Нужна миграция через canonical savepoint.Что делать, если упёрлись в maxParallelism
Если ваш job стал получать в 10 раз больше нагрузки и default maxParallelism = 128 мал — нужна миграция:
- Создать canonical savepoint с текущим job.
- Изменить
maxParallelismв коде (например, на 1024). - Запустить новую версию без
-sфлага — то есть с пустым state. - Backfill state из savepoint через State Processor API (
StateBackendAPI для чтения savepoint’а и записи в новый формат с другим maxParallelism).
State Processor API — отдельная тема (краткое введение в модуле 16). Это нетривиальная процедура с downtime.
Задавайте maxParallelism явно и с большим запасом при первом запуске production job. Например, maxParallelism = 1024 для job, который сейчас работает с parallelism = 8. Это даёт огромное окно для будущего rescale без миграции. Cost: чуть больше memory overhead для tracking keyGroups, но это копейки.
Restore checkpoint vs savepoint
Можно восстанавливать из обоих:
# Из savepoint
flink run -s s3://savepoints/savepoint-xyz my-job.jar
# Из checkpoint
flink run -s s3://checkpoints/.../chk-42 my-job.jar
В Flink 1.15+ оба используют один и тот же restore механизм. Но:
- Checkpoint обычно native format (backend-specific). Не работает при смене backend.
- Savepoint обычно canonical format. Работает при смене backend и upgrade Flink.
Для emergency restore (crash) — checkpoint (свежее). Для planned upgrade — savepoint.
Allow non-restored state
flink run -s s3://savepoints/sp \
--allowNonRestoredState \
my-job.jar
Когда вы удаляете оператор из job или меняете его UID, в savepoint остаётся orphan state. Без --allowNonRestoredState restore падает.
С флагом orphan state просто игнорируется (выкидывается). Job стартует, но state удалённого оператора потерян.
Когда использовать:
- Намеренно удалили оператор (например, убрали enrichment стадию).
- Refactor код, изменили UID (плохая практика, но бывает).
- Миграция: некоторые операторы заменены полностью.
Когда не использовать:
- Случайно изменили UID — заметите потерю state через час. Лучше pad UID и не использовать флаг.
Restore без savepoint: фактический рестарт
Если запустить job без -s — он стартует с пустым state. Для stateful job это значит:
- Counter per user = 0.
- Window contents = пустые.
- Kafka source начинает с
auto.offset.reset(latest или earliest по конфигу).
В production это почти всегда не то, что вы хотите. Двойная проверка: всегда указывайте -s при restore.
Изменение topology: что переживает rescale, что нет
| Изменение в коде | Restore проходит? |
|---|---|
| Добавить новый оператор без state | Да |
| Удалить оператор без state | Да |
| Добавить новый stateful оператор (новый UID) | Да |
| Удалить stateful оператор (UID был) | Нужен --allowNonRestoredState |
| Изменить UID оператора со state | Нужен --allowNonRestoredState (state теряется) |
| Изменить тип state (Integer -> Long) | Нет (StateMigrationException) |
| Изменить тип POJO в state с миграцией схемы | Иногда (через TypeSerializer migration) |
| Изменить parallelism | Да (в пределах maxParallelism) |
| Изменить maxParallelism | Нет (нужна migration через State Processor API) |
| Сменить state backend (через canonical savepoint) | Да |
Production check-list при upgrade
- Каждый оператор со state имеет explicit
.uid(). maxParallelismзадан явно с большим запасом (1024+).state.backendиstate.checkpoints.dirвflink-conf.yaml.- Перед upgrade —
flink stop --savepointPath .... - Запускать с
flink run -s .... - После запуска проверить в Flink UI: state size, lastCheckpointDuration, no error.
- Сохранить старый savepoint минимум 7 дней — для rollback.
Попробуй сам
- Запусти job с
setParallelism(4), setMaxParallelism(128). - Сделай savepoint, рестартуй с
-p 8. State должен сохраниться. - Попробуй
-p 256. Получи ошибку («parallelism cannot exceed maxParallelism»). - Сделай canonical savepoint, перезапусти с
setMaxParallelism(1024)и без-s(пустой state). - Подними
-p 256— теперь работает.
Ключевые выводы
- Restore через
flink run -s <path>— для savepoint и checkpoint. -p Nменяет parallelism при restore. Flink перераспределяет state.maxParallelism— жёсткий потолок rescale. Задаётся при первом запуске, не меняется без миграции.- KeyGroups — атомарная единица rescale для keyed state. Их количество =
maxParallelism. - Задавайте
maxParallelismявно с запасом: 1024+ даже для маленьких job’ов. --allowNonRestoredState— игнорировать orphan state при изменении topology. Использовать осторожно.