Learning Platform
Глоссарий Troubleshooting
Урок 06.05 · 16 мин
Средний
Allowed LatenessLate EventsSide OutputsRe-emitWindow Lifecycle

Allowed lateness и late events

В идеальном мире события приходят строго по порядку их event time. В реальном — Kafka партиция отстаёт, сеть лагает, мобильный клиент офлайн два часа и потом высылает накопленное. Эти late events (события, чей timestamp меньше текущего watermark) — нормальное явление, и Flink даёт инструменты для их обработки.

В этом уроке разбираем allowedLateness, side output для late, и как окно ведёт себя при поздних повторных срабатываниях.


Что считается late

Late event — событие, чей timestamp меньше текущего watermark на момент обработки. После того как watermark прошёл за конец окна и окно эмитилось, любое событие, попадающее в этот временной интервал, считается late.

Window [10:00, 11:00)
Watermark 11:05  -> окно эмитилось (timer на window.maxTimestamp = 11:00 сработал)
Event с timestamp 10:45 приходит сейчас -> LATE

Без allowedLateness — event дропается.
С allowedLateness(15min) — event обработается, окно ре-эмиттит.
Lifecycle окна с allowed lateness
Конец окна по event time. Окно содержит события, накопленные за час. Но trigger ещё не сработал, watermark = 10:55 (отстаёт на 5 минут).
Watermark прошёл за конец окна. EventTimeTrigger возвращает FIRE_AND_PURGE. Окно эмиттит результат. С allowedLateness=15min state не очищается — окно остаётся в памяти ещё 15 минут event time.
Late event попадает в окно [10:00, 11:00). С allowedLateness — обрабатывается. EventTimeTrigger возвращает FIRE: окно re-emit с обновлёнными данными.
Watermark прошёл за window.end + allowedLateness (11:00 + 15min = 11:15). Окно окончательно закрывается. Все последующие late events попадают в side output.

Настройка allowedLateness

events.keyBy(Event::getKey)
      .window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
      .allowedLateness(Duration.ofMinutes(15))
      .reduce((a, b) -> a.merge(b));

Параметр allowedLateness определяет, как долго окно живёт после своего нормального закрытия. В этот период late events:

  1. Принимаются окном.
  2. Вызывают повторный FIRE.
  3. Обновлённый результат идёт downstream.

После истечения window.end + allowedLateness окно окончательно closing — state очищается, late events дропаются (или идут в side output, см. ниже).

events \
    .key_by(lambda e: e.key) \
    .window(TumblingEventTimeWindows.of(Duration.of_minutes(5))) \
    .allowed_lateness(Duration.of_minutes(15)) \
    .reduce(lambda a, b: a.merge(b))

Side output для дропнутых late events

События, которые слишком поздно даже для allowedLateness, по умолчанию молча дропаются. В production это плохо — вы не узнаете, что часть данных потеряна. Side output даёт способ их собрать:

OutputTag<Event> lateTag = new OutputTag<Event>("late-events"){};

SingleOutputStreamOperator<Result> windowed = events
    .keyBy(Event::getKey)
    .window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
    .allowedLateness(Duration.ofMinutes(15))
    .sideOutputLateData(lateTag)
    .reduce(...);

DataStream<Event> lateEvents = windowed.getSideOutput(lateTag);

lateEvents
    .map(e -> new LateEventAlert(e))
    .sinkTo(kafkaLateSink);  // отправить в DLQ

OutputTag — это типизированный handle для побочного потока. sideOutputLateData сообщает оператору окна: “вместо drop отправь в этот tag”. Затем через getSideOutput(tag) можно получить отдельный DataStream и обработать его (например, отправить в alerting или специальный backfill topic).

TIP

Всегда используйте sideOutputLateData в production. Это даёт observability: вы видите количество дропнутых событий в метриках и можете отреагировать (поднять allowedLateness, разобраться с upstream-задержками). Без side output потеря данных тиха и долго остаётся незамеченной.


Cost of allowed lateness

Большой allowedLateness означает, что окно дольше живёт в state:

allowedLatenessWindow state lifetime
0 (по умолчанию)window length only
15 minwindow length + 15 min
1 hourwindow length + 1 hour
24 hourswindow length + 24 hours

Для tumbling-окна 1 час с allowedLateness=24h:

  • В любой момент в памяти живут 25 часов окон одновременно (24 уже закрытых + 1 текущее).
  • Размер state per ключ умножается на 25.

Для sliding-окон эффект ещё хуже: каждое из перекрывающихся окон живёт дополнительное время.

Установка allowedLateness=Duration.ofDays(7) — это конкретно плохая идея для активных потоков. Размер state быстро становится неуправляемым.

Правило большого пальца: allowedLateness должно быть на порядок меньше windowLength. Для часового окна — 5-15 минут. Для дневного — до 4-6 часов. Для месячного — до 1-2 дней.


Поведение разных функций при re-emit

При повторных FIRE из-за late events результат окна обновляется. Что это значит для downstream:

ReduceFunction / AggregateFunction. Reducer/aggregator обновляет accumulator с late event и эмиттит новый результат.

High watermark в Kafka: схожая концепция прогресса Downstream получает поток (window_key, result_v1), (window_key, result_v2) для одного и того же окна. Sink должен поддерживать upsert-семантику — replace по window_key, не append.

ProcessWindowFunction. При re-emit функция вызывается заново со всем Iterable окна (включая late events). Это значит, что повторная обработка может быть медленной для больших окон. Лучше использовать combination AggregateFunction + ProcessWindowFunction, где первая обновляет accumulator инкрементально.

Window output for SQL/Table API. В Table API late updates автоматически генерируют retract/append-сообщения для downstream changelog. Это позволяет sinks вроде upsert-kafka корректно обработать обновления.


Late events vs early events

Симметричный сценарий — события из будущего (timestamp > current_watermark + значительный gap). Flink их принимает — для окна они “обычные”, но они могут двигать watermark в большое будущее, что закроет много окон преждевременно.

Это не late events, это “outliers” по событийному времени. Обработка:

  • Фильтровать в source — отбрасывать события с подозрительно большим future timestamp.
  • Не использовать как источник watermarkwithTimestampAssigner отдельно от forBoundedOutOfOrderness.
  • forMonotonousTimestamps строже — не подходит для unordered входных данных, но фильтрует extreme outliers.

Allowed lateness vs side output

Эти два механизма решают разные задачи:

Allowed lateness: “позволяем окну принимать late events до этого момента и обновлять output”.

Side output для late: “когда даже allowedLateness не помогло — не теряй данные, собери в отдельный поток”.

Они комбинируются. Полная production-конфигурация:

OutputTag<Event> lateTag = new OutputTag<Event>("late-events"){};

events
    .keyBy(Event::getKey)
    .window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
    .allowedLateness(Duration.ofMinutes(15))     // re-emit для умеренно поздних
    .sideOutputLateData(lateTag)                  // капчер супер-поздних
    .aggregate(new MyAggregator(), new MyProcessFn());

Result:

  • События до window.end — нормальная обработка.
  • События от window.end до window.end + 15min — re-emit окна.
  • События после window.end + 15min — в lateTag side output для отдельной обработки.

Downstream и upserts

При re-emit окно посылает обновления для одного и того же window_key (и timestamp). Sink должен это правильно обработать.

Kafka upsert-sink:

KafkaSink<Result> sink = KafkaSink.<Result>builder()
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("results")
        .setKeySerializationSchema(r -> r.windowKey.getBytes())  // ключ = (window_key, window_start)
        .setValueSerializationSchema(new ResultSerializer())
        .build())
    .build();

Compacted topic с правильным ключом сохранит только последнее значение для каждого окна — это даёт корректный upsert downstream.

Database upsert-sink:

INSERT INTO window_results (window_key, window_start, result_value)
VALUES (?, ?, ?)
ON CONFLICT (window_key, window_start) DO UPDATE SET
    result_value = EXCLUDED.result_value;

Без upsert downstream накопит несколько записей для одного окна — придётся фильтровать “по последней”.

WARNING

Append-only sinks (например, file sink без compaction, или Kafka topic с RETAIN_ALL) при re-emit получат несколько записей для одного окна. Downstream analytics увидит дублирующиеся данные — суммы будут завышены, count неверный. Всегда проектируйте sink с учётом возможных re-emit, если allowedLateness > 0.


Сценарии: когда какое allowedLateness

Internal microservices, низкая задержка партиций. allowedLateness = 0 — данные приходят почти строго по порядку. Side output для отлова единичных аномалий.

Public Kafka с partitions из разных DC. allowedLateness = 5-15 минут — учитывает обычную сетевую задержку. Side output для catastrophic delays.

Mobile event collection. allowedLateness = 1-4 часа — мобильные клиенты могут быть офлайн. Бизнес-решение: насколько важна “недавняя” аналитика vs полнота.

CDC из historical sources. allowedLateness в обычном смысле не нужен — лучше использовать withWatermarkAlignment (см. урок 6.4). При backfill события могут “опережать” друг друга в плане event time.


Production-чеклист

  • Установите явный allowedLateness для всех event-time окон в production. Дефолт 0 — обычно слишком строгий.
  • Добавьте sideOutputLateData с маршрутизацией в DLQ-топик или alerting. Мониторинг: количество late events per minute.
  • Sink должен быть upsert-aware. Если sink append-only — рассмотрите ProcessWindowFunction с явной “первый или последний emit?” логикой через windowState.
  • Замерьте дополнительный размер state от allowedLateness. Если он критичен — уменьшите длительность или используйте более sparse окна.

Попробуй сам

  1. Re-emit observation. Запустите job с 5-минутным tumbling-окном и allowedLateness(10min). Отправьте сначала нормальный поток, потом задержанное событие. Посмотрите downstream — должно прийти 2 результата для того же окна.

  2. Side output behavior. Установите allowedLateness(0) и sideOutputLateData. Отправьте late event. Убедитесь, что оно появилось в side output stream, а не в основном.

  3. Memory cost of long allowedLateness. Создайте job с tumbling-окном 5 минут и allowedLateness 24 часа. Через несколько часов работы посмотрите размер state и количество live окон в памяти.

Проверка знанийKnowledge check
Job вычисляет hourly revenue per shop с allowedLateness=30min и sink — обычный Kafka topic (без upsert). Через неделю BI-команда жалуется на завышенные цифры. Что произошло и как исправить?
ОтветAnswer
При late events окно re-emit-ится с обновлённой суммой revenue. Kafka topic без upsert-семантики накапливает все эмиссии — для одного окна (например shop=A, [10:00, 11:00)) может быть несколько сообщений с разными значениями revenue. Downstream BI-tools читают все сообщения и суммируют, получая многократно завышенную выручку. Решение: либо использовать upsert-kafka sink с ключом (shop_id, window_start) — Kafka compacted topic оставит только последнее значение per ключ; либо переключить на ProcessWindowFunction с явным emit только при финальном FIRE_AND_PURGE (но это потеряет re-emit для late events — компромисс между точностью и observability); либо в downstream BI добавить дедупликацию по (shop_id, window_start) с выбором max(emit_timestamp). Самый чистый вариант — upsert-sink, это decoupling Flink-семантики от downstream-системы.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. У job-а tumbling EventTime окно 1 час и allowedLateness(15min). Окно [10:00, 11:00) уже эмитнуло в 11:05 (watermark прошёл). Сейчас 11:12, приходит событие с timestamp 10:45. Что произойдёт?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5