Aligned vs unaligned checkpoints
В прошлом уроке мы разобрали, как Asynchronous Barrier Snapshotting гарантирует consistent snapshot через barrier injection и alignment. Aligned checkpoints — это классический подход, описанный в оригинальной Chandy-Lamport adaptation: оператор ждёт barrier-ы со всех входных каналов перед тем как snapshot-ить state. Этот подход работает безотказно на здоровых job-ах, но катастрофически плох при backpressure: время alignment может превысить checkpoint interval, и checkpoint-ы перестают завершаться.
Конфигурация aligned/unaligned checkpoint в productionЧтобы это решить, в Flink 1.11 был введён unaligned mode (FLIP-76), стабилизирован в 1.13. Идея: разрешить barrier-у обогнать данные, а in-flight данные включить в snapshot как часть state. Этот компромисс меняет profile checkpoint-а — он становится быстрее, но больше по размеру и сложнее в реализации. В этом уроке разбираем оба режима, их внутренности и когда использовать какой.
Aligned mode: барьер ждёт всех
В aligned режиме оператор-приёмник с несколькими входными каналами работает по простому правилу: барьер чекпойнта X нельзя пересылать в downstream, пока он не получен по ВСЕМ входным каналам. Это правило реализовано через специальный компонент — CheckpointBarrierTracker (или CheckpointBarrierAligner в исторических версиях).
Когда первый barrier приходит по каналу C1, tracker помечает C1 как “barrier received”. События, продолжающие приходить по C1, буферизуются в очередь “for next checkpoint” — они логически принадлежат уже следующему snapshot и не должны быть обработаны до завершения текущего. События с других каналов (C2, C3 — те, по которым barrier ещё не пришёл) обрабатываются нормально.
input channels: C1, C2, C3
event sequence per channel:
C1: e1, e2, BARRIER(42), e3, e4 <- after BARRIER e3,e4 in buffer
C2: e5, BARRIER(42), e6 <- after BARRIER e6 in buffer
C3: e7, e8, e9, BARRIER(42) <- all 3 processed, then alignment done
Когда barrier пришёл по всем трём каналам, alignment complete. Оператор: (1) snapshot-ит свой state в state backend, (2) эмитит CheckpointBarrier(42) во все outgoing каналы, (3) начинает обрабатывать буферизованные события из C1/C2, (4) продолжает читать новые события со всех каналов нормально.
Что плохо в aligned режиме? Буферизация. Если события приходят по C1 быстрее, чем по C3 (skew или backpressure), буфер C1 растёт. У network buffer pool ограниченный размер (taskmanager.memory.network.fraction * heap). Когда буфер заполнится, upstream-оператор на C1 не сможет отправлять данные — backpressure распространяется обратно. В результате не только этот оператор тормозит, но и весь upstream.
Самая частая патология aligned mode — alignment timeout. Если barrier приходит по C3 через 10 минут после C1 (потому что upstream-task сильно тормозит), taskmanager.network.memory.buffer-debloat-target буферы переполнены, и checkpoint timeout-ится на execution.checkpointing.timeout (default 10 минут). В Web UI вы видите “Alignment Duration: 9m 30s” и failed checkpoint.
Unaligned mode: барьер обгоняет данные
Unaligned checkpoints (FLIP-76, 1.11+, stable 1.13+) ломают alignment-ожидание. Идея: вместо ожидания barrier-а по всем каналам, оператор немедленно эмитит barrier в downstream сразу после получения по первому каналу. Но это нарушает корректность Chandy-Lamport — что насчёт событий, которые пришли по другим каналам ДО barrier-а на этом канале?
Решение: in-flight данные становятся частью snapshot. Оператор snapshot-ит не только свой state (как в aligned mode), но и:
- In-flight данные в input buffers — события, которые пришли по C1/C2/C3, но ещё не обработаны.
- In-flight данные в output buffers — события, эмитнутые оператором, но ещё не отправленные через сеть.
При восстановлении из такого snapshot эти in-flight данные переотправляются в input/output buffers, и обработка продолжается с того же логического момента.
input channels: C1, C2, C3
event sequence per channel:
C1: e1, e2, BARRIER(42) <- barrier received, immediately forward
C2: e5 [in network buffer] <- e5 still in input buffer, NOT processed yet
C3: e7 [in network buffer] <- e7 still in input buffer, NOT processed yet
+ barrier 42 still in transit on C3
operator action:
1. emit BARRIER(42) downstream immediately
2. snapshot operator state
3. snapshot in-flight buffers: [e5 on C2, e7 on C3, ... + any in output]
4. continue processing
Корректность сохраняется: события e5 и e7 не теряются, потому что они есть в snapshot. При restore они будут восстановлены в input buffer-ы и обработаны “как будто только что пришли”. Семантика та же — событие обработано ровно один раз.
Trade-offs: скорость vs размер
Aligned и unaligned дают разные профили checkpoint-а:
| Параметр | Aligned | Unaligned |
|---|---|---|
| Alignment duration | Может быть очень большой (до timeout) | Минимальный, не зависит от backpressure |
| Sync phase duration | Короткий (только snapshot state) | Длиннее (snapshot state + in-flight) |
| Async phase duration | Зависит от size state | Зависит от size state + in-flight buffers |
| Checkpoint size | State only | State + in-flight buffers (может быть в 2-3x больше) |
| End-to-end latency under backpressure | Большая, может timeout | Стабильная |
| Recovery time | State restore + Kafka replay | State restore + in-flight restore (быстрее в Kafka replay) |
| Memory pressure при checkpoint | Высокий (буферы заполнены) | Средний (только sync phase) |
Когда использовать unaligned:
- Backpressure persistent. Если вы видите Alignment Duration > 100ms стабильно, unaligned почти всегда выгоден.
- Очень асимметричный fan-in. Один input быстрый, другой медленный — alignment всегда будет долгим.
- Strict checkpoint SLA. Регуляторные требования к checkpoint duration (например, max 30 секунд для exactly-once с external systems).
Когда оставить aligned:
- Низкий backpressure. State уже большой, добавление in-flight данных удвоит cost.
- Low parallelism, simple DAG. Один input на оператор — alignment мгновенный, нечего оптимизировать.
- Большие network buffers. Если у вас 8 GB на TaskManager network memory, in-flight данные могут весить десятки гигабайт.
В Flink 1.13+ есть гибридный режим: aligned до alignment timeout (например 1 секунда), потом автоматический switch на unaligned для этого конкретного checkpoint-а. Включается через execution.checkpointing.aligned-checkpoint-timeout: 1s. Это компромисс: на здоровых checkpoint-ах работает aligned (быстрее, меньше), на проблемных автоматически активируется unaligned.
Unaligned mode НЕ совместим с одним важным сценарием: rescale (изменение parallelism) из unaligned snapshot до Flink 1.14 был запрещён. Начиная с 1.14 поддержан, но требует full state rebalance — медленный restart. Если вы планируете частый rescale (Reactive mode, Kubernetes autoscaling), используйте aligned. Также unaligned не совместим с broadcast state одновременно — нужны aligned для broadcast operators.
Внутренности unaligned: как snapshot in-flight
Самая нетривиальная часть unaligned — как atomically snapshot буферы, через которые непрерывно идут данные. Стандартный Java MPSC queue не позволяет “fork copy” без блокировки writer-а.
Flink решает это через buffer reference counting. Когда наступает момент snapshot in-flight данных, текущие network buffers получают +1 reference и помечаются “owned by checkpoint”. Они не освобождаются обратно в pool, даже когда reader их прочитал. Async writer (отдельный thread) сериализует их в DFS вместе с metadata о том, какой канал и какой position они занимают. После завершения write reference счётчик уменьшается, и буферы возвращаются в pool.
Ключевые классы (Flink 2.2):
org.apache.flink.streaming.runtime.io.checkpointing.UnalignedBarrierHandler— barrier handler для unaligned mode.org.apache.flink.runtime.io.network.partition.consumer.ChannelStateWriter— async writer для in-flight данных.org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutor— executor, который пишет channel state в DFS.org.apache.flink.runtime.io.network.buffer.NetworkBuffer— буфер с reference counting.
При restore картина обратная. JobMaster получает CompletedCheckpoint, читает metadata, понимает, какой канал какие в-flight данные должен получить. ChannelStateReader восстанавливает буферы и кладёт их в input/output gates перед началом обработки. Только после этого оператор начинает обычную обработку.
Production patterns
Pattern 1: симметричный switch. При первом включении unaligned для job-а делайте сначала savepoint в aligned, потом restart с execution.checkpointing.unaligned: true. Это даёт rollback path: если unaligned ведёт себя странно (out-of-memory, slow restore), можно откатиться на aligned savepoint.
Pattern 2: hybrid режим. В production обычно правильный default: aligned-checkpoint-timeout: 1s, unaligned: true. Большую часть времени работает aligned (быстрый), но при спайках нагрузки автоматически переключается на unaligned.
Pattern 3: исключение для broadcast operators. Если ваш job использует BroadcastState (например, rule streams), эти операторы должны быть в aligned mode даже когда остальные в unaligned. Flink делает это автоматически на основе topology analysis — broadcast inputs forcefully aligned.
Pattern 4: monitoring. Метрика lastCheckpointDuration показывает общее время. Для anomaly detection полезнее отдельные:
lastCheckpointAlignmentBuffered(bytes) — сколько данных буферизовано во время alignment.lastCheckpointPersistedData— размер state на диске.lastCheckpointPersistedSize— общий размер snapshot включая in-flight.
Большая разница между PersistedData и PersistedSize — индикатор того, что unaligned добавляет много in-flight cost.
Чтение source
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler— base class для aligned/unaligned логики.org.apache.flink.streaming.runtime.io.checkpointing.AlternatingController— реализация hybrid режима с автоматическим switch.org.apache.flink.runtime.io.network.partition.consumer.CheckpointedInputGate— оборачивает обычный InputGate с barrier handling.- FLIP-76 в Apache Flink Wiki — дизайн-документ unaligned checkpoints со всеми deliberations.
Попробуй сам
-
Измерьте Alignment Duration. На production job-е откройте Checkpoints -> History -> Details -> Subtask. Посмотрите столбец Alignment Duration. Если у вас стабильно > 100ms — кандидат на unaligned.
-
Сравните режимы. На staging-job-е снимите бенчмарк: средняя checkpoint duration aligned vs unaligned за час нагрузки. Разница в throughput, разница в checkpoint size, разница в memory usage на TaskManager. Используйте Flink Web UI metrics.
-
Спровоцируйте alignment timeout. Добавьте искусственный backpressure:
Thread.sleep(1000)в одном из операторов. Запустите job в aligned mode — увидите failed checkpoint-ы с timeout. Включите unaligned — checkpoint начнёт проходить.