Chandy-Lamport барьеры: интуиция
В предыдущем уроке мы установили, что Flink даёт exactly-once внутри job. Механизм, который это обеспечивает — алгоритм Chandy-Lamport, опубликованный ещё в 1985 году для distributed snapshots. Flink адаптировал его под streaming, добавив барьеры — специальные маркеры в потоке данных.
В этом уроке разберём принцип работы барьеров на интуитивном уровне (без формальной теории), поймём механизм alignment, и кратко обсудим разницу aligned vs unaligned checkpoints. Глубокая теория — в курсе flink-internals; здесь — production-уровень понимания.
Проблема: consistent snapshot распределённого state
Представьте, что у нас Flink job с 3 операторами и parallelism = 4 у каждого. Это 12 параллельных subtask’ов, каждый с собственным локальным state. Cluster running.
Как сделать «фото» (snapshot) всего state в один логический момент времени?
Наивный подход: остановить все subtask’и в один и тот же момент, скопировать state, запустить обратно. Это stop-the-world, который убивает latency. Любая распределённая synchronization дорогая.
Лучший подход (Chandy-Lamport): не останавливать ничего. Использовать маркеры в потоке данных, которые разделят «уже зачекпоинченные» события от «будущих».
Chandy-Lamport internals: полный разбор Каждый оператор делает snapshot когда видит маркер. Поскольку маркеры идут в потоке вместе с событиями, естественно образуется consistent boundary.
Барьеры в потоке
Барьер — это специальная record в потоке данных, которая «протекает» через DAG операторов так же, как обычные события.
Source events e1 e2 BARRIER e3 e4
Source. JobManager инициирует checkpoint — посылает барьер N в source. Барьер появляется в потоке между событиями.Operator processes
Оператор обрабатывает события до барьера. Когда видит барьер — делает snapshot своего state.Snapshot done forward barrier
После snapshot оператор отправляет барьер дальше. Snapshot записан в state backend. Оператор продолжает обработку e3, e4.Downstream operator
Барьер пропагируется через всю DAG. Каждый оператор повторяет: обработать всё до барьера, snapshot state, переслать барьер.Гарантия: snapshot одного оператора содержит state после обработки всех событий до барьера и до обработки событий после барьера. Это означает: snapshot всех операторов вместе образует consistent global snapshot.
Сценарий с parallelism
У реальных операторов несколько входных каналов:
e1, BAR, e3 -> channel A
Operator <
e2, BAR, e4 -> channel B
Что делать оператору, который получил BAR из channel A, но из channel B ещё не получил?
Alignment: подождать всех
В aligned checkpoint оператор делает следующее:
- Получил BAR из channel A -> начинает alignment phase.
- События после BAR из channel A — буферизуются в памяти (не обрабатываются).
- События из channel B (всё до BAR) — продолжают обрабатываться нормально.
- Получил BAR из channel B -> alignment closed.
- Snapshot state.
- Forward barrier downstream.
- Обрабатывает буферизованные события (после BAR).
Channel A e1 BAR e3
Channel A: события e1, BAR, e3. Барьер пришёл первым.Channel B e2 e5 BAR e4
Channel B: события e2, e5, BAR, e4. Барьер ещё в пути.Alignment phase
Оператор включает alignment: события с channel A после BAR (e3) буферизуются. Channel B продолжает течь.Buffer e3 process e5
Буфер: e3 ждёт. Тем временем оператор обрабатывает e5 из channel B.BAR from B snapshot then process e3 e4
Получили BAR из channel B. Alignment завершён. Делаем snapshot и обрабатываем буфер e3, потом e4.Зачем alignment: snapshot должен содержать состояние после обработки всех событий до барьера и до событий после барьера. Если бы мы делали snapshot сразу после первого BAR, событие e5 из channel B (которое логически до барьера) могло бы попасть в snapshot или не попасть — нарушается consistency.
Цена alignment: буферизация и задержка. При backpressure (медленный channel B) барьеру из A приходится долго ждать. Latency растёт.
Unaligned checkpoints: skip alignment
Flink 1.11+ поддерживает unaligned checkpoints:
- Получил BAR из channel A -> сразу делает snapshot.
- События, находящиеся «в полёте» между операторами (in-flight buffers), включаются в snapshot.
- Не ждём BAR из других каналов.
env.getCheckpointConfig().enableUnalignedCheckpoints();
Преимущество: мгновенный checkpoint даже при backpressure. Alignment latency = 0.
Минус: snapshot больше (включает in-flight buffers), recovery медленнее (нужно восстановить in-flight данные).
| Aligned | Unaligned | |
|---|---|---|
| Snapshot size | Меньше | Больше (in-flight включены) |
| Checkpoint duration | Растёт при backpressure | Постоянная |
| Recovery speed | Быстрее | Медленнее |
| Default | Да | Нет (нужно enable) |
| Хорошо для | Стабильная нагрузка | Backpressure spikes |
В production обычно начинают с aligned (default), включают unaligned если есть периодические backpressure и checkpoint timeouts.
Что внутри барьера
Барьер — это маленькая структура: CheckpointBarrier(checkpoint_id, timestamp, type). Размер ~50 байт. Передаётся через network channels как обычная запись, но имеет особый тип, чтобы операторы её распознавали.
Несколько checkpoint’ов могут быть «в полёте» одновременно (есть maxConcurrentCheckpoints). Каждый имеет уникальный checkpoint_id. Оператор может держать alignment buffers для нескольких checkpoint’ов параллельно — но обычно конфигурят maxConcurrentCheckpoints = 1, чтобы не накладывались.
Crash и restore: атомарность snapshot
Что если crash случился во время checkpoint’а?
Сценарий: оператор делает snapshot, частично записал в state backend, и crash.
Что Flink делает:
- JobManager не получил confirmation от всех операторов -> checkpoint не помечается как complete.
- Незавершённый snapshot не используется для restore.
- Restore происходит из предыдущего complete checkpoint.
Это даёт атомарность checkpoint’а: либо ВСЕ операторы сохранили state, либо checkpoint считается failed и не используется. Нет partial checkpoint.
JobManager хранит lastSuccessfulCheckpoint в его metadata. При restore читает именно его, игнорируя partial state записи от незавершённых checkpoint’ов. Старые checkpoint’ы хранятся согласно state.checkpoints.num-retained — Flink не удаляет их сразу.
Координация: как JobManager управляет
JobManager — координатор checkpoint’ов:
- Каждый interval секунд -> инициирует новый checkpoint.
- Посылает
TriggerCheckpointкаждому source operator. - Source operator посылает barrier в потоки.
- Каждый operator, завершив snapshot, посылает
AcknowledgeCheckpointв JobManager с указанием снапшота state location. - Когда все операторы подтвердили — JobManager помечает checkpoint complete, обновляет metadata.
Если хоть один operator не подтвердил за checkpointTimeout — checkpoint failed, дальше зависит от tolerableCheckpointFailureNumber.
Diagram: barrier flow в DAG
JobManager triggers N
JobManager инициирует checkpoint N. Посылает TriggerCheckpoint каждому source.Source 1 emits BAR N
Source 1 subtask: получает Trigger, эмитит BARRIER N в downstream channel.Source 2 emits BAR N
Source 2 subtask: эмитит BARRIER N тоже.Op 1 snapshot forward
Operator 1: получает BAR N из всех входных каналов, alignment finished, snapshot его state, forward BAR N.Op 2 snapshot forward
Operator 2: тоже получает, snapshot, forward.Sink ACK to JM
Sink operator: получает BAR N, snapshot, ACK в JobManager.Checkpoint N complete
JobManager получил ACK от всех operators. Checkpoint N помечен complete. Metadata записана.Что барьер не решает
Chandy-Lamport барьеры обеспечивают Flink-internal exactly-once. Они НЕ решают:
- Source dedup: если source отправил событие дважды (например, Kafka producer без idempotence), барьеры не дедуплицируют.
- Sink exactly-once delivery: барьеры дают consistent snapshot, но sink должен дополнительно делать transactional commit.
- External system consistency: если ваш Flink job обновляет state и пишет в Redis, Redis может видеть обновлённый Redis state, но snapshot ещё не commit’ит. Restore приведёт к рассинхрону.
Барьеры — это necessary but not sufficient для end-to-end exactly-once. Без них ничего не работает; с ними плюс transactional sink — работает.
Попробуй сам
Включи Flink UI и наблюдай checkpoints в реальном времени:
- Запусти job со средним state (например, keyed aggregation).
- В UI -> Checkpoints -> Subtasks View — увидь
Sync Duration,Async Duration,Alignment Duration. - Создай backpressure: добавь искусственный
Thread.sleep(100)в один из operators. Посмотри, какAlignment Durationподскочит. - Включи unaligned checkpoints. Сравни alignment time — должен упасть до 0.
Ключевые выводы
- Барьер — специальная record в потоке, маркирует границу checkpoint’а. Протекает через DAG как обычное событие.
- Alignment: оператор ждёт барьер из всех input каналов, прежде чем snapshot. Гарантирует consistent snapshot.
- Alignment latency растёт при backpressure (медленный канал заставляет всех ждать).
- Unaligned checkpoints: пропускают alignment, включают in-flight buffers в snapshot. Быстрее при backpressure, дороже recovery.
- Атомарность: либо все операторы завершили snapshot, либо checkpoint failed. Нет partial state.
- JobManager координирует: trigger, ACK collection, complete metadata.
- Барьеры дают Flink-internal exactly-once, не end-to-end. Sink должен дополнить transactional commit.