Learning Platform
Глоссарий Troubleshooting
Урок 09.03 · 24 мин
Продвинутый
Watermark AlignmentwithWatermarkAlignmentmaxDriftBootstrap KafkaSource Pause

Watermark alignment — внутренности withWatermarkAlignment

В прошлом уроке мы видели, что watermark в multi-input операторе — это min от всех входов. Это даёт correctness, но создаёт проблему performance: если один из inputs сильно опережает остальные (event time во многом больше), он генерирует огромный amount работы downstream. Window operators накапливают state для будущих окон, которые ещё не trigger-нутся (потому что min watermark behind). State growing, memory pressure растёт, OOM possible.

Watermark alignment (FLIP-182, Flink 1.15+) решает эту проблему. Идея: если source watermark убежал больше чем на maxDrift от группового min, Flink паузит этот source — temporarily prevents reading new data from него.

Consumer groups и lag в Kafka Так он “ждёт” пока медленные sources подтянутся. Это особенно критично при bootstrap из historic Kafka, где разные partitions могут иметь разные начальные offsets и event times.


Mотивация: bootstrap из historic Kafka

Сценарий: вы запускаете new Flink job, который читает Kafka topic с retention 7 дней. Topic имеет 16 partitions. При startup Flink начинает читать с earliest offsets во всех partitions.

Проблема: data в partitions распределена unevenly. Partition 0 содержит events за последние 3 дня (3 GB), partition 1 — за последние 5 дней (8 GB), partition 7 — за последние 7 дней (12 GB). При startup partition 0 catch up быстро (3 GB обработано за 5 минут), partition 7 продолжает работать (12 GB обработано за 25 минут).

Без alignment: partition 0 finishes early, его watermark advance до “now”. Но downstream watermark = min = watermark partition 7 = 7 days ago. Window operators начинают buffering events partition 0 для windows, которые will trigger only когда watermark catch up — то есть через 25 минут. За это время накапливается 25 minutes worth of windowed state — потенциально гигабайты в state backend, OOM на TaskManager.

С alignment: when partition 0 watermark убегает > 5 минут от min, source partition 0 паузится (offset не advance, no new events read). Other partitions catch up. Когда min watermark advance, partition 0 resumes. Profile state backend становится smoother.


API: withWatermarkAlignment

WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withWatermarkAlignment(
        "alignment-group",        // group name
        Duration.ofMinutes(5),    // maxDrift
        Duration.ofSeconds(2)     // update interval
    )

Параметры:

  • group name — sources с одним и тем же group name aligned вместе. Это позволяет alignment multiple sources в jobе.
  • maxDrift — максимальная разница между источникам watermark и групповым min. Если превышено, source паузится.
  • update interval — как часто Flink проверяет alignment status. Slower = less overhead, faster = more responsive.

Group name полезен, когда у вас multiple sources, но не все должны align together. Например, main events source и control plane source — у control plane low rate, его не нужно align с main.


Внутренности: alignment protocol

Alignment работает через RPC interaction между TaskManager-ами и JobMaster-ом.

Watermark alignment protocol
Source subtask 1 (fast)
Source subtask 2 (slow)
AlignmentAgent (TM)
JobMaster
current WM = 1700000000current WM = 1699999700report group WMscompute group min = 1699999700broadcast group minpause (WM 1700000000 > min 1699999700 + maxDrift)continue (WM `< min` + maxDrift)pause-resume cycle

Key details:

Paused source ничего не reads. Не calls poll() на underlying connector. Кафka consumer не делает next-fetch, file reader не читает next bytes. Источник эффективно spinning idle, что использует minimal CPU.

Paused state is sticky. После pause source остаётся paused до next alignment check. Это даёт buffer — pause/unpause не происходит на каждом event, только на check intervals.

Per-source granularity. Alignment работает на уровне source subtask, не split. Если SubТаск читает 4 splits, и все они opережают, ВСЕ paused. Для finer-grained — split-level alignment в Flink 2.1+ (см. следующий урок).


Алгоритм pause/resume

# Pseudo-code в AlignmentAgent
def check_alignment(source_subtask):
    current_wm = source_subtask.get_current_watermark()
    group_min = jm.get_group_min(source_subtask.group_id)
    
    if current_wm - group_min > maxDrift:
        if not source_subtask.paused:
            source_subtask.pause()
    else:
        if source_subtask.paused:
            source_subtask.resume()

Pause/resume cycles повторяются каждые update interval (default 2s). На production update interval = 1-5s — достаточно responsive, без overhead.


Гибкость: per-source maxDrift

В FLIP-182 API specified что different sources могут иметь different maxDrift по обещанию same group name. Это даёт гибкость: например, main source может иметь maxDrift = 5 минут, secondary source = 30 минут. Group min computed normally.

В практике это полезно для:

  • Mixed-velocity sources (Kafka главный + slow file source для enrichment).
  • Tiered correctness — main pipeline strict, debug pipeline loose.

Trade-offs

Pros:

  • Smoother state backend size during bootstrap.
  • Predictable memory pressure.
  • Easier diagnosis “почему OOM” — alignment явно отображается в Web UI.

Cons:

  • Slower bootstrap (paused sources не process events).
  • Coordination overhead (RPC каждые update interval).
  • Не помогает если ВСЕ sources slow — alignment просто ensures parity, не accelerates progress.

Когда не использовать:

  • Streaming-only workload без replay (если sources всегда near-realtime, alignment не triggered).
  • Multiple unconnected pipelines (нет need to align).
  • Small state, OOM не риск.

Diagnostics

Метрики (per source subtask):

  • currentWatermark — current watermark, который source имеет.
  • alignmentDriftFromGroupMin — diff между current и group min.
  • numberOfAlignmentPauseEvents — count of pauses.
  • accumulatedPauseDuration — total time paused.

Если accumulatedPauseDuration > 50% of running time, source heavily throttled — bootstrap может занять 2x времени без alignment. Это trade-off для memory safety.

В Web UI Source -> SubtaskMetrics -> Filter by “alignment” — увидите relevant metrics.

TIP

Для bootstrap из historic Kafka типичный setup: WatermarkStrategy.forBoundedOutOfOrderness(15s).withWatermarkAlignment(“kafka-group”, Duration.ofMinutes(2), Duration.ofSeconds(1)). 2-minute maxDrift даёт reasonable bootstrap speed без excessive state growth. 1-second update interval достаточно responsive.


Limitations и edge cases

Limitation 1: alignment overhead на high-rate sources. Каждые update interval source должен report watermark и check alignment. Для very high rate sources (millions events/sec) overhead noticeable — 1-2% throughput.

Limitation 2: paused source delays cause backpressure upstream. Если source paused, no events forwarded. Upstream queue (например, in Kafka consumer-side prefetch) может fill up, причинить consumer-side issues. Не affect Kafka cluster, но может affect application-level behavior.

Limitation 3: alignment не помогает при skewed processing. Если slow source actually slow because downstream operator не успевает обрабатывать, pause только усугубит проблему — source оградит больше времени в paused state, не делая больше progress. Alignment предполагает, что slow source slow at SOURCE level, не downstream.

Limitation 4: не interactives с recovery from savepoint. При recovery savepoint имеет конкретный watermark per source. Если он imbalanced, alignment immediately начнёт pause-resume cycle до catch up. Это может занять время.


Чтение source

  • org.apache.flink.api.common.eventtime.WatermarkAlignmentParams — config object для alignment.
  • org.apache.flink.runtime.source.coordinator.SourceCoordinator — JM-side coordinator.
  • org.apache.flink.runtime.source.coordinator.WatermarkAlignmentManager — actual coordination logic.
  • org.apache.flink.connector.base.source.reader.SourceReaderBase#pauseOrResumeSplits — где source actually pauses/resumes.
  • FLIP-182 в Apache Flink Wiki — original design.

Попробуй сам

  1. Симулируйте bootstrap. На test cluster с 4 source subtasks и Kafka topic с huge backlog, run job без alignment vs с alignment. Compare: max state size during bootstrap, total time to catch up. На large state difference в state size может быть 10x.

  2. Tune maxDrift. На production-like job попробуйте maxDrift = 30s, 5min, 1hour. Меньше — better memory, медленнее bootstrap. Больше — fast bootstrap, more memory. Найдите sweet spot.

  3. Monitoring dashboard. Добавьте alignmentDriftFromGroupMin per source в Grafana. Это даст visibility, какие sources frequently throttled. Полезно для long-term tuning.

Проверка знанийKnowledge check
Ваш job stuck в bootstrap mode 3 hours, OOM приходит на одном TaskManager. State backend size 25 GB на этом TM (RocksDB), вот ещё не закончил. Investigation: source partition 12 (один из 16) имеет 50 GB backlog vs 5 GB на других. Без watermark alignment. Window 1-hour tumbling. Решили включить alignment с maxDrift=30min. Через 2 hours: bootstrap всё ещё не закончился, hit OOM снова. Что происходит и какие шаги?
ОтветAnswer
maxDrift=30min слишком большой для этого scenario. Other partitions catch up быстро, advanced их watermark на 6+ hours (поскольку только 5 GB обработать). Их watermark уходит вперёд на 6 hours. Min watermark тоже advance на 6 hours (поскольку min от всех partitions). Partition 12 watermark отстаёт на 50 GB worth of events — может быть 3-4 hours. 3 hours - 6 hours = -3 hours, but maxDrift = 30 min: значит min = 6 hours catching up to 3 hours - проблема в обратном направлении: fast sources уже advanced слишком далеко относительно partition 12. Result: fast sources paused, slow source 12 продолжает работать, но window operator всё ещё накапливает state for windows которые not trigger (need min watermark > end of window for 1-hour windows). 25 GB state — это events для tens of windows которые буфферизованы. OOM. Решения: 1) Уменьшить maxDrift до 5 min или ниже — это keep group min closer to slow partition, reducing state size. Compute: 5 min worth of events at typical rate = X MB, vs current 25 GB. 2) Скейлить TM up — увеличить heap RocksDB cache, чтобы physical memory не была bottleneck. 3) Use smaller window OR pre-aggregate events upstream через .reduce() прежде чем window. 4) Skip historic data — start KafkaSource с timestamp filter (только events за последний day), что reduces total work. 5) Параллелизировать bootstrap — increase parallelism source-а, чтобы каждая subtask читала less per-partition data. Production lesson: alignment не silver bullet, нужно правильно tune maxDrift = expected difference между sources, не просто 'big enough number'. Тестируйте с realistic bootstrap scenarios.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Какую главную проблему решает withWatermarkAlignment?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4