Learning Platform
Глоссарий Troubleshooting
Урок 09.02 · 24 мин
Продвинутый
Watermark PropagationMin WatermarkWatermarkOutputMulti-input OperatorsBroadcast

Watermark propagation через DAG

В прошлом уроке мы разобрали, как watermark создаётся в source operator. Но это только начало пути — watermark должен пройти через весь DAG до sink-операторов, чтобы window operations и timer-ы могли triggered correctly. По пути watermark проходит через множество операторов, у каждого может быть несколько входов, и логика propagation нетривиальна: оператор не может просто forward watermark, он должен дождаться watermark со всех входов и emit minimum.

Watermark alignment в production Late events: что происходит с опоздавшими данными

В этом уроке разбираем full propagation pipeline: как watermark идёт через operators, что происходит с multiple inputs (keyBy, join, union), как broadcast меняет картину, и где это реализовано в коде.


Базовое правило: emit min от всех inputs

Для оператора с N input channels (например, после shuffle keyBy parallelism=4, у каждой downstream subtask 4 input channels), watermark логика такова:

  1. Для каждого input channel оператор maintains “current watermark received по этому каналу”.
  2. Effective operator watermark = min(channel1_wm, channel2_wm, …, channelN_wm).
  3. При получении watermark по любому channel, если new min > old min, оператор emit new effective watermark.

Это правило строгое: оператор не emit watermark до того, как каждый из его inputs has sent at least one watermark. Иначе мы могли бы emit watermark, who оставит позади события из тех каналов, которые ещё не отчитались — нарушение событийного времени.

Min watermark propagation в multi-input operator
Оператор имеет 4 input channels, каждый с current watermark
Channel 1Watermark 1000ms. Канал от source subtask 0 после shuffle.
Channel 2Watermark 950ms. Канал от source subtask 1.
Channel 3Watermark 900ms — самая старая. Этот канал отстаёт, и оператор берёт его как effective.
Channel 4Watermark 1100ms — самая новая. Не важна, потому что effective = min.
Operator WM = 900Operator effective watermark = min всех channel watermarks = 900ms. Это значение emitted downstream.
emit WM = 900Эмиссия watermark = 900 во все downstream channels. Все timers и windows с timestamp <= 900 могут firing.
Затем Channel 3 присылает WM = 1050 -> new min = 950 -> emit new WM = 950

Из этой картины ясно: slowest channel определяет операторскую watermark. Это создаёт практическую проблему: если одна Kafka partition consistently slower (или idle), вся pipeline behind её watermark.


StatusWatermarkValve: реализация min-logic

В коде Flink min watermark logic implemented в StatusWatermarkValve. Это класс-helper, который инкапсулирует логику multi-input watermark merging.

public class StatusWatermarkValve {
    private final InputChannelStatus[] channelStatuses;
    private long lastOutputWatermark;
    private boolean lastOutputStreamStatus;
    
    public void inputWatermark(Watermark wm, int channelIndex, DataOutput output) {
        InputChannelStatus ch = channelStatuses[channelIndex];
        if (wm.getTimestamp() > ch.watermark) {
            ch.watermark = wm.getTimestamp();
            ch.isWatermarkAligned = true;
            findAndOutputNewMinWatermark(output);
        }
    }
    
    private void findAndOutputNewMinWatermark(DataOutput output) {
        long minWm = Long.MAX_VALUE;
        for (InputChannelStatus ch : channelStatuses) {
            if (ch.isActive() && ch.isWatermarkAligned) {
                minWm = Math.min(minWm, ch.watermark);
            }
        }
        if (minWm > lastOutputWatermark) {
            lastOutputWatermark = minWm;
            output.emitWatermark(new Watermark(minWm));
        }
    }
}

Каждый input в operator-таске имеет свой StatusWatermarkValve, который трекит per-channel watermark и вычисляет min. Это просто, но эффективно — O(N) на channel, где N = number of channels.

Дополнительная сложность: StreamStatus. Каналы могут быть IDLE (см. предыдущий урок про idleness). IDLE channels исключаются из min computation. Когда canal становится active снова, его watermark снова participates в min.


Через broadcast: broadcast не emit watermark

Broadcast streams (.broadcast(), BroadcastState pattern) — special case. Когда вы делаете broadcast, каждый downstream subtask получает копию всех elements. Но broadcast НЕ emit watermarks — это design.

Why: broadcast обычно используется для rule streams, config updates, etc., которые не имеют strong event time semantics. Если мы бы пускали watermarks через broadcast, ones из broadcast стало бы блокирующим для downstream watermark advancement (как min от broadcast WM и main WM).

В практике это означает: downstream operator после .connect(broadcastStream) использует watermark только от non-broadcast input. Broadcast watermark ignored.

DataStream<Event> events = ...;
BroadcastStream<Rule> rules = ruleStream.broadcast(ruleDescriptor);

events
    .connect(rules)
    .process(new KeyedBroadcastProcessFunction<>())
    // watermark = events watermark, не affected by rules

Это удобно, но требует понимания: если ваш rules stream имеет важные event-time semantics (например, time-varying rules), broadcast — не правильный mechanism. Используйте обычный union или join.


Через keyBy: shuffle и min propagation

keyBy() — shuffle operation, она перераспределяет events between subtasks по hash key. Что происходит с watermarks?

Каждый upstream subtask отправляет свой текущий watermark во все downstream subtasks (broadcast watermark, не data!). Это означает: после keyBy, downstream subtask имеет N input channels (где N = upstream parallelism), и в каждом канал ones получает копию того же watermark от upstream subtask.

Down subtask применяет min logic. Effective downstream watermark = min всех upstream watermarks.

Это даёт интересный side effect: downstream watermark = min over all upstream subtasks, независимо от того, сколько events идёт от каждого. Один slow upstream subtask блокирует watermark всех downstream subtasks.

Watermark broadcast через keyBy shuffle
Source 0Source subtask 0 emit watermark = 1000.
Source 1Source subtask 1 emit watermark = 950.
Source 2Source subtask 2 emit watermark = 1100.
keyBy shufflekeyBy shuffle: каждый source отправляет watermark всем downstream subtasks (broadcast). Data распределяется по hash key.
Downstream 0Получает 3 watermarks: 1000, 950, 1100. Effective = min = 950.
Downstream 1То же — все downstream subtasks имеют одинаковый effective WM = 950.
Slow Source 1 (WM=950) блокирует все downstream

Это критично для health monitoring: если один source partition slow, downstream watermark gone. Window operations не triggered. В Web UI это выглядит как “WM stuck at X” — диагностика требует understanding, что один из upstream sub-task’ов тормозит.


Multi-input operators: connect, union, join

Union (stream1.union(stream2)) — combines two streams of the same type. У downstream operator появляются input channels от обоих streams. Effective watermark = min от обоих.

Connect (stream1.connect(stream2)) — combines two streams разных types. Аналогично union, но с CoFunction-логикой. Effective watermark = min(stream1 watermark, stream2 watermark).

Window join (stream1.join(stream2).where(...).equalTo(...).window(...)) — joins по equality keys и within window. Watermark logic: same min over both inputs.

В каждом случае правило одно: min watermark = bottleneck. Если один из streams (или sources) consistently slow, joined output throttled.

Production pattern: partition-level watermarks (см. следующий урок про split-level). Это allows finer-grained tracking, и при one slow partition downstream watermark не stalls completely.


WatermarkOutputMultiplexer для multi-source emission

В Source API (FLIP-27), один Source subtask может читать с multiple “splits” (Kafka partitions, file ranges, Kinesis shards). Каждый split potentially имеет свой own watermark progression.

WatermarkOutputMultiplexer — это класс, который позволяет multiple watermark generators (один per split) emit watermarks через один WatermarkOutput, with proper min logic.

public class WatermarkOutputMultiplexer {
    private final Map<String, OutputState> outputs;
    private long currentCombined;
    
    public WatermarkOutput createOutput(String splitId) {
        outputs.put(splitId, new OutputState());
        return new SplitWatermarkOutput(splitId);
    }
    
    private void updateCombinedWatermark() {
        long minWm = Long.MAX_VALUE;
        for (OutputState state : outputs.values()) {
            if (!state.isIdle) {
                minWm = Math.min(minWm, state.watermark);
            }
        }
        if (minWm > currentCombined) {
            currentCombined = minWm;
            actualOutput.emitWatermark(new Watermark(minWm));
        }
    }
}

Каждый split generator (например, Kafka partition consumer) emit-ит watermark через свой SplitWatermarkOutput. Multiplexer комбинирует их в combined output. Idle splits (no events for withIdleness duration) исключаются — combined watermark может advance даже если одна partition idle.

Это решает проблему “Kafka topic с unevenly loaded partitions, где watermark stuck because one partition idle”. До Multiplexer-а каждая partition watermark была обязательная.


Hot path: watermark в operator

SubInputProcessor.processInput()
  -> StreamMultipleInputProcessor (если multi-input)
  -> StatusWatermarkValve.inputWatermark(wm, channelIdx, output)
    -> updates channelStatuses[channelIdx]
    -> findAndOutputNewMinWatermark(output)
       -> if new min > lastOutput, output.emitWatermark(new min)
         -> operator.processWatermark(wm)
            -> [optional: trigger timers]
            -> output.emitWatermark(wm) // forward to downstream

В каждом stateful operator, processWatermark может trigger timer firing (для onTimer callbacks). Window operators используют processWatermark to detect when windows should close.

После processing внутренней логики, operator forward-ит watermark downstream (если applicable). Это рекурсивно через весь DAG.


Чтение source

  • org.apache.flink.streaming.runtime.io.StatusWatermarkValve — multi-input min logic.
  • org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer — для multi-split sources.
  • org.apache.flink.streaming.api.operators.AbstractStreamOperator#processWatermark — operator-level handler.
  • org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput — где watermark приходит из network.
  • org.apache.flink.streaming.runtime.streamstatus.StreamStatus — IDLE/ACTIVE state канала.

Попробуй сам

  1. Trace watermark через DAG. В Web UI зайдите Source -> SubtaskMetrics. Найдите lastWatermark. Сделайте то же для downstream operators. Сравните lag между source и sink — это actual end-to-end latency event time.

  2. Симулируйте slow partition. На test job с Kafka source с 3 partitions, producer-те events с разной rate в каждую. Наблюдайте, что watermark всех downstream subtasks = min от source-ов. Slow partition блокирует windows.

  3. Реализуйте custom WatermarkOutput. Создайте wrapper, который логирует каждый emit. Это даёт visibility, как часто и какой watermark emitted в hot path. Полезно для debugging tuning issues.

Проверка знанийKnowledge check
У вас Flink-job с keyBy и 1-minute tumbling window. Source — Kafka с 8 partitions, parallelism source = 4. Window emit-ит результаты с задержкой 3-4 минуты после end of window. Investigation показывает: 7 partitions имеют consistent traffic, partition 5 имеет sporadic gaps до 90 секунд. Что происходит и какие 2 решения?
ОтветAnswer
Partition 5 с 90s gaps — это slow channel в watermark propagation. Каждые 90s partition 5 не sends events -> its watermark не advance -> source subtask читающий partition 5 имеет slow watermark -> all downstream subtasks effective watermark = min всех source subtasks = behind on 90s. Window 1-min trigger-ится только когда WM > end_of_window, поэтому задержка 90s + processing overhead = 3-4 минуты. Решения: 1) Включить withIdleness(Duration.ofMinutes(2)). Когда partition 5 не sends events for 2 minutes, она помечается idle, watermark вычисляется без неё. Pros: window trigger normally. Cons: если partition 5 wakes up с late events (event time > 2 min ago), они dropped как late (или handled через allowedLateness). 2) Использовать SplitWatermarkOutputMultiplexer (built-in в KafkaSource). Это даёт per-partition watermark tracking — partition 5 watermark может lag behind, но other partitions watermarks valid и downstream watermark не блокирован полностью. Combined with withIdleness для polling-only partitions. 3) Альтернатива (less common): pre-aggregate partition 5 events upstream так, чтобы they always emit events even when 'effectively idle' — например, heartbeat events. Но это требует control над producer. Production best practice: combination 1 + 2: use KafkaSource (which has built-in multiplexer) + withIdleness as fallback for partition-level idleness. Это обычно даёт acceptable trade-off latency vs correctness.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Оператор имеет 4 input channels с watermarks: 1000, 950, 900, 1100. Какой effective operator watermark, и какой emit downstream?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4