Learning Platform
Глоссарий Troubleshooting
Урок 07.02 · 22 мин
Средний
WatermarkStrategyWatermarksBounded Out-of-OrdernessMonotonous TimestampsIdlenessTimestampAssigner

WatermarkStrategy

Watermark в Flink — это формальная гарантия о порядке событий. Watermark W(t) говорит: “я обещаю, что событий с event_time меньше или равно t больше не будет”. На основе этой гарантии операторы решают, когда закрывать окна, когда срабатывать триггерам, когда считать состояние финальным.

API WatermarkStrategy объединяет два компонента: timestamp assigner (откуда брать event time из события) и watermark generator (как генерировать watermarks из потока). От правильной их настройки зависит вся корректность event-time pipeline.


Базовая структура

WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withTimestampAssigner((event, recordTimestamp) -> event.timestamp);

DataStream<Event> withWatermarks = events.assignTimestampsAndWatermarks(strategy);

Что здесь происходит:

  1. forBoundedOutOfOrderness(10s) — выбор готового watermark generator: “события могут опаздывать максимум на 10 секунд”.
  2. withTimestampAssigner(...) — функция, извлекающая event time из события.
  3. assignTimestampsAndWatermarks(strategy) — применить стратегию к потоку.

После этого вызова event_time и watermarks доступны во всех downstream-операторах: window operations, timers в KeyedProcessFunction, side outputs для late data.

Python (PyFlink):

from pyflink.common import WatermarkStrategy, Duration
from pyflink.common.watermark_strategy import TimestampAssigner

class EventTimestampAssigner(TimestampAssigner):
    def extract_timestamp(self, event, record_timestamp):
        return event.timestamp

strategy = WatermarkStrategy \
    .for_bounded_out_of_orderness(Duration.of_seconds(10)) \
    .with_timestamp_assigner(EventTimestampAssigner())

events_with_watermarks = events.assign_timestamps_and_watermarks(strategy)

forBoundedOutOfOrderness: дефолт для production

Самая популярная стратегия. Параметр — максимально допустимая задержка событий.

WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))

Алгоритм:

  • На каждое событие Flink запоминает максимум event time, увиденный до сих пор: maxEventTime.
  • Watermark генерируется как maxEventTime - boundedOutOfOrderness. То есть watermark всегда отстаёт от максимального события на заданную длительность.
  • При новом событии maxEventTime обновляется (если больше предыдущего); watermark пересчитывается.

Пример с boundedOutOfOrderness = 10s:

TimeEvent arrivesEvent timemaxEventTimeWatermark
t1e110:00:0010:00:0009:59:50
t2e210:00:0510:00:0509:59:55
t3e310:00:03 (out-of-order)10:00:0509:59:55
t4e410:00:2010:00:2010:00:10

Watermark 10:00:10 означает: окна с end <= 10:00:10 закрываются. Если e3 пришёл с timestamp 10:00:03 после того как watermark уже был 10:00:10 — это late event.

Как выбрать значение?

  • Замерьте реальную задержку событий в production: 99-percentile latency между event creation и Flink ingestion.
  • Используйте это значение плюс небольшой запас (например, p99 * 1.5).
  • Альтернатива: статистика по самым late events за последние 7-30 дней.

Слишком большое значение = большая latency window emission. Слишком маленькое = много late events, dropped или re-emit.

TIP

Хорошее стартовое значение для большинства Kafka-based pipelines — 5-30 секунд. Для cross-region Kafka с MirrorMaker — 1-2 минуты. Для mobile event collection — 5-15 минут. Замеряйте, не угадывайте.


forMonotonousTimestamps: строгий вариант

Если события приходят строго по порядку (никаких out-of-order), используйте forMonotonousTimestamps:

WatermarkStrategy.<Event>forMonotonousTimestamps()
    .withTimestampAssigner((e, ts) -> e.timestamp);

Watermark = текущему event time (без задержки). Это более агрессивно — окна закрываются сразу при прохождении timestamp.

Когда применимо:

  • Один источник, один partition, события генерируются последовательно (например, файлы из FileSource).
  • Pre-sorted batch data.
  • Тесты.

В production Kafka с многими partitions это почти никогда не подходит — даже единичные out-of-order events будут late, и любая задержка одной партиции приведёт к проблемам.


withIdleness: лечение зависающих watermarks

Watermark — это глобальный минимум по всем upstream-источникам. Если одна партиция Kafka idle (не получает событий), её watermark не продвигается, и весь downstream watermark зависает.

WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withIdleness(Duration.ofMinutes(1))
    .withTimestampAssigner((e, ts) -> e.timestamp);

С withIdleness(1min): партиция, не получившая событий 1 минуту, помечается idle. При вычислении общего watermark idle-партиции игнорируются — watermark двигается по активным источникам.

Это критично для:

  • Ночных периодов низкой нагрузки.
  • Topics с неравномерной нагрузкой по партициям.
  • Pipeline с несколькими источниками, где один может временно стихнуть.
WARNING

Без withIdleness одна молчащая партиция Kafka блокирует весь job. Окна перестают эмитить, никаких ошибок — просто ‘тишина’. Это одна из самых частых причин ‘job работает, но output остановился’. Всегда включайте withIdleness в production.


Композиция и порядок вызовов

WatermarkStrategy — fluent builder. Порядок методов важен:

WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))  // 1. watermark generator
    .withIdleness(Duration.ofMinutes(1))                       // 2. idle handling
    .withTimestampAssigner((e, ts) -> e.timestamp);            // 3. timestamp assigner

withIdleness должен идти после forBoundedOutOfOrderness. withTimestampAssigner обычно последний — это assigner, не generator, и его поведение независимо.

Альтернативный паттерн с готовым TimestampAssigner-классом:

public class EventTimestampExtractor implements SerializableTimestampAssigner<Event> {
    @Override
    public long extractTimestamp(Event element, long recordTimestamp) {
        return element.timestamp;
    }
}

WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withTimestampAssigner(new EventTimestampExtractor());

Класс полезен, если нужна dependency injection или сложная логика извлечения timestamp.


Custom WatermarkGenerator

Внутренности WatermarkGenerator: как работает onPeriodicEmit

Для специфической логики реализуйте собственный generator:

public class CustomWatermarkGenerator implements WatermarkGenerator<Event> {
    private long maxTimestamp = Long.MIN_VALUE;
    private final long maxOutOfOrderness = Duration.ofSeconds(10).toMillis();

    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
        // Можно эмитить watermark на каждое событие
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness - 1));
    }
}

WatermarkStrategy<Event> strategy = WatermarkStrategy
    .forGenerator(ctx -> new CustomWatermarkGenerator())
    .withTimestampAssigner((e, ts) -> e.timestamp);

onEvent — вызывается на каждое событие. onPeriodicEmit — вызывается периодически (по умолчанию каждые 200ms, настраивается через pipeline.auto-watermark-interval).

Custom generator пригождается для:

  • Адаптивного boundedOutOfOrderness, меняющегося в зависимости от нагрузки.
  • Бизнес-логики типа “ждать события ‘session_end’ перед emit watermark”.
  • Дебаг-режима с логированием watermarks.

Per-partition watermarks

В Flink Kafka connector watermarks могут вычисляться per partition независимо. Это даёт более точные watermarks для idle-handling:

KafkaSource<Event> source = KafkaSource.<Event>builder()
    .setBootstrapServers("localhost:9092")
    .setTopics("events")
    .setDeserializer(...)
    .build();

DataStream<Event> stream = env.fromSource(
    source,
    WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
        .withIdleness(Duration.ofMinutes(1))
        .withTimestampAssigner((e, ts) -> e.timestamp),
    "Kafka Source"
);

В этой схеме WatermarkStrategy применяется в source-операторе, и Flink вычисляет watermark per Kafka партиция. Глобальный watermark в downstream — это min() по активным (не-idle) партициям.

Альтернатива — assignTimestampsAndWatermarks после источника — менее эффективна: Flink не знает о структуре партиций, и watermark — глобальный по всему потоку.


Что НЕ делать с watermarks

1. assignTimestampsAndWatermarks дважды. Не вызывайте strategy несколько раз — Flink использует последнюю и игнорирует первые. Это редко даёт ожидаемый результат.

// ПЛОХО
events.assignTimestampsAndWatermarks(strategy1)
      .keyBy(...)
      .assignTimestampsAndWatermarks(strategy2);  // переопределяет

2. Слишком оптимистичный boundedOutOfOrderness. Если ставите 1 секунду, а реальная latency 30 секунд — каждое второе событие будет late. Не угадывайте, замеряйте.

3. Игнорировать withIdleness. Симптомы ‘job работает, output отсутствует’ почти всегда — idle partition. См. warning выше.

4. Использовать processing time как timestamp. Если withTimestampAssigner((e, ts) -> System.currentTimeMillis()) — вы получили processing time с лишним overhead, и потеряли всю выгоду event time.


Production-чеклист

  • Используйте forBoundedOutOfOrderness со значением, основанным на p99 реальной задержки.
  • Всегда добавляйте withIdleness для job-ов с Kafka источниками или несколькими параллельными источниками.
  • Применяйте WatermarkStrategy в источнике (через env.fromSource(source, strategy, ...)) для per-partition watermarks.
  • Мониторьте watermark в Web UI: должно быть не более 1-2x boundedOutOfOrderness отставание от current processing time.
  • Метрика numLateRecordsDropped per operator — мониторить и алертить.
WatermarkStrategy: компоненты
Извлекает event time из payload события. Поле order.created_at, Kafka header timestamp, или вычисляется логикой.
extracted
На основе видимых event times генерирует watermarks: forBoundedOutOfOrderness, forMonotonousTimestamps или custom.
Опциональный wrapper: если в N времени не было событий, помечать партицию/source как idle. Без этого молчащие partition блокируют watermark.
combined
Окна и таймеры используют watermarks для решений о закрытии. EventTimeTrigger возвращает FIRE_AND_PURGE при watermark >= window.end.

Попробуй сам

  1. Watermark observability. Запустите job с WatermarkStrategy и зайдите в Flink Web UI -> Job -> Operator -> Watermarks. Наблюдайте, как watermark двигается. Сравните с current processing time — отставание должно быть около boundedOutOfOrderness.

  2. Idle partition test. Создайте Kafka topic с 4 партициями. Отправляйте события только в партицию 0. Без withIdleness watermark зависает. Добавьте withIdleness(30s) — после 30 секунд тишины на других партициях watermark продолжит продвигаться.

  3. Custom generator. Реализуйте generator с адаптивным maxOutOfOrderness: при низкой нагрузке (мало событий) — 30 секунд, при высокой — 5 секунд. Сравните latency emission окон с фиксированным значением.

Проверка знанийKnowledge check
У Flink-job с источником Kafka (8 партиций) после midnight все окна перестали эмитить результаты. В Web UI watermark показан как очень старое значение. Backpressure 0%, ошибок в логах нет. Что вероятная причина и какое исправление?
ОтветAnswer
Скорее всего, одна или несколько Kafka партиций стали idle после полуночи — нагрузка упала, и часть партиций перестала получать новые события. Watermark в Flink — это глобальный минимум по всем upstream-источникам, включая каждую Kafka партицию (при per-partition watermarks). Молчащая партиция блокирует продвижение watermark для всего job-а. EventTimeTrigger не сработает, пока watermark не пройдёт за window.maxTimestamp(), поэтому окна копят данные и не emit. Backpressure 0% — нагрузки на job нет, всё "спокойно". Решение: добавить .withIdleness(Duration.ofMinutes(1)) к WatermarkStrategy. После этого партиции, не получавшие событий 1 минуту, будут помечаться idle и игнорироваться при вычислении глобального watermark. Окна снова начнут закрываться по watermark от активных партиций. Это одна из самых распространённых production-проблем event-time job-ов с переменной нагрузкой.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. У job-а в Flink Web UI watermark показан как очень старое значение (Long.MIN_VALUE или вчерашнее число), а текущие события поступают. Backpressure 0%, ошибок нет. WatermarkStrategy.forBoundedOutOfOrderness(10s) применён. Что наиболее вероятная причина?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4