Learning Platform
Глоссарий Troubleshooting
Урок 12.02 · 22 мин
Средний
Chandy LamportCheckpoint BarriersAlignmentUnaligned

Chandy-Lamport барьеры: интуиция

В предыдущем уроке мы установили, что Flink даёт exactly-once внутри job. Механизм, который это обеспечивает — алгоритм Chandy-Lamport, опубликованный ещё в 1985 году для distributed snapshots. Flink адаптировал его под streaming, добавив барьеры — специальные маркеры в потоке данных.

В этом уроке разберём принцип работы барьеров на интуитивном уровне (без формальной теории), поймём механизм alignment, и кратко обсудим разницу aligned vs unaligned checkpoints. Глубокая теория — в курсе flink-internals; здесь — production-уровень понимания.


Проблема: consistent snapshot распределённого state

Представьте, что у нас Flink job с 3 операторами и parallelism = 4 у каждого. Это 12 параллельных subtask’ов, каждый с собственным локальным state. Cluster running.

Как сделать «фото» (snapshot) всего state в один логический момент времени?

Наивный подход: остановить все subtask’и в один и тот же момент, скопировать state, запустить обратно. Это stop-the-world, который убивает latency. Любая распределённая synchronization дорогая.

Лучший подход (Chandy-Lamport): не останавливать ничего. Использовать маркеры в потоке данных, которые разделят «уже зачекпоинченные» события от «будущих».

Chandy-Lamport internals: полный разбор Каждый оператор делает snapshot когда видит маркер. Поскольку маркеры идут в потоке вместе с событиями, естественно образуется consistent boundary.


Барьеры в потоке

Барьер — это специальная record в потоке данных, которая «протекает» через DAG операторов так же, как обычные события.

Барьер протекает через operator

Source events e1 e2 BARRIER e3 e4

Source. JobManager инициирует checkpoint — посылает барьер N в source. Барьер появляется в потоке между событиями.

Operator processes

Оператор обрабатывает события до барьера. Когда видит барьер — делает snapshot своего state.

Snapshot done forward barrier

После snapshot оператор отправляет барьер дальше. Snapshot записан в state backend. Оператор продолжает обработку e3, e4.

Downstream operator

Барьер пропагируется через всю DAG. Каждый оператор повторяет: обработать всё до барьера, snapshot state, переслать барьер.

Гарантия: snapshot одного оператора содержит state после обработки всех событий до барьера и до обработки событий после барьера. Это означает: snapshot всех операторов вместе образует consistent global snapshot.


Сценарий с parallelism

У реальных операторов несколько входных каналов:

                 e1, BAR, e3  -> channel A
   Operator <
                 e2, BAR, e4  -> channel B

Что делать оператору, который получил BAR из channel A, но из channel B ещё не получил?


Alignment: подождать всех

В aligned checkpoint оператор делает следующее:

  1. Получил BAR из channel A -> начинает alignment phase.
  2. События после BAR из channel A — буферизуются в памяти (не обрабатываются).
  3. События из channel B (всё до BAR) — продолжают обрабатываться нормально.
  4. Получил BAR из channel B -> alignment closed.
  5. Snapshot state.
  6. Forward barrier downstream.
  7. Обрабатывает буферизованные события (после BAR).
Aligned checkpoint: ожидание barrier из всех каналов

Channel A e1 BAR e3

Channel A: события e1, BAR, e3. Барьер пришёл первым.

Channel B e2 e5 BAR e4

Channel B: события e2, e5, BAR, e4. Барьер ещё в пути.
оператор получил BAR из A

Alignment phase

Оператор включает alignment: события с channel A после BAR (e3) буферизуются. Channel B продолжает течь.
buffer e3

Buffer e3 process e5

Буфер: e3 ждёт. Тем временем оператор обрабатывает e5 из channel B.

BAR from B snapshot then process e3 e4

Получили BAR из channel B. Alignment завершён. Делаем snapshot и обрабатываем буфер e3, потом e4.

Зачем alignment: snapshot должен содержать состояние после обработки всех событий до барьера и до событий после барьера. Если бы мы делали snapshot сразу после первого BAR, событие e5 из channel B (которое логически до барьера) могло бы попасть в snapshot или не попасть — нарушается consistency.

Цена alignment: буферизация и задержка. При backpressure (медленный channel B) барьеру из A приходится долго ждать. Latency растёт.


Unaligned checkpoints: skip alignment

Flink 1.11+ поддерживает unaligned checkpoints:

  1. Получил BAR из channel A -> сразу делает snapshot.
  2. События, находящиеся «в полёте» между операторами (in-flight buffers), включаются в snapshot.
  3. Не ждём BAR из других каналов.
env.getCheckpointConfig().enableUnalignedCheckpoints();

Преимущество: мгновенный checkpoint даже при backpressure. Alignment latency = 0.

Минус: snapshot больше (включает in-flight buffers), recovery медленнее (нужно восстановить in-flight данные).

AlignedUnaligned
Snapshot sizeМеньшеБольше (in-flight включены)
Checkpoint durationРастёт при backpressureПостоянная
Recovery speedБыстрееМедленнее
DefaultДаНет (нужно enable)
Хорошо дляСтабильная нагрузкаBackpressure spikes

В production обычно начинают с aligned (default), включают unaligned если есть периодические backpressure и checkpoint timeouts.


Что внутри барьера

Барьер — это маленькая структура: CheckpointBarrier(checkpoint_id, timestamp, type). Размер ~50 байт. Передаётся через network channels как обычная запись, но имеет особый тип, чтобы операторы её распознавали.

Несколько checkpoint’ов могут быть «в полёте» одновременно (есть maxConcurrentCheckpoints). Каждый имеет уникальный checkpoint_id. Оператор может держать alignment buffers для нескольких checkpoint’ов параллельно — но обычно конфигурят maxConcurrentCheckpoints = 1, чтобы не накладывались.


Crash и restore: атомарность snapshot

Что если crash случился во время checkpoint’а?

Сценарий: оператор делает snapshot, частично записал в state backend, и crash.

Что Flink делает:

  1. JobManager не получил confirmation от всех операторов -> checkpoint не помечается как complete.
  2. Незавершённый snapshot не используется для restore.
  3. Restore происходит из предыдущего complete checkpoint.

Это даёт атомарность checkpoint’а: либо ВСЕ операторы сохранили state, либо checkpoint считается failed и не используется. Нет partial checkpoint.

NOTE

JobManager хранит lastSuccessfulCheckpoint в его metadata. При restore читает именно его, игнорируя partial state записи от незавершённых checkpoint’ов. Старые checkpoint’ы хранятся согласно state.checkpoints.num-retained — Flink не удаляет их сразу.


Координация: как JobManager управляет

JobManager — координатор checkpoint’ов:

  1. Каждый interval секунд -> инициирует новый checkpoint.
  2. Посылает TriggerCheckpoint каждому source operator.
  3. Source operator посылает barrier в потоки.
  4. Каждый operator, завершив snapshot, посылает AcknowledgeCheckpoint в JobManager с указанием снапшота state location.
  5. Когда все операторы подтвердили — JobManager помечает checkpoint complete, обновляет metadata.

Если хоть один operator не подтвердил за checkpointTimeout — checkpoint failed, дальше зависит от tolerableCheckpointFailureNumber.


Diagram: barrier flow в DAG

Barrier пропагируется через DAG

JobManager triggers N

JobManager инициирует checkpoint N. Посылает TriggerCheckpoint каждому source.

Source 1 emits BAR N

Source 1 subtask: получает Trigger, эмитит BARRIER N в downstream channel.

Source 2 emits BAR N

Source 2 subtask: эмитит BARRIER N тоже.

Op 1 snapshot forward

Operator 1: получает BAR N из всех входных каналов, alignment finished, snapshot его state, forward BAR N.

Op 2 snapshot forward

Operator 2: тоже получает, snapshot, forward.

Sink ACK to JM

Sink operator: получает BAR N, snapshot, ACK в JobManager.

Checkpoint N complete

JobManager получил ACK от всех operators. Checkpoint N помечен complete. Metadata записана.

Что барьер не решает

Chandy-Lamport барьеры обеспечивают Flink-internal exactly-once. Они НЕ решают:

  • Source dedup: если source отправил событие дважды (например, Kafka producer без idempotence), барьеры не дедуплицируют.
  • Sink exactly-once delivery: барьеры дают consistent snapshot, но sink должен дополнительно делать transactional commit.
  • External system consistency: если ваш Flink job обновляет state и пишет в Redis, Redis может видеть обновлённый Redis state, но snapshot ещё не commit’ит. Restore приведёт к рассинхрону.

Барьеры — это necessary but not sufficient для end-to-end exactly-once. Без них ничего не работает; с ними плюс transactional sink — работает.


Попробуй сам

Включи Flink UI и наблюдай checkpoints в реальном времени:

  1. Запусти job со средним state (например, keyed aggregation).
  2. В UI -> Checkpoints -> Subtasks View — увидь Sync Duration, Async Duration, Alignment Duration.
  3. Создай backpressure: добавь искусственный Thread.sleep(100) в один из operators. Посмотри, как Alignment Duration подскочит.
  4. Включи unaligned checkpoints. Сравни alignment time — должен упасть до 0.

Ключевые выводы

  1. Барьер — специальная record в потоке, маркирует границу checkpoint’а. Протекает через DAG как обычное событие.
  2. Alignment: оператор ждёт барьер из всех input каналов, прежде чем snapshot. Гарантирует consistent snapshot.
  3. Alignment latency растёт при backpressure (медленный канал заставляет всех ждать).
  4. Unaligned checkpoints: пропускают alignment, включают in-flight buffers в snapshot. Быстрее при backpressure, дороже recovery.
  5. Атомарность: либо все операторы завершили snapshot, либо checkpoint failed. Нет partial state.
  6. JobManager координирует: trigger, ACK collection, complete metadata.
  7. Барьеры дают Flink-internal exactly-once, не end-to-end. Sink должен дополнить transactional commit.
Проверка знанийKnowledge check
Ваш Flink job имеет aligned checkpoints. В UI вы видите: Alignment Duration = 8 секунд (растёт), Sync Duration = 100ms, Async Duration = 200ms. Что это значит и какие три действия имеет смысл предпринять?
ОтветAnswer
Что это значит: операторы тратят 8 секунд на ожидание barrier из всех input каналов (alignment). Это в 40 раз дольше, чем сама работа snapshot (300ms total). Огромный alignment time = backpressure: какой-то input канал намного медленнее остальных. Channel с быстрым потоком эмитит барьер быстро, медленный — задерживается, и быстрые каналы буферизуют все события после барьера, ожидая медленного. Действия: (1) Включить unaligned checkpoints: env.getCheckpointConfig().enableUnalignedCheckpoints() — снимет alignment latency полностью. Trade-off: snapshot будет больше (in-flight buffers включены), но checkpoint станет мгновенным. (2) Найти источник backpressure: Flink UI -> Subtasks View -> Backpressure column покажет, какой оператор медленный. Обычно это sink с медленным external system или async I/O с малым capacity. (3) Решить root cause: увеличить parallelism медленного оператора, или async I/O capacity, или сменить sink на batch-friendly. Если backpressure временный (peak hours) — unaligned checkpoint снимает симптом без решения причины.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что такое checkpoint barrier в Flink и как он обеспечивает consistent snapshot?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4