Learning Platform
Глоссарий Troubleshooting
Урок 16.02 · 25 мин
Средний
UpgradeSavepointCheckpointLast-stateStatelessOperator

Upgrade modes: savepoint, last-state, stateless

Стримовый джоб не может быть просто перезапущен — в нём живёт стейт. Сотни мегабайт keyed state с агрегациями, миллионы записей в RocksDB, прогресс по offset-ам Kafka. Когда вы катите новую версию (баг-фикс, новая фича, обновление зависимостей) — нужно сохранить стейт, остановить старый джоб, поднять новый и подцепить тот же стейт.

Flink Kubernetes Operator предлагает три варианта, как это сделать: savepoint, last-state и stateless. У каждого свои гарантии, своя скорость, свои подводные камни. Этот урок — про то, как они работают и когда что выбирать.

Savepoint vs checkpoint: внутренние отличия форматов

Зачем вообще “upgrade modes”

Когда вы делаете kubectl apply -f flink-deployment.yaml с изменённым image или flinkConfiguration, оператор должен:

  1. Остановить старый джоб (так чтобы стейт сохранился где-то снаружи кластера).
  2. Снести старые поды JM и TM.
  3. Поднять новые с новым образом или конфигом.
  4. Запустить джоб, восстановив его из сохранённого стейта.

Главный вопрос — откуда восстанавливаемся. Можно из ручного savepoint (надёжно, но медленнее). Можно из последнего удачного чекпойнта (быстрее, но риск отката записанных данных). Можно вообще не восстанавливаться (новый джоб с нуля). Это и есть три upgradeMode.

NOTE

upgradeMode задаётся в spec.job.upgradeMode и применяется ко всем будущим изменениям. Можно менять между деплоями. По умолчанию — stateless, но в production почти всегда savepoint.


Сравнение режимов: одна таблица

Upgrade modes: что они делают
savepointПеред остановкой джоба оператор делает savepoint в внешнем хранилище (S3). Восстановление при старте — из этого savepoint.
НадёжностьСамый надёжный. Savepoint - явный консистентный снапшот, может быть использован для версионных миграций стейта.
last-stateИспользует последний удачный чекпойнт во время cancel джоба. Не делает явный savepoint - быстрее.
СкоростьБыстро. Но при сбое в момент cancel - может потерять записи между последним checkpoint и моментом cancel.
statelessПолностью новый джоб без восстановления. Старый стейт выбрасывается. Используется для джобов без стейта или для радикальных рефакторингов.
ПростотаСтейт теряется. Подходит только для stateless джобов или когда вы готовы пересчитать всё с нуля (replay из Kafka).

Режим 1: savepoint

Это рекомендованный режим для всех stateful production джобов. Перед любым изменением (новый образ, новый parallelism, новый конфиг) оператор:

  1. Вызывает Flink REST API stop-with-savepoint — джоб делает синхронный финальный savepoint в state.savepoints.dir (например, s3://flink-state/orders/savepoints/).
  2. После того, как savepoint успешно записан — джоб корректно останавливается (все источники остановлены, чекпойнт-барьер дошёл до всех sink-ов, EXACTLY_ONCE-гарантии соблюдены).
  3. Оператор сносит поды и поднимает новые.
  4. При старте джоба передаёт флаг --fromSavepoint s3://.../savepoint-xxx и --allowNonRestoredState false.
  5. Новый джоб восстанавливает стейт из savepoint, источники продолжают с тех же offset-ов.

Гарантии:

  • EXACTLY_ONCE сохраняется: ни одна запись не теряется и не дублируется при правильно настроенных connector-ах.
  • Savepoint версионно совместим между разными версиями Flink (1.18 -> 2.2) и переживает изменения parallelism (для keyed state с key groups).
  • Если новый код несовместим со старым стейтом (например, удалили field из POJO) — джоб упадёт при старте с понятной ошибкой, а старый savepoint останется как fallback для ручного rollback.

Когда использовать:

  • ВСЕ stateful production джобы с EXACTLY_ONCE.
  • Любые изменения, где могут поменяться сериализаторы (POJO, Avro schema).
  • При повышении parallelism (нужно key group redistribution из savepoint).

Цена:

  • Время на savepoint: от 30 секунд до нескольких минут в зависимости от размера стейта и скорости S3.
  • Доводнительная нагрузка на джоб при savepoint (это синхронная операция).
  • Простой джоба на это время.
WARNING

Если savepoint не успевает за timeout (по умолчанию 10 минут), оператор может зависнуть в UPGRADING-фазе. Для джобов с десятками гигабайт стейта увеличьте kubernetes.operator.savepoint.trigger.timeout в конфиге оператора и используйте инкрементальные savepoint (Flink 2.0+: state.savepoints.format: native — savepoint в нативном формате backend-а, быстрее).


Режим 2: last-state

Этот режим — компромисс между скоростью и надёжностью. Оператор использует последний удачный checkpoint вместо явного savepoint. Логика:

  1. Оператор вызывает cancel (НЕ stop-with-savepoint). Это менее аккуратно — Flink просто прибивает джоб.
  2. Из метаданных в HA ConfigMap оператор знает путь к последнему успешному чекпойнту.
  3. Поды сносятся, новые поднимаются.
  4. Джоб стартует с флагом --fromSavepoint s3://.../chk-XXXX (даже хотя это checkpoint, но Flink его принимает так же).

Гарантии:

  • При EXACTLY_ONCE-сетапе с транзакционным sink-ом (Kafka TX, Iceberg) — нет потери и нет дубликатов, как в savepoint режиме. НО есть откат: записи между последним checkpoint-ом и моментом cancel будут переиграны (это нормально для EXACTLY_ONCE, sink-ы их подавят/откатят).
  • При AT_LEAST_ONCE-setup-ах будут дубликаты в записях за период между checkpoint и cancel.

Когда использовать:

  • Когда savepoint занимает слишком много времени и downtime критичен.
  • Для джобов с маленьким интервалом между checkpoint-ами (например, 10 секунд) — потерь почти нет.
  • Когда у sink-а есть idempotency или транзакции.

Цена:

  • Откат на checkpoint означает повторную обработку небольшого окна данных. Для агрегаций — это норм, для side-эффектов (write to external DB без idempotency) — может привести к дублям.
  • Если последний checkpoint был давно (например, при крупном back-pressure событии) — откат может быть значительным.

Когда НЕ использовать:

  • Без EXACTLY_ONCE-настройки — дубли в downstream системах.
  • При радикальных изменениях схемы стейта — checkpoint в native format может быть несовместим (savepoint в canonical format безопаснее).
  • При увеличении parallelism — Flink 2.0+ позволяет это из checkpoint, но это менее тестируемая ветвь, чем savepoint.
NOTE

В Flink K8s Operator 1.14 last-state — это upgradeMode: last-state. Этот режим требует, чтобы у вас был HA включён (high-availability.type: kubernetes), иначе оператор не знает, где последний checkpoint.


Режим 3: stateless

Самый простой режим — никакого восстановления. Оператор просто прибивает джоб и поднимает новый с нуля.

  1. cancel без savepoint.
  2. Стейт в state.checkpoints.dir остаётся, но не используется.
  3. Новый джоб стартует с пустыми state backend-ами.

Когда использовать:

  • Полностью stateless джобы: простой mapping Kafka -> Flink -> Kafka без агрегаций, окон, joins. Стейт = только offset-ы Kafka, и они хранятся в Kafka внутри потребительской группы (если включено).
  • Радикальные рефакторинги — поменяли структуру стейта так сильно, что старый savepoint всё равно невозможно использовать. Тогда честнее сделать stateless apply и пересчитать из Kafka (если retention позволяет).
  • Dev-окружения для быстрых итераций.

НИКОГДА не использовать:

  • Stateful production джобы — потеряете часы или дни накопленных агрегаций.
  • Любые джобы, где источник не позволяет полный replay (например, CDC от Debezium с истекшим binlog).

Реальный пример: смена parallelism

Допустим, у вас джоб orders-to-iceberg с parallelism: 4. Нужно поднять до 8 (новые TPS требуют). Применяете изменение:

spec:
  job:
    parallelism: 8  # было 4
    upgradeMode: savepoint

Что произойдёт с разными режимами:

savepoint:

  1. Оператор вызывает stop-with-savepoint. ~60 сек на savepoint к S3.
  2. Поды JM и TM удаляются.
  3. Поднимаются новые с обновлённой spec (TM count удваивается, slots соответствуют).
  4. Джоб стартует, читает savepoint, перераспределяет keyed state по key groups между новыми оператор-инстансами.
  5. Старт занимает ~90 сек (загрузка savepoint в RocksDB).
  6. Готово. Стейт сохранён, exactly-once соблюдён.

last-state:

  1. Оператор cancel. ~5 сек.
  2. Поды пересоздаются.
  3. Джоб стартует с последнего checkpoint, перераспределяет state.
  4. Записи между cancel и last checkpoint — переигрываются.
  5. Старт занимает ~30 сек. Выигрыш в скорости — около минуты.

stateless:

  1. cancel, новый запуск. Все агрегации сброшены. Окна потеряны. Не делайте так для stateful джоба.
TIP

Для срочных hotfix-ов в проде savepoint режим может быть слишком медленным. Тогда last-state — приемлемый компромисс, если sink-ы транзакционные. Но дефолт всегда savepoint.


Откат через initialSavepointPath

Что если новый деплой сломался? Скажем, новый код упал на каком-то edge-case, и джоб в CrashLoopBackOff. Оператор перевёл FlinkDeployment в состояние ERROR.

В savepoint-режиме у вас всегда есть откатный savepoint — тот, который сделал оператор перед upgrade-ом. Откат:

  1. Возвращаете старый образ в spec.image.
  2. В spec.job добавляете initialSavepointPath: s3://.../savepoint-pre-upgrade-xxx.
  3. kubectl apply -f.

Оператор увидит изменение spec, применит upgrade с этим savepoint-ом как стартовая точка. Джоб поднимется с состоянием, какое было ДО неудачного деплоя.

spec:
  image: my-registry/orders-to-iceberg:v0.4.1  # старый
  job:
    initialSavepointPath: s3://flink-state/orders/savepoints/savepoint-9a3b2-2c1d4e
    upgradeMode: savepoint
WARNING

Пути к savepoint-ам сохраняются в status FlinkDeployment-а (kubectl get flinkdeployment -o yaml). При успешном upgrade-е там видно lastSavepointTriggerNonce и savepointTriggerLocation. Это и есть путь для отката. В CI/CD держите автоматическую регистрацию этих путей в external store (S3 bucket с тегами или DynamoDB) — чтобы при необходимости отката не лезть в kubectl.


Что выбирать: дерево решений

Какой upgradeMode выбрать
Q1: джоб stateful?Stateful = есть keyed state, window state, joins, aggregations. Stateless = только map/filter.
нет
stateless OKДля stateless джобов восстанавливать нечего. Stateless mode - самый простой.
Q2: критичен downtime?Если 1-2 минуты простоя приемлемы - savepoint. Если каждая секунда дорога - рассмотрите last-state.
нет
savepoint (default)Самый надёжный. Используйте всегда, когда downtime не блокирует.
Q3: sink идемпотентный?Idempotent sink = Kafka TX, Iceberg, JDBC с upsert. Non-idempotent = JDBC INSERT, HTTP POST.
да
last-state OKС idempotent sink last-state режим безопасен и быстр. Иначе - savepoint.

Простое правило: по умолчанию savepoint. Переключайтесь на last-state только если downtime критичен И sink-ы идемпотентны. Stateless — только для джобов без стейта или для редких ситуаций с полным сбросом.


Ключевые выводы

  1. upgradeMode управляет тем, как оператор переключает джоб при изменении spec. Три варианта: savepoint, last-state, stateless.

  2. savepoint — рекомендованный default для всех stateful production джобов. Делает явный consistent savepoint, поддерживает изменение parallelism и версионные миграции стейта. Медленнее (30s — 5min).

  3. last-state — компромисс: использует последний checkpoint вместо savepoint. Быстро, но риск отката на checkpoint. Безопасно только с idempotent / EXACTLY_ONCE sink-ами.

  4. stateless — без восстановления. Только для stateless джобов или радикальных рефакторингов.

  5. Откат через initialSavepointPath — стандартный механизм отката, savepoint от предыдущей версии хранится в S3 и используется как стартовая точка для старого образа.

  6. Native vs canonical savepoint (Flink 2.0+): native быстрее, но привязан к backend. Canonical — slower, but portable. Для регулярных апгрейдов в проде — native. Для миграций (RocksDB -> HashMap) — canonical.

Проверка знанийKnowledge check
У вас Flink-джоб с агрегацией ежечасных сумм по 50M ключам в RocksDB (~40 ГБ state). Sink — Apache Iceberg с EXACTLY_ONCE. Каждую неделю катите минорные багфиксы. Что использовать как upgradeMode и почему? Какие риски?
ОтветAnswer
Используйте upgradeMode: savepoint с native savepoint format (state.savepoints.format: native). Обоснование: - 40 ГБ state не подходит для last-state, потому что savepoint в native format будет быстрее и инкрементальный, а для canonical format на таком объёме savepoint занимал бы минуты. С native — секунды-минуты. - savepoint надёжнее last-state для крупного state - явный консистентный снапшот, гарантированно записан в S3 до cancel джоба. - Iceberg sink с EXACTLY_ONCE: last-state с её откатом на checkpoint тоже бы сработал, но savepoint безопаснее, так как стейт критичный. Риски при savepoint на 40 ГБ: 1. Время savepoint: даже с native может занять минуты при медленном S3. Увеличьте kubernetes.operator.savepoint.trigger.timeout в operator config. 2. Backpressure на момент savepoint: джоб может временно отставать от Kafka. Используйте unaligned checkpoints (execution.checkpointing.unaligned: true) - они применимы и к savepoint в Flink 2.x. 3. Disk pressure на TaskManager: при создании savepoint RocksDB делает snapshot файлов. Нужен запас места. Альтернатива: для очень срочных hotfix-ов разово переключиться на last-state, но дефолт - savepoint.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Production Flink-джоб с RocksDB state ~20 ГБ, sink в Kafka с EXACTLY_ONCE. Команда регулярно деплоит багфиксы (раз в неделю). downtime до 60 секунд приемлем. Какой upgradeMode оптимален?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5