Learning Platform
Глоссарий Troubleshooting
Урок 11.01 · 22 мин
Средний
CheckpointConfigurationExactly OnceRetention

Конфигурация checkpoint’ов

Checkpoint — это автоматический snapshot всего состояния job, который Flink делает периодически в фоне. Он позволяет восстановить job после сбоя без потери данных. По умолчанию checkpoint’ы выключены — в production это значит, что любой crash сбрасывает state в ноль. Включение и тонкая настройка checkpoint’ов — обязательный шаг production deployment.

В этом уроке разберём ключевые параметры: enableCheckpointing, режимы EXACTLY_ONCE/AT_LEAST_ONCE, retention policy, externalized checkpoints, и компромиссы между частотой и накладными расходами.


Минимальная настройка

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000);  // каждые 60 секунд
# PyFlink 2.x
env.enable_checkpointing(60_000)

Это минимум. Job будет делать snapshot всего state каждые 60 секунд и сохранять в configured state backend (file:///, s3://, hdfs://).

Что значит «каждые 60 секунд»:

interval — минимальное время между концом одного checkpoint и началом следующего. Если checkpoint занимает 30 секунд, следующий начнётся через 60 секунд после его завершения, то есть всего цикл 90 секунд. Это сделано, чтобы checkpoint’ы не накладывались друг на друга.


Mode: EXACTLY_ONCE vs AT_LEAST_ONCE

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// или
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

EXACTLY_ONCE (default)

Chandy-Lamport: детальный механизм барьеров

Checkpoint barriers используют alignment: оператор, получив barrier из одного канала, ждёт barrier из всех остальных каналов, прежде чем сделать snapshot. Это гарантирует, что snapshot включает все события до barrier и ни одно после.

Цена: во время alignment пришедшие из «уже зачекпоинченных» каналов события буферизуются и обрабатываются после checkpoint. Это добавляет latency.

AT_LEAST_ONCE

Без alignment: каждый оператор делает snapshot сразу после получения первого barrier. События могут быть включены в snapshot дважды (если поступили после barrier на одном канале, но до barrier на другом).

Цена: при restore некоторые события могут быть обработаны повторно — нужна идемпотентность downstream’а.

Когда использовать что

  • EXACTLY_ONCE: дефолт. Большинство jobs.
  • AT_LEAST_ONCE: если latency критична и downstream tolerant к дубликатам (метрики, аналитика). Также для unaligned checkpoints, которые мы изучим в модуле 11.

Interval: сколько данных можно потерять

interval — главный trade-off:

IntervalПри crash теряетсяНакладные расходы
5 секундДо ~5 секунд работыВысокие — постоянный snapshot
30 секундДо ~30 секундСредние
60 секунд (типично)До ~1 минутыСредние
5 минутДо ~5 минутНизкие
30 минутДо ~30 минутМинимальные

«При crash теряется» = время между последним успешным checkpoint и моментом crash. После crash job стартует с last checkpoint, повторно обрабатывает все события из source с этого момента (если source поддерживает replay — Kafka, Kinesis).

TIP

Не делайте interval слишком маленьким (меньше 5 секунд) — checkpoint занимает доли секунды на small state, секунды на medium state, минуты на RocksDB с десятками GB. Если interval меньше времени checkpoint — checkpoints будут накладываться и job замедлится.


Min pause between checkpoints

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000);

Минимальное время между окончанием одного checkpoint и началом следующего. Гарантирует, что между checkpoint’ами есть «нормальное» время обработки.

Если у вас interval = 60s и minPause = 30s:

  • Сначала checkpoint занял 45 секунд. После него ждём max(60 - 45, 30) = 30 секунд.
  • Следующий начнётся через 75 секунд от старта первого.

Это защита от ситуации «checkpoint занимает 90% времени», когда interval меньше длительности checkpoint.


Timeout: когда отменить зависший checkpoint

env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);  // 10 минут

Если checkpoint занимает дольше timeout, Flink отменяет его. По умолчанию 10 минут. Для больших state (несколько TB) может понадобиться 30+ минут.

Часто отменяющиеся checkpoint’ы (numCheckpointsFailed растёт) — сигнал, что state backend или storage не справляются. Симптомы:

  • Slow S3/HDFS uploads.
  • Slow RocksDB compactions.
  • Slow async snapshots (slow disk).

Tolerable failed checkpoints

env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);

Сколько подряд провальных checkpoint’ов допустимо до того, как job упадёт. По умолчанию 0 — любой fail валит job. В production обычно 3-5, чтобы пережить транзиентные сбои S3 или сетевые джиттеры.


Retention: что хранится после job termination

env.getCheckpointConfig().setExternalizedCheckpointCleanup(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

Три варианта:

  1. DELETE_ON_CANCELLATION (default до некоторых версий) — checkpoint удаляется когда job отменён вручную (flink cancel). При crash сохраняется.
  2. RETAIN_ON_CANCELLATION — checkpoint сохраняется и после ручной отмены. Можно restore.
  3. NO_EXTERNALIZED_CHECKPOINTS — checkpoint’ы внутренние, удаляются всегда. Не для production.

В production почти всегда нужен RETAIN_ON_CANCELLATION — даёт возможность откатиться при «упс, отменил не тот job».


Number of retained checkpoints

# flink-conf.yaml
state.checkpoints.num-retained: 3

Сколько последних checkpoint’ов хранить. По умолчанию 1 — Flink держит только последний. При получении нового — удаляет предыдущий.

Для production обычно 3-5: даёт окно отката при найденной багулине, не раздувая место в state backend.


Diagram: checkpoint lifecycle

Checkpoint lifecycle при normal operation и crash

Trigger checkpoint

JobManager инициирует checkpoint. Координатор посылает barrier во все source operators.

Operators snapshot state

Каждый оператор получает barrier, делает snapshot своего state в state backend (S3, HDFS, local disk).

Checkpoint complete

После завершения всех snapshot'ов координатор объявляет checkpoint complete. Metadata записывается в external path.

Stored at s3 chk-42

State в S3: chk-42/ contains все snapshot файлы. Готово к восстановлению.

Crash detected

TaskManager упал (OOM, network partition). Job обнаружил failure.

Restore from chk-42

Job рестартует с last successful checkpoint chk-42. State восстановлен, source replays события с last committed offset.

Production setup: рекомендуемый конфиг

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Checkpoint каждые 60 секунд в EXACTLY_ONCE режиме
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);

CheckpointConfig cfg = env.getCheckpointConfig();

// Минимум 30 секунд между checkpoint'ами
cfg.setMinPauseBetweenCheckpoints(30_000);

// Timeout 10 минут
cfg.setCheckpointTimeout(10 * 60 * 1000);

// Терпим 3 подряд fail'а
cfg.setTolerableCheckpointFailureNumber(3);

// Сохранять при ручной отмене
cfg.setExternalizedCheckpointCleanup(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

// Только 1 concurrent checkpoint (не накладываются)
cfg.setMaxConcurrentCheckpoints(1);
# flink-conf.yaml — кластерный конфиг
state.checkpoints.dir: s3://flink-state/checkpoints/
state.savepoints.dir: s3://flink-state/savepoints/
state.checkpoints.num-retained: 3
# PyFlink: тот же конфиг
env.enable_checkpointing(60000, CheckpointingMode.EXACTLY_ONCE)
cfg = env.get_checkpoint_config()
cfg.set_min_pause_between_checkpoints(30000)
cfg.set_checkpoint_timeout(10 * 60 * 1000)
cfg.set_tolerable_checkpoint_failure_number(3)
cfg.enable_externalized_checkpoints(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
)
cfg.set_max_concurrent_checkpoints(1)

Unaligned checkpoints (briefly)

Flink 1.11+ поддерживает unaligned checkpoints — оператор делает snapshot без ожидания barrier’ов из всех каналов. Это убирает alignment latency при backpressure.

cfg.enableUnalignedCheckpoints();

Trade-off: больше данных в snapshot (in-flight данные между операторами), но мгновенный checkpoint при backpressure. Полезно когда latency критична и есть периодические backpressure spikes.

Подробнее — в модуле 11.


Метрики, которые надо мониторить

В Flink UI и Prometheus:

МетрикаЧто значитКогда alerting
lastCheckpointDurationВремя последнего checkpoint>50% от interval
lastCheckpointSizeРазмер snapshotРастёт линейно — leak в state
numberOfCompletedCheckpointsTotal successfulДолжен расти
numberOfFailedCheckpointsTotal failedРастёт быстрее 1/час
alignmentTime (per task)Время ожидания barrier’ов>5s — backpressure

Попробуй сам

  1. Создай простой job с enableCheckpointing(10000) — каждые 10 секунд.
  2. Запусти и понаблюдай Flink UI -> Checkpoints — смотри как создаются.
  3. Поставь interval = 100ms (искусственно мало). Запусти. Что происходит? (Спойлер: checkpoints начинают накладываться, job просядет в throughput.)
  4. Верни нормально, сделай искусственный crash (kill -9 taskmanager). Посмотри в UI как job рестартует с last checkpoint.
  5. Поменяй на EXTERNALIZED RETAIN_ON_CANCELLATION, отмени job, найди checkpoint в S3, рестартуй из него вручную через flink run -s s3://.../chk-42 my-job.jar.

Ключевые выводы

  1. Checkpoint’ы выключены по умолчанию — нужно явно enableCheckpointing(interval).
  2. EXACTLY_ONCE (default) — с alignment, дороже но безопаснее. AT_LEAST_ONCE — быстрее, но возможны дубликаты.
  3. interval определяет, сколько данных теряется при crash. Обычно 30-60 секунд.
  4. minPauseBetweenCheckpoints защищает от наложения. timeout отменяет зависшие.
  5. RETAIN_ON_CANCELLATION обязателен в production — иначе ручная отмена удалит state.
  6. num-retained: 3 даёт окно отката. Мониторьте numberOfFailedCheckpoints и lastCheckpointDuration.
Проверка знанийKnowledge check
У вас job с EXACTLY_ONCE checkpoint каждые 30 секунд. lastCheckpointDuration в Flink UI = 25 секунд, начало расти со временем. alignmentTime > 10 секунд. Что происходит, какие три действия предпринять?
ОтветAnswer
Происходит: state растёт (lastCheckpointDuration увеличивается с течением времени — больше state, дольше snapshot). Большой alignmentTime говорит о backpressure: операторы ждут barrier'ы из медленных каналов. С interval=30s и duration=25s — мало времени остаётся на нормальную обработку, скоро checkpoint займёт >30s и начнут накладываться. Действия: (1) Увеличить interval до 60-120 секунд: уменьшит частоту, даст больше room для обработки. (2) Включить unaligned checkpoints: cfg.enableUnalignedCheckpoints() — снимет alignment time при backpressure. (3) Найти источник backpressure через Flink UI Subtasks View: возможно, нужна оптимизация конкретного оператора (async I/O вместо sync, увеличение parallelism). Также проверить state size: если state растёт линейно — где-то leak (например, ValueState без TTL), нужен StateTtlConfig или manual cleanup.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. В чём ключевое различие EXACTLY_ONCE и AT_LEAST_ONCE checkpoint mode?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4