Watermark alignment — внутренности withWatermarkAlignment
В прошлом уроке мы видели, что watermark в multi-input операторе — это min от всех входов. Это даёт correctness, но создаёт проблему performance: если один из inputs сильно опережает остальные (event time во многом больше), он генерирует огромный amount работы downstream. Window operators накапливают state для будущих окон, которые ещё не trigger-нутся (потому что min watermark behind). State growing, memory pressure растёт, OOM possible.
Watermark alignment (FLIP-182, Flink 1.15+) решает эту проблему. Идея: если source watermark убежал больше чем на maxDrift от группового min, Flink паузит этот source — temporarily prevents reading new data from него.
Consumer groups и lag в Kafka Так он “ждёт” пока медленные sources подтянутся. Это особенно критично при bootstrap из historic Kafka, где разные partitions могут иметь разные начальные offsets и event times.
Mотивация: bootstrap из historic Kafka
Сценарий: вы запускаете new Flink job, который читает Kafka topic с retention 7 дней. Topic имеет 16 partitions. При startup Flink начинает читать с earliest offsets во всех partitions.
Проблема: data в partitions распределена unevenly. Partition 0 содержит events за последние 3 дня (3 GB), partition 1 — за последние 5 дней (8 GB), partition 7 — за последние 7 дней (12 GB). При startup partition 0 catch up быстро (3 GB обработано за 5 минут), partition 7 продолжает работать (12 GB обработано за 25 минут).
Без alignment: partition 0 finishes early, его watermark advance до “now”. Но downstream watermark = min = watermark partition 7 = 7 days ago. Window operators начинают buffering events partition 0 для windows, которые will trigger only когда watermark catch up — то есть через 25 минут. За это время накапливается 25 minutes worth of windowed state — потенциально гигабайты в state backend, OOM на TaskManager.
С alignment: when partition 0 watermark убегает > 5 минут от min, source partition 0 паузится (offset не advance, no new events read). Other partitions catch up. Когда min watermark advance, partition 0 resumes. Profile state backend становится smoother.
API: withWatermarkAlignment
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withWatermarkAlignment(
"alignment-group", // group name
Duration.ofMinutes(5), // maxDrift
Duration.ofSeconds(2) // update interval
)
Параметры:
- group name — sources с одним и тем же group name aligned вместе. Это позволяет alignment multiple sources в jobе.
- maxDrift — максимальная разница между источникам watermark и групповым min. Если превышено, source паузится.
- update interval — как часто Flink проверяет alignment status. Slower = less overhead, faster = more responsive.
Group name полезен, когда у вас multiple sources, но не все должны align together. Например, main events source и control plane source — у control plane low rate, его не нужно align с main.
Внутренности: alignment protocol
Alignment работает через RPC interaction между TaskManager-ами и JobMaster-ом.
Key details:
Paused source ничего не reads. Не calls poll() на underlying connector. Кафka consumer не делает next-fetch, file reader не читает next bytes. Источник эффективно spinning idle, что использует minimal CPU.
Paused state is sticky. После pause source остаётся paused до next alignment check. Это даёт buffer — pause/unpause не происходит на каждом event, только на check intervals.
Per-source granularity. Alignment работает на уровне source subtask, не split. Если SubТаск читает 4 splits, и все они opережают, ВСЕ paused. Для finer-grained — split-level alignment в Flink 2.1+ (см. следующий урок).
Алгоритм pause/resume
# Pseudo-code в AlignmentAgent
def check_alignment(source_subtask):
current_wm = source_subtask.get_current_watermark()
group_min = jm.get_group_min(source_subtask.group_id)
if current_wm - group_min > maxDrift:
if not source_subtask.paused:
source_subtask.pause()
else:
if source_subtask.paused:
source_subtask.resume()
Pause/resume cycles повторяются каждые update interval (default 2s). На production update interval = 1-5s — достаточно responsive, без overhead.
Гибкость: per-source maxDrift
В FLIP-182 API specified что different sources могут иметь different maxDrift по обещанию same group name. Это даёт гибкость: например, main source может иметь maxDrift = 5 минут, secondary source = 30 минут. Group min computed normally.
В практике это полезно для:
- Mixed-velocity sources (Kafka главный + slow file source для enrichment).
- Tiered correctness — main pipeline strict, debug pipeline loose.
Trade-offs
Pros:
- Smoother state backend size during bootstrap.
- Predictable memory pressure.
- Easier diagnosis “почему OOM” — alignment явно отображается в Web UI.
Cons:
- Slower bootstrap (paused sources не process events).
- Coordination overhead (RPC каждые update interval).
- Не помогает если ВСЕ sources slow — alignment просто ensures parity, не accelerates progress.
Когда не использовать:
- Streaming-only workload без replay (если sources всегда near-realtime, alignment не triggered).
- Multiple unconnected pipelines (нет need to align).
- Small state, OOM не риск.
Diagnostics
Метрики (per source subtask):
currentWatermark— current watermark, который source имеет.alignmentDriftFromGroupMin— diff между current и group min.numberOfAlignmentPauseEvents— count of pauses.accumulatedPauseDuration— total time paused.
Если accumulatedPauseDuration > 50% of running time, source heavily throttled — bootstrap может занять 2x времени без alignment. Это trade-off для memory safety.
В Web UI Source -> SubtaskMetrics -> Filter by “alignment” — увидите relevant metrics.
Для bootstrap из historic Kafka типичный setup: WatermarkStrategy.forBoundedOutOfOrderness(15s).withWatermarkAlignment(“kafka-group”, Duration.ofMinutes(2), Duration.ofSeconds(1)). 2-minute maxDrift даёт reasonable bootstrap speed без excessive state growth. 1-second update interval достаточно responsive.
Limitations и edge cases
Limitation 1: alignment overhead на high-rate sources. Каждые update interval source должен report watermark и check alignment. Для very high rate sources (millions events/sec) overhead noticeable — 1-2% throughput.
Limitation 2: paused source delays cause backpressure upstream. Если source paused, no events forwarded. Upstream queue (например, in Kafka consumer-side prefetch) может fill up, причинить consumer-side issues. Не affect Kafka cluster, но может affect application-level behavior.
Limitation 3: alignment не помогает при skewed processing. Если slow source actually slow because downstream operator не успевает обрабатывать, pause только усугубит проблему — source оградит больше времени в paused state, не делая больше progress. Alignment предполагает, что slow source slow at SOURCE level, не downstream.
Limitation 4: не interactives с recovery from savepoint. При recovery savepoint имеет конкретный watermark per source. Если он imbalanced, alignment immediately начнёт pause-resume cycle до catch up. Это может занять время.
Чтение source
org.apache.flink.api.common.eventtime.WatermarkAlignmentParams— config object для alignment.org.apache.flink.runtime.source.coordinator.SourceCoordinator— JM-side coordinator.org.apache.flink.runtime.source.coordinator.WatermarkAlignmentManager— actual coordination logic.org.apache.flink.connector.base.source.reader.SourceReaderBase#pauseOrResumeSplits— где source actually pauses/resumes.- FLIP-182 в Apache Flink Wiki — original design.
Попробуй сам
-
Симулируйте bootstrap. На test cluster с 4 source subtasks и Kafka topic с huge backlog, run job без alignment vs с alignment. Compare: max state size during bootstrap, total time to catch up. На large state difference в state size может быть 10x.
-
Tune maxDrift. На production-like job попробуйте maxDrift = 30s, 5min, 1hour. Меньше — better memory, медленнее bootstrap. Больше — fast bootstrap, more memory. Найдите sweet spot.
-
Monitoring dashboard. Добавьте
alignmentDriftFromGroupMinper source в Grafana. Это даст visibility, какие sources frequently throttled. Полезно для long-term tuning.