Chandy-Lamport и Asynchronous Barrier Snapshotting
Когда Flink-job работает в production, тысячи событий в секунду текут через DAG операторов, каждый оператор накапливает состояние в RocksDB, и где-то на S3 лежат checkpoint-ы, которые позволяют восстановить job после падения TaskManager или kernel panic. Это базовая обещанная гарантия Flink — exactly-once семантика на bounded и unbounded потоках. Но как именно она реализована? Как вы делаете консистентный snapshot распределённой системы из десятков операторов, между которыми летят события, без остановки всего пайплайна?
Ответ — это адаптация алгоритма Chandy-Lamport 1985 года, который в Flink называется Asynchronous Barrier Snapshotting (ABS).
Chandy-Lamport барьеры: интуиция (практический курс)Конфигурация checkpoint-ов в Flink Понимание этого алгоритма — фундамент для всего, что мы будем разбирать дальше: aligned/unaligned checkpoints, savepoints, schema evolution, exactly-once с 2PC. Если вы не понимаете ABS, вы не понимаете Flink.
Проблема: snapshot распределённой системы
Представьте job из четырёх операторов, выполняющийся на трёх TaskManager-ах. У каждого оператора своё in-memory state, между ними летят события по network buffer-ам. Вы хотите сделать snapshot — такое состояние всех операторов и каналов, которое можно восстановить, если что-то упадёт.
Наивный подход — остановить мир. Послать команду “стоп” всем операторам, дождаться, пока все они закончат обработку текущего события, дождаться, пока сетевые буферы опустеют, и тогда дампить состояние. Это работает, но catastrophic для latency: при checkpoint interval 30 секунд и snapshot duration 5 секунд вы теряете 1/6 throughput.
Второй подход — каждый оператор snapshot-ит независимо в произвольный момент. Тоже не работает: snapshots разных операторов будут сделаны в разное логическое время, и при восстановлении вы получите inconsistent state. Событие, которое уже обработал downstream-оператор, может ещё не быть отражено в snapshot upstream — после restore оно будет обработано второй раз.
Правильный подход — алгоритм Chandy-Lamport, опубликованный в 1985 году. Идея: в каждый из каналов системы инжектируется специальное сообщение — marker (в Flink — barrier). Каждый процесс, получив marker по первому каналу, делает snapshot своего состояния, потом пересылает marker по всем outgoing-каналам. Каждый процесс, получив marker по каналу, который уже не первый, фиксирует все события, пришедшие до marker как часть snapshot канала.
В Flink эта идея реализована с одним важным упрощением: DAG операторов — это направленный ациклический граф (с одним исключением — iterations, о которых отдельно). Это позволяет упростить логику: barrier-ы инжектируются только в источниках, и каждый оператор просто ждёт barrier-ов от всех своих входных каналов.
Анатомия barrier
CheckpointBarrier в Flink — это специальный StreamElement, который течёт через DAG вместе с обычными данными. У него три ключевых поля:
public class CheckpointBarrier extends RuntimeEvent {
private final long id; // ID checkpoint-а, монотонно растёт
private final long timestamp; // Время инъекции в source
private final CheckpointOptions options; // aligned/unaligned, savepoint, etc
}
Barrier инжектируется CheckpointCoordinator-ом в JobManager-е. Coordinator отправляет RPC-вызов triggerCheckpoint(checkpointId, timestamp) всем source-операторам job-а. Source-оператор, получив этот вызов, делает три вещи в строгом порядке:
- Snapshot своего собственного state (для Kafka source — это commit offset-ов в state backend, для file source — текущая позиция в файле).
- Создаёт CheckpointBarrier и отправляет его во все outgoing-каналы (для каждой downstream-subtask).
- Продолжает читать данные из внешней системы и пересылать их в downstream.
Этот момент — момент создания barrier-а в source — определяет point-in-time consistency snapshot-а. Все события, отправленные source-ом до barrier, должны быть включены в snapshot. Все события после barrier — в следующий snapshot (или ещё позже).
Дальше barrier течёт через DAG. Когда оператор-приёмник получает barrier по каналу C1, у него обычно ещё несколько входных каналов C2, C3 — по ним barrier ещё не пришёл. Что делать с событиями, которые приходят по C2/C3? Здесь начинается развилка: aligned mode (классический Chandy-Lamport) или unaligned mode (Flink-специфичный).
В aligned режиме оператор начинает буферизовать события из C2/C3 — они логически “относятся к следующему checkpoint-у”. Когда barrier пришёл по всем каналам, оператор snapshot-ит свой state и пересылает barrier дальше. Буферизованные события начинают обрабатываться.
В unaligned режиме оператор пересылает barrier в downstream сразу, не дожидаясь остальных каналов, и snapshot включает не только state, но и in-flight данные в input/output буферах. Подробности — в следующем уроке.
Barrier — это control message, который течёт ровно по тем же каналам, что и данные. Это критично: barrier не может “обогнать” события, отправленные source-ом до него, потому что Flink использует FIFO-каналы. Это свойство FIFO даёт алгоритму корректность: всё, что упало в канал до barrier, гарантированно обработано до barrier на приёмнике.
Sequence: жизнь одного checkpoint-а
Давайте проследим типичный checkpoint от триггера в JobManager до завершения в state backend.
Несколько важных деталей в этом sequence:
triggerCheckpoint отправляется параллельно всем source-task-ам. Они не координируются между собой — barrier инжектируется независимо в каждом source-е. Это даёт горизонтальную масштабируемость: 1000 sources на разных TaskManager-ах одновременно инжектируют barrier без узких мест в JobManager.
Snapshot operator state происходит асинхронно. Sync-phase (короткая) — копирование state на heap. Async-phase (долгая) — flush на DFS. Это критично для performance: оператор может продолжать обрабатывать события, пока его state пишется на S3.
acknowledgeCheckpoint — каждый оператор отдельно ack-ит свой part snapshot. Coordinator собирает ack-и в TreeMap по checkpoint ID и понимает, когда checkpoint завершён. Если в течение checkpoint.timeout (default 10 минут) хоть один ack не пришёл — checkpoint abort-ится.
notifyCheckpointComplete — это второй RPC от Coordinator-а ко всем операторам после полного завершения. Большинство операторов игнорируют его. Но 2PC sink (Kafka transactional producer) использует это уведомление как сигнал к commit транзакций — это и есть точка, когда данные становятся видимыми в Kafka.
Где это всё живёт в коде
Главные классы (для Flink 2.2):
org.apache.flink.runtime.checkpoint.CheckpointCoordinator— singleton в JobManager, оркестрирует жизненный цикл.org.apache.flink.runtime.checkpoint.PendingCheckpoint— состояние checkpoint-а до завершения. Хранит набор pending ack-ов.org.apache.flink.runtime.checkpoint.CompletedCheckpoint— финальный объект с metadata после успеха.org.apache.flink.runtime.io.network.api.CheckpointBarrier— сам barrier event.org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker— barrier alignment logic в оператор-приёмнике.org.apache.flink.runtime.checkpoint.SubtaskState— snapshot state одной parallel subtask (keyed + operator + raw).
CheckpointCoordinator работает на отдельном thread-pool (Checkpoints pool в JobManager-е). Это важно: даже если RPC-вызовы к TaskManager-ам блокируются на десятки секунд, JobManager-овский main thread не страдает.
Где Chandy-Lamport отличается от ABS
Оригинальный Chandy-Lamport был сформулирован для общих distributed systems с unconstrained communication graph (произвольный граф, циклы, недетерминированная topology). ABS в Flink делает три упрощения:
Первое: DAG топология. Flink job — это DAG. Это означает, что barrier-ы текут только в одном направлении (от sources к sinks), и нет проблемы “barrier пришёл, когда я ещё не отправил свой”. Если есть iterations (DataStream.iterate), Flink использует специальную логику — checkpoint-ит часть state, относящуюся к iteration, через специальный broadcast канал. В 2.2 iterations считаются legacy, рекомендация — использовать stateful processing для итеративных алгоритмов.
Второе: in-channel state не нужен (в aligned mode). Оригинальный Chandy-Lamport требует записать состояние каналов — все события, пришедшие после marker по каналу-приёмнику до marker по каналу-источнику. В Flink aligned mode этого не нужно: оператор просто ждёт barrier-ов и буферизует данные. Эта буферизация эквивалентна “state канала” — но не записывается в snapshot.
Третье: ack-protocol через JobManager. Оригинальный алгоритм децентрализованный — каждый процесс сам решает, когда snapshot завершён. В Flink есть центральный координатор, который собирает ack-и и фиксирует checkpoint глобально. Это нужно для упрощения rollback: при restart Coordinator знает, какой последний consistent snapshot, и инструктирует всех операторов начать оттуда.
В unaligned mode Flink частично возвращается к оригинальному Chandy-Lamport: in-flight данные из input buffers становятся частью snapshot. Подробности — в следующем уроке.
Корректность ABS
Почему этот алгоритм даёт consistent snapshot? Доказательство опирается на два инварианта:
Инвариант FIFO. Каждый network channel в Flink (через BufferConsumer) гарантирует, что события доставляются в том порядке, в котором они отправлены. Barrier — обычный event в этом канале, не “out-of-band signal”.
Инвариант barrier alignment. Оператор начинает snapshot своего state только после получения barrier-ов со всех входов. До этого момента он мог обработать события, отправленные ДО barrier-ов на upstream-операторах, но не мог обработать события, отправленные ПОСЛЕ.
Вместе эти инварианты дают: snapshot оператора A “видит” в точности тот набор событий, который был отправлен ему до момента barrier injection в источниках. Это и есть consistent snapshot — все события, входящие в snapshot, отправлены и обработаны; никаких событий “из будущего” в snapshot нет.
При restart Flink восстанавливает state каждого оператора из последнего CompletedCheckpoint и продолжает обработку с offset-ов, зафиксированных в snapshot source-ов. Все события, которые были в полёте между операторами на момент snapshot, будут переотправлены — но это безопасно, потому что snapshot операторов “не знает” о них, и они будут обработаны как новые.
Понимание ABS объясняет, почему checkpoint duration зависит не от размера state, а от backpressure: если оператор не успевает обрабатывать данные, barrier долго стоит в очереди input buffer, alignment занимает минуты, checkpoint timeout-ится. На уровне симптомов вы видите “checkpoint failures”, но root cause — overloaded operator.
Чтение source
org.apache.flink.runtime.checkpoint.CheckpointCoordinator#triggerCheckpoint— entry point. Здесь логика выбора момента триггера и формирования PendingCheckpoint.org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl#initCheckpoint— что происходит в каждой subtask при получении barrier.org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#getNextBufferOrEvent— где barrier приходит в input gate и попадает в barrier handler.org.apache.flink.streaming.runtime.io.checkpointing.AlignedBarrierHandlerState— состояние alignment logic.
FLIP-76 (unaligned checkpoints) и FLIP-183 (changelog state backend) — proposals для дальнейших улучшений, читать в Apache Flink Wiki.
Попробуй сам
-
Найдите CheckpointBarrier в Flink Web UI. На запущенном job-е перейдите Checkpoints -> Details -> Subtasks. Колонка “Alignment Duration” показывает, сколько subtask ждала barrier-ов от всех входов. Если она > 1 секунды — у вас backpressure или skew.
-
Включите checkpoint logs. Добавьте в log4j2.xml:
<Logger name="org.apache.flink.runtime.checkpoint" level="DEBUG"/>. В JobManager-логах вы увидите triggerCheckpoint вызовы, ack-и от каждой subtask, completion events. -
Сравните overhead aligned vs unaligned. Включите
execution.checkpointing.unaligned: trueи сравните среднюю checkpoint duration на одном и том же job-е. На high-backpressure job-ах разница может быть 10x.