Allowed lateness и late events
В идеальном мире события приходят строго по порядку их event time. В реальном — Kafka партиция отстаёт, сеть лагает, мобильный клиент офлайн два часа и потом высылает накопленное. Эти late events (события, чей timestamp меньше текущего watermark) — нормальное явление, и Flink даёт инструменты для их обработки.
В этом уроке разбираем allowedLateness, side output для late, и как окно ведёт себя при поздних повторных срабатываниях.
Что считается late
Late event — событие, чей timestamp меньше текущего watermark на момент обработки. После того как watermark прошёл за конец окна и окно эмитилось, любое событие, попадающее в этот временной интервал, считается late.
Window [10:00, 11:00)
Watermark 11:05 -> окно эмитилось (timer на window.maxTimestamp = 11:00 сработал)
Event с timestamp 10:45 приходит сейчас -> LATE
Без allowedLateness — event дропается.
С allowedLateness(15min) — event обработается, окно ре-эмиттит.
Настройка allowedLateness
events.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
.allowedLateness(Duration.ofMinutes(15))
.reduce((a, b) -> a.merge(b));
Параметр allowedLateness определяет, как долго окно живёт после своего нормального закрытия. В этот период late events:
- Принимаются окном.
- Вызывают повторный FIRE.
- Обновлённый результат идёт downstream.
После истечения window.end + allowedLateness окно окончательно closing — state очищается, late events дропаются (или идут в side output, см. ниже).
events \
.key_by(lambda e: e.key) \
.window(TumblingEventTimeWindows.of(Duration.of_minutes(5))) \
.allowed_lateness(Duration.of_minutes(15)) \
.reduce(lambda a, b: a.merge(b))
Side output для дропнутых late events
События, которые слишком поздно даже для allowedLateness, по умолчанию молча дропаются. В production это плохо — вы не узнаете, что часть данных потеряна. Side output даёт способ их собрать:
OutputTag<Event> lateTag = new OutputTag<Event>("late-events"){};
SingleOutputStreamOperator<Result> windowed = events
.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
.allowedLateness(Duration.ofMinutes(15))
.sideOutputLateData(lateTag)
.reduce(...);
DataStream<Event> lateEvents = windowed.getSideOutput(lateTag);
lateEvents
.map(e -> new LateEventAlert(e))
.sinkTo(kafkaLateSink); // отправить в DLQ
OutputTag — это типизированный handle для побочного потока. sideOutputLateData сообщает оператору окна: “вместо drop отправь в этот tag”. Затем через getSideOutput(tag) можно получить отдельный DataStream и обработать его (например, отправить в alerting или специальный backfill topic).
Всегда используйте sideOutputLateData в production. Это даёт observability: вы видите количество дропнутых событий в метриках и можете отреагировать (поднять allowedLateness, разобраться с upstream-задержками). Без side output потеря данных тиха и долго остаётся незамеченной.
Cost of allowed lateness
Большой allowedLateness означает, что окно дольше живёт в state:
| allowedLateness | Window state lifetime |
|---|---|
| 0 (по умолчанию) | window length only |
| 15 min | window length + 15 min |
| 1 hour | window length + 1 hour |
| 24 hours | window length + 24 hours |
Для tumbling-окна 1 час с allowedLateness=24h:
- В любой момент в памяти живут 25 часов окон одновременно (24 уже закрытых + 1 текущее).
- Размер state per ключ умножается на 25.
Для sliding-окон эффект ещё хуже: каждое из перекрывающихся окон живёт дополнительное время.
Установка allowedLateness=Duration.ofDays(7) — это конкретно плохая идея для активных потоков. Размер state быстро становится неуправляемым.
Правило большого пальца: allowedLateness должно быть на порядок меньше windowLength. Для часового окна — 5-15 минут. Для дневного — до 4-6 часов. Для месячного — до 1-2 дней.
Поведение разных функций при re-emit
При повторных FIRE из-за late events результат окна обновляется. Что это значит для downstream:
ReduceFunction / AggregateFunction. Reducer/aggregator обновляет accumulator с late event и эмиттит новый результат.
High watermark в Kafka: схожая концепция прогресса Downstream получает поток (window_key, result_v1), (window_key, result_v2) для одного и того же окна. Sink должен поддерживать upsert-семантику — replace по window_key, не append.
ProcessWindowFunction. При re-emit функция вызывается заново со всем Iterable окна (включая late events). Это значит, что повторная обработка может быть медленной для больших окон. Лучше использовать combination AggregateFunction + ProcessWindowFunction, где первая обновляет accumulator инкрементально.
Window output for SQL/Table API. В Table API late updates автоматически генерируют retract/append-сообщения для downstream changelog. Это позволяет sinks вроде upsert-kafka корректно обработать обновления.
Late events vs early events
Симметричный сценарий — события из будущего (timestamp > current_watermark + значительный gap). Flink их принимает — для окна они “обычные”, но они могут двигать watermark в большое будущее, что закроет много окон преждевременно.
Это не late events, это “outliers” по событийному времени. Обработка:
- Фильтровать в source — отбрасывать события с подозрительно большим future timestamp.
- Не использовать как источник watermark —
withTimestampAssignerотдельно отforBoundedOutOfOrderness. forMonotonousTimestampsстроже — не подходит для unordered входных данных, но фильтрует extreme outliers.
Allowed lateness vs side output
Эти два механизма решают разные задачи:
Allowed lateness: “позволяем окну принимать late events до этого момента и обновлять output”.
Side output для late: “когда даже allowedLateness не помогло — не теряй данные, собери в отдельный поток”.
Они комбинируются. Полная production-конфигурация:
OutputTag<Event> lateTag = new OutputTag<Event>("late-events"){};
events
.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
.allowedLateness(Duration.ofMinutes(15)) // re-emit для умеренно поздних
.sideOutputLateData(lateTag) // капчер супер-поздних
.aggregate(new MyAggregator(), new MyProcessFn());
Result:
- События до
window.end— нормальная обработка. - События от
window.endдоwindow.end + 15min— re-emit окна. - События после
window.end + 15min— вlateTagside output для отдельной обработки.
Downstream и upserts
При re-emit окно посылает обновления для одного и того же window_key (и timestamp). Sink должен это правильно обработать.
Kafka upsert-sink:
KafkaSink<Result> sink = KafkaSink.<Result>builder()
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("results")
.setKeySerializationSchema(r -> r.windowKey.getBytes()) // ключ = (window_key, window_start)
.setValueSerializationSchema(new ResultSerializer())
.build())
.build();
Compacted topic с правильным ключом сохранит только последнее значение для каждого окна — это даёт корректный upsert downstream.
Database upsert-sink:
INSERT INTO window_results (window_key, window_start, result_value)
VALUES (?, ?, ?)
ON CONFLICT (window_key, window_start) DO UPDATE SET
result_value = EXCLUDED.result_value;
Без upsert downstream накопит несколько записей для одного окна — придётся фильтровать “по последней”.
Append-only sinks (например, file sink без compaction, или Kafka topic с RETAIN_ALL) при re-emit получат несколько записей для одного окна. Downstream analytics увидит дублирующиеся данные — суммы будут завышены, count неверный. Всегда проектируйте sink с учётом возможных re-emit, если allowedLateness > 0.
Сценарии: когда какое allowedLateness
Internal microservices, низкая задержка партиций. allowedLateness = 0 — данные приходят почти строго по порядку. Side output для отлова единичных аномалий.
Public Kafka с partitions из разных DC. allowedLateness = 5-15 минут — учитывает обычную сетевую задержку. Side output для catastrophic delays.
Mobile event collection. allowedLateness = 1-4 часа — мобильные клиенты могут быть офлайн. Бизнес-решение: насколько важна “недавняя” аналитика vs полнота.
CDC из historical sources. allowedLateness в обычном смысле не нужен — лучше использовать withWatermarkAlignment (см. урок 6.4). При backfill события могут “опережать” друг друга в плане event time.
Production-чеклист
- Установите явный
allowedLatenessдля всех event-time окон в production. Дефолт 0 — обычно слишком строгий. - Добавьте
sideOutputLateDataс маршрутизацией в DLQ-топик или alerting. Мониторинг: количество late events per minute. - Sink должен быть upsert-aware. Если sink append-only — рассмотрите ProcessWindowFunction с явной “первый или последний emit?” логикой через windowState.
- Замерьте дополнительный размер state от allowedLateness. Если он критичен — уменьшите длительность или используйте более sparse окна.
Попробуй сам
-
Re-emit observation. Запустите job с 5-минутным tumbling-окном и allowedLateness(10min). Отправьте сначала нормальный поток, потом задержанное событие. Посмотрите downstream — должно прийти 2 результата для того же окна.
-
Side output behavior. Установите allowedLateness(0) и sideOutputLateData. Отправьте late event. Убедитесь, что оно появилось в side output stream, а не в основном.
-
Memory cost of long allowedLateness. Создайте job с tumbling-окном 5 минут и allowedLateness 24 часа. Через несколько часов работы посмотрите размер state и количество live окон в памяти.