Learning Platform
Глоссарий Troubleshooting
Урок 07.04 · 18 мин
Средний
Watermark AlignmentBackfillHistoric KafkaSkewBootstrap

Watermark alignment

Стандартный механизм watermarks работает хорошо для real-time потоков, где задержки между партициями измеряются секундами. Но при backfill — переигрывании исторических данных из Kafka с начала — partitions могут расходиться по event time на часы или дни. Это приводит к огромному window state и memory pressure.

withWatermarkAlignment (FLIP-217, доступен с Flink 1.15) решает эту проблему: координирует watermark между партициями, замедляя те, что слишком ушли вперёд.


Проблема: skew между партициями при backfill

Представьте Kafka topic с 10 партициями, каждая содержит события за последние 30 дней. Вы стартуете Flink-job с самого начала (earliest offsets). Что происходит:

  • Партиция 0 — Flink consumer быстро обрабатывает первый файл сегментов (например, 1 GB) — event time дошёл до дня 5.
  • Партиция 1 — consumer задерживается на partition 0, ещё на дне 1.
  • Глобальный watermark = min всех партиций = день 1.

Что делает window-оператор:

  • Получает события от partition 0 с event time день 5.
  • Получает события от partition 1 с event time день 1.
  • Глобальный watermark — день 1, поэтому окна за дни 2-5 (получившие данные от partition 0) ещё не закрыты.
  • Эти окна копят данные в state.

Для часовых окон за 5 дней это означает 24*5 = 120 одновременно активных окон per ключ. Для миллионов ключей — десятки гигабайт state. Часто это приводит к OOM или резкому замедлению backfill.

Skew между партициями при backfill
Consumer успел обработать много данных. Event time = day 5 (5 дней истории processed). Watermark per partition = day 5.
Consumer отстаёт. Event time = day 1. Watermark per partition = day 1.
min(day 5, day 1) = day 1. Окна за дни 2-5 не закроются, пока watermark не пройдёт. Партиция 0 продолжает накапливать события в неоткрытые окна. State растёт катастрофически.

Решение: withWatermarkAlignment

Идея: ограничить разрыв (skew) между watermarks разных партиций/источников. Партиция, чей watermark слишком ушёл вперёд, приостанавливается — Flink не читает новых событий из неё, пока медленные партиции не догонят.

WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withWatermarkAlignment("alignment-group-1", Duration.ofMinutes(20), Duration.ofSeconds(1))
    .withTimestampAssigner((e, ts) -> e.timestamp);

Параметры:

  • alignment group — строковый ID. Все источники с тем же ID синхронизируются между собой. Можно иметь несколько independent групп (например, для двух разных Kafka-кластеров).
  • max allowed watermark drift — максимальный разрыв между самым “быстрым” и “медленным” watermark в группе. В примере — 20 минут.
  • update interval — как часто Flink перепроверяет alignment. В примере — 1 секунда.

С этой настройкой:

  • Если partition 0 ушла за min_watermark + 20 минут, она паузится.
  • Partition 1 продолжает читать, её watermark догоняет.
  • Когда разрыв уменьшается ниже 20 минут, partition 0 разпаузивается.

Результат: skew ограничен, state не растёт неконтролируемо.


Когда нужен alignment

Backfill / replay с начала Kafka. Самый частый кейс — обработать накопленные данные.

Multiple sources с разной скоростью. Например, два Kafka-кластера: один current, другой historic. Без alignment current будет уходить далеко вперёд.

Asymmetric partition load. Если partition assignment в Kafka неравномерный (skew по keys в producer), некоторые partitions имеют больше данных и обрабатываются медленнее.

Source с переменной throughput. Например, S3 source — некоторые файлы обрабатываются быстрее других.


Когда alignment не нужен

Realtime steady-state pipeline. Если все партиции обрабатываются примерно с одинаковой скоростью, alignment overhead не оправдан.

Single source, single partition. Alignment не имеет смысла — нечего синхронизировать.

Pipeline с очень коротким lookback. Если allowedLateness 30 секунд, и максимальный skew в norm 5 секунд, alignment ничего не даёт.

TIP

Хорошая практика: включить withWatermarkAlignment с большим drift (например, 30 минут или 1 час) для production-job, который будет использоваться для backfill. В steady-state он не активируется (partitions работают синхронно). При backfill он автоматически защищает от ухода state в неконтролируемый рост.


Trade-offs alignment

Pros:

  • Контролируемый размер state при backfill.
  • Корректные результаты — все партиции обработаны до того же event time.
  • Защита от OOM при cold start.

Cons:

  • Снижение throughput. Если alignment активирован, “быстрые” партиции работают только так же быстро, как самая медленная. Backfill в целом замедляется.
  • Дополнительная coordination overhead. Flink периодически синхронизирует watermarks через JobManager.
  • Не все sources поддерживают. Source должен реализовать SourceSplit API с поддержкой pausing. Kafka, FileSystem source поддерживают; некоторые legacy sources — нет.

Конфигурация для разных сценариев

Backfill all historic Kafka (5+ дней истории):

