Learning Platform
Глоссарий Troubleshooting
Урок 07.01 · 28 мин
Продвинутый
Chandy-LamportABSCheckpoint BarriersAsynchronous SnapshottingDistributed Snapshots

Chandy-Lamport и Asynchronous Barrier Snapshotting

Когда Flink-job работает в production, тысячи событий в секунду текут через DAG операторов, каждый оператор накапливает состояние в RocksDB, и где-то на S3 лежат checkpoint-ы, которые позволяют восстановить job после падения TaskManager или kernel panic. Это базовая обещанная гарантия Flink — exactly-once семантика на bounded и unbounded потоках. Но как именно она реализована? Как вы делаете консистентный snapshot распределённой системы из десятков операторов, между которыми летят события, без остановки всего пайплайна?

Ответ — это адаптация алгоритма Chandy-Lamport 1985 года, который в Flink называется Asynchronous Barrier Snapshotting (ABS).

Chandy-Lamport барьеры: интуиция (практический курс)

Конфигурация checkpoint-ов в Flink Понимание этого алгоритма — фундамент для всего, что мы будем разбирать дальше: aligned/unaligned checkpoints, savepoints, schema evolution, exactly-once с 2PC. Если вы не понимаете ABS, вы не понимаете Flink.


Проблема: snapshot распределённой системы

Представьте job из четырёх операторов, выполняющийся на трёх TaskManager-ах. У каждого оператора своё in-memory state, между ними летят события по network buffer-ам. Вы хотите сделать snapshot — такое состояние всех операторов и каналов, которое можно восстановить, если что-то упадёт.

Наивный подход — остановить мир. Послать команду “стоп” всем операторам, дождаться, пока все они закончат обработку текущего события, дождаться, пока сетевые буферы опустеют, и тогда дампить состояние. Это работает, но catastrophic для latency: при checkpoint interval 30 секунд и snapshot duration 5 секунд вы теряете 1/6 throughput.

Второй подход — каждый оператор snapshot-ит независимо в произвольный момент. Тоже не работает: snapshots разных операторов будут сделаны в разное логическое время, и при восстановлении вы получите inconsistent state. Событие, которое уже обработал downstream-оператор, может ещё не быть отражено в snapshot upstream — после restore оно будет обработано второй раз.

Правильный подход — алгоритм Chandy-Lamport, опубликованный в 1985 году. Идея: в каждый из каналов системы инжектируется специальное сообщение — marker (в Flink — barrier). Каждый процесс, получив marker по первому каналу, делает snapshot своего состояния, потом пересылает marker по всем outgoing-каналам. Каждый процесс, получив marker по каналу, который уже не первый, фиксирует все события, пришедшие до marker как часть snapshot канала.

В Flink эта идея реализована с одним важным упрощением: DAG операторов — это направленный ациклический граф (с одним исключением — iterations, о которых отдельно). Это позволяет упростить логику: barrier-ы инжектируются только в источниках, и каждый оператор просто ждёт barrier-ов от всех своих входных каналов.


Анатомия barrier

CheckpointBarrier в Flink — это специальный StreamElement, который течёт через DAG вместе с обычными данными. У него три ключевых поля:

public class CheckpointBarrier extends RuntimeEvent {
    private final long id;                       // ID checkpoint-а, монотонно растёт
    private final long timestamp;                // Время инъекции в source
    private final CheckpointOptions options;     // aligned/unaligned, savepoint, etc
}

Barrier инжектируется CheckpointCoordinator-ом в JobManager-е. Coordinator отправляет RPC-вызов triggerCheckpoint(checkpointId, timestamp) всем source-операторам job-а. Source-оператор, получив этот вызов, делает три вещи в строгом порядке:

  1. Snapshot своего собственного state (для Kafka source — это commit offset-ов в state backend, для file source — текущая позиция в файле).
  2. Создаёт CheckpointBarrier и отправляет его во все outgoing-каналы (для каждой downstream-subtask).
  3. Продолжает читать данные из внешней системы и пересылать их в downstream.

Этот момент — момент создания barrier-а в source — определяет point-in-time consistency snapshot-а. Все события, отправленные source-ом до barrier, должны быть включены в snapshot. Все события после barrier — в следующий snapshot (или ещё позже).

Дальше barrier течёт через DAG. Когда оператор-приёмник получает barrier по каналу C1, у него обычно ещё несколько входных каналов C2, C3 — по ним barrier ещё не пришёл. Что делать с событиями, которые приходят по C2/C3? Здесь начинается развилка: aligned mode (классический Chandy-Lamport) или unaligned mode (Flink-специфичный).

В aligned режиме оператор начинает буферизовать события из C2/C3 — они логически “относятся к следующему checkpoint-у”. Когда barrier пришёл по всем каналам, оператор snapshot-ит свой state и пересылает barrier дальше. Буферизованные события начинают обрабатываться.

В unaligned режиме оператор пересылает barrier в downstream сразу, не дожидаясь остальных каналов, и snapshot включает не только state, но и in-flight данные в input/output буферах. Подробности — в следующем уроке.

NOTE

Barrier — это control message, который течёт ровно по тем же каналам, что и данные. Это критично: barrier не может “обогнать” события, отправленные source-ом до него, потому что Flink использует FIFO-каналы. Это свойство FIFO даёт алгоритму корректность: всё, что упало в канал до barrier, гарантированно обработано до barrier на приёмнике.


Sequence: жизнь одного checkpoint-а

Давайте проследим типичный checkpoint от триггера в JobManager до завершения в state backend.

Жизненный цикл checkpoint в Flink (aligned mode)
CheckpointCoordinator (JM)
Source operator
Stateful operator
Sink operator
DFS (S3/HDFS)
triggerCheckpoint(id=42, ts)snapshot source stateCheckpointBarrier(id=42)align: wait for barriers from all inputssnapshot keyed stateCheckpointBarrier(id=42)snapshot sink state (precommit)acknowledgeCheckpoint(id=42, stateHandles)complete checkpoint when all acks receivednotifyCheckpointComplete(id=42)

Несколько важных деталей в этом sequence:

triggerCheckpoint отправляется параллельно всем source-task-ам. Они не координируются между собой — barrier инжектируется независимо в каждом source-е. Это даёт горизонтальную масштабируемость: 1000 sources на разных TaskManager-ах одновременно инжектируют barrier без узких мест в JobManager.

Snapshot operator state происходит асинхронно. Sync-phase (короткая) — копирование state на heap. Async-phase (долгая) — flush на DFS. Это критично для performance: оператор может продолжать обрабатывать события, пока его state пишется на S3.

acknowledgeCheckpoint — каждый оператор отдельно ack-ит свой part snapshot. Coordinator собирает ack-и в TreeMap по checkpoint ID и понимает, когда checkpoint завершён. Если в течение checkpoint.timeout (default 10 минут) хоть один ack не пришёл — checkpoint abort-ится.

notifyCheckpointComplete — это второй RPC от Coordinator-а ко всем операторам после полного завершения. Большинство операторов игнорируют его. Но 2PC sink (Kafka transactional producer) использует это уведомление как сигнал к commit транзакций — это и есть точка, когда данные становятся видимыми в Kafka.


Где это всё живёт в коде

Главные классы (для Flink 2.2):

  • org.apache.flink.runtime.checkpoint.CheckpointCoordinator — singleton в JobManager, оркестрирует жизненный цикл.
  • org.apache.flink.runtime.checkpoint.PendingCheckpoint — состояние checkpoint-а до завершения. Хранит набор pending ack-ов.
  • org.apache.flink.runtime.checkpoint.CompletedCheckpoint — финальный объект с metadata после успеха.
  • org.apache.flink.runtime.io.network.api.CheckpointBarrier — сам barrier event.
  • org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker — barrier alignment logic в оператор-приёмнике.
  • org.apache.flink.runtime.checkpoint.SubtaskState — snapshot state одной parallel subtask (keyed + operator + raw).

CheckpointCoordinator работает на отдельном thread-pool (Checkpoints pool в JobManager-е). Это важно: даже если RPC-вызовы к TaskManager-ам блокируются на десятки секунд, JobManager-овский main thread не страдает.


Где Chandy-Lamport отличается от ABS

Оригинальный Chandy-Lamport был сформулирован для общих distributed systems с unconstrained communication graph (произвольный граф, циклы, недетерминированная topology). ABS в Flink делает три упрощения:

Первое: DAG топология. Flink job — это DAG. Это означает, что barrier-ы текут только в одном направлении (от sources к sinks), и нет проблемы “barrier пришёл, когда я ещё не отправил свой”. Если есть iterations (DataStream.iterate), Flink использует специальную логику — checkpoint-ит часть state, относящуюся к iteration, через специальный broadcast канал. В 2.2 iterations считаются legacy, рекомендация — использовать stateful processing для итеративных алгоритмов.

Второе: in-channel state не нужен (в aligned mode). Оригинальный Chandy-Lamport требует записать состояние каналов — все события, пришедшие после marker по каналу-приёмнику до marker по каналу-источнику. В Flink aligned mode этого не нужно: оператор просто ждёт barrier-ов и буферизует данные. Эта буферизация эквивалентна “state канала” — но не записывается в snapshot.

Третье: ack-protocol через JobManager. Оригинальный алгоритм децентрализованный — каждый процесс сам решает, когда snapshot завершён. В Flink есть центральный координатор, который собирает ack-и и фиксирует checkpoint глобально. Это нужно для упрощения rollback: при restart Coordinator знает, какой последний consistent snapshot, и инструктирует всех операторов начать оттуда.

В unaligned mode Flink частично возвращается к оригинальному Chandy-Lamport: in-flight данные из input buffers становятся частью snapshot. Подробности — в следующем уроке.


Корректность ABS

Почему этот алгоритм даёт consistent snapshot? Доказательство опирается на два инварианта:

Инвариант FIFO. Каждый network channel в Flink (через BufferConsumer) гарантирует, что события доставляются в том порядке, в котором они отправлены. Barrier — обычный event в этом канале, не “out-of-band signal”.

Инвариант barrier alignment. Оператор начинает snapshot своего state только после получения barrier-ов со всех входов. До этого момента он мог обработать события, отправленные ДО barrier-ов на upstream-операторах, но не мог обработать события, отправленные ПОСЛЕ.

Вместе эти инварианты дают: snapshot оператора A “видит” в точности тот набор событий, который был отправлен ему до момента barrier injection в источниках. Это и есть consistent snapshot — все события, входящие в snapshot, отправлены и обработаны; никаких событий “из будущего” в snapshot нет.

При restart Flink восстанавливает state каждого оператора из последнего CompletedCheckpoint и продолжает обработку с offset-ов, зафиксированных в snapshot source-ов. Все события, которые были в полёте между операторами на момент snapshot, будут переотправлены — но это безопасно, потому что snapshot операторов “не знает” о них, и они будут обработаны как новые.

TIP

Понимание ABS объясняет, почему checkpoint duration зависит не от размера state, а от backpressure: если оператор не успевает обрабатывать данные, barrier долго стоит в очереди input buffer, alignment занимает минуты, checkpoint timeout-ится. На уровне симптомов вы видите “checkpoint failures”, но root cause — overloaded operator.


Чтение source

  • org.apache.flink.runtime.checkpoint.CheckpointCoordinator#triggerCheckpoint — entry point. Здесь логика выбора момента триггера и формирования PendingCheckpoint.
  • org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl#initCheckpoint — что происходит в каждой subtask при получении barrier.
  • org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate#getNextBufferOrEvent — где barrier приходит в input gate и попадает в barrier handler.
  • org.apache.flink.streaming.runtime.io.checkpointing.AlignedBarrierHandlerState — состояние alignment logic.

FLIP-76 (unaligned checkpoints) и FLIP-183 (changelog state backend) — proposals для дальнейших улучшений, читать в Apache Flink Wiki.


Попробуй сам

  1. Найдите CheckpointBarrier в Flink Web UI. На запущенном job-е перейдите Checkpoints -> Details -> Subtasks. Колонка “Alignment Duration” показывает, сколько subtask ждала barrier-ов от всех входов. Если она > 1 секунды — у вас backpressure или skew.

  2. Включите checkpoint logs. Добавьте в log4j2.xml: <Logger name="org.apache.flink.runtime.checkpoint" level="DEBUG"/>. В JobManager-логах вы увидите triggerCheckpoint вызовы, ack-и от каждой subtask, completion events.

  3. Сравните overhead aligned vs unaligned. Включите execution.checkpointing.unaligned: true и сравните среднюю checkpoint duration на одном и том же job-е. На high-backpressure job-ах разница может быть 10x.

Проверка знанийKnowledge check
Source оператор в Kafka source-е получает triggerCheckpoint(id=100). Он snapshot-ит свои offsets и эмитит CheckpointBarrier(id=100) во все downstream каналы. Сразу после этого приходит новое событие из Kafka. Что Flink с ним сделает: 1) включит в snapshot id=100, 2) включит в snapshot id=101, 3) пропустит, 4) обработает и не включит ни в один snapshot?
ОтветAnswer
Ответ 2 — событие будет включено в следующий snapshot (id=101). Это критический момент алгоритма ABS: snapshot source-а делается СИНХРОННО с эмиссией barrier-а, до того как новые события прочитаны. После эмиссии barrier source продолжает читать из Kafka и эмитить события в downstream, но эти события "логически" принадлежат уже к следующему checkpoint-у — они придут после barrier-а в FIFO канале, и downstream-операторы будут их буферизовать (aligned mode) или обрабатывать как часть следующего checkpoint state. Когда придёт triggerCheckpoint(id=101), source снова snapshot-ит offsets (теперь уже сдвинутые) и эмитит CheckpointBarrier(id=101). Все события, эмитнутые между barrier 100 и barrier 101, относятся к snapshot 101. Это даёт exactly-once: при restore из snapshot 100 Kafka offsets откатятся, и эти события будут перечитаны и переэмитнуты — а downstream state восстановится из snapshot 100, в который они не входили.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. В Flink ABS (Asynchronous Barrier Snapshotting) барьеры инжектируются в каких операторах?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5