Learning Platform
Глоссарий Troubleshooting
Урок 07.02 · 28 мин
Продвинутый
Aligned CheckpointUnaligned CheckpointFLIP-76Barrier AlignmentIn-flight Data

Aligned vs unaligned checkpoints

В прошлом уроке мы разобрали, как Asynchronous Barrier Snapshotting гарантирует consistent snapshot через barrier injection и alignment. Aligned checkpoints — это классический подход, описанный в оригинальной Chandy-Lamport adaptation: оператор ждёт barrier-ы со всех входных каналов перед тем как snapshot-ить state. Этот подход работает безотказно на здоровых job-ах, но катастрофически плох при backpressure: время alignment может превысить checkpoint interval, и checkpoint-ы перестают завершаться.

Конфигурация aligned/unaligned checkpoint в production

Чтобы это решить, в Flink 1.11 был введён unaligned mode (FLIP-76), стабилизирован в 1.13. Идея: разрешить barrier-у обогнать данные, а in-flight данные включить в snapshot как часть state. Этот компромисс меняет profile checkpoint-а — он становится быстрее, но больше по размеру и сложнее в реализации. В этом уроке разбираем оба режима, их внутренности и когда использовать какой.


Aligned mode: барьер ждёт всех

В aligned режиме оператор-приёмник с несколькими входными каналами работает по простому правилу: барьер чекпойнта X нельзя пересылать в downstream, пока он не получен по ВСЕМ входным каналам. Это правило реализовано через специальный компонент — CheckpointBarrierTracker (или CheckpointBarrierAligner в исторических версиях).

Когда первый barrier приходит по каналу C1, tracker помечает C1 как “barrier received”. События, продолжающие приходить по C1, буферизуются в очередь “for next checkpoint” — они логически принадлежат уже следующему snapshot и не должны быть обработаны до завершения текущего. События с других каналов (C2, C3 — те, по которым barrier ещё не пришёл) обрабатываются нормально.

input channels: C1, C2, C3
event sequence per channel:
  C1: e1, e2, BARRIER(42), e3, e4   <- after BARRIER e3,e4 in buffer
  C2: e5, BARRIER(42), e6           <- after BARRIER e6 in buffer
  C3: e7, e8, e9, BARRIER(42)       <- all 3 processed, then alignment done

Когда barrier пришёл по всем трём каналам, alignment complete. Оператор: (1) snapshot-ит свой state в state backend, (2) эмитит CheckpointBarrier(42) во все outgoing каналы, (3) начинает обрабатывать буферизованные события из C1/C2, (4) продолжает читать новые события со всех каналов нормально.

Aligned mode: barrier ждёт alignment перед propagation
Состояние оператора с 3 входами в момент частичного alignment
Input C1Barrier пришёл первым. Все события после barrier буферизуются в input buffer и не обрабатываются. Каждое такое событие занимает место в network buffer pool.
e3 e4 [BUFFER]Буферизованные события: e3, e4 пришли по C1 после barrier. Будут обработаны после завершения checkpoint.
Input C2Barrier пришёл вторым. Аналогично C1 — события после barrier буферизуются.
e6 [BUFFER]Буферизованное событие: e6 после barrier по C2.
Input C3Barrier ещё не пришёл. Subtask продолжает обрабатывать события из C3 нормально.
e7 e8 e9 -> processАктивная обработка событий с C3 продолжается, пока ждём barrier-а по C3.
После barrier по C3: snapshot, emit barrier, drain buffered
snapshot stateSnapshot state в state backend: RocksDB incremental или full. Async write в DFS.
then emit
emit barrier 42Эмиссия barrier 42 во все downstream channels. После этого начинается обработка буферизованных событий.

Что плохо в aligned режиме? Буферизация. Если события приходят по C1 быстрее, чем по C3 (skew или backpressure), буфер C1 растёт. У network buffer pool ограниченный размер (taskmanager.memory.network.fraction * heap). Когда буфер заполнится, upstream-оператор на C1 не сможет отправлять данные — backpressure распространяется обратно. В результате не только этот оператор тормозит, но и весь upstream.

Самая частая патология aligned mode — alignment timeout. Если barrier приходит по C3 через 10 минут после C1 (потому что upstream-task сильно тормозит), taskmanager.network.memory.buffer-debloat-target буферы переполнены, и checkpoint timeout-ится на execution.checkpointing.timeout (default 10 минут). В Web UI вы видите “Alignment Duration: 9m 30s” и failed checkpoint.


Unaligned mode: барьер обгоняет данные

Unaligned checkpoints (FLIP-76, 1.11+, stable 1.13+) ломают alignment-ожидание. Идея: вместо ожидания barrier-а по всем каналам, оператор немедленно эмитит barrier в downstream сразу после получения по первому каналу. Но это нарушает корректность Chandy-Lamport — что насчёт событий, которые пришли по другим каналам ДО barrier-а на этом канале?