.withWatermarkAlignment("backfill", Duration.ofMinutes(30), Duration.ofSeconds(5))

30-минутный drift даёт партициям пространство, чтобы не паузиться слишком часто. Update interval 5 секунд — низкий overhead.

Bootstrap из historic + live стрим:

.withWatermarkAlignment("hybrid", Duration.ofMinutes(5), Duration.ofSeconds(1))

Меньший drift — live стрим не должен сильно отставать от historic catchup.

Multi-source cross-region:

// Kafka eu-west
.withWatermarkAlignment("global", Duration.ofMinutes(10), Duration.ofSeconds(2))

// Kafka us-east, same alignment group
.withWatermarkAlignment("global", Duration.ofMinutes(10), Duration.ofSeconds(2))

Используется одна alignment group для обоих источников — они синхронизируются между собой.


Мониторинг alignment

Flink публикует метрики, относящиеся к alignment:

  • Source__SplitAlignmentSnapshot_currentMinWatermark — текущий минимум по источнику.
  • Source__SplitAlignmentSnapshot_currentMaxWatermark — максимум.
  • numSplitsBeingPaused — сколько splits сейчас паузнуто.

Если numSplitsBeingPaused > 0 стабильно, значит alignment активирован, и какие-то splits работают медленнее. Это нормально для backfill, но в steady-state может означать неравномерную нагрузку (стоит проверить producer-side distribution).

Watermark alignment в действии
Partition 0 (day 5) и partition 1 (day 1): глобальный watermark = day 1. Окна за 4 дня копятся в state. OOM рисk.
apply alignment
Partition 0 паузится при drift > 20min. Партиции работают в темпе самой медленной. Skew ограничен. State controlled.

Альтернатива alignment для extreme backfill

Для очень большого backfill (терабайты, недели данных) даже alignment может быть недостаточен — workload concept “не успеваем real-time + 30 дней истории одновременно”. Альтернативы:

Lambda-архитектура: batch + stream. Batch-job (Spark) обрабатывает историю, stream-job (Flink) — только real-time + последние X часов. Результаты объединяются в data warehouse.

Lambda и Kappa архитектуры

Bootstrapping через savepoint. Использовать batch для генерации Flink savepoint, потом Flink стартует уже с готовым state. Сложно, но эффективно для one-time bootstrap.

Сценарий “always live.” Не делать backfill вообще — Flink-job работает только на актуальные данные. Историческая аналитика — отдельный pipeline.


Production-чеклист

  • В job-ах, которые могут использоваться для backfill, всегда включайте withWatermarkAlignment с разумным drift (15-60 минут).
  • Замерьте throughput с alignment и без — иногда overhead значителен (10-30%).
  • Мониторьте numSplitsBeingPaused — если стабильно > 0 в steady-state, расследуйте причину неравномерной нагрузки.
  • Для multi-source pipelines используйте общую alignment group — без этого они расходятся независимо.

Попробуй сам

  1. Backfill без alignment. Запустите job с tumbling 1h окном на Kafka с 10 партициями и 5 днями исторических данных. Без alignment — наблюдайте, как state стремительно растёт. Job может OOM.

  2. С alignment. Тот же job с .withWatermarkAlignment("test", Duration.ofMinutes(30), Duration.ofSeconds(2)). State должен оставаться контролируемым, backfill — медленнее но стабильнее.

  3. Throughput cost. Замерьте throughput с alignment и без на realtime workload (без backfill). Должен быть пренебрежимо малая разница (alignment активируется только при drift).

Проверка знанийKnowledge check
Команда стартует Flink-job на Kafka topic с 20 партициями для backfill с начала (earliest offsets). История — 30 дней. Job использует tumbling 1-hour окна с allowedLateness 1 час. После 30 минут работы TaskManager падают по OOM. Что произошло и как починить?
ОтветAnswer
Проблема: 'partition skew' при cold start backfill. Часть партиций обрабатывается быстрее других, и event time для них уходит далеко вперёд. Глобальный watermark = min по партициям, поэтому он 'застывает' на самых медленных. Окна, получившие данные от быстрых партиций, не могут закрыться — они копятся в state. Для 1-hour окна с разрывом в дни между партициями это десятки или сотни одновременно активных окон per ключ. Дополнительно allowedLateness(1h) удлиняет lifecycle окон. Результат — катастрофический рост state и OOM. Решение: добавить .withWatermarkAlignment('backfill', Duration.ofMinutes(30), Duration.ofSeconds(5)) к WatermarkStrategy. Это ограничит разрыв между самой быстрой и самой медленной партицией 30 минутами; быстрые партиции будут паузиться, пока медленные догоняют. Размер state станет предсказуемым (максимум 30 минут окон в state одновременно). Дополнительно: уменьшить allowedLateness для backfill (например, до 5 минут), потому что late events при replay редки — данные приходят в более-менее правильном порядке относительно event time.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. Команда стартует Flink-job на Kafka topic с 20 партициями и 30-дневной историей. Через 30 минут TaskManager-ы падают по OOM. Используются tumbling 1-hour окна с allowedLateness(1h). Что произошло?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4