Learning Platform
Глоссарий Troubleshooting
Урок 09.01 · 24 мин
Продвинутый
WatermarkGeneratorWatermarkStrategyPunctuated WatermarksPeriodic WatermarksEvent Time

WatermarkGenerator internals — onEvent и onPeriodicEmit

Watermarks — это центральный механизм event time processing в Flink. Они сигнализируют downstream-операторам “событий с timestamp <= W больше не будет (с высокой вероятностью)”. Это позволяет window operators знать, когда закрывать окна, и triggers — когда firing. Без правильно настроенных watermarks ваш job либо не emit-ит результаты (watermark стоит), либо допускает out-of-order events за пределами window (watermark убежал слишком далеко).

WatermarkStrategy: практическое использование Watermarks в Spark Structured Streaming

В этом уроке разбираем WatermarkGenerator API на уровне internals: что именно происходит в onEvent и onPeriodicEmit, разница между punctuated и periodic стратегиями, и где Flink вызывает эти методы в hot path source-оператора.


WatermarkStrategy и его компоненты

В user-facing API вы пишете:

DataStream<Event> events = env.fromSource(
    kafkaSource,
    WatermarkStrategy
        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event, ts) -> event.getEventTime())
        .withIdleness(Duration.ofMinutes(1)),
    "events"
);

WatermarkStrategy — это factory, который создаёт три вещи на каждый source subtask:

  1. TimestampAssigner — функция (T, long) -> long, экстрактит event time из элемента.
  2. WatermarkGenerator — объект с двумя методами: onEvent и onPeriodicEmit. Решает, когда emit watermark и какой.
  3. WatermarkOutput — interface, который generator использует для emission watermark.

Под капотом эти три объекта склеены в WatermarkEmitter (или BoundedWatermarkStrategy для wrapper) и подключаются к source’s SourceReader в новой Source API (FLIP-27).


WatermarkGenerator API

public interface WatermarkGenerator<T> {
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
    void onPeriodicEmit(WatermarkOutput output);
}

Два метода — это два разных моментов вызова:

onEvent вызывается на КАЖДОЕ событие. Generator обычно обновляет внутреннее состояние (максимальный seen timestamp), может опционально emit watermark немедленно.

onPeriodicEmit вызывается периодически — по timer, с интервалом pipeline.auto-watermark-interval (default 200ms). Generator обычно emit watermark здесь, основанный на накопленном state.

Разделение этих методов даёт два важных свойства:

  • Bounded latency emission: даже если события не приходят (idle source), periodic emit может продолжать продвигать watermark.
  • Event-aware logic: onEvent видит каждый element и может реагировать на специальные markers.

Periodic strategy: BoundedOutOfOrdernessWatermarks

Самая частая стратегия — forBoundedOutOfOrderness(Duration). Это упаковка BoundedOutOfOrdernessWatermarks generator:

public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
    private final long outOfOrdernessMillis;
    private long maxTimestamp;
    
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }
    
    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }
    
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

Простая логика: запоминаем max timestamp seen, и каждые 200ms emit watermark maxTimestamp - outOfOrderness - 1. Минус один потому, что watermark семантически означает “no more events with timestamp <= W” — если max был 1000, и out-of-orderness 5s, то watermark должна быть 995 (события до 995 включительно завершены).

В onEvent НЕ происходит emit — только обновление state. Это periodic strategy: emission редкая и предсказуемая.

Это подходит для большинства cases. Pros:

  • Минимальный overhead на event (просто max update).
  • Контролируемая частота emission (200ms — sensible default).
  • Predictable latency.

Cons:

  • Если события не приходят, watermark не продвигается (см. idleness ниже).
  • Latency между событием и watermark — minimum 200ms.

Punctuated strategy: emit on every event

Для low-latency requirements можно использовать punctuated strategy — emit watermark немедленно после event, базируясь на содержимом event:

public class PunctuatedAssigner implements WatermarkGenerator<Event> {
    private long lastEmittedWatermark = Long.MIN_VALUE;
    
    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        if (event.isHighWatermarkMarker()) {
            // Special marker event signals watermark advance
            long newWatermark = event.getWatermarkValue();
            if (newWatermark > lastEmittedWatermark) {
                output.emitWatermark(new Watermark(newWatermark));
                lastEmittedWatermark = newWatermark;
            }
        }
    }
    
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // no-op для pure punctuated strategy
    }
}

Punctuated стратегия useful, когда source provides explicit watermark signals — например, in-band markers в Kafka topic, или transactions commit timestamps в CDC stream. Например, Debezium emit-ит heartbeat events с известным timestamp, которые можно использовать как punctuated watermarks.

Pros:

  • Immediate watermark advancement.
  • Strong correctness guarantees (watermark only after explicit confirmation).

Cons:

  • Overhead в onEvent на каждое событие.
  • Если markers редкие — watermark не продвигается.

В Flink 2.x чисто punctuated strategies встречаются реже, чем гибридные (punctuated + periodic как fallback).


Hot path: где это вызывается

С FLIP-27 Source API, hot path выглядит так:

Hot path watermark emission в Source operator
SplitReader (Kafka client)
SourceReaderBase
TimestampAssigner
WatermarkGenerator
WatermarkOutput
Periodic timer (200ms)
fetch batchextractTimestamp(event)ts = 1700000000000onEvent(event, ts, out)collect(event, ts)onPeriodicEmit(out) tickemitWatermark(W)

Key insight: timer onPeriodicEmit вызывается на главном thread task-а, не на отдельном thread. Это значит, что между двумя tick-ами полностью обрабатывается batch event-ов. Если ваш onEvent медленный (например, делает heavy parsing), watermark emission задерживается.


WithIdleness и source disengagement

Periodic strategies имеют проблему: если события не приходят, watermark не продвигается. Это может происходить, когда:

  • Kafka partition пустой в данный момент.
  • Source subtask читает с одной partition, которая медленная.
  • Job restart-нулся, и пройдёт время до первого event.

В этих cases downstream watermarks (min of all source watermarks) застывают, и window operations не triggered.

Решение — withIdleness(Duration):

WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withIdleness(Duration.ofMinutes(1))

После 1 minute без events эта source subtask помечается “idle”, и downstream operators игнорируют её watermark при вычислении min. Activity resumed (first event) — subtask снова active.

Реализация: внутри generator wrapper отслеживается timestamp последнего event. При onPeriodicEmit, если elapsed > idleness, вызывается output.markIdle(). Watermark из этой subtask временно not propagated downstream.

WARNING

withIdleness — это double-edged sword. Если ваши Kafka partitions могут быть idle часами (например, weekend traffic), idleness может вызвать significantly advanced downstream watermark — события, пришедшие после wake-up, могут быть already late. На strict event-time semantics это даёт data loss (по умолчанию late events dropped в window operators). Используйте только когда понимаете trade-off.


Common errors

Error 1: timestamp в millis vs seconds. TimestampAssigner expects timestamp in milliseconds. Если ваш event имеет timestamp в seconds (Unix epoch seconds), нужно умножить на 1000. Распространённая ошибка — забыть это, и тогда все timestamps в “будущем” (или в 1970-х, в зависимости от scale).

Error 2: timestamp from clock vs event. Если TimestampAssigner возвращает System.currentTimeMillis() вместо event time, вы фактически делаете processing time не event time. Watermark будет advanced как processing time, поздние events будут late. Это distorts business semantics.

Error 3: out-of-orderness слишком маленький. Если real out-of-orderness в данных — minutes (например, mobile clients sending delayed events), а вы указали forBoundedOutOfOrderness(Duration.ofSeconds(5)), многие events будут late. Метрика numLateRecordsDropped (per window operator) показывает это.

Error 4: out-of-orderness слишком большой. forBoundedOutOfOrderness(Duration.ofHours(1)) — windows закроются спустя час после конца. Latency обработки = 1 hour. Это часто не желаемо.

Правило: out-of-orderness = p99.9 фактической задержки от event time до arrival. Измерьте в production через (processing_time - event_time) distribution.


Чтение source

  • org.apache.flink.api.common.eventtime.WatermarkStrategy — top-level factory.
  • org.apache.flink.api.common.eventtime.WatermarkGenerator — interface для generators.
  • org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks — самая частая impl.
  • org.apache.flink.api.common.eventtime.WatermarkOutput — interface для emission.
  • org.apache.flink.streaming.api.operators.source.NoMoreSplitsEvent — связано с idle detection.
  • org.apache.flink.connector.base.source.reader.SourceReaderBase — где watermark hot path.

Попробуй сам

  1. Implement custom punctuated generator. Создайте generator, который emit-ит watermark только при event с specific marker (например, event.type == "WATERMARK"). Сравните latency с periodic.

  2. Measure watermark lag. В Web UI Metrics есть lastWatermark per source. Сравните с lastReceivedTimestamp. Difference — это actual out-of-orderness в production. Это даст ground truth для тюнинга.

  3. Простимулируйте idle source. На test job с Kafka source с несколькими partitions, prepare scenario где одна partition idle (никаких новых producer). Включайте/выключайте withIdleness и наблюдайте, как window operations triggered (или не triggered).

Проверка знанийKnowledge check
Ваш Flink-job с Kafka source, BoundedOutOfOrderness(5s), withIdleness(1min). 95% events arrive с lag < 2 сек, но 5% — с lag 30-60 сек из mobile clients с плохой connectivity. Window aggregation 1-minute tumbling. Видите high numLateRecordsDropped. Какие три варианта решения, и какие trade-offs?
ОтветAnswer
Проблема в том, что 5% events lag > 5s, поэтому они классифицируются как late и dropped из window aggregation. Решения: 1) Увеличить out-of-orderness до 60s (forBoundedOutOfOrderness(60s)). Pros: large fraction events будет accommodated в window. Cons: windows закрываются спустя 60s после end + latency 200ms emit interval — каждый result delayed ~1 min. Для real-time use case это может быть unacceptable. 2) Использовать allowedLateness в window operator (.allowedLateness(Duration.ofMinutes(2))). Pros: out-of-orderness остаётся 5s (low latency для majority events), но late events до 2 мин включены через secondary window update. Cons: window emit-ит result дважды (initial + updated), downstream должен handle deduplication через sink primary key. State window держится дольше — больше memory. 3) Использовать side output для late events (.sideOutputLateData(lateTag)). Pros: main pipeline остаётся low-latency, late events captured separately для аудита или alternative processing (например, write в audit log + dashboard). Cons: split logic, downstream должен union main + side output, более complex code. Выбор зависит от business: если result correctness важнее latency — solution 1. Если real-time важен, но точность тоже — solution 2 с allowedLateness. Если поздние events надо investigate, но они не должны влиять на main aggregation — solution 3. В production часто комбинируют 2 + 3 для visibility + correctness.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какие два метода имеет WatermarkGenerator interface, и когда они вызываются?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4