Windowed Aggregations и UDFs
В Модуле 07 (Kafka Streams) вы изучали оконные операции через Java DSL: TimeWindows.ofSizeWithNoGrace(), SessionWindows.ofInactivityGapWithNoGrace(), grace period. ksqlDB предоставляет те же возможности через SQL-синтаксис с WINDOW clause. Под капотом — те же RocksDB WindowStore и механизмы Kafka Streams.
Windowed aggregations в ksqlDB
Оконная агрегация вычисляет агрегат не за всё время, а за конкретные временные окна. Каждый ключ + каждое окно = отдельная строка в результирующей TABLE.
Базовый пример — количество кликов по пользователю за каждый час:
CREATE TABLE clicks_per_hour AS
SELECT
user_id,
COUNT(*) AS click_count,
WINDOWSTART AS window_start,
WINDOWEND AS window_end
FROM clickstream
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY user_id
EMIT CHANGES;
Что здесь происходит:
WINDOW TUMBLING (SIZE 1 HOUR)— разбивает время на часовые окна: [00:00, 01:00), [01:00, 02:00), …GROUP BY user_id— внутри каждого окна агрегирует по ключу.COUNT(*)— считает клики в окне.WINDOWSTART,WINDOWEND— псевдо-столбцы, содержащие границы окна в виде timestamp (BIGINT, миллисекунды).- Результирующий ключ в TABLE: составной
(user_id, window).
Типы окон в ksqlDB
TUMBLING: фиксированные непересекающиеся окна
WINDOW TUMBLING (SIZE 5 MINUTES)
Каждое событие попадает ровно в одно окно. Окна фиксированы: [0:00, 0:05), [0:05, 0:10), [0:10, 0:15). При агрегации за 5 минут с тысячами событий — один COUNT(*) на окно. Прямой эквивалент TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)) из Kafka Streams.
HOPPING: фиксированные перекрывающиеся окна
WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 1 MINUTE)
Окна перекрываются. Каждое событие может принадлежать нескольким окнам одновременно. При SIZE=5m и ADVANCE BY=1m: одно событие попадает в 5 окон — [0:00,0:05), [0:01,0:06), [0:02,0:07), [0:03,0:08), [0:04,0:09). Результат: больше строк в выходной TABLE. Аналог TimeWindows.ofSizeWithNoGrace(5m).advanceBy(1m) в Kafka Streams.
SESSION: динамические окна по пробелам активности
WINDOW SESSION (10 MINUTES)
Окно открывается при первом событии и расширяется, пока события приходят в пределах 10 минут друг от друга. Если пробел превышает 10 минут — текущая сессия закрывается, начинается новая. Идеально для пользовательских сессий. Аналог SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(10)).
Sliding windows из Kafka Streams DSL (SlidingWindows) не имеют прямого SQL-аналога в ksqlDB. Для sliding window семантики используйте Kafka Streams Java API. ksqlDB поддерживает TUMBLING, HOPPING и SESSION.
WINDOWSTART и WINDOWEND
WINDOWSTART и WINDOWEND — встроенные псевдо-столбцы, доступные в любом windowed запросе. Возвращают временные метки начала и конца окна как BIGINT (миллисекунды с Unix epoch).
Применение:
CREATE TABLE hourly_stats AS
SELECT
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount,
WINDOWSTART AS win_start_ms,
WINDOWEND AS win_end_ms,
FORMAT_TIMESTAMP(FROM_UNIXTIME(WINDOWSTART / 1000), 'yyyy-MM-dd HH:mm') AS win_label
FROM orders_stream
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY user_id
EMIT CHANGES;
Здесь FORMAT_TIMESTAMP(FROM_UNIXTIME(...)) конвертирует BIGINT в читаемую дату — это встроенные скалярные функции ksqlDB.
GRACE PERIOD: обработка запоздавших записей
По умолчанию, после закрытия окна ksqlDB отбрасывает записи, пришедшие с опозданием. GRACE PERIOD продлевает приём данных:
CREATE TABLE orders_per_hour AS
SELECT
customer_id,
COUNT(*) AS order_count,
WINDOWSTART AS window_start
FROM orders_stream
WINDOW TUMBLING (SIZE 1 HOUR, GRACE PERIOD 30 MINUTES)
GROUP BY customer_id
EMIT CHANGES;
С GRACE PERIOD 30 MINUTES: даже если часовое окно закрылось, записи, пришедшие в течение следующих 30 минут, всё ещё попадают в это окно. Это прямой аналог grace period в Kafka Streams DSL (Модуль 07).
Когда нужен GRACE PERIOD:
- Источники данных с нестабильной задержкой (IoT-сенсоры, мобильные клиенты).
- Сетевые перебои между продюсером и Kafka.
- Batch-загрузка исторических данных в поток.
Pull queries на windowed tables
Материализованные windowed TABLE поддерживают pull queries со спецификацией окон:
-- Все окна для конкретного пользователя
SELECT *
FROM clicks_per_hour
WHERE user_id = 'user-42';
-- Конкретный диапазон окон
SELECT *
FROM clicks_per_hour
WHERE user_id = 'user-42'
AND WINDOWSTART >= '2024-01-01T00:00:00'
AND WINDOWEND <= '2024-01-01T12:00:00';
Ключ в windowed TABLE составной: (user_id, window). Pull query должен указать значение ключа (user_id). Фильтр по WINDOWSTART/WINDOWEND дополнительно ограничивает диапазон возвращаемых окон.
Поток событий → разбивка по временным окнам → агрегация по ключу → материализованная TABLE
STREAM (клики)
Исходный STREAM: события кликов с user_id и timestamp. Каждое событие несёт event time — время возникновения события (не время прибытия в Kafka).Временные окна
WINDOW TUMBLING (SIZE 1 HOUR): каждое событие помещается в соответствующее часовое окно на основе его event time. Событие в 14:37 → окно [14:00, 15:00). ksqlDB использует Kafka Streams WindowedBy() под капотом.COUNT / SUM
GROUP BY user_id: внутри каждого окна агрегирует события по user_id. COUNT(*) обновляется при каждом новом событии в окне. Состояние хранится в RocksDB WindowStore на диске ksqlDB-сервера.TABLE (windowed)
Материализованная TABLE: составной ключ (user_id, window). Каждое обновление агрегата порождает новую запись в changelog топике. Pull query возвращает текущее значение агрегата для (user_id, window) из RocksDB.UDF: User-Defined Functions
ksqlDB поддерживает три типа пользовательских функций, написанных на Java или Kotlin:
UDF (User-Defined Scalar Function)
Принимает одну строку, возвращает одно значение. Пример: маскирование PII-данных, геодистанция.
@UdfDescription(
name = "mask_email",
description = "Маскирует email-адрес, оставляя первые два символа"
)
public class MaskEmailUdf {
@Udf(description = "Возвращает замаскированный email")
public String maskEmail(@UdfParameter(value = "email") final String email) {
if (email == null) return null;
return email.replaceAll("(?<=.{2}).(?=.*@)", "*");
}
}
После деплоя:
SELECT user_id, MASK_EMAIL(email) AS safe_email FROM users_stream EMIT CHANGES;
UDAF (User-Defined Aggregate Function)
Агрегирует несколько строк в одно значение. Аналог COUNT, SUM, но с произвольной логикой. Пример: пользовательский расчёт перцентилей.
@UdafDescription(
name = "percentile_90",
description = "Вычисляет 90-й перцентиль значений"
)
public class Percentile90Udaf {
@UdafFactory(description = "Перцентиль для DOUBLE")
public static Udaf<Double, List<Double>, Double> createUdaf() {
return UdafUtil.createUdaf(
() -> new ArrayList<>(),
(agg, val) -> { agg.add(val); return agg; },
(a, b) -> { a.addAll(b); return a; },
agg -> percentile(agg, 0.90)
);
}
}
UDTF (User-Defined Table Function)
Принимает одну строку, возвращает несколько строк. Один-ко-многим трансформация. Используется с LATERAL JOIN или EXPLODE(). Пример: разбивка массива тегов в отдельные строки.
@UdtfDescription(
name = "split_tags",
description = "Разбивает строку тегов через запятую в отдельные строки"
)
public class SplitTagsUdtf {
@Udtf
public List<String> splitTags(final String tags) {
if (tags == null || tags.isEmpty()) return Collections.emptyList();
return Arrays.asList(tags.split(","));
}
}
Деплой UDF
- Скомпилировать JAR с аннотированными классами.
- Скопировать JAR в директорию
ext/ksqlDB-сервера (или смонтировать volume в Docker). - Перезапустить ksqlDB-сервер — функции загружаются автоматически.
- Проверить:
SHOW FUNCTIONS;— должна появиться ваша функция.
UDF деплоятся как JAR-файл в директорию ext/ ksqlDB-сервера. После перезапуска сервера функция доступна во всех SQL-выражениях. Для production: версионируйте JAR и включайте в CI/CD pipeline ksqlDB — при обновлении UDF нужен рестарт, что влияет на uptime.
Встроенные функции ksqlDB
ksqlDB поставляется с богатым набором встроенных функций:
| Категория | Функции |
|---|---|
| Строковые | CONCAT, SUBSTRING, TRIM, UPPER, LOWER, REGEXP_EXTRACT, REPLACE |
| Преобразование типов | CAST, TO_BYTES, FROM_BYTES |
| Дата/Время | UNIX_TIMESTAMP, FORMAT_TIMESTAMP, FROM_UNIXTIME, CONVERT_TZ |
| Математика | ABS, CEIL, FLOOR, ROUND, SQRT, POWER |
| Массивы и коллекции | ARRAY_LENGTH, ARRAY_CONTAINS, ARRAY_JOIN, MAP_KEYS, MAP_VALUES |
| JSON | EXTRACTJSONFIELD, IS_JSON_STRING |
| Null-handling | IFNULL, COALESCE, NULLIF |
| Агрегатные | COUNT, SUM, AVG, MIN, MAX, COLLECT_LIST, COLLECT_SET, EARLIEST_BY_OFFSET, LATEST_BY_OFFSET |
Полный список — в официальной документации ksqlDB Function Reference.
Ключевые выводы
- Windowed aggregations в ksqlDB — SQL-синтаксис поверх Kafka Streams WindowedBy().
WINDOW TUMBLING,HOPPING,SESSION— прямые аналоги Java-классов окон из Модуля 07. WINDOWSTARTиWINDOWEND— псевдо-столбцы для границ окна (BIGINT ms). Доступны в любом windowed запросе.GRACE PERIODпродлевает приём запоздавших записей после закрытия окна.- Pull queries на windowed TABLE требуют указания ключа; фильтр по WINDOWSTART/WINDOWEND ограничивает диапазон.
- UDF (скаляр), UDAF (агрегат), UDTF (один-ко-многим) — три типа пользовательских функций, деплоятся как JAR в ext/.