WatermarkStrategy
Watermark в Flink — это формальная гарантия о порядке событий. Watermark W(t) говорит: “я обещаю, что событий с event_time меньше или равно t больше не будет”. На основе этой гарантии операторы решают, когда закрывать окна, когда срабатывать триггерам, когда считать состояние финальным.
API WatermarkStrategy объединяет два компонента: timestamp assigner (откуда брать event time из события) и watermark generator (как генерировать watermarks из потока). От правильной их настройки зависит вся корректность event-time pipeline.
Базовая структура
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, recordTimestamp) -> event.timestamp);
DataStream<Event> withWatermarks = events.assignTimestampsAndWatermarks(strategy);
Что здесь происходит:
forBoundedOutOfOrderness(10s)— выбор готового watermark generator: “события могут опаздывать максимум на 10 секунд”.withTimestampAssigner(...)— функция, извлекающая event time из события.assignTimestampsAndWatermarks(strategy)— применить стратегию к потоку.
После этого вызова event_time и watermarks доступны во всех downstream-операторах: window operations, timers в KeyedProcessFunction, side outputs для late data.
Python (PyFlink):
from pyflink.common import WatermarkStrategy, Duration
from pyflink.common.watermark_strategy import TimestampAssigner
class EventTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, event, record_timestamp):
return event.timestamp
strategy = WatermarkStrategy \
.for_bounded_out_of_orderness(Duration.of_seconds(10)) \
.with_timestamp_assigner(EventTimestampAssigner())
events_with_watermarks = events.assign_timestamps_and_watermarks(strategy)
forBoundedOutOfOrderness: дефолт для production
Самая популярная стратегия. Параметр — максимально допустимая задержка событий.
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
Алгоритм:
- На каждое событие Flink запоминает максимум event time, увиденный до сих пор:
maxEventTime. - Watermark генерируется как
maxEventTime - boundedOutOfOrderness. То есть watermark всегда отстаёт от максимального события на заданную длительность. - При новом событии maxEventTime обновляется (если больше предыдущего); watermark пересчитывается.
Пример с boundedOutOfOrderness = 10s:
| Time | Event arrives | Event time | maxEventTime | Watermark |
|---|---|---|---|---|
| t1 | e1 | 10:00:00 | 10:00:00 | 09:59:50 |
| t2 | e2 | 10:00:05 | 10:00:05 | 09:59:55 |
| t3 | e3 | 10:00:03 (out-of-order) | 10:00:05 | 09:59:55 |
| t4 | e4 | 10:00:20 | 10:00:20 | 10:00:10 |
Watermark 10:00:10 означает: окна с end <= 10:00:10 закрываются. Если e3 пришёл с timestamp 10:00:03 после того как watermark уже был 10:00:10 — это late event.
Как выбрать значение?
- Замерьте реальную задержку событий в production: 99-percentile latency между event creation и Flink ingestion.
- Используйте это значение плюс небольшой запас (например, p99 * 1.5).
- Альтернатива: статистика по самым late events за последние 7-30 дней.
Слишком большое значение = большая latency window emission. Слишком маленькое = много late events, dropped или re-emit.
Хорошее стартовое значение для большинства Kafka-based pipelines — 5-30 секунд. Для cross-region Kafka с MirrorMaker — 1-2 минуты. Для mobile event collection — 5-15 минут. Замеряйте, не угадывайте.
forMonotonousTimestamps: строгий вариант
Если события приходят строго по порядку (никаких out-of-order), используйте forMonotonousTimestamps:
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner((e, ts) -> e.timestamp);
Watermark = текущему event time (без задержки). Это более агрессивно — окна закрываются сразу при прохождении timestamp.
Когда применимо:
- Один источник, один partition, события генерируются последовательно (например, файлы из FileSource).
- Pre-sorted batch data.
- Тесты.
В production Kafka с многими partitions это почти никогда не подходит — даже единичные out-of-order events будут late, и любая задержка одной партиции приведёт к проблемам.
withIdleness: лечение зависающих watermarks
Watermark — это глобальный минимум по всем upstream-источникам. Если одна партиция Kafka idle (не получает событий), её watermark не продвигается, и весь downstream watermark зависает.
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withIdleness(Duration.ofMinutes(1))
.withTimestampAssigner((e, ts) -> e.timestamp);
С withIdleness(1min): партиция, не получившая событий 1 минуту, помечается idle. При вычислении общего watermark idle-партиции игнорируются — watermark двигается по активным источникам.
Это критично для:
- Ночных периодов низкой нагрузки.
- Topics с неравномерной нагрузкой по партициям.
- Pipeline с несколькими источниками, где один может временно стихнуть.
Без withIdleness одна молчащая партиция Kafka блокирует весь job. Окна перестают эмитить, никаких ошибок — просто ‘тишина’. Это одна из самых частых причин ‘job работает, но output остановился’. Всегда включайте withIdleness в production.
Композиция и порядок вызовов
WatermarkStrategy — fluent builder. Порядок методов важен:
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10)) // 1. watermark generator
.withIdleness(Duration.ofMinutes(1)) // 2. idle handling
.withTimestampAssigner((e, ts) -> e.timestamp); // 3. timestamp assigner
withIdleness должен идти после forBoundedOutOfOrderness. withTimestampAssigner обычно последний — это assigner, не generator, и его поведение независимо.
Альтернативный паттерн с готовым TimestampAssigner-классом:
public class EventTimestampExtractor implements SerializableTimestampAssigner<Event> {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner(new EventTimestampExtractor());
Класс полезен, если нужна dependency injection или сложная логика извлечения timestamp.
Custom WatermarkGenerator
Внутренности WatermarkGenerator: как работает onPeriodicEmitДля специфической логики реализуйте собственный generator:
public class CustomWatermarkGenerator implements WatermarkGenerator<Event> {
private long maxTimestamp = Long.MIN_VALUE;
private final long maxOutOfOrderness = Duration.ofSeconds(10).toMillis();
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
// Можно эмитить watermark на каждое событие
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness - 1));
}
}
WatermarkStrategy<Event> strategy = WatermarkStrategy
.forGenerator(ctx -> new CustomWatermarkGenerator())
.withTimestampAssigner((e, ts) -> e.timestamp);
onEvent — вызывается на каждое событие. onPeriodicEmit — вызывается периодически (по умолчанию каждые 200ms, настраивается через pipeline.auto-watermark-interval).
Custom generator пригождается для:
- Адаптивного boundedOutOfOrderness, меняющегося в зависимости от нагрузки.
- Бизнес-логики типа “ждать события ‘session_end’ перед emit watermark”.
- Дебаг-режима с логированием watermarks.
Per-partition watermarks
В Flink Kafka connector watermarks могут вычисляться per partition независимо. Это даёт более точные watermarks для idle-handling:
KafkaSource<Event> source = KafkaSource.<Event>builder()
.setBootstrapServers("localhost:9092")
.setTopics("events")
.setDeserializer(...)
.build();
DataStream<Event> stream = env.fromSource(
source,
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withIdleness(Duration.ofMinutes(1))
.withTimestampAssigner((e, ts) -> e.timestamp),
"Kafka Source"
);
В этой схеме WatermarkStrategy применяется в source-операторе, и Flink вычисляет watermark per Kafka партиция. Глобальный watermark в downstream — это min() по активным (не-idle) партициям.
Альтернатива — assignTimestampsAndWatermarks после источника — менее эффективна: Flink не знает о структуре партиций, и watermark — глобальный по всему потоку.
Что НЕ делать с watermarks
1. assignTimestampsAndWatermarks дважды. Не вызывайте strategy несколько раз — Flink использует последнюю и игнорирует первые. Это редко даёт ожидаемый результат.
// ПЛОХО
events.assignTimestampsAndWatermarks(strategy1)
.keyBy(...)
.assignTimestampsAndWatermarks(strategy2); // переопределяет
2. Слишком оптимистичный boundedOutOfOrderness. Если ставите 1 секунду, а реальная latency 30 секунд — каждое второе событие будет late. Не угадывайте, замеряйте.
3. Игнорировать withIdleness. Симптомы ‘job работает, output отсутствует’ почти всегда — idle partition. См. warning выше.
4. Использовать processing time как timestamp. Если withTimestampAssigner((e, ts) -> System.currentTimeMillis()) — вы получили processing time с лишним overhead, и потеряли всю выгоду event time.
Production-чеклист
- Используйте
forBoundedOutOfOrdernessсо значением, основанным на p99 реальной задержки. - Всегда добавляйте
withIdlenessдля job-ов с Kafka источниками или несколькими параллельными источниками. - Применяйте WatermarkStrategy в источнике (через
env.fromSource(source, strategy, ...)) для per-partition watermarks. - Мониторьте watermark в Web UI: должно быть не более 1-2x boundedOutOfOrderness отставание от current processing time.
- Метрика
numLateRecordsDroppedper operator — мониторить и алертить.
Попробуй сам
-
Watermark observability. Запустите job с WatermarkStrategy и зайдите в Flink Web UI -> Job -> Operator -> Watermarks. Наблюдайте, как watermark двигается. Сравните с current processing time — отставание должно быть около boundedOutOfOrderness.
-
Idle partition test. Создайте Kafka topic с 4 партициями. Отправляйте события только в партицию 0. Без withIdleness watermark зависает. Добавьте withIdleness(30s) — после 30 секунд тишины на других партициях watermark продолжит продвигаться.
-
Custom generator. Реализуйте generator с адаптивным maxOutOfOrderness: при низкой нагрузке (мало событий) — 30 секунд, при высокой — 5 секунд. Сравните latency emission окон с фиксированным значением.