Watermark propagation через DAG
В прошлом уроке мы разобрали, как watermark создаётся в source operator. Но это только начало пути — watermark должен пройти через весь DAG до sink-операторов, чтобы window operations и timer-ы могли triggered correctly. По пути watermark проходит через множество операторов, у каждого может быть несколько входов, и логика propagation нетривиальна: оператор не может просто forward watermark, он должен дождаться watermark со всех входов и emit minimum.
Watermark alignment в production Late events: что происходит с опоздавшими даннымиВ этом уроке разбираем full propagation pipeline: как watermark идёт через operators, что происходит с multiple inputs (keyBy, join, union), как broadcast меняет картину, и где это реализовано в коде.
Базовое правило: emit min от всех inputs
Для оператора с N input channels (например, после shuffle keyBy parallelism=4, у каждой downstream subtask 4 input channels), watermark логика такова:
- Для каждого input channel оператор maintains “current watermark received по этому каналу”.
- Effective operator watermark = min(channel1_wm, channel2_wm, …, channelN_wm).
- При получении watermark по любому channel, если new min > old min, оператор emit new effective watermark.
Это правило строгое: оператор не emit watermark до того, как каждый из его inputs has sent at least one watermark. Иначе мы могли бы emit watermark, who оставит позади события из тех каналов, которые ещё не отчитались — нарушение событийного времени.
Из этой картины ясно: slowest channel определяет операторскую watermark. Это создаёт практическую проблему: если одна Kafka partition consistently slower (или idle), вся pipeline behind её watermark.
StatusWatermarkValve: реализация min-logic
В коде Flink min watermark logic implemented в StatusWatermarkValve. Это класс-helper, который инкапсулирует логику multi-input watermark merging.
public class StatusWatermarkValve {
private final InputChannelStatus[] channelStatuses;
private long lastOutputWatermark;
private boolean lastOutputStreamStatus;
public void inputWatermark(Watermark wm, int channelIndex, DataOutput output) {
InputChannelStatus ch = channelStatuses[channelIndex];
if (wm.getTimestamp() > ch.watermark) {
ch.watermark = wm.getTimestamp();
ch.isWatermarkAligned = true;
findAndOutputNewMinWatermark(output);
}
}
private void findAndOutputNewMinWatermark(DataOutput output) {
long minWm = Long.MAX_VALUE;
for (InputChannelStatus ch : channelStatuses) {
if (ch.isActive() && ch.isWatermarkAligned) {
minWm = Math.min(minWm, ch.watermark);
}
}
if (minWm > lastOutputWatermark) {
lastOutputWatermark = minWm;
output.emitWatermark(new Watermark(minWm));
}
}
}
Каждый input в operator-таске имеет свой StatusWatermarkValve, который трекит per-channel watermark и вычисляет min. Это просто, но эффективно — O(N) на channel, где N = number of channels.
Дополнительная сложность: StreamStatus. Каналы могут быть IDLE (см. предыдущий урок про idleness). IDLE channels исключаются из min computation. Когда canal становится active снова, его watermark снова participates в min.
Через broadcast: broadcast не emit watermark
Broadcast streams (.broadcast(), BroadcastState pattern) — special case. Когда вы делаете broadcast, каждый downstream subtask получает копию всех elements. Но broadcast НЕ emit watermarks — это design.
Why: broadcast обычно используется для rule streams, config updates, etc., которые не имеют strong event time semantics. Если мы бы пускали watermarks через broadcast, ones из broadcast стало бы блокирующим для downstream watermark advancement (как min от broadcast WM и main WM).
В практике это означает: downstream operator после .connect(broadcastStream) использует watermark только от non-broadcast input. Broadcast watermark ignored.
DataStream<Event> events = ...;
BroadcastStream<Rule> rules = ruleStream.broadcast(ruleDescriptor);
events
.connect(rules)
.process(new KeyedBroadcastProcessFunction<>())
// watermark = events watermark, не affected by rules
Это удобно, но требует понимания: если ваш rules stream имеет важные event-time semantics (например, time-varying rules), broadcast — не правильный mechanism. Используйте обычный union или join.
Через keyBy: shuffle и min propagation
keyBy() — shuffle operation, она перераспределяет events between subtasks по hash key. Что происходит с watermarks?
Каждый upstream subtask отправляет свой текущий watermark во все downstream subtasks (broadcast watermark, не data!). Это означает: после keyBy, downstream subtask имеет N input channels (где N = upstream parallelism), и в каждом канал ones получает копию того же watermark от upstream subtask.
Down subtask применяет min logic. Effective downstream watermark = min всех upstream watermarks.
Это даёт интересный side effect: downstream watermark = min over all upstream subtasks, независимо от того, сколько events идёт от каждого. Один slow upstream subtask блокирует watermark всех downstream subtasks.
Это критично для health monitoring: если один source partition slow, downstream watermark gone. Window operations не triggered. В Web UI это выглядит как “WM stuck at X” — диагностика требует understanding, что один из upstream sub-task’ов тормозит.
Multi-input operators: connect, union, join
Union (stream1.union(stream2)) — combines two streams of the same type. У downstream operator появляются input channels от обоих streams. Effective watermark = min от обоих.
Connect (stream1.connect(stream2)) — combines two streams разных types. Аналогично union, но с CoFunction-логикой. Effective watermark = min(stream1 watermark, stream2 watermark).
Window join (stream1.join(stream2).where(...).equalTo(...).window(...)) — joins по equality keys и within window. Watermark logic: same min over both inputs.
В каждом случае правило одно: min watermark = bottleneck. Если один из streams (или sources) consistently slow, joined output throttled.
Production pattern: partition-level watermarks (см. следующий урок про split-level). Это allows finer-grained tracking, и при one slow partition downstream watermark не stalls completely.
WatermarkOutputMultiplexer для multi-source emission
В Source API (FLIP-27), один Source subtask может читать с multiple “splits” (Kafka partitions, file ranges, Kinesis shards). Каждый split potentially имеет свой own watermark progression.
WatermarkOutputMultiplexer — это класс, который позволяет multiple watermark generators (один per split) emit watermarks через один WatermarkOutput, with proper min logic.
public class WatermarkOutputMultiplexer {
private final Map<String, OutputState> outputs;
private long currentCombined;
public WatermarkOutput createOutput(String splitId) {
outputs.put(splitId, new OutputState());
return new SplitWatermarkOutput(splitId);
}
private void updateCombinedWatermark() {
long minWm = Long.MAX_VALUE;
for (OutputState state : outputs.values()) {
if (!state.isIdle) {
minWm = Math.min(minWm, state.watermark);
}
}
if (minWm > currentCombined) {
currentCombined = minWm;
actualOutput.emitWatermark(new Watermark(minWm));
}
}
}
Каждый split generator (например, Kafka partition consumer) emit-ит watermark через свой SplitWatermarkOutput. Multiplexer комбинирует их в combined output. Idle splits (no events for withIdleness duration) исключаются — combined watermark может advance даже если одна partition idle.
Это решает проблему “Kafka topic с unevenly loaded partitions, где watermark stuck because one partition idle”. До Multiplexer-а каждая partition watermark была обязательная.
Hot path: watermark в operator
SubInputProcessor.processInput()
-> StreamMultipleInputProcessor (если multi-input)
-> StatusWatermarkValve.inputWatermark(wm, channelIdx, output)
-> updates channelStatuses[channelIdx]
-> findAndOutputNewMinWatermark(output)
-> if new min > lastOutput, output.emitWatermark(new min)
-> operator.processWatermark(wm)
-> [optional: trigger timers]
-> output.emitWatermark(wm) // forward to downstream
В каждом stateful operator, processWatermark может trigger timer firing (для onTimer callbacks). Window operators используют processWatermark to detect when windows should close.
После processing внутренней логики, operator forward-ит watermark downstream (если applicable). Это рекурсивно через весь DAG.
Чтение source
org.apache.flink.streaming.runtime.io.StatusWatermarkValve— multi-input min logic.org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer— для multi-split sources.org.apache.flink.streaming.api.operators.AbstractStreamOperator#processWatermark— operator-level handler.org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput— где watermark приходит из network.org.apache.flink.streaming.runtime.streamstatus.StreamStatus— IDLE/ACTIVE state канала.
Попробуй сам
-
Trace watermark через DAG. В Web UI зайдите Source -> SubtaskMetrics. Найдите
lastWatermark. Сделайте то же для downstream operators. Сравните lag между source и sink — это actual end-to-end latency event time. -
Симулируйте slow partition. На test job с Kafka source с 3 partitions, producer-те events с разной rate в каждую. Наблюдайте, что watermark всех downstream subtasks = min от source-ов. Slow partition блокирует windows.
-
Реализуйте custom WatermarkOutput. Создайте wrapper, который логирует каждый emit. Это даёт visibility, как часто и какой watermark emitted в hot path. Полезно для debugging tuning issues.