Решение: in-flight данные становятся частью snapshot. Оператор snapshot-ит не только свой state (как в aligned mode), но и:

  • In-flight данные в input buffers — события, которые пришли по C1/C2/C3, но ещё не обработаны.
  • In-flight данные в output buffers — события, эмитнутые оператором, но ещё не отправленные через сеть.

При восстановлении из такого snapshot эти in-flight данные переотправляются в input/output buffers, и обработка продолжается с того же логического момента.

input channels: C1, C2, C3
event sequence per channel:
  C1: e1, e2, BARRIER(42)            <- barrier received, immediately forward
  C2: e5 [in network buffer]         <- e5 still in input buffer, NOT processed yet
  C3: e7 [in network buffer]         <- e7 still in input buffer, NOT processed yet
                                        + barrier 42 still in transit on C3

operator action:
  1. emit BARRIER(42) downstream immediately
  2. snapshot operator state
  3. snapshot in-flight buffers: [e5 on C2, e7 on C3, ... + any in output]
  4. continue processing
Unaligned mode: barrier overtakes data
Barrier пришёл по C1 и сразу пересылается downstream
C1Barrier пришёл первым. В отличие от aligned, оператор не буферизует e3, e4 — он немедленно эмитит barrier в downstream.
C2Barrier ещё не пришёл по C2. Событие e5 находится в network buffer C2, не обработано.
C3Аналогично C2 — событие e7 ещё в input buffer.
emit barrier 42 NOWШаг 1: эмиссия barrier во все downstream channels без ожидания. Скорость пропагации не зависит от backpressure.
then
snapshot state + in-flightШаг 2-3: snapshot operator state + snapshot in-flight данных в input/output буферах. Это и есть стоимость unaligned: больше данных в snapshot.
Snapshot включает { state, e5 on C2, e7 on C3, ... }

Корректность сохраняется: события e5 и e7 не теряются, потому что они есть в snapshot. При restore они будут восстановлены в input buffer-ы и обработаны “как будто только что пришли”. Семантика та же — событие обработано ровно один раз.


Trade-offs: скорость vs размер

Aligned и unaligned дают разные профили checkpoint-а:

ПараметрAlignedUnaligned
Alignment durationМожет быть очень большой (до timeout)Минимальный, не зависит от backpressure
Sync phase durationКороткий (только snapshot state)Длиннее (snapshot state + in-flight)
Async phase durationЗависит от size stateЗависит от size state + in-flight buffers
Checkpoint sizeState onlyState + in-flight buffers (может быть в 2-3x больше)
End-to-end latency under backpressureБольшая, может timeoutСтабильная
Recovery timeState restore + Kafka replayState restore + in-flight restore (быстрее в Kafka replay)
Memory pressure при checkpointВысокий (буферы заполнены)Средний (только sync phase)

Когда использовать unaligned:

  • Backpressure persistent. Если вы видите Alignment Duration > 100ms стабильно, unaligned почти всегда выгоден.
  • Очень асимметричный fan-in. Один input быстрый, другой медленный — alignment всегда будет долгим.
  • Strict checkpoint SLA. Регуляторные требования к checkpoint duration (например, max 30 секунд для exactly-once с external systems).

Когда оставить aligned:

  • Низкий backpressure. State уже большой, добавление in-flight данных удвоит cost.
  • Low parallelism, simple DAG. Один input на оператор — alignment мгновенный, нечего оптимизировать.
  • Большие network buffers. Если у вас 8 GB на TaskManager network memory, in-flight данные могут весить десятки гигабайт.

В Flink 1.13+ есть гибридный режим: aligned до alignment timeout (например 1 секунда), потом автоматический switch на unaligned для этого конкретного checkpoint-а. Включается через execution.checkpointing.aligned-checkpoint-timeout: 1s. Это компромисс: на здоровых checkpoint-ах работает aligned (быстрее, меньше), на проблемных автоматически активируется unaligned.

WARNING

Unaligned mode НЕ совместим с одним важным сценарием: rescale (изменение parallelism) из unaligned snapshot до Flink 1.14 был запрещён. Начиная с 1.14 поддержан, но требует full state rebalance — медленный restart. Если вы планируете частый rescale (Reactive mode, Kubernetes autoscaling), используйте aligned. Также unaligned не совместим с broadcast state одновременно — нужны aligned для broadcast operators.


Внутренности unaligned: как snapshot in-flight

Самая нетривиальная часть unaligned — как atomically snapshot буферы, через которые непрерывно идут данные. Стандартный Java MPSC queue не позволяет “fork copy” без блокировки writer-а.

Flink решает это через buffer reference counting. Когда наступает момент snapshot in-flight данных, текущие network buffers получают +1 reference и помечаются “owned by checkpoint”. Они не освобождаются обратно в pool, даже когда reader их прочитал. Async writer (отдельный thread) сериализует их в DFS вместе с metadata о том, какой канал и какой position они занимают. После завершения write reference счётчик уменьшается, и буферы возвращаются в pool.

