WatermarkGenerator internals — onEvent и onPeriodicEmit
Watermarks — это центральный механизм event time processing в Flink. Они сигнализируют downstream-операторам “событий с timestamp <= W больше не будет (с высокой вероятностью)”. Это позволяет window operators знать, когда закрывать окна, и triggers — когда firing. Без правильно настроенных watermarks ваш job либо не emit-ит результаты (watermark стоит), либо допускает out-of-order events за пределами window (watermark убежал слишком далеко).
В этом уроке разбираем WatermarkGenerator API на уровне internals: что именно происходит в onEvent и onPeriodicEmit, разница между punctuated и periodic стратегиями, и где Flink вызывает эти методы в hot path source-оператора.
WatermarkStrategy и его компоненты
В user-facing API вы пишете:
DataStream<Event> events = env.fromSource(
kafkaSource,
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getEventTime())
.withIdleness(Duration.ofMinutes(1)),
"events"
);
WatermarkStrategy — это factory, который создаёт три вещи на каждый source subtask:
- TimestampAssigner — функция
(T, long) -> long, экстрактит event time из элемента. - WatermarkGenerator — объект с двумя методами:
onEventиonPeriodicEmit. Решает, когда emit watermark и какой. - WatermarkOutput — interface, который generator использует для emission watermark.
Под капотом эти три объекта склеены в WatermarkEmitter (или BoundedWatermarkStrategy для wrapper) и подключаются к source’s SourceReader в новой Source API (FLIP-27).
WatermarkGenerator API
public interface WatermarkGenerator<T> {
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
void onPeriodicEmit(WatermarkOutput output);
}
Два метода — это два разных моментов вызова:
onEvent вызывается на КАЖДОЕ событие. Generator обычно обновляет внутреннее состояние (максимальный seen timestamp), может опционально emit watermark немедленно.
onPeriodicEmit вызывается периодически — по timer, с интервалом pipeline.auto-watermark-interval (default 200ms). Generator обычно emit watermark здесь, основанный на накопленном state.
Разделение этих методов даёт два важных свойства:
- Bounded latency emission: даже если события не приходят (idle source), periodic emit может продолжать продвигать watermark.
- Event-aware logic: onEvent видит каждый element и может реагировать на специальные markers.
Periodic strategy: BoundedOutOfOrdernessWatermarks
Самая частая стратегия — forBoundedOutOfOrderness(Duration). Это упаковка BoundedOutOfOrdernessWatermarks generator:
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
private final long outOfOrdernessMillis;
private long maxTimestamp;
public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
}
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
}
Простая логика: запоминаем max timestamp seen, и каждые 200ms emit watermark maxTimestamp - outOfOrderness - 1. Минус один потому, что watermark семантически означает “no more events with timestamp <= W” — если max был 1000, и out-of-orderness 5s, то watermark должна быть 995 (события до 995 включительно завершены).
В onEvent НЕ происходит emit — только обновление state. Это periodic strategy: emission редкая и предсказуемая.
Это подходит для большинства cases. Pros:
- Минимальный overhead на event (просто max update).
- Контролируемая частота emission (200ms — sensible default).
- Predictable latency.
Cons:
- Если события не приходят, watermark не продвигается (см. idleness ниже).
- Latency между событием и watermark — minimum 200ms.
Punctuated strategy: emit on every event
Для low-latency requirements можно использовать punctuated strategy — emit watermark немедленно после event, базируясь на содержимом event:
public class PunctuatedAssigner implements WatermarkGenerator<Event> {
private long lastEmittedWatermark = Long.MIN_VALUE;
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
if (event.isHighWatermarkMarker()) {
// Special marker event signals watermark advance
long newWatermark = event.getWatermarkValue();
if (newWatermark > lastEmittedWatermark) {
output.emitWatermark(new Watermark(newWatermark));
lastEmittedWatermark = newWatermark;
}
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// no-op для pure punctuated strategy
}
}
Punctuated стратегия useful, когда source provides explicit watermark signals — например, in-band markers в Kafka topic, или transactions commit timestamps в CDC stream. Например, Debezium emit-ит heartbeat events с известным timestamp, которые можно использовать как punctuated watermarks.
Pros:
- Immediate watermark advancement.
- Strong correctness guarantees (watermark only after explicit confirmation).
Cons:
- Overhead в onEvent на каждое событие.
- Если markers редкие — watermark не продвигается.
В Flink 2.x чисто punctuated strategies встречаются реже, чем гибридные (punctuated + periodic как fallback).
Hot path: где это вызывается
С FLIP-27 Source API, hot path выглядит так:
Key insight: timer onPeriodicEmit вызывается на главном thread task-а, не на отдельном thread. Это значит, что между двумя tick-ами полностью обрабатывается batch event-ов. Если ваш onEvent медленный (например, делает heavy parsing), watermark emission задерживается.
WithIdleness и source disengagement
Periodic strategies имеют проблему: если события не приходят, watermark не продвигается. Это может происходить, когда:
- Kafka partition пустой в данный момент.
- Source subtask читает с одной partition, которая медленная.
- Job restart-нулся, и пройдёт время до первого event.
В этих cases downstream watermarks (min of all source watermarks) застывают, и window operations не triggered.
Решение — withIdleness(Duration):
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withIdleness(Duration.ofMinutes(1))
После 1 minute без events эта source subtask помечается “idle”, и downstream operators игнорируют её watermark при вычислении min. Activity resumed (first event) — subtask снова active.
Реализация: внутри generator wrapper отслеживается timestamp последнего event. При onPeriodicEmit, если elapsed > idleness, вызывается output.markIdle(). Watermark из этой subtask временно not propagated downstream.
withIdleness — это double-edged sword. Если ваши Kafka partitions могут быть idle часами (например, weekend traffic), idleness может вызвать significantly advanced downstream watermark — события, пришедшие после wake-up, могут быть already late. На strict event-time semantics это даёт data loss (по умолчанию late events dropped в window operators). Используйте только когда понимаете trade-off.
Common errors
Error 1: timestamp в millis vs seconds. TimestampAssigner expects timestamp in milliseconds. Если ваш event имеет timestamp в seconds (Unix epoch seconds), нужно умножить на 1000. Распространённая ошибка — забыть это, и тогда все timestamps в “будущем” (или в 1970-х, в зависимости от scale).
Error 2: timestamp from clock vs event. Если TimestampAssigner возвращает System.currentTimeMillis() вместо event time, вы фактически делаете processing time не event time. Watermark будет advanced как processing time, поздние events будут late. Это distorts business semantics.
Error 3: out-of-orderness слишком маленький. Если real out-of-orderness в данных — minutes (например, mobile clients sending delayed events), а вы указали forBoundedOutOfOrderness(Duration.ofSeconds(5)), многие events будут late. Метрика numLateRecordsDropped (per window operator) показывает это.
Error 4: out-of-orderness слишком большой. forBoundedOutOfOrderness(Duration.ofHours(1)) — windows закроются спустя час после конца. Latency обработки = 1 hour. Это часто не желаемо.
Правило: out-of-orderness = p99.9 фактической задержки от event time до arrival. Измерьте в production через (processing_time - event_time) distribution.
Чтение source
org.apache.flink.api.common.eventtime.WatermarkStrategy— top-level factory.org.apache.flink.api.common.eventtime.WatermarkGenerator— interface для generators.org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks— самая частая impl.org.apache.flink.api.common.eventtime.WatermarkOutput— interface для emission.org.apache.flink.streaming.api.operators.source.NoMoreSplitsEvent— связано с idle detection.org.apache.flink.connector.base.source.reader.SourceReaderBase— где watermark hot path.
Попробуй сам
-
Implement custom punctuated generator. Создайте generator, который emit-ит watermark только при event с specific marker (например,
event.type == "WATERMARK"). Сравните latency с periodic. -
Measure watermark lag. В Web UI Metrics есть
lastWatermarkper source. Сравните сlastReceivedTimestamp. Difference — это actual out-of-orderness в production. Это даст ground truth для тюнинга. -
Простимулируйте idle source. На test job с Kafka source с несколькими partitions, prepare scenario где одна partition idle (никаких новых producer). Включайте/выключайте withIdleness и наблюдайте, как window operations triggered (или не triggered).