Learning Platform
Глоссарий Troubleshooting
Урок 11.04 · 20 мин
Средний
RestoreRescaleMax ParallelismKeyGroup

Restore и rescale: изменение parallelism

В предыдущем уроке мы научились создавать savepoint’ы. Теперь разберём обратную сторону — restore: как восстановить job из snapshot, как изменить parallelism при restore, и какие гарантии Flink даёт при rescale разных типов state (keyed, operator, broadcast).

Главный gotcha rescale — параметр maxParallelism, который задаётся при первом запуске job и не может быть изменён при rescale. Если вы не подумали о нём заранее, через год вам придётся делать миграцию через canonical savepoint, что больно.


Базовый restore

flink run -s s3://savepoints/savepoint-abc123 my-job.jar

Flink:

  1. Скачивает savepoint metadata.
  2. Для каждого оператора в новом job ищет соответствующий state в savepoint по UID.
  3. Для keyed state — распределяет keyGroups между параллельными subtask’ами.
  4. Для operator state — распределяет элементы списка (even-split или union).
  5. Для broadcast state — каждый subtask получает полную копию.
  6. Запускает job, и source начинает читать с offset’а из savepoint.

Restore с другим parallelism

flink run -s s3://savepoints/sp-xyz -p 16 my-job.jar

Флаг -p 16 устанавливает parallelism = 16 для нового запуска. Если savepoint был сделан с parallelism = 8 — Flink перераспределит state.

Как это работает для разных типов state:

Key groups internals: как rescale перераспределяет state

Keyed state: keyGroups

Keyed state хранится не «per key», а «per keyGroup». KeyGroup — это логический shard, в который Flink распределяет ключи через хеш-функцию. Каждый subtask отвечает за подмножество keyGroups.

При rescale Flink перераспределяет keyGroups между новыми subtask’ами без перемещения данных внутри keyGroup’ы.

Operator state (ListState)

Even-split: Flink собирает все элементы со всех старых subtask’ов и поровну раздаёт новым subtask’ам. См. модуль 08, урок 3.

Operator state (UnionListState)

Каждый новый subtask получает полный union всех элементов. См. модуль 08, урок 3.

Broadcast state

Каждый новый subtask получает полную копию state. (Как broadcast делает для main потока.)


maxParallelism: критичный параметр

maxParallelism — максимальное число параллельных subtask’ов, до которого job может масштабироваться. Это жёсткий потолок rescale.

Default: Flink вычисляет автоматически на основе initial parallelism:

  • parallelism < 32 -> maxParallelism = 128
  • parallelism = 32-128 -> maxParallelism = parallelism * 1.5 (округлено)
  • parallelism >= 128 -> maxParallelism = parallelism * 1.5

Минимум 128, максимум 32768.

env.setParallelism(8);
env.setMaxParallelism(256);  // Явно задать
env.set_parallelism(8)
env.set_max_parallelism(256)

Почему maxParallelism критичен

maxParallelism определяет количество keyGroups для keyed state. KeyGroup — атомарная единица rescale: Flink может распределить keyGroups между subtask’ами, но не может разбить одну keyGroup на две.

Правило: maxParallelism определяет максимум, до которого можно увеличить parallelism при rescale. Если maxParallelism = 128, нельзя сделать parallelism = 256.

KeyGroups и rescale

Start parallelism 4 max 128

Job стартует с parallelism=4, Flink установил maxParallelism=128 (default).

128 keyGroups across 4 subtasks

Flink создал 128 keyGroups. Распределил между 4 subtask: subtask 0 владеет keyGroup 0-31, subtask 1 — 32-63, и т.д.

Rescale to 64

Rescale до parallelism=64. maxParallelism=128 позволяет.

128 keyGroups across 64 subtasks

Каждый subtask получает 2 keyGroups: subtask 0 = keyGroup 0,1; subtask 1 = keyGroup 2,3; и т.д. Без переноса данных внутри keyGroup.

Rescale to 256 FAILS

Rescale до parallelism=256. maxParallelism=128 НЕ позволяет — нельзя разбить keyGroup.

Error

Job не запустится — parallelism > maxParallelism. Нужна миграция через canonical savepoint.

Что делать, если упёрлись в maxParallelism

Если ваш job стал получать в 10 раз больше нагрузки и default maxParallelism = 128 мал — нужна миграция:

  1. Создать canonical savepoint с текущим job.
  2. Изменить maxParallelism в коде (например, на 1024).
  3. Запустить новую версию без -s флага — то есть с пустым state.
  4. Backfill state из savepoint через State Processor API (StateBackend API для чтения savepoint’а и записи в новый формат с другим maxParallelism).

State Processor API — отдельная тема (краткое введение в модуле 16). Это нетривиальная процедура с downtime.

WARNING

Задавайте maxParallelism явно и с большим запасом при первом запуске production job. Например, maxParallelism = 1024 для job, который сейчас работает с parallelism = 8. Это даёт огромное окно для будущего rescale без миграции. Cost: чуть больше memory overhead для tracking keyGroups, но это копейки.


Restore checkpoint vs savepoint

Можно восстанавливать из обоих:

# Из savepoint
flink run -s s3://savepoints/savepoint-xyz my-job.jar

# Из checkpoint
flink run -s s3://checkpoints/.../chk-42 my-job.jar

В Flink 1.15+ оба используют один и тот же restore механизм. Но:

  • Checkpoint обычно native format (backend-specific). Не работает при смене backend.
  • Savepoint обычно canonical format. Работает при смене backend и upgrade Flink.

Для emergency restore (crash) — checkpoint (свежее). Для planned upgrade — savepoint.


Allow non-restored state

flink run -s s3://savepoints/sp \
    --allowNonRestoredState \
    my-job.jar

Когда вы удаляете оператор из job или меняете его UID, в savepoint остаётся orphan state. Без --allowNonRestoredState restore падает.

С флагом orphan state просто игнорируется (выкидывается). Job стартует, но state удалённого оператора потерян.

Когда использовать:

  • Намеренно удалили оператор (например, убрали enrichment стадию).
  • Refactor код, изменили UID (плохая практика, но бывает).
  • Миграция: некоторые операторы заменены полностью.

Когда не использовать:

  • Случайно изменили UID — заметите потерю state через час. Лучше pad UID и не использовать флаг.

Restore без savepoint: фактический рестарт

Если запустить job без -s — он стартует с пустым state. Для stateful job это значит:

  • Counter per user = 0.
  • Window contents = пустые.
  • Kafka source начинает с auto.offset.reset (latest или earliest по конфигу).

В production это почти всегда не то, что вы хотите. Двойная проверка: всегда указывайте -s при restore.


Изменение topology: что переживает rescale, что нет

Изменение в кодеRestore проходит?
Добавить новый оператор без stateДа
Удалить оператор без stateДа
Добавить новый stateful оператор (новый UID)Да
Удалить stateful оператор (UID был)Нужен --allowNonRestoredState
Изменить UID оператора со stateНужен --allowNonRestoredState (state теряется)
Изменить тип state (Integer -> Long)Нет (StateMigrationException)
Изменить тип POJO в state с миграцией схемыИногда (через TypeSerializer migration)
Изменить parallelismДа (в пределах maxParallelism)
Изменить maxParallelismНет (нужна migration через State Processor API)
Сменить state backend (через canonical savepoint)Да

Production check-list при upgrade

  1. Каждый оператор со state имеет explicit .uid().
  2. maxParallelism задан явно с большим запасом (1024+).
  3. state.backend и state.checkpoints.dir в flink-conf.yaml.
  4. Перед upgrade — flink stop --savepointPath ....
  5. Запускать с flink run -s ....
  6. После запуска проверить в Flink UI: state size, lastCheckpointDuration, no error.
  7. Сохранить старый savepoint минимум 7 дней — для rollback.

Попробуй сам

  1. Запусти job с setParallelism(4), setMaxParallelism(128).
  2. Сделай savepoint, рестартуй с -p 8. State должен сохраниться.
  3. Попробуй -p 256. Получи ошибку («parallelism cannot exceed maxParallelism»).
  4. Сделай canonical savepoint, перезапусти с setMaxParallelism(1024) и без -s (пустой state).
  5. Подними -p 256 — теперь работает.

Ключевые выводы

  1. Restore через flink run -s <path> — для savepoint и checkpoint.
  2. -p N меняет parallelism при restore. Flink перераспределяет state.
  3. maxParallelism — жёсткий потолок rescale. Задаётся при первом запуске, не меняется без миграции.
  4. KeyGroups — атомарная единица rescale для keyed state. Их количество = maxParallelism.
  5. Задавайте maxParallelism явно с запасом: 1024+ даже для маленьких job’ов.
  6. --allowNonRestoredState — игнорировать orphan state при изменении topology. Использовать осторожно.
Проверка знанийKnowledge check
Вы стартовали Flink job с setParallelism(8). maxParallelism не задавали явно. Через 6 месяцев нагрузка выросла в 50 раз, нужно сделать rescale до parallelism=400. При flink run -s savepoint -p 400 получаете ошибку 'parallelism cannot exceed maxParallelism'. Что произошло, и как решить?
ОтветAnswer
Произошло: при первом запуске Flink автоматически выставил maxParallelism = 128 (default для parallelism < 32). Это hard limit для всех будущих rescale. Сейчас попытка установить parallelism=400 > maxParallelism=128 отклоняется. Решение через State Processor API: (1) Сделать canonical savepoint текущей версии. (2) Написать утилиту через State Processor API: читать savepoint с maxParallelism=128, перепаковывать state в новый формат с maxParallelism=1024. (3) Изменить код job: explicit setMaxParallelism(1024). (4) Запустить новую версию из переписанного savepoint с -p 400. Это даунтайм процедура — отдельный план миграции с rollback стратегией. Урок для будущего: всегда задавать setMaxParallelism(1024+) при первом запуске production job, чтобы не оказаться в этой ситуации. Cost — копейки overhead на tracking keyGroups, benefit — гибкость rescale без миграции.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Что такое maxParallelism и почему его важно задавать явно с запасом при первом старте production job?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4