Watermark alignment
Стандартный механизм watermarks работает хорошо для real-time потоков, где задержки между партициями измеряются секундами. Но при backfill — переигрывании исторических данных из Kafka с начала — partitions могут расходиться по event time на часы или дни. Это приводит к огромному window state и memory pressure.
withWatermarkAlignment (FLIP-217, доступен с Flink 1.15) решает эту проблему: координирует watermark между партициями, замедляя те, что слишком ушли вперёд.
Проблема: skew между партициями при backfill
Представьте Kafka topic с 10 партициями, каждая содержит события за последние 30 дней. Вы стартуете Flink-job с самого начала (earliest offsets). Что происходит:
- Партиция 0 — Flink consumer быстро обрабатывает первый файл сегментов (например, 1 GB) — event time дошёл до дня 5.
- Партиция 1 — consumer задерживается на partition 0, ещё на дне 1.
- Глобальный watermark = min всех партиций = день 1.
Что делает window-оператор:
- Получает события от partition 0 с event time день 5.
- Получает события от partition 1 с event time день 1.
- Глобальный watermark — день 1, поэтому окна за дни 2-5 (получившие данные от partition 0) ещё не закрыты.
- Эти окна копят данные в state.
Для часовых окон за 5 дней это означает 24*5 = 120 одновременно активных окон per ключ. Для миллионов ключей — десятки гигабайт state. Часто это приводит к OOM или резкому замедлению backfill.
Решение: withWatermarkAlignment
Идея: ограничить разрыв (skew) между watermarks разных партиций/источников. Партиция, чей watermark слишком ушёл вперёд, приостанавливается — Flink не читает новых событий из неё, пока медленные партиции не догонят.
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withWatermarkAlignment("alignment-group-1", Duration.ofMinutes(20), Duration.ofSeconds(1))
.withTimestampAssigner((e, ts) -> e.timestamp);
Параметры:
- alignment group — строковый ID. Все источники с тем же ID синхронизируются между собой. Можно иметь несколько independent групп (например, для двух разных Kafka-кластеров).
- max allowed watermark drift — максимальный разрыв между самым “быстрым” и “медленным” watermark в группе. В примере — 20 минут.
- update interval — как часто Flink перепроверяет alignment. В примере — 1 секунда.
С этой настройкой:
- Если partition 0 ушла за
min_watermark + 20 минут, она паузится. - Partition 1 продолжает читать, её watermark догоняет.
- Когда разрыв уменьшается ниже 20 минут, partition 0 разпаузивается.
Результат: skew ограничен, state не растёт неконтролируемо.
Когда нужен alignment
Backfill / replay с начала Kafka. Самый частый кейс — обработать накопленные данные.
Multiple sources с разной скоростью. Например, два Kafka-кластера: один current, другой historic. Без alignment current будет уходить далеко вперёд.
Asymmetric partition load. Если partition assignment в Kafka неравномерный (skew по keys в producer), некоторые partitions имеют больше данных и обрабатываются медленнее.
Source с переменной throughput. Например, S3 source — некоторые файлы обрабатываются быстрее других.
Когда alignment не нужен
Realtime steady-state pipeline. Если все партиции обрабатываются примерно с одинаковой скоростью, alignment overhead не оправдан.
Single source, single partition. Alignment не имеет смысла — нечего синхронизировать.
Pipeline с очень коротким lookback. Если allowedLateness 30 секунд, и максимальный skew в norm 5 секунд, alignment ничего не даёт.
Хорошая практика: включить withWatermarkAlignment с большим drift (например, 30 минут или 1 час) для production-job, который будет использоваться для backfill. В steady-state он не активируется (partitions работают синхронно). При backfill он автоматически защищает от ухода state в неконтролируемый рост.
Trade-offs alignment
Pros:
- Контролируемый размер state при backfill.
- Корректные результаты — все партиции обработаны до того же event time.
- Защита от OOM при cold start.
Cons:
- Снижение throughput. Если alignment активирован, “быстрые” партиции работают только так же быстро, как самая медленная. Backfill в целом замедляется.
- Дополнительная coordination overhead. Flink периодически синхронизирует watermarks через JobManager.
- Не все sources поддерживают. Source должен реализовать
SourceSplitAPI с поддержкой pausing. Kafka, FileSystem source поддерживают; некоторые legacy sources — нет.
Конфигурация для разных сценариев
Backfill all historic Kafka (5+ дней истории):
.withWatermarkAlignment("backfill", Duration.ofMinutes(30), Duration.ofSeconds(5))
30-минутный drift даёт партициям пространство, чтобы не паузиться слишком часто. Update interval 5 секунд — низкий overhead.
Bootstrap из historic + live стрим:
.withWatermarkAlignment("hybrid", Duration.ofMinutes(5), Duration.ofSeconds(1))
Меньший drift — live стрим не должен сильно отставать от historic catchup.
Multi-source cross-region:
// Kafka eu-west
.withWatermarkAlignment("global", Duration.ofMinutes(10), Duration.ofSeconds(2))
// Kafka us-east, same alignment group
.withWatermarkAlignment("global", Duration.ofMinutes(10), Duration.ofSeconds(2))
Используется одна alignment group для обоих источников — они синхронизируются между собой.
Мониторинг alignment
Flink публикует метрики, относящиеся к alignment:
Source__SplitAlignmentSnapshot_currentMinWatermark— текущий минимум по источнику.Source__SplitAlignmentSnapshot_currentMaxWatermark— максимум.numSplitsBeingPaused— сколько splits сейчас паузнуто.
Если numSplitsBeingPaused > 0 стабильно, значит alignment активирован, и какие-то splits работают медленнее. Это нормально для backfill, но в steady-state может означать неравномерную нагрузку (стоит проверить producer-side distribution).
Альтернатива alignment для extreme backfill
Для очень большого backfill (терабайты, недели данных) даже alignment может быть недостаточен — workload concept “не успеваем real-time + 30 дней истории одновременно”. Альтернативы:
Lambda-архитектура: batch + stream. Batch-job (Spark) обрабатывает историю, stream-job (Flink) — только real-time + последние X часов. Результаты объединяются в data warehouse.
Lambda и Kappa архитектурыBootstrapping через savepoint. Использовать batch для генерации Flink savepoint, потом Flink стартует уже с готовым state. Сложно, но эффективно для one-time bootstrap.
Сценарий “always live.” Не делать backfill вообще — Flink-job работает только на актуальные данные. Историческая аналитика — отдельный pipeline.
Production-чеклист
- В job-ах, которые могут использоваться для backfill, всегда включайте withWatermarkAlignment с разумным drift (15-60 минут).
- Замерьте throughput с alignment и без — иногда overhead значителен (10-30%).
- Мониторьте
numSplitsBeingPaused— если стабильно > 0 в steady-state, расследуйте причину неравномерной нагрузки. - Для multi-source pipelines используйте общую alignment group — без этого они расходятся независимо.
Попробуй сам
-
Backfill без alignment. Запустите job с tumbling 1h окном на Kafka с 10 партициями и 5 днями исторических данных. Без alignment — наблюдайте, как state стремительно растёт. Job может OOM.
-
С alignment. Тот же job с
.withWatermarkAlignment("test", Duration.ofMinutes(30), Duration.ofSeconds(2)). State должен оставаться контролируемым, backfill — медленнее но стабильнее. -
Throughput cost. Замерьте throughput с alignment и без на realtime workload (без backfill). Должен быть пренебрежимо малая разница (alignment активируется только при drift).