Learning Platform
Глоссарий Troubleshooting
Урок 09.03 · 25 мин
Продвинутый
Windowed AggregationWINDOW TUMBLINGWINDOW HOPPINGWINDOW SESSIONWINDOWSTARTWINDOWENDUDFUDAFUDTF

Windowed Aggregations и UDFs

В Модуле 07 (Kafka Streams) вы изучали оконные операции через Java DSL: TimeWindows.ofSizeWithNoGrace(), SessionWindows.ofInactivityGapWithNoGrace(), grace period. ksqlDB предоставляет те же возможности через SQL-синтаксис с WINDOW clause. Под капотом — те же RocksDB WindowStore и механизмы Kafka Streams.


Windowed aggregations в ksqlDB

Оконная агрегация вычисляет агрегат не за всё время, а за конкретные временные окна. Каждый ключ + каждое окно = отдельная строка в результирующей TABLE.

Базовый пример — количество кликов по пользователю за каждый час:

CREATE TABLE clicks_per_hour AS
  SELECT
    user_id,
    COUNT(*) AS click_count,
    WINDOWSTART AS window_start,
    WINDOWEND AS window_end
  FROM clickstream
  WINDOW TUMBLING (SIZE 1 HOUR)
  GROUP BY user_id
  EMIT CHANGES;

Что здесь происходит:

  • WINDOW TUMBLING (SIZE 1 HOUR) — разбивает время на часовые окна: [00:00, 01:00), [01:00, 02:00), …
  • GROUP BY user_id — внутри каждого окна агрегирует по ключу.
  • COUNT(*) — считает клики в окне.
  • WINDOWSTART, WINDOWEND — псевдо-столбцы, содержащие границы окна в виде timestamp (BIGINT, миллисекунды).
  • Результирующий ключ в TABLE: составной (user_id, window).

Типы окон в ksqlDB

TUMBLING: фиксированные непересекающиеся окна

WINDOW TUMBLING (SIZE 5 MINUTES)

Каждое событие попадает ровно в одно окно. Окна фиксированы: [0:00, 0:05), [0:05, 0:10), [0:10, 0:15). При агрегации за 5 минут с тысячами событий — один COUNT(*) на окно. Прямой эквивалент TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)) из Kafka Streams.

HOPPING: фиксированные перекрывающиеся окна

WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 1 MINUTE)

Окна перекрываются. Каждое событие может принадлежать нескольким окнам одновременно. При SIZE=5m и ADVANCE BY=1m: одно событие попадает в 5 окон — [0:00,0:05), [0:01,0:06), [0:02,0:07), [0:03,0:08), [0:04,0:09). Результат: больше строк в выходной TABLE. Аналог TimeWindows.ofSizeWithNoGrace(5m).advanceBy(1m) в Kafka Streams.

SESSION: динамические окна по пробелам активности

WINDOW SESSION (10 MINUTES)

Окно открывается при первом событии и расширяется, пока события приходят в пределах 10 минут друг от друга. Если пробел превышает 10 минут — текущая сессия закрывается, начинается новая. Идеально для пользовательских сессий. Аналог SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(10)).

NOTE

Sliding windows из Kafka Streams DSL (SlidingWindows) не имеют прямого SQL-аналога в ksqlDB. Для sliding window семантики используйте Kafka Streams Java API. ksqlDB поддерживает TUMBLING, HOPPING и SESSION.


WINDOWSTART и WINDOWEND

WINDOWSTART и WINDOWEND — встроенные псевдо-столбцы, доступные в любом windowed запросе. Возвращают временные метки начала и конца окна как BIGINT (миллисекунды с Unix epoch).

Применение:

CREATE TABLE hourly_stats AS
  SELECT
    user_id,
    COUNT(*) AS event_count,
    SUM(amount) AS total_amount,
    WINDOWSTART AS win_start_ms,
    WINDOWEND AS win_end_ms,
    FORMAT_TIMESTAMP(FROM_UNIXTIME(WINDOWSTART / 1000), 'yyyy-MM-dd HH:mm') AS win_label
  FROM orders_stream
  WINDOW TUMBLING (SIZE 1 HOUR)
  GROUP BY user_id
  EMIT CHANGES;

Здесь FORMAT_TIMESTAMP(FROM_UNIXTIME(...)) конвертирует BIGINT в читаемую дату — это встроенные скалярные функции ksqlDB.


GRACE PERIOD: обработка запоздавших записей

По умолчанию, после закрытия окна ksqlDB отбрасывает записи, пришедшие с опозданием. GRACE PERIOD продлевает приём данных:

CREATE TABLE orders_per_hour AS
  SELECT
    customer_id,
    COUNT(*) AS order_count,
    WINDOWSTART AS window_start
  FROM orders_stream
  WINDOW TUMBLING (SIZE 1 HOUR, GRACE PERIOD 30 MINUTES)
  GROUP BY customer_id
  EMIT CHANGES;

С GRACE PERIOD 30 MINUTES: даже если часовое окно закрылось, записи, пришедшие в течение следующих 30 минут, всё ещё попадают в это окно. Это прямой аналог grace period в Kafka Streams DSL (Модуль 07).

Когда нужен GRACE PERIOD:

  • Источники данных с нестабильной задержкой (IoT-сенсоры, мобильные клиенты).
  • Сетевые перебои между продюсером и Kafka.
  • Batch-загрузка исторических данных в поток.

Pull queries на windowed tables

Материализованные windowed TABLE поддерживают pull queries со спецификацией окон:

-- Все окна для конкретного пользователя
SELECT *
FROM clicks_per_hour
WHERE user_id = 'user-42';

-- Конкретный диапазон окон
SELECT *
FROM clicks_per_hour
WHERE user_id = 'user-42'
  AND WINDOWSTART >= '2024-01-01T00:00:00'
  AND WINDOWEND <= '2024-01-01T12:00:00';

Ключ в windowed TABLE составной: (user_id, window). Pull query должен указать значение ключа (user_id). Фильтр по WINDOWSTART/WINDOWEND дополнительно ограничивает диапазон возвращаемых окон.

Windowed Aggregation Pipeline

Поток событий → разбивка по временным окнам → агрегация по ключу → материализованная TABLE

STREAM (клики)

Исходный STREAM: события кликов с user_id и timestamp. Каждое событие несёт event time — время возникновения события (не время прибытия в Kafka).
WINDOW TUMBLING

Временные окна

WINDOW TUMBLING (SIZE 1 HOUR): каждое событие помещается в соответствующее часовое окно на основе его event time. Событие в 14:37 → окно [14:00, 15:00). ksqlDB использует Kafka Streams WindowedBy() под капотом.
GROUP BY user_id

COUNT / SUM

GROUP BY user_id: внутри каждого окна агрегирует события по user_id. COUNT(*) обновляется при каждом новом событии в окне. Состояние хранится в RocksDB WindowStore на диске ksqlDB-сервера.
EMIT CHANGES

TABLE (windowed)

Материализованная TABLE: составной ключ (user_id, window). Каждое обновление агрегата порождает новую запись в changelog топике. Pull query возвращает текущее значение агрегата для (user_id, window) из RocksDB.

UDF: User-Defined Functions

ksqlDB поддерживает три типа пользовательских функций, написанных на Java или Kotlin:

UDF (User-Defined Scalar Function)

Принимает одну строку, возвращает одно значение. Пример: маскирование PII-данных, геодистанция.

@UdfDescription(
    name = "mask_email",
    description = "Маскирует email-адрес, оставляя первые два символа"
)
public class MaskEmailUdf {

    @Udf(description = "Возвращает замаскированный email")
    public String maskEmail(@UdfParameter(value = "email") final String email) {
        if (email == null) return null;
        return email.replaceAll("(?<=.{2}).(?=.*@)", "*");
    }
}

После деплоя:

SELECT user_id, MASK_EMAIL(email) AS safe_email FROM users_stream EMIT CHANGES;

UDAF (User-Defined Aggregate Function)

Агрегирует несколько строк в одно значение. Аналог COUNT, SUM, но с произвольной логикой. Пример: пользовательский расчёт перцентилей.

@UdafDescription(
    name = "percentile_90",
    description = "Вычисляет 90-й перцентиль значений"
)
public class Percentile90Udaf {

    @UdafFactory(description = "Перцентиль для DOUBLE")
    public static Udaf<Double, List<Double>, Double> createUdaf() {
        return UdafUtil.createUdaf(
            () -> new ArrayList<>(),
            (agg, val) -> { agg.add(val); return agg; },
            (a, b) -> { a.addAll(b); return a; },
            agg -> percentile(agg, 0.90)
        );
    }
}

UDTF (User-Defined Table Function)

Принимает одну строку, возвращает несколько строк. Один-ко-многим трансформация. Используется с LATERAL JOIN или EXPLODE(). Пример: разбивка массива тегов в отдельные строки.

@UdtfDescription(
    name = "split_tags",
    description = "Разбивает строку тегов через запятую в отдельные строки"
)
public class SplitTagsUdtf {

    @Udtf
    public List<String> splitTags(final String tags) {
        if (tags == null || tags.isEmpty()) return Collections.emptyList();
        return Arrays.asList(tags.split(","));
    }
}

Деплой UDF

  1. Скомпилировать JAR с аннотированными классами.
  2. Скопировать JAR в директорию ext/ ksqlDB-сервера (или смонтировать volume в Docker).
  3. Перезапустить ksqlDB-сервер — функции загружаются автоматически.
  4. Проверить: SHOW FUNCTIONS; — должна появиться ваша функция.
TIP

UDF деплоятся как JAR-файл в директорию ext/ ksqlDB-сервера. После перезапуска сервера функция доступна во всех SQL-выражениях. Для production: версионируйте JAR и включайте в CI/CD pipeline ksqlDB — при обновлении UDF нужен рестарт, что влияет на uptime.


Встроенные функции ksqlDB

ksqlDB поставляется с богатым набором встроенных функций:

КатегорияФункции
СтроковыеCONCAT, SUBSTRING, TRIM, UPPER, LOWER, REGEXP_EXTRACT, REPLACE
Преобразование типовCAST, TO_BYTES, FROM_BYTES
Дата/ВремяUNIX_TIMESTAMP, FORMAT_TIMESTAMP, FROM_UNIXTIME, CONVERT_TZ
МатематикаABS, CEIL, FLOOR, ROUND, SQRT, POWER
Массивы и коллекцииARRAY_LENGTH, ARRAY_CONTAINS, ARRAY_JOIN, MAP_KEYS, MAP_VALUES
JSONEXTRACTJSONFIELD, IS_JSON_STRING
Null-handlingIFNULL, COALESCE, NULLIF
АгрегатныеCOUNT, SUM, AVG, MIN, MAX, COLLECT_LIST, COLLECT_SET, EARLIEST_BY_OFFSET, LATEST_BY_OFFSET

Полный список — в официальной документации ksqlDB Function Reference.


Ключевые выводы

  1. Windowed aggregations в ksqlDB — SQL-синтаксис поверх Kafka Streams WindowedBy(). WINDOW TUMBLING, HOPPING, SESSION — прямые аналоги Java-классов окон из Модуля 07.
  2. WINDOWSTART и WINDOWEND — псевдо-столбцы для границ окна (BIGINT ms). Доступны в любом windowed запросе.
  3. GRACE PERIOD продлевает приём запоздавших записей после закрытия окна.
  4. Pull queries на windowed TABLE требуют указания ключа; фильтр по WINDOWSTART/WINDOWEND ограничивает диапазон.
  5. UDF (скаляр), UDAF (агрегат), UDTF (один-ко-многим) — три типа пользовательских функций, деплоятся как JAR в ext/.
Проверка знанийKnowledge check
Напишите ksqlDB запрос, который считает количество ошибок по HTTP-коду за 10-минутные tumbling окна с grace period 2 минуты из потока http_logs.
ОтветAnswer
Сначала нужно убедиться, что STREAM зарегистрирован: CREATE STREAM http_logs (status_code INT, path VARCHAR, ts BIGINT) WITH (kafka_topic='http-logs', value_format='JSON'); Затем windowed CTAS: CREATE TABLE errors_per_code AS SELECT status_code, COUNT(*) AS error_count, WINDOWSTART AS win_start FROM http_logs WHERE status_code >= 400 WINDOW TUMBLING (SIZE 10 MINUTES, GRACE PERIOD 2 MINUTES) GROUP BY status_code EMIT CHANGES; Это создаёт persistent query с Kafka Streams TopologyBuilder: источник http_logs → фильтр status_code >= 400 → TumblingWindow(10m, grace=2m) → GROUP BY status_code → COUNT(*) → материализованная TABLE.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что возвращают псевдо-столбцы WINDOWSTART и WINDOWEND в windowed запросе ksqlDB? Какой тип данных и что они означают?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5