Ключевые классы (Flink 2.2):

  • org.apache.flink.streaming.runtime.io.checkpointing.UnalignedBarrierHandler — barrier handler для unaligned mode.
  • org.apache.flink.runtime.io.network.partition.consumer.ChannelStateWriter — async writer для in-flight данных.
  • org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutor — executor, который пишет channel state в DFS.
  • org.apache.flink.runtime.io.network.buffer.NetworkBuffer — буфер с reference counting.

При restore картина обратная. JobMaster получает CompletedCheckpoint, читает metadata, понимает, какой канал какие в-flight данные должен получить. ChannelStateReader восстанавливает буферы и кладёт их в input/output gates перед началом обработки. Только после этого оператор начинает обычную обработку.


Production patterns

Pattern 1: симметричный switch. При первом включении unaligned для job-а делайте сначала savepoint в aligned, потом restart с execution.checkpointing.unaligned: true. Это даёт rollback path: если unaligned ведёт себя странно (out-of-memory, slow restore), можно откатиться на aligned savepoint.

Pattern 2: hybrid режим. В production обычно правильный default: aligned-checkpoint-timeout: 1s, unaligned: true. Большую часть времени работает aligned (быстрый), но при спайках нагрузки автоматически переключается на unaligned.

Pattern 3: исключение для broadcast operators. Если ваш job использует BroadcastState (например, rule streams), эти операторы должны быть в aligned mode даже когда остальные в unaligned. Flink делает это автоматически на основе topology analysis — broadcast inputs forcefully aligned.

Pattern 4: monitoring. Метрика lastCheckpointDuration показывает общее время. Для anomaly detection полезнее отдельные:

  • lastCheckpointAlignmentBuffered (bytes) — сколько данных буферизовано во время alignment.
  • lastCheckpointPersistedData — размер state на диске.
  • lastCheckpointPersistedSize — общий размер snapshot включая in-flight.

Большая разница между PersistedData и PersistedSize — индикатор того, что unaligned добавляет много in-flight cost.


Чтение source

  • org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler — base class для aligned/unaligned логики.
  • org.apache.flink.streaming.runtime.io.checkpointing.AlternatingController — реализация hybrid режима с автоматическим switch.
  • org.apache.flink.runtime.io.network.partition.consumer.CheckpointedInputGate — оборачивает обычный InputGate с barrier handling.
  • FLIP-76 в Apache Flink Wiki — дизайн-документ unaligned checkpoints со всеми deliberations.

Попробуй сам

  1. Измерьте Alignment Duration. На production job-е откройте Checkpoints -> History -> Details -> Subtask. Посмотрите столбец Alignment Duration. Если у вас стабильно > 100ms — кандидат на unaligned.

  2. Сравните режимы. На staging-job-е снимите бенчмарк: средняя checkpoint duration aligned vs unaligned за час нагрузки. Разница в throughput, разница в checkpoint size, разница в memory usage на TaskManager. Используйте Flink Web UI metrics.

  3. Спровоцируйте alignment timeout. Добавьте искусственный backpressure: Thread.sleep(1000) в одном из операторов. Запустите job в aligned mode — увидите failed checkpoint-ы с timeout. Включите unaligned — checkpoint начнёт проходить.

Проверка знанийKnowledge check
Вы запустили job в aligned mode. Видите Alignment Duration: 4 минуты, а checkpoint timeout: 10 минут. Checkpoint-ы проходят. Через неделю нагрузка увеличилась, Alignment Duration выросла до 11 минут, checkpoint-ы стали failing. Решили включить unaligned mode. Через день обнаружили, что size checkpoint-ов вырос в 3 раза, S3 затраты на storage растут. Что происходит и как смягчить?
ОтветAnswer
In-flight данные — главный driver size unaligned checkpoint-ов. При сильном backpressure network buffers переполнены, и в момент snapshot все эти буферы попадают в checkpoint. На pipeline с parallelism 100 и network buffer size 8MB на subtask это может дать дополнительные 800MB на checkpoint. Решения: 1) Уменьшить network buffer size через taskmanager.memory.network.fraction до минимума, который не вызывает throttling. 2) Включить buffer debloat (taskmanager.network.memory.buffer-debloat-enabled: true) — динамически уменьшает buffer size при backpressure. 3) Решить root cause backpressure — оптимизировать slow operator, добавить parallelism для него. 4) Использовать hybrid режим (aligned-checkpoint-timeout: 30s, unaligned: true) — это даст aligned для здоровых checkpoint-ов и unaligned только когда aligned не успевает. Долгосрочное решение — устранить backpressure: unaligned помогает, но это симптоматическое лечение, не cure.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. В unaligned mode что включается в snapshot, чего НЕТ в aligned mode?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5