Конфигурация checkpoint’ов
Checkpoint — это автоматический snapshot всего состояния job, который Flink делает периодически в фоне. Он позволяет восстановить job после сбоя без потери данных. По умолчанию checkpoint’ы выключены — в production это значит, что любой crash сбрасывает state в ноль. Включение и тонкая настройка checkpoint’ов — обязательный шаг production deployment.
В этом уроке разберём ключевые параметры: enableCheckpointing, режимы EXACTLY_ONCE/AT_LEAST_ONCE, retention policy, externalized checkpoints, и компромиссы между частотой и накладными расходами.
Минимальная настройка
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000); // каждые 60 секунд
# PyFlink 2.x
env.enable_checkpointing(60_000)
Это минимум. Job будет делать snapshot всего state каждые 60 секунд и сохранять в configured state backend (file:///, s3://, hdfs://).
Что значит «каждые 60 секунд»:
interval — минимальное время между концом одного checkpoint и началом следующего. Если checkpoint занимает 30 секунд, следующий начнётся через 60 секунд после его завершения, то есть всего цикл 90 секунд. Это сделано, чтобы checkpoint’ы не накладывались друг на друга.
Mode: EXACTLY_ONCE vs AT_LEAST_ONCE
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// или
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
EXACTLY_ONCE (default)
Chandy-Lamport: детальный механизм барьеровCheckpoint barriers используют alignment: оператор, получив barrier из одного канала, ждёт barrier из всех остальных каналов, прежде чем сделать snapshot. Это гарантирует, что snapshot включает все события до barrier и ни одно после.
Цена: во время alignment пришедшие из «уже зачекпоинченных» каналов события буферизуются и обрабатываются после checkpoint. Это добавляет latency.
AT_LEAST_ONCE
Без alignment: каждый оператор делает snapshot сразу после получения первого barrier. События могут быть включены в snapshot дважды (если поступили после barrier на одном канале, но до barrier на другом).
Цена: при restore некоторые события могут быть обработаны повторно — нужна идемпотентность downstream’а.
Когда использовать что
- EXACTLY_ONCE: дефолт. Большинство jobs.
- AT_LEAST_ONCE: если latency критична и downstream tolerant к дубликатам (метрики, аналитика). Также для unaligned checkpoints, которые мы изучим в модуле 11.
Interval: сколько данных можно потерять
interval — главный trade-off:
| Interval | При crash теряется | Накладные расходы |
|---|---|---|
| 5 секунд | До ~5 секунд работы | Высокие — постоянный snapshot |
| 30 секунд | До ~30 секунд | Средние |
| 60 секунд (типично) | До ~1 минуты | Средние |
| 5 минут | До ~5 минут | Низкие |
| 30 минут | До ~30 минут | Минимальные |
«При crash теряется» = время между последним успешным checkpoint и моментом crash. После crash job стартует с last checkpoint, повторно обрабатывает все события из source с этого момента (если source поддерживает replay — Kafka, Kinesis).
Не делайте interval слишком маленьким (меньше 5 секунд) — checkpoint занимает доли секунды на small state, секунды на medium state, минуты на RocksDB с десятками GB. Если interval меньше времени checkpoint — checkpoints будут накладываться и job замедлится.
Min pause between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000);
Минимальное время между окончанием одного checkpoint и началом следующего. Гарантирует, что между checkpoint’ами есть «нормальное» время обработки.
Если у вас interval = 60s и minPause = 30s:
- Сначала checkpoint занял 45 секунд. После него ждём
max(60 - 45, 30) = 30секунд. - Следующий начнётся через 75 секунд от старта первого.
Это защита от ситуации «checkpoint занимает 90% времени», когда interval меньше длительности checkpoint.
Timeout: когда отменить зависший checkpoint
env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000); // 10 минут
Если checkpoint занимает дольше timeout, Flink отменяет его. По умолчанию 10 минут. Для больших state (несколько TB) может понадобиться 30+ минут.
Часто отменяющиеся checkpoint’ы (numCheckpointsFailed растёт) — сигнал, что state backend или storage не справляются. Симптомы:
- Slow S3/HDFS uploads.
- Slow RocksDB compactions.
- Slow async snapshots (slow disk).
Tolerable failed checkpoints
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
Сколько подряд провальных checkpoint’ов допустимо до того, как job упадёт. По умолчанию 0 — любой fail валит job. В production обычно 3-5, чтобы пережить транзиентные сбои S3 или сетевые джиттеры.
Retention: что хранится после job termination
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
Три варианта:
DELETE_ON_CANCELLATION(default до некоторых версий) — checkpoint удаляется когда job отменён вручную (flink cancel). При crash сохраняется.RETAIN_ON_CANCELLATION— checkpoint сохраняется и после ручной отмены. Можно restore.NO_EXTERNALIZED_CHECKPOINTS— checkpoint’ы внутренние, удаляются всегда. Не для production.
В production почти всегда нужен RETAIN_ON_CANCELLATION — даёт возможность откатиться при «упс, отменил не тот job».
Number of retained checkpoints
# flink-conf.yaml
state.checkpoints.num-retained: 3
Сколько последних checkpoint’ов хранить. По умолчанию 1 — Flink держит только последний. При получении нового — удаляет предыдущий.
Для production обычно 3-5: даёт окно отката при найденной багулине, не раздувая место в state backend.
Diagram: checkpoint lifecycle
Trigger checkpoint
JobManager инициирует checkpoint. Координатор посылает barrier во все source operators.Operators snapshot state
Каждый оператор получает barrier, делает snapshot своего state в state backend (S3, HDFS, local disk).Checkpoint complete
После завершения всех snapshot'ов координатор объявляет checkpoint complete. Metadata записывается в external path.Stored at s3 chk-42
State в S3: chk-42/ contains все snapshot файлы. Готово к восстановлению.Crash detected
TaskManager упал (OOM, network partition). Job обнаружил failure.Restore from chk-42
Job рестартует с last successful checkpoint chk-42. State восстановлен, source replays события с last committed offset.Production setup: рекомендуемый конфиг
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Checkpoint каждые 60 секунд в EXACTLY_ONCE режиме
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig cfg = env.getCheckpointConfig();
// Минимум 30 секунд между checkpoint'ами
cfg.setMinPauseBetweenCheckpoints(30_000);
// Timeout 10 минут
cfg.setCheckpointTimeout(10 * 60 * 1000);
// Терпим 3 подряд fail'а
cfg.setTolerableCheckpointFailureNumber(3);
// Сохранять при ручной отмене
cfg.setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// Только 1 concurrent checkpoint (не накладываются)
cfg.setMaxConcurrentCheckpoints(1);
# flink-conf.yaml — кластерный конфиг
state.checkpoints.dir: s3://flink-state/checkpoints/
state.savepoints.dir: s3://flink-state/savepoints/
state.checkpoints.num-retained: 3
# PyFlink: тот же конфиг
env.enable_checkpointing(60000, CheckpointingMode.EXACTLY_ONCE)
cfg = env.get_checkpoint_config()
cfg.set_min_pause_between_checkpoints(30000)
cfg.set_checkpoint_timeout(10 * 60 * 1000)
cfg.set_tolerable_checkpoint_failure_number(3)
cfg.enable_externalized_checkpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
)
cfg.set_max_concurrent_checkpoints(1)
Unaligned checkpoints (briefly)
Flink 1.11+ поддерживает unaligned checkpoints — оператор делает snapshot без ожидания barrier’ов из всех каналов. Это убирает alignment latency при backpressure.
cfg.enableUnalignedCheckpoints();
Trade-off: больше данных в snapshot (in-flight данные между операторами), но мгновенный checkpoint при backpressure. Полезно когда latency критична и есть периодические backpressure spikes.
Подробнее — в модуле 11.
Метрики, которые надо мониторить
В Flink UI и Prometheus:
| Метрика | Что значит | Когда alerting |
|---|---|---|
lastCheckpointDuration | Время последнего checkpoint | >50% от interval |
lastCheckpointSize | Размер snapshot | Растёт линейно — leak в state |
numberOfCompletedCheckpoints | Total successful | Должен расти |
numberOfFailedCheckpoints | Total failed | Растёт быстрее 1/час |
alignmentTime (per task) | Время ожидания barrier’ов | >5s — backpressure |
Попробуй сам
- Создай простой job с
enableCheckpointing(10000)— каждые 10 секунд. - Запусти и понаблюдай Flink UI -> Checkpoints — смотри как создаются.
- Поставь
interval = 100ms(искусственно мало). Запусти. Что происходит? (Спойлер: checkpoints начинают накладываться, job просядет в throughput.) - Верни нормально, сделай искусственный crash (
kill -9 taskmanager). Посмотри в UI как job рестартует с last checkpoint. - Поменяй на
EXTERNALIZED RETAIN_ON_CANCELLATION, отмени job, найди checkpoint в S3, рестартуй из него вручную черезflink run -s s3://.../chk-42 my-job.jar.
Ключевые выводы
- Checkpoint’ы выключены по умолчанию — нужно явно
enableCheckpointing(interval). - EXACTLY_ONCE (default) — с alignment, дороже но безопаснее. AT_LEAST_ONCE — быстрее, но возможны дубликаты.
intervalопределяет, сколько данных теряется при crash. Обычно 30-60 секунд.minPauseBetweenCheckpointsзащищает от наложения.timeoutотменяет зависшие.RETAIN_ON_CANCELLATIONобязателен в production — иначе ручная отмена удалит state.num-retained: 3даёт окно отката. МониторьтеnumberOfFailedCheckpointsиlastCheckpointDuration.