Learning Platform
Глоссарий Troubleshooting
Урок 14.03 · 24 мин
Продвинутый
WatermarkPer-split watermarkIdle splitsmarkIdleWatermark alignmentCoordinator

Watermark coordination на уровне source

Watermark — это самое сложное в event-time streaming. Pre-FLIP-27 sources имели простую но проблематичную модель: один watermark на subtask. Если у тебя Kafka source читает 10 partitions, и одна partition отстаёт (или временно мертва), watermark всего subtask-а stuck. FLIP-27 ввёл per-split watermarks + механизм их агрегации.

В этом уроке: как watermarks реально генерируются на уровне source, что значит idle, как работает watermark alignment для slow partitions, и почему всё это критично для production-readyiness.

Late events: allowedLateness и side outputs

Где раньше всё ломалось

Old API: SourceFunction.run(SourceContext). SourceContext имел метод emitWatermark(Watermark). Что это значит?

Если ты эмитишь Watermark(t=1000), ты говоришь pipeline-у: “не будет event с timestamp меньше 1000”. Это global guarantee для всего subtask-а.

Но если subtask читает 10 partitions, и одна partition отстала (consumer lag огромный, или вообще не пушит events), что делать?

Вариант A: emit watermark = min(всех partitions). Это safe, но если одна partition мертва, watermark всего subtask-а stuck на её last event time forever. Window-ы не закрываются.

Вариант B: emit watermark = max(всех partitions). Это даёт прогресс, но breaks correctness — события из отставшей partition будут late.

Вариант C: ignore slow partitions, but how decide?

В old API не было встроенного решения. Каждый коннектор писал свою логику, обычно с ошибками. FlinkKafkaConsumer (old) имел кучу багов и edge cases с watermarks.


FLIP-27 решение: per-split watermarks

В новом API watermark — это per-split state. Каждый split имеет свой собственный watermark generator. Source-operator агрегирует их.

Per-split watermarks: генерация и агрегация
Split 1 (kafka p0)WM = 1230Каждый split имеет свой watermark, обновляется при каждом emit. Generator (BoundedOutOfOrderness, AscendingTimestamps, custom) применяется per-split
Split 2 (kafka p1)WM = 1280Второй split. Это полностью независимый WatermarkGenerator instance. Не shared state с split 1
Split 3 (kafka p2)WM = 1150Третий split, отстаёт. Возможно потому что lag в этой partition, или events с меньшими timestamps
Split 4 (kafka p3)IDLEЧетвёртый split idle — нет events последние idleTimeout мс. Source помечает его markIdle(). Его watermark исключается из агрегации
Source operatoraggregated WMSource-operator берёт watermark = min(active splits). Idle splits исключены. В этом примере min(1230, 1280, 1150) = 1150. Этот WM эмитится в pipeline
emit WM = 1150
Downstreamreceives WM = 1150Downstream operator получает один watermark на input channel от source-task. Если есть keyBy, watermark снова агрегируется на receiver-side (min от всех channels)

output.createOutputForSplit(splitId) в pollNext возвращает SourceOutput, который scoped к одному split-у. Когда reader делает splitOutput.collect(record, timestamp), watermark generator для этого split-а update-ится. Когда reader делает splitOutput.emitWatermark(wm), watermark этого split-а становится wm.

Source-operator агрегирует watermarks. Стратегия по умолчанию: min(active splits) ignoring idle. Это даёт нам и progress (idle splits не блокируют), и correctness (active splits всегда учитываются).


Watermark strategy: per-split assignment

WatermarkStrategy задаётся при создании source-а:

WatermarkStrategy<MyRecord> strategy = WatermarkStrategy
    .<MyRecord>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((event, timestamp) -> event.getEventTime())
    .withIdleness(Duration.ofMinutes(1));

DataStream<MyRecord> stream = env.fromSource(
    kafkaSource,
    strategy,
    "kafka-source"
);

WatermarkStrategy — это фабрика, которая создаёт WatermarkGenerator и TimestampAssigner. Когда source-operator получает split, он вызывает strategy чтобы создать per-split instance generator-а и assigner-а.

Это значит: если у тебя 10 splits на subtask, у source-operator-а 10 generator instances и 10 assigner instances. Каждый видит только records своего split-а.

BoundedOutOfOrdernessGenerator (one of the standard generators):

public class BoundedOutOfOrdernessGenerator<T> implements WatermarkGenerator<T> {

    private final long maxOutOfOrderness; // 5000 ms
    private long maxTimestampSeen = Long.MIN_VALUE;

    @Override
    public void onEvent(T event, long timestamp, WatermarkOutput output) {
        maxTimestampSeen = Math.max(maxTimestampSeen, timestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestampSeen - maxOutOfOrderness - 1));
    }
}

Per-split instance. Каждые pipeline.auto-watermark-interval (default 200ms) Flink вызывает onPeriodicEmit на каждом generator-е и pushes watermark в split-output.

Если в каком-то split-е давно не было records, его maxTimestampSeen не обновляется, его watermark stuck. Это плохо для общего progress. Поэтому есть idleness.


Idleness: когда split становится idle

.withIdleness(Duration.ofMinutes(1)) — если у split-а не было events за минуту, он помечается idle.

Идея: idle split не контрибутит в aggregated watermark. Если все остальные splits progress-ят — общий watermark прогрессирует, idle split не блокирует.

Под капотом это обёртывает WatermarkGenerator в IdlenessAwareWatermarkGenerator:

public class IdlenessAwareWatermarkGenerator<T> implements WatermarkGenerator<T> {

    private final WatermarkGenerator<T> underlying;
    private final long idleTimeout; // 60_000 ms
    private long lastActivityTime;
    private boolean isIdle = false;

    @Override
    public void onEvent(T event, long timestamp, WatermarkOutput output) {
        if (isIdle) {
            output.markActive(); // Сообщить pipeline: я снова active
            isIdle = false;
        }
        lastActivityTime = currentSystemTime();
        underlying.onEvent(event, timestamp, output);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        long now = currentSystemTime();
        if (!isIdle && now - lastActivityTime > idleTimeout) {
            output.markIdle();
            isIdle = true;
            return; // не emit-ить watermark
        }
        if (!isIdle) {
            underlying.onPeriodicEmit(output);
        }
    }
}

output.markIdle() сигналит в source-aggregator: этот split idle. Aggregator excludes его. output.markActive() сигналит: split active again, включай его watermark в aggregation.

Это работает только если другие splits продолжают progress-ить. Если ВСЕ splits idle, aggregated watermark тоже stuck (нет inputs для агрегации). В этом случае ничего не сделаешь без withIdleness на уровне всего pipeline (то же самое, на operator side).

WARNING

Idleness — это balance correctness vs progress. Если ты выставишь idleTimeout = 5 секунд, и у тебя есть partition, где events с интервалом 10 секунд (рідко, но регулярно), Flink будет mark её idle каждый раз. Это значит, что её events будут эмититься, но к этому моменту watermark уже advance-нулся выше — events становятся late. Idleness не должен быть короче чем максимальный inter-event interval в твоих partitions.


Watermark alignment: новый механизм

В Flink 1.15 появился watermark alignment (FLIP-217) — механизм, который ограничивает разброс watermarks между splits/subtasks. До этого был только idleness, а alignment решает другую проблему.

Проблема alignment: у тебя 100 Kafka partitions, на 50 из них traffic нормальный (recent data), на 50 — replay старых данных (lag 24 часа). Без alignment, fast partitions будут сыпать данные быстро (recent), а slow partitions медленно (24-часовой lag). Pipeline-state будет расти, потому что:

  • Fast partition emit-ит event с t=2025-05-19T10:00.
  • Slow partition emit-ит event с t=2024-05-19T10:00 (год назад).
  • Window-агрегация в downstream имеет state для 365 дней.
  • Memory blow up.

Alignment решает это, “придерживая” fast splits, пока slow не догонят.

WatermarkStrategy<MyRecord> strategy = WatermarkStrategy
    .<MyRecord>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withWatermarkAlignment("source-group", Duration.ofMinutes(30), Duration.ofSeconds(1));

Параметры:

  • alignmentGroup (“source-group”): identifier, к которому относится alignment. Несколько sources могут быть в одной alignment-group и aligned вместе.
  • maxAllowedWatermarkDrift (30 min): максимальная разница между fastest и slowest split. Если fast split emit watermark на 30 минут впереди slow, fast пауза.
  • updateInterval (1 sec): как часто координатор синхронизирует watermarks (более частое — больше overhead, более точно).
Watermark alignment: координатор pauses fast splits
Reader 1WM = 10:00Subtask 1: его splits на live data. Watermark близок к real time
Reader 2WM = 09:55Subtask 2: тоже live data, но чуть отстаёт
Reader 3WM = 09:25Subtask 3: один из его splits в replay-моде. Watermark = 09:25
report
report
report
Coordinatoraligned WM = 09:25Coordinator на JM получает watermarks от всех readers. Вычисляет min, отправляет это как 'aligned watermark' обратно. drift threshold = 30 минут — Reader 1, 2 близко, Reader 3 — anchor
aligned = 09:25
aligned = 09:25
aligned = 09:25
Reader 1PAUSE splits emit > 09:55Reader 1 фильтрует свой read: только records с timestamp <= aligned + drift = 09:25 + 30min = 09:55. Splits, которые advanced бы дальше — pause через pauseOrResumeSplits()
Reader 2PAUSE splits emit > 09:55Reader 2 аналогично. Замедляется до уровня Reader 3, давая ему догнать
Reader 3продолжает replayReader 3 не aligned (он anchor), читает на полной скорости. Когда его watermark догонит Reader 1, 2 — все ressume

Реализация: каждый source-reader periodically reports его watermark в OperatorCoordinator. Coordinator вычисляет minWatermark across all readers + maxDrift. Этот alignedWatermark рассылается всем readers. Reader смотрит свой текущий per-split watermark vs alignedWatermark: если split going past alignedWatermark, reader пытается pause этот split.

pauseOrResumeSplits(toPause, toResume) — API SourceReader, который коннектор должен реализовать. Для Kafka: consumer.pause(partitions) / consumer.resume(partitions). Для files: skip new file open. Для DB: skip new query batch.

NOTE

Alignment работает только с FLIP-27 sources, которые реализуют pauseOrResumeSplits. Kafka source (modern) поддерживает. Legacy SourceFunction — нет, не работает с alignment. Если у тебя critical alignment-zависимая job, проверь что все source-ы новые.


Watermark на координаторе vs на subtask

Не путай два разных уровня alignment:

  1. Per-split, in-subtask aggregation (всегда). Каждый subtask agg-ирует watermarks своих splits через min(active). Не нужна конфигурация.

  2. Cross-subtask alignment (опционально, FLIP-217). Coordinator на JM agg-ирует watermarks всех subtasks, рассылает aligned, и subtask pauses splits drift-нувшие вперёд. Requires withWatermarkAlignment.

Уровень 2 имеет network overhead — каждый subtask reports каждую секунду в JM, JM рассылает. Для job-а с 1024 subtasks это 1024 reports + 1024 responses в секунду — managable для JM. Но если у тебя 10 source-ов с alignment, это 10240 messages/sec на JM — может быть hot path.

pipeline.watermark-alignment.timeout — если subtask не отвечает на alignment update в этот timeout, coordinator считает его dead и excludes. По умолчанию — same как RPC timeout (10 секунд).


Debugging: watermark не движется

Top-3 причины и как diagnose:

1. Все splits idle / нет данных.

Проверь numRecordsInPerSecond на source. Если 0 — нет данных, watermark не двинется. Если > 0, копать дальше.

2. Idleness не настроен, slow split блокирует.

Проверь: используй Flink Web UI -> Source -> Watermarks view. Если ты видишь, что разные subtasks имеют сильно разные watermarks (один subtask -infinity, другие моложе) — у этого subtask-а probably idle split без markIdle.

Решение: добавь .withIdleness(Duration.of...) в WatermarkStrategy.

3. Alignment блокирует fast splits.

Если у тебя withWatermarkAlignment и один split действительно очень медленный (e.g. dead partition), он будет вечно anchor — fast splits будут paused forever.

Решение: убедись что dead partitions handled через idleness первым (idle splits исключаются из alignment). Или используй больший maxDrift.

# Метрики для debugging
flink_taskmanager_job_task_operator_currentInputWatermark   # WM на input
flink_taskmanager_job_task_operator_currentOutputWatermark  # WM на output (после processing)
flink_taskmanager_job_task_operator_watermarkAlignmentDrift # для aligned sources

Custom WatermarkGenerator

Если ни одна из standard strategy не подходит, custom generator:

public class MyHybridWatermarkGenerator<T> implements WatermarkGenerator<T> {

    private long maxTimestamp = Long.MIN_VALUE;
    private final long allowedLatenessMs;
    private long lastEmittedWatermark = Long.MIN_VALUE;

    @Override
    public void onEvent(T event, long timestamp, WatermarkOutput output) {
        if (timestamp > maxTimestamp) {
            maxTimestamp = timestamp;
        }
        // Custom logic: emit watermark сразу после "special" events
        if (event.isCheckpoint()) {
            long wm = timestamp - allowedLatenessMs;
            if (wm > lastEmittedWatermark) {
                output.emitWatermark(new Watermark(wm));
                lastEmittedWatermark = wm;
            }
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        long wm = maxTimestamp - allowedLatenessMs;
        if (wm > lastEmittedWatermark) {
            output.emitWatermark(new Watermark(wm));
            lastEmittedWatermark = wm;
        }
    }
}

Pattern: maintain max timestamp, emit watermarks via онEvent (eager) или onPeriodicEmit (periodic, default). Никогда не emit watermark меньше previous (это broken contract).

Внимание: generator вызывается per-split. Если ты сохраняешь state в instance fields, он per-split. Если хочешь global state — используй SourceCoordinator и custom events.


Проверка знанийKnowledge check
Production job: Kafka source, 256 partitions, parallelism 16 (по 16 partitions на subtask). Pipeline имеет event-time window 1 час. Watermark не advance-ит и window-ы не закрываются — но только в weekends. Метрики: numRecordsInPerSecond на source ~10000, sourceIdleTime низкий, idleness не настроен. В чём проблема и как chitить?
ОтветAnswer
Гипотеза: в weekends на каких-то partitions traffic сильно падает (или вообще нулевой), но reader не воспринимает их как idle (idleness не настроен, и даже если бы был — записи есть, просто редкие). С 16 partitions per subtask: если одна имеет events с интервалом 2+ часов в weekend, её per-split watermark не двигается практически. min(всех partitions) в aggregator — этот partition блокирует. На weekdays traffic высокий, все partitions advance-ят watermark, проблемы нет. Решение варианты: (1) Включить .withIdleness(Duration.ofMinutes(5)) — slow partitions помечаются idle когда нет events 5 минут, исключаются из aggregation. Compromise: events from idle partition могут быть late если они приходят после window закрылся. (2) Использовать watermark alignment — фактически не решит, потому что alignment pauses fast, не accelerates slow. (3) Перестать иметь dead partitions (architecturally — может, нужно меньше partitions, или больше merge на producer side). Самое практичное — idleness 5 минут. Это типичная weekend-baga, которую ловят только в prod. Тест с заводом одной dead partition на staging должен ловить её до prod.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. FLIP-27 источник имеет per-split watermarks. Если subtask читает 5 splits, какой watermark эмитится source-operator-ом в downstream?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4