Watermark coordination на уровне source
Watermark — это самое сложное в event-time streaming. Pre-FLIP-27 sources имели простую но проблематичную модель: один watermark на subtask. Если у тебя Kafka source читает 10 partitions, и одна partition отстаёт (или временно мертва), watermark всего subtask-а stuck. FLIP-27 ввёл per-split watermarks + механизм их агрегации.
В этом уроке: как watermarks реально генерируются на уровне source, что значит idle, как работает watermark alignment для slow partitions, и почему всё это критично для production-readyiness.
Late events: allowedLateness и side outputsГде раньше всё ломалось
Old API: SourceFunction.run(SourceContext). SourceContext имел метод emitWatermark(Watermark). Что это значит?
Если ты эмитишь Watermark(t=1000), ты говоришь pipeline-у: “не будет event с timestamp меньше 1000”. Это global guarantee для всего subtask-а.
Но если subtask читает 10 partitions, и одна partition отстала (consumer lag огромный, или вообще не пушит events), что делать?
Вариант A: emit watermark = min(всех partitions). Это safe, но если одна partition мертва, watermark всего subtask-а stuck на её last event time forever. Window-ы не закрываются.
Вариант B: emit watermark = max(всех partitions). Это даёт прогресс, но breaks correctness — события из отставшей partition будут late.
Вариант C: ignore slow partitions, but how decide?
В old API не было встроенного решения. Каждый коннектор писал свою логику, обычно с ошибками. FlinkKafkaConsumer (old) имел кучу багов и edge cases с watermarks.
FLIP-27 решение: per-split watermarks
В новом API watermark — это per-split state. Каждый split имеет свой собственный watermark generator. Source-operator агрегирует их.
output.createOutputForSplit(splitId) в pollNext возвращает SourceOutput, который scoped к одному split-у. Когда reader делает splitOutput.collect(record, timestamp), watermark generator для этого split-а update-ится. Когда reader делает splitOutput.emitWatermark(wm), watermark этого split-а становится wm.
Source-operator агрегирует watermarks. Стратегия по умолчанию: min(active splits) ignoring idle. Это даёт нам и progress (idle splits не блокируют), и correctness (active splits всегда учитываются).
Watermark strategy: per-split assignment
WatermarkStrategy задаётся при создании source-а:
WatermarkStrategy<MyRecord> strategy = WatermarkStrategy
.<MyRecord>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getEventTime())
.withIdleness(Duration.ofMinutes(1));
DataStream<MyRecord> stream = env.fromSource(
kafkaSource,
strategy,
"kafka-source"
);
WatermarkStrategy — это фабрика, которая создаёт WatermarkGenerator и TimestampAssigner. Когда source-operator получает split, он вызывает strategy чтобы создать per-split instance generator-а и assigner-а.
Это значит: если у тебя 10 splits на subtask, у source-operator-а 10 generator instances и 10 assigner instances. Каждый видит только records своего split-а.
BoundedOutOfOrdernessGenerator (one of the standard generators):
public class BoundedOutOfOrdernessGenerator<T> implements WatermarkGenerator<T> {
private final long maxOutOfOrderness; // 5000 ms
private long maxTimestampSeen = Long.MIN_VALUE;
@Override
public void onEvent(T event, long timestamp, WatermarkOutput output) {
maxTimestampSeen = Math.max(maxTimestampSeen, timestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestampSeen - maxOutOfOrderness - 1));
}
}
Per-split instance. Каждые pipeline.auto-watermark-interval (default 200ms) Flink вызывает onPeriodicEmit на каждом generator-е и pushes watermark в split-output.
Если в каком-то split-е давно не было records, его maxTimestampSeen не обновляется, его watermark stuck. Это плохо для общего progress. Поэтому есть idleness.
Idleness: когда split становится idle
.withIdleness(Duration.ofMinutes(1)) — если у split-а не было events за минуту, он помечается idle.
Идея: idle split не контрибутит в aggregated watermark. Если все остальные splits progress-ят — общий watermark прогрессирует, idle split не блокирует.
Под капотом это обёртывает WatermarkGenerator в IdlenessAwareWatermarkGenerator:
public class IdlenessAwareWatermarkGenerator<T> implements WatermarkGenerator<T> {
private final WatermarkGenerator<T> underlying;
private final long idleTimeout; // 60_000 ms
private long lastActivityTime;
private boolean isIdle = false;
@Override
public void onEvent(T event, long timestamp, WatermarkOutput output) {
if (isIdle) {
output.markActive(); // Сообщить pipeline: я снова active
isIdle = false;
}
lastActivityTime = currentSystemTime();
underlying.onEvent(event, timestamp, output);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
long now = currentSystemTime();
if (!isIdle && now - lastActivityTime > idleTimeout) {
output.markIdle();
isIdle = true;
return; // не emit-ить watermark
}
if (!isIdle) {
underlying.onPeriodicEmit(output);
}
}
}
output.markIdle() сигналит в source-aggregator: этот split idle. Aggregator excludes его. output.markActive() сигналит: split active again, включай его watermark в aggregation.
Это работает только если другие splits продолжают progress-ить. Если ВСЕ splits idle, aggregated watermark тоже stuck (нет inputs для агрегации). В этом случае ничего не сделаешь без withIdleness на уровне всего pipeline (то же самое, на operator side).
Idleness — это balance correctness vs progress. Если ты выставишь idleTimeout = 5 секунд, и у тебя есть partition, где events с интервалом 10 секунд (рідко, но регулярно), Flink будет mark её idle каждый раз. Это значит, что её events будут эмититься, но к этому моменту watermark уже advance-нулся выше — events становятся late. Idleness не должен быть короче чем максимальный inter-event interval в твоих partitions.
Watermark alignment: новый механизм
В Flink 1.15 появился watermark alignment (FLIP-217) — механизм, который ограничивает разброс watermarks между splits/subtasks. До этого был только idleness, а alignment решает другую проблему.
Проблема alignment: у тебя 100 Kafka partitions, на 50 из них traffic нормальный (recent data), на 50 — replay старых данных (lag 24 часа). Без alignment, fast partitions будут сыпать данные быстро (recent), а slow partitions медленно (24-часовой lag). Pipeline-state будет расти, потому что:
- Fast partition emit-ит event с t=2025-05-19T10:00.
- Slow partition emit-ит event с t=2024-05-19T10:00 (год назад).
- Window-агрегация в downstream имеет state для 365 дней.
- Memory blow up.
Alignment решает это, “придерживая” fast splits, пока slow не догонят.
WatermarkStrategy<MyRecord> strategy = WatermarkStrategy
.<MyRecord>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withWatermarkAlignment("source-group", Duration.ofMinutes(30), Duration.ofSeconds(1));
Параметры:
- alignmentGroup (“source-group”): identifier, к которому относится alignment. Несколько sources могут быть в одной alignment-group и aligned вместе.
- maxAllowedWatermarkDrift (30 min): максимальная разница между fastest и slowest split. Если fast split emit watermark на 30 минут впереди slow, fast пауза.
- updateInterval (1 sec): как часто координатор синхронизирует watermarks (более частое — больше overhead, более точно).
Реализация: каждый source-reader periodically reports его watermark в OperatorCoordinator. Coordinator вычисляет minWatermark across all readers + maxDrift. Этот alignedWatermark рассылается всем readers. Reader смотрит свой текущий per-split watermark vs alignedWatermark: если split going past alignedWatermark, reader пытается pause этот split.
pauseOrResumeSplits(toPause, toResume) — API SourceReader, который коннектор должен реализовать. Для Kafka: consumer.pause(partitions) / consumer.resume(partitions). Для files: skip new file open. Для DB: skip new query batch.
Alignment работает только с FLIP-27 sources, которые реализуют pauseOrResumeSplits. Kafka source (modern) поддерживает. Legacy SourceFunction — нет, не работает с alignment. Если у тебя critical alignment-zависимая job, проверь что все source-ы новые.
Watermark на координаторе vs на subtask
Не путай два разных уровня alignment:
-
Per-split, in-subtask aggregation (всегда). Каждый subtask agg-ирует watermarks своих splits через
min(active). Не нужна конфигурация. -
Cross-subtask alignment (опционально, FLIP-217). Coordinator на JM agg-ирует watermarks всех subtasks, рассылает aligned, и subtask pauses splits drift-нувшие вперёд. Requires
withWatermarkAlignment.
Уровень 2 имеет network overhead — каждый subtask reports каждую секунду в JM, JM рассылает. Для job-а с 1024 subtasks это 1024 reports + 1024 responses в секунду — managable для JM. Но если у тебя 10 source-ов с alignment, это 10240 messages/sec на JM — может быть hot path.
pipeline.watermark-alignment.timeout — если subtask не отвечает на alignment update в этот timeout, coordinator считает его dead и excludes. По умолчанию — same как RPC timeout (10 секунд).
Debugging: watermark не движется
Top-3 причины и как diagnose:
1. Все splits idle / нет данных.
Проверь numRecordsInPerSecond на source. Если 0 — нет данных, watermark не двинется. Если > 0, копать дальше.
2. Idleness не настроен, slow split блокирует.
Проверь: используй Flink Web UI -> Source -> Watermarks view. Если ты видишь, что разные subtasks имеют сильно разные watermarks (один subtask -infinity, другие моложе) — у этого subtask-а probably idle split без markIdle.
Решение: добавь .withIdleness(Duration.of...) в WatermarkStrategy.
3. Alignment блокирует fast splits.
Если у тебя withWatermarkAlignment и один split действительно очень медленный (e.g. dead partition), он будет вечно anchor — fast splits будут paused forever.
Решение: убедись что dead partitions handled через idleness первым (idle splits исключаются из alignment). Или используй больший maxDrift.
# Метрики для debugging
flink_taskmanager_job_task_operator_currentInputWatermark # WM на input
flink_taskmanager_job_task_operator_currentOutputWatermark # WM на output (после processing)
flink_taskmanager_job_task_operator_watermarkAlignmentDrift # для aligned sources
Custom WatermarkGenerator
Если ни одна из standard strategy не подходит, custom generator:
public class MyHybridWatermarkGenerator<T> implements WatermarkGenerator<T> {
private long maxTimestamp = Long.MIN_VALUE;
private final long allowedLatenessMs;
private long lastEmittedWatermark = Long.MIN_VALUE;
@Override
public void onEvent(T event, long timestamp, WatermarkOutput output) {
if (timestamp > maxTimestamp) {
maxTimestamp = timestamp;
}
// Custom logic: emit watermark сразу после "special" events
if (event.isCheckpoint()) {
long wm = timestamp - allowedLatenessMs;
if (wm > lastEmittedWatermark) {
output.emitWatermark(new Watermark(wm));
lastEmittedWatermark = wm;
}
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
long wm = maxTimestamp - allowedLatenessMs;
if (wm > lastEmittedWatermark) {
output.emitWatermark(new Watermark(wm));
lastEmittedWatermark = wm;
}
}
}
Pattern: maintain max timestamp, emit watermarks via онEvent (eager) или onPeriodicEmit (periodic, default). Никогда не emit watermark меньше previous (это broken contract).
Внимание: generator вызывается per-split. Если ты сохраняешь state в instance fields, он per-split. Если хочешь global state — используй SourceCoordinator и custom events.