Learning Platform
Глоссарий Troubleshooting
Урок 08.04 · 30 мин
Продвинутый
WindowingTumbling WindowHopping WindowSession WindowSliding WindowGrace PeriodWindowedSerdeSuppress

Windowing

Потоки данных — бесконечны. Невозможно посчитать “количество транзакций за всё время”, не дождавшись конца истории. Оконные агрегации решают эту проблему: они делят бесконечный поток на конечные временные отрезки (окна) и позволяют агрегировать данные в каждом окне.

Windowing — один из самых часто проверяемых тем в сертификации CCDAK. Нужно чётко понимать семантику каждого типа окна.


Почему нужны окна

Без оконных агрегаций count() считает ВСЕ записи за всё время работы приложения. С оконными агрегациями — только записи, попавшие в данное временное окно.

ЗапросБез оконС окнами
Количество транзакцийВсего с начала времёнЗа последние 5 минут
ОшибкиВсего накопленоВ минуту / в час
Активные сессииНевозможно определитьСессии с активностью в последние 30 минут

Время в Kafka Streams — это event time (timestamp самой записи), не wall clock time обработки. Это важно: запись с timestamp из прошлого попадёт в “прошлое” окно, а не в “текущее”.


Tumbling Window — фиксированное, без перекрытия

Tumbling window — простейший тип. Фиксированный размер, записи не перекрываются. Поток делится на смежные окна фиксированного размера.

KTable<Windowed<String>, Long> errorsByMinute = errors
    .groupByKey()
    .windowedBy(
        TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1))
    )
    .count(Materialized.as("errors-by-minute"));

Границы окон: [0:00, 1:00), [1:00, 2:00), [2:00, 3:00)

Запись с timestamp 0:37 принадлежит только окну [0:00, 1:00). Ровно одному окну. Это ключевое свойство tumbling window.

Использование:

  • Подсчёт ошибок в минуту / транзакций в час
  • Агрегация метрик по временным периодам
  • Биллинг с фиксированными периодами
Tumbling vs Hopping: одни и те же события, разные окна
События: A@0:30, B@1:20, C@1:50, D@2:40
Tumbling 2 мин[0:00-2:00): события A, B, C — count=3. [2:00-4:00): событие D — count=1. Каждое событие строго в одном окне. Простая, предсказуемая агрегация.
Hopping 2 мин / advance 1 мин[0:00-2:00): A,B,C — count=3. [1:00-3:00): B,C — count=2. [2:00-4:00): D — count=1. Событие B принадлежит сразу двум окнам. Более сглаженные метрики.

Hopping Window — фиксированное, с перекрытием

Hopping window имеет фиксированный размер, но окна перекрываются: окно сдвигается на advanceBy интервал, меньший, чем размер окна.

KTable<Windowed<String>, Long> rolling5min = clicks
    .groupByKey()
    .windowedBy(
        TimeWindows
            .ofSizeWithNoGrace(Duration.ofMinutes(5))
            .advanceBy(Duration.ofMinutes(1))
    )
    .count();

Семантика: Запись с timestamp 3:30 принадлежит окнам [0:00-5:00), [1:00-6:00), [2:00-7:00), [3:00-8:00) — всем окнам, которые “включают” этот момент времени.

Trade-off по сравнению с tumbling:

  • Больше выходных записей (одна запись → несколько обновлений в разных окнах)
  • Больше state (перекрывающиеся окна = больший WindowStore)
  • Сглаженные метрики (нет “перескока” на границе окна)

Использование:

  • Скользящее среднее (rolling average) за последние N минут, обновляемое каждую минуту
  • Мониторинг: “количество запросов за последние 5 минут” с обновлением раз в минуту

Session Window — динамическое, по активности

Session window не имеет фиксированного размера. Окно формируется вокруг активности: записи, поступающие в пределах inactivityGap друг от друга, попадают в одну сессию.

KTable<Windowed<String>, Long> sessions = userEvents
    .groupByKey()
    .windowedBy(
        SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30))
    )
    .count();

Правила session window:

  • Два события в пределах 30 минут — одна сессия
  • Разрыв > 30 минут — разные сессии
  • Session merging: если позднее приходит событие, которое “мостит” два ранее закрытых окна (timestamp между ними), сессии объединяются в одну

Пример с inactivityGap=10 мин:

  • 10:00, 10:07, 10:15 → одна сессия [10:00, 10:15]
  • 10:30 → новая сессия [10:30, 10:30]
  • 10:38 → обновление → [10:30, 10:38]

Использование:

  • Длительность пользовательских сессий на сайте
  • Группировка кликов в интерактивные сессии
  • Анализ активности с переменной интенсивностью

Sliding Window — запись-центричное

Sliding window (Kafka 2.7+) кардинально отличается от hopping. Вместо выравнивания по wall clock, каждое событие определяет своё собственное окно: включаются все записи с тем же ключом, чей timestamp находится в пределах timeDifference от данной записи.

KTable<Windowed<String>, Long> dedup = events
    .groupByKey()
    .windowedBy(
        SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
    )
    .count();

Основное применение: JoinWindows в stream-stream joins (не самостоятельно для агрегации):

// JoinWindows: найти соответствующую пару в пределах 5 минут
KStream<String, Enriched> joined = orders.join(
    payments,
    (order, payment) -> enrich(order, payment),
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
);

Использование:

  • Stream-stream joins (поиск пар событий в окне)
  • Дедупликация: обнаружить дублирующиеся события в пределах 5 минут

WindowingDiagram

Kafka Streams: Типы оконных операций
Тип окнаTumblingФиксированные неперекрывающиеся окна. Каждая запись принадлежит ровно одному окну. DSL: TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)). С grace: TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofSeconds(30)). Используйте для: метрики за период (requests/min), агрегации за фиксированный интервал.
[00:00 — 05:00)Окно 1: [00:00 — 05:00). Все события с timestamp в этом диапазоне попадают в это окно. После закрытия (t=05:00) — окно финализируется и результат выдаётся downstream. При grace=0 поздние записи отбрасываются немедленно.
[05:00 — 10:00)Окно 2: [05:00 — 10:00). Начинается ровно там, где закончилось предыдущее. Между окнами нет перекрытия и нет пропусков — временная ось полностью покрыта неперекрывающимися сегментами. Каждая запись входит в ровно одно окно.
[10:00 — 15:00)Окно 3: [10:00 — 15:00). Последовательность окон продолжается. Tumbling window — частный случай Hopping window, где advanceBy = size. Kafka Streams использует WindowStore (RocksDB) для хранения частичных агрегатов до финализации окна.
Тип окнаHoppingФиксированные перекрывающиеся окна. Каждая запись может принадлежать нескольким окнам одновременно. DSL: TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1)). Используйте для: скользящий средний, SLA-мониторинг с перекрытием, обнаружение аномалий в скользящем окне.
[00:00 — 05:00)Hopping окно A: [00:00 — 05:00). Запись в t=02:00 входит в окна A, B, C, D, E (все 5 минутных окон, начало которых от 00:00 до 02:00). Количество окон, в которые попадает запись = size/advanceBy = 5мин/1мин = 5 окон одновременно.
[01:00 — 06:00)Hopping окно B: [01:00 — 06:00). Перекрывается с окном A на 4 минуты. Каждую минуту открывается новое окно. Это создаёт большую нагрузку на state store: записей в 5 раз больше, чем при Tumbling. Выбирайте advanceBy пропорционально требуемой точности.
[02:00 — 07:00)Hopping окно C: [02:00 — 07:00). Каждое новое окно сдвигается на advanceBy. Если advanceBy = size — получаем Tumbling. Если advanceBy < size — Hopping (записи в нескольких окнах). Если advanceBy > size — есть gaps (некоторые записи ни в каком окне).
Тип окнаSessionДинамические окна, определяемые периодом бездействия. Нет фиксированного размера — окно закрывается после inactivity gap. DSL: SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(10)). Используйте для: пользовательские сессии, анализ последовательностей действий, обнаружение пауз в потоке.
Сессия 1 [02 — 07]Сессия 1: активность с 00:02 по 00:07 (3 события). Следующее событие в 00:20 — разрыв 13 минут, больше inactivity gap (10 мин) — сессия 1 закрывается. Размер окна = время от первого до последнего события сессии. SessionStore агрегирует результат по каждой сессии.
— — gap > 10 мин — —
Сессия 2 [20 — 35]Сессия 2: активность с 00:20 по 00:35 (4 события). Между ними паузы менее 10 минут — они объединяются в одну сессию. Kafka Streams может объединять (merge) смежные сессии: если новое событие попадает в gap между двумя открытыми сессиями, они сливаются в одну.
— — gap > 10 мин — —
Сессия 3 [50]Сессия 3: одиночное событие в 00:50. Если следующее событие не придёт в течение inactivity gap (10 мин), сессия закрывается с длиной 0. Минимальный размер сессии = 0 (одно событие). Отличие от TimeWindows: размер определяется данными, а не конфигурацией.
Тип окнаSlidingОкно привязано к каждой записи: включает все записи в пределах timeDifference от данной. DSL: SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)). Отличие от Hopping: границы определяются временем записи, а не фиксированным шагом. Используйте для: обнаружение дубликатов, join-окна (JoinWindows используют тот же принцип).
E1: [(-3) — 02:00]Sliding окно для события E1 (t=02:00): включает все события с t в [02:00-5мин, 02:00+5мин] = [не раньше t=minus(timeDifference), не позже t=plus(timeDifference)]. Точнее: стандартное sliding window покрывает [t-timeDifference, t]. Размер фиксирован (timeDifference), но начало/конец привязаны к каждой конкретной записи.
E2: [(-1) — 04:00]Sliding окно для события E2 (t=04:00): покрывает [-1:00, 04:00]. Перекрывается с окном E1 на интервале [-1:00, 02:00]. Число записей в каждом окне разное — зависит от плотности потока. Kafka Streams использует WindowStore с ключом (record_key, window_start, window_end).
E3: [01:00 — 06:00]Sliding окно для события E3 (t=06:00): покрывает [01:00, 06:00]. Окно E3 включает E2 (t=04:00) но не E1 (t=02:00, вне диапазона если timeDifference=5мин). Sliding windows создают больше всего уникальных окон (по одному на запись) — наибольшая нагрузка на state store.
grace periodGrace period определяет, как долго после закрытия окна принимаются поздние записи. DSL: ofSizeAndGrace(size, grace). По умолчанию grace=24 часа (Kafka 3.x+). После истечения grace — запись отбрасывается. ofSizeWithNoGrace() = grace Duration.ZERO. Важно: grace не влияет на размер окна, только на буферизацию поздних данных.
Suppressed.untilWindowCloses()Emit behaviour: по умолчанию Kafka Streams выдаёт обновлённый результат окна при каждой новой записи (emit on each update). Для downstream это означает серию промежуточных результатов до финального. suppress(Suppressed.untilWindowCloses(BufferConfig)) задерживает вывод до закрытия окна + grace.

Grace Period — запаздывающие записи

Реальные системы: данные от IoT-устройств, мобильных приложений, региональных ДЦ приходят с задержкой. Событие с timestamp 10:03 может прийти в 10:08, когда окно [10:00-10:05) уже “закрылось”.

Grace period определяет, как долго после закрытия окна принимаются запоздавшие записи.

// Okно 5 минут, grace 30 секунд
TimeWindows tumblingWithGrace = TimeWindows
    .ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofSeconds(30));

// Без grace — записи после закрытия окна отбрасываются
TimeWindows noGrace = TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5));
WARNING

Grace period по умолчанию в Kafka 3.x+ составляет 24 часа. Это означает, что окно остаётся “открытым” для поздних записей целые сутки. Для high-throughput приложений это создаёт огромное количество state: WindowStore хранит данные для 24 часов открытых окон. Всегда явно устанавливайте grace period: .ofSizeAndGrace(size, Duration.ofSeconds(30)).

Записи после grace period: отбрасываются без обработки. Kafka Streams не выдаёт ошибку — запись просто игнорируется. Мониторинг: kafka_streams_task_dropped_records_total метрика.


Suppress — финальный результат

По умолчанию windowed aggregation эмитирует промежуточный результат при поступлении каждой новой записи в окно. Для окна с 1000 записями потребитель выходного топика увидит 1000 обновлений (1, 2, 3, …, 1000).

Suppress изменяет это поведение: только финальный результат после закрытия окна.

KTable<Windowed<String>, Long> finalCounts = stream
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofSeconds(10)))
    .count()
    .suppress(
        Suppressed.untilWindowCloses(BufferConfig.unbounded())
    );
TIP

Suppress = финальный результат вместо промежуточных обновлений. Без suppress потребитель выходного топика видит десятки или сотни обновлений для каждого окна по мере поступления данных. С suppress — ровно одно сообщение после закрытия окна. Компромисс: suppress добавляет latency (нужно ждать закрытия окна + grace period) и буферизует данные в памяти.

BufferConfig варианты:

  • BufferConfig.unbounded() — буферизует в памяти без ограничений (риск OOM при большом state)
  • BufferConfig.maxBytes(bytes).shutdownWhenFull() — при превышении лимита — штатное завершение
  • BufferConfig.maxRecords(count).emitEarlyWhenFull() — при превышении — эмитировать досрочно

WindowedSerde — сериализация ключей окон

После windowed aggregation ключ записи становится составным: Windowed<K>, содержащим исходный ключ и временные границы окна.

// Сериализация Windowed<String> ключей для записи в топик
stream
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count()
    .toStream()
    .to("windowed-counts",
        Produced.with(
            WindowedSerdes.timeWindowedSerdeFrom(Serdes.String(), 5 * 60 * 1000L),
            Serdes.Long()
        ));

В выходном топике ключи имеют вид: user-42@1706694000000/1706694300000 (ключ@начало_окна/конец_окна).

Проверка знанийKnowledge check
У вас поток кликов пользователей. Нужно считать количество кликов за 10-минутные интервалы. Данные приходят с задержкой до 2 минут. Какой тип окна выбрать, какой grace period установить, и нужен ли Suppress? Объясните каждое решение.
ОтветAnswer
Тип окна: Tumbling Window (TimeWindows.ofSizeAndGrace). Обоснование: нужны фиксированные 10-минутные периоды без перекрытия — это tumbling. Если нужно скользящее обновление каждую минуту — тогда hopping, но условие не указывает этого. Grace period: минимум 2 минуты (Duration.ofMinutes(2)), лучше с запасом: 5 минут. Обоснование: данные приходят с задержкой до 2 минут, значит записи могут прийти через 2 минуты после закрытия окна. Grace period должен покрывать эту задержку. Без grace эти записи будут отброшены. Suppress: зависит от потребителя. Если потребитель хочет видеть только финальный счётчик после закрытия — добавить Suppress.untilWindowCloses(). Если потребитель может обрабатывать промежуточные обновления (например, real-time dashboard) — Suppress не нужен. Итоговый код: TimeWindows.ofSizeAndGrace(Duration.ofMinutes(10), Duration.ofMinutes(2)).

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Система мониторинга считает количество HTTP-ошибок за фиксированные 5-минутные периоды: [0:00-5:00), [5:00-10:00), [10:00-15:00) и т.д. Данные могут опаздывать до 30 секунд. Какой тип окна и конфигурация grace period корректны?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6