Upgrade modes: savepoint, last-state, stateless
Стримовый джоб не может быть просто перезапущен — в нём живёт стейт. Сотни мегабайт keyed state с агрегациями, миллионы записей в RocksDB, прогресс по offset-ам Kafka. Когда вы катите новую версию (баг-фикс, новая фича, обновление зависимостей) — нужно сохранить стейт, остановить старый джоб, поднять новый и подцепить тот же стейт.
Flink Kubernetes Operator предлагает три варианта, как это сделать: savepoint, last-state и stateless. У каждого свои гарантии, своя скорость, свои подводные камни. Этот урок — про то, как они работают и когда что выбирать.
Зачем вообще “upgrade modes”
Когда вы делаете kubectl apply -f flink-deployment.yaml с изменённым image или flinkConfiguration, оператор должен:
- Остановить старый джоб (так чтобы стейт сохранился где-то снаружи кластера).
- Снести старые поды JM и TM.
- Поднять новые с новым образом или конфигом.
- Запустить джоб, восстановив его из сохранённого стейта.
Главный вопрос — откуда восстанавливаемся. Можно из ручного savepoint (надёжно, но медленнее). Можно из последнего удачного чекпойнта (быстрее, но риск отката записанных данных). Можно вообще не восстанавливаться (новый джоб с нуля). Это и есть три upgradeMode.
upgradeMode задаётся в spec.job.upgradeMode и применяется ко всем будущим изменениям. Можно менять между деплоями. По умолчанию — stateless, но в production почти всегда savepoint.
Сравнение режимов: одна таблица
Режим 1: savepoint
Это рекомендованный режим для всех stateful production джобов. Перед любым изменением (новый образ, новый parallelism, новый конфиг) оператор:
- Вызывает Flink REST API
stop-with-savepoint— джоб делает синхронный финальный savepoint вstate.savepoints.dir(например,s3://flink-state/orders/savepoints/). - После того, как savepoint успешно записан — джоб корректно останавливается (все источники остановлены, чекпойнт-барьер дошёл до всех sink-ов, EXACTLY_ONCE-гарантии соблюдены).
- Оператор сносит поды и поднимает новые.
- При старте джоба передаёт флаг
--fromSavepoint s3://.../savepoint-xxxи--allowNonRestoredState false. - Новый джоб восстанавливает стейт из savepoint, источники продолжают с тех же offset-ов.
Гарантии:
- EXACTLY_ONCE сохраняется: ни одна запись не теряется и не дублируется при правильно настроенных connector-ах.
- Savepoint версионно совместим между разными версиями Flink (1.18 -> 2.2) и переживает изменения parallelism (для keyed state с key groups).
- Если новый код несовместим со старым стейтом (например, удалили field из POJO) — джоб упадёт при старте с понятной ошибкой, а старый savepoint останется как fallback для ручного rollback.
Когда использовать:
- ВСЕ stateful production джобы с EXACTLY_ONCE.
- Любые изменения, где могут поменяться сериализаторы (POJO, Avro schema).
- При повышении parallelism (нужно key group redistribution из savepoint).
Цена:
- Время на savepoint: от 30 секунд до нескольких минут в зависимости от размера стейта и скорости S3.
- Доводнительная нагрузка на джоб при savepoint (это синхронная операция).
- Простой джоба на это время.
Если savepoint не успевает за timeout (по умолчанию 10 минут), оператор может зависнуть в UPGRADING-фазе. Для джобов с десятками гигабайт стейта увеличьте kubernetes.operator.savepoint.trigger.timeout в конфиге оператора и используйте инкрементальные savepoint (Flink 2.0+: state.savepoints.format: native — savepoint в нативном формате backend-а, быстрее).
Режим 2: last-state
Этот режим — компромисс между скоростью и надёжностью. Оператор использует последний удачный checkpoint вместо явного savepoint. Логика:
- Оператор вызывает
cancel(НЕstop-with-savepoint). Это менее аккуратно — Flink просто прибивает джоб. - Из метаданных в HA ConfigMap оператор знает путь к последнему успешному чекпойнту.
- Поды сносятся, новые поднимаются.
- Джоб стартует с флагом
--fromSavepoint s3://.../chk-XXXX(даже хотя это checkpoint, но Flink его принимает так же).
Гарантии:
- При EXACTLY_ONCE-сетапе с транзакционным sink-ом (Kafka TX, Iceberg) — нет потери и нет дубликатов, как в savepoint режиме. НО есть откат: записи между последним checkpoint-ом и моментом cancel будут переиграны (это нормально для EXACTLY_ONCE, sink-ы их подавят/откатят).
- При AT_LEAST_ONCE-setup-ах будут дубликаты в записях за период между checkpoint и cancel.
Когда использовать:
- Когда savepoint занимает слишком много времени и downtime критичен.
- Для джобов с маленьким интервалом между checkpoint-ами (например, 10 секунд) — потерь почти нет.
- Когда у sink-а есть idempotency или транзакции.
Цена:
- Откат на checkpoint означает повторную обработку небольшого окна данных. Для агрегаций — это норм, для side-эффектов (write to external DB без idempotency) — может привести к дублям.
- Если последний checkpoint был давно (например, при крупном back-pressure событии) — откат может быть значительным.
Когда НЕ использовать:
- Без EXACTLY_ONCE-настройки — дубли в downstream системах.
- При радикальных изменениях схемы стейта — checkpoint в native format может быть несовместим (savepoint в canonical format безопаснее).
- При увеличении parallelism — Flink 2.0+ позволяет это из checkpoint, но это менее тестируемая ветвь, чем savepoint.
В Flink K8s Operator 1.14 last-state — это upgradeMode: last-state. Этот режим требует, чтобы у вас был HA включён (high-availability.type: kubernetes), иначе оператор не знает, где последний checkpoint.
Режим 3: stateless
Самый простой режим — никакого восстановления. Оператор просто прибивает джоб и поднимает новый с нуля.
cancelбез savepoint.- Стейт в
state.checkpoints.dirостаётся, но не используется. - Новый джоб стартует с пустыми state backend-ами.
Когда использовать:
- Полностью stateless джобы: простой mapping
Kafka -> Flink -> Kafkaбез агрегаций, окон, joins. Стейт = только offset-ы Kafka, и они хранятся в Kafka внутри потребительской группы (если включено). - Радикальные рефакторинги — поменяли структуру стейта так сильно, что старый savepoint всё равно невозможно использовать. Тогда честнее сделать stateless apply и пересчитать из Kafka (если retention позволяет).
- Dev-окружения для быстрых итераций.
НИКОГДА не использовать:
- Stateful production джобы — потеряете часы или дни накопленных агрегаций.
- Любые джобы, где источник не позволяет полный replay (например, CDC от Debezium с истекшим binlog).
Реальный пример: смена parallelism
Допустим, у вас джоб orders-to-iceberg с parallelism: 4. Нужно поднять до 8 (новые TPS требуют). Применяете изменение:
spec:
job:
parallelism: 8 # было 4
upgradeMode: savepoint
Что произойдёт с разными режимами:
savepoint:
- Оператор вызывает
stop-with-savepoint. ~60 сек на savepoint к S3. - Поды JM и TM удаляются.
- Поднимаются новые с обновлённой spec (TM count удваивается, slots соответствуют).
- Джоб стартует, читает savepoint, перераспределяет keyed state по key groups между новыми оператор-инстансами.
- Старт занимает ~90 сек (загрузка savepoint в RocksDB).
- Готово. Стейт сохранён, exactly-once соблюдён.
last-state:
- Оператор cancel. ~5 сек.
- Поды пересоздаются.
- Джоб стартует с последнего checkpoint, перераспределяет state.
- Записи между cancel и last checkpoint — переигрываются.
- Старт занимает ~30 сек. Выигрыш в скорости — около минуты.
stateless:
- cancel, новый запуск. Все агрегации сброшены. Окна потеряны. Не делайте так для stateful джоба.
Для срочных hotfix-ов в проде savepoint режим может быть слишком медленным. Тогда last-state — приемлемый компромисс, если sink-ы транзакционные. Но дефолт всегда savepoint.
Откат через initialSavepointPath
Что если новый деплой сломался? Скажем, новый код упал на каком-то edge-case, и джоб в CrashLoopBackOff. Оператор перевёл FlinkDeployment в состояние ERROR.
В savepoint-режиме у вас всегда есть откатный savepoint — тот, который сделал оператор перед upgrade-ом. Откат:
- Возвращаете старый образ в
spec.image. - В
spec.jobдобавляетеinitialSavepointPath: s3://.../savepoint-pre-upgrade-xxx. kubectl apply -f.
Оператор увидит изменение spec, применит upgrade с этим savepoint-ом как стартовая точка. Джоб поднимется с состоянием, какое было ДО неудачного деплоя.
spec:
image: my-registry/orders-to-iceberg:v0.4.1 # старый
job:
initialSavepointPath: s3://flink-state/orders/savepoints/savepoint-9a3b2-2c1d4e
upgradeMode: savepoint
Пути к savepoint-ам сохраняются в status FlinkDeployment-а (kubectl get flinkdeployment -o yaml). При успешном upgrade-е там видно lastSavepointTriggerNonce и savepointTriggerLocation. Это и есть путь для отката. В CI/CD держите автоматическую регистрацию этих путей в external store (S3 bucket с тегами или DynamoDB) — чтобы при необходимости отката не лезть в kubectl.
Что выбирать: дерево решений
Простое правило: по умолчанию savepoint. Переключайтесь на last-state только если downtime критичен И sink-ы идемпотентны. Stateless — только для джобов без стейта или для редких ситуаций с полным сбросом.
Ключевые выводы
-
upgradeMode управляет тем, как оператор переключает джоб при изменении spec. Три варианта: savepoint, last-state, stateless.
-
savepoint — рекомендованный default для всех stateful production джобов. Делает явный consistent savepoint, поддерживает изменение parallelism и версионные миграции стейта. Медленнее (30s — 5min).
-
last-state — компромисс: использует последний checkpoint вместо savepoint. Быстро, но риск отката на checkpoint. Безопасно только с idempotent / EXACTLY_ONCE sink-ами.
-
stateless — без восстановления. Только для stateless джобов или радикальных рефакторингов.
-
Откат через initialSavepointPath — стандартный механизм отката, savepoint от предыдущей версии хранится в S3 и используется как стартовая точка для старого образа.
-
Native vs canonical savepoint (Flink 2.0+): native быстрее, но привязан к backend. Canonical — slower, but portable. Для регулярных апгрейдов в проде — native. Для миграций (RocksDB -> HashMap) — canonical.