Windowing
Потоки данных — бесконечны. Невозможно посчитать “количество транзакций за всё время”, не дождавшись конца истории. Оконные агрегации решают эту проблему: они делят бесконечный поток на конечные временные отрезки (окна) и позволяют агрегировать данные в каждом окне.
Windowing — один из самых часто проверяемых тем в сертификации CCDAK. Нужно чётко понимать семантику каждого типа окна.
Почему нужны окна
Без оконных агрегаций count() считает ВСЕ записи за всё время работы приложения. С оконными агрегациями — только записи, попавшие в данное временное окно.
| Запрос | Без окон | С окнами |
|---|---|---|
| Количество транзакций | Всего с начала времён | За последние 5 минут |
| Ошибки | Всего накоплено | В минуту / в час |
| Активные сессии | Невозможно определить | Сессии с активностью в последние 30 минут |
Время в Kafka Streams — это event time (timestamp самой записи), не wall clock time обработки. Это важно: запись с timestamp из прошлого попадёт в “прошлое” окно, а не в “текущее”.
Tumbling Window — фиксированное, без перекрытия
Tumbling window — простейший тип. Фиксированный размер, записи не перекрываются. Поток делится на смежные окна фиксированного размера.
KTable<Windowed<String>, Long> errorsByMinute = errors
.groupByKey()
.windowedBy(
TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1))
)
.count(Materialized.as("errors-by-minute"));
Границы окон: [0:00, 1:00), [1:00, 2:00), [2:00, 3:00) …
Запись с timestamp 0:37 принадлежит только окну [0:00, 1:00). Ровно одному окну. Это ключевое свойство tumbling window.
Использование:
- Подсчёт ошибок в минуту / транзакций в час
- Агрегация метрик по временным периодам
- Биллинг с фиксированными периодами
Hopping Window — фиксированное, с перекрытием
Hopping window имеет фиксированный размер, но окна перекрываются: окно сдвигается на advanceBy интервал, меньший, чем размер окна.
KTable<Windowed<String>, Long> rolling5min = clicks
.groupByKey()
.windowedBy(
TimeWindows
.ofSizeWithNoGrace(Duration.ofMinutes(5))
.advanceBy(Duration.ofMinutes(1))
)
.count();
Семантика: Запись с timestamp 3:30 принадлежит окнам [0:00-5:00), [1:00-6:00), [2:00-7:00), [3:00-8:00) — всем окнам, которые “включают” этот момент времени.
Trade-off по сравнению с tumbling:
- Больше выходных записей (одна запись → несколько обновлений в разных окнах)
- Больше state (перекрывающиеся окна = больший WindowStore)
- Сглаженные метрики (нет “перескока” на границе окна)
Использование:
- Скользящее среднее (rolling average) за последние N минут, обновляемое каждую минуту
- Мониторинг: “количество запросов за последние 5 минут” с обновлением раз в минуту
Session Window — динамическое, по активности
Session window не имеет фиксированного размера. Окно формируется вокруг активности: записи, поступающие в пределах inactivityGap друг от друга, попадают в одну сессию.
KTable<Windowed<String>, Long> sessions = userEvents
.groupByKey()
.windowedBy(
SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30))
)
.count();
Правила session window:
- Два события в пределах 30 минут — одна сессия
- Разрыв > 30 минут — разные сессии
- Session merging: если позднее приходит событие, которое “мостит” два ранее закрытых окна (timestamp между ними), сессии объединяются в одну
Пример с inactivityGap=10 мин:
- 10:00, 10:07, 10:15 → одна сессия [10:00, 10:15]
- 10:30 → новая сессия [10:30, 10:30]
- 10:38 → обновление → [10:30, 10:38]
Использование:
- Длительность пользовательских сессий на сайте
- Группировка кликов в интерактивные сессии
- Анализ активности с переменной интенсивностью
Sliding Window — запись-центричное
Sliding window (Kafka 2.7+) кардинально отличается от hopping. Вместо выравнивания по wall clock, каждое событие определяет своё собственное окно: включаются все записи с тем же ключом, чей timestamp находится в пределах timeDifference от данной записи.
KTable<Windowed<String>, Long> dedup = events
.groupByKey()
.windowedBy(
SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
)
.count();
Основное применение: JoinWindows в stream-stream joins (не самостоятельно для агрегации):
// JoinWindows: найти соответствующую пару в пределах 5 минут
KStream<String, Enriched> joined = orders.join(
payments,
(order, payment) -> enrich(order, payment),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
);
Использование:
- Stream-stream joins (поиск пар событий в окне)
- Дедупликация: обнаружить дублирующиеся события в пределах 5 минут
WindowingDiagram
Grace Period — запаздывающие записи
Реальные системы: данные от IoT-устройств, мобильных приложений, региональных ДЦ приходят с задержкой. Событие с timestamp 10:03 может прийти в 10:08, когда окно [10:00-10:05) уже “закрылось”.
Grace period определяет, как долго после закрытия окна принимаются запоздавшие записи.
// Okно 5 минут, grace 30 секунд
TimeWindows tumblingWithGrace = TimeWindows
.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofSeconds(30));
// Без grace — записи после закрытия окна отбрасываются
TimeWindows noGrace = TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5));
Grace period по умолчанию в Kafka 3.x+ составляет 24 часа. Это означает, что окно остаётся “открытым” для поздних записей целые сутки. Для high-throughput приложений это создаёт огромное количество state: WindowStore хранит данные для 24 часов открытых окон. Всегда явно устанавливайте grace period: .ofSizeAndGrace(size, Duration.ofSeconds(30)).
Записи после grace period: отбрасываются без обработки. Kafka Streams не выдаёт ошибку — запись просто игнорируется. Мониторинг: kafka_streams_task_dropped_records_total метрика.
Suppress — финальный результат
По умолчанию windowed aggregation эмитирует промежуточный результат при поступлении каждой новой записи в окно. Для окна с 1000 записями потребитель выходного топика увидит 1000 обновлений (1, 2, 3, …, 1000).
Suppress изменяет это поведение: только финальный результат после закрытия окна.
KTable<Windowed<String>, Long> finalCounts = stream
.groupByKey()
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofSeconds(10)))
.count()
.suppress(
Suppressed.untilWindowCloses(BufferConfig.unbounded())
);
Suppress = финальный результат вместо промежуточных обновлений. Без suppress потребитель выходного топика видит десятки или сотни обновлений для каждого окна по мере поступления данных. С suppress — ровно одно сообщение после закрытия окна. Компромисс: suppress добавляет latency (нужно ждать закрытия окна + grace period) и буферизует данные в памяти.
BufferConfig варианты:
BufferConfig.unbounded()— буферизует в памяти без ограничений (риск OOM при большом state)BufferConfig.maxBytes(bytes).shutdownWhenFull()— при превышении лимита — штатное завершениеBufferConfig.maxRecords(count).emitEarlyWhenFull()— при превышении — эмитировать досрочно
WindowedSerde — сериализация ключей окон
После windowed aggregation ключ записи становится составным: Windowed<K>, содержащим исходный ключ и временные границы окна.
// Сериализация Windowed<String> ключей для записи в топик
stream
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count()
.toStream()
.to("windowed-counts",
Produced.with(
WindowedSerdes.timeWindowedSerdeFrom(Serdes.String(), 5 * 60 * 1000L),
Serdes.Long()
));
В выходном топике ключи имеют вид: user-42@1706694000000/1706694300000 (ключ@начало_окна/конец_окна).