Learning Platform
Глоссарий Troubleshooting
Урок 13.03 · 22 мин
Средний
SQLGROUP BYWindowTumbleHopWatermarkEvent-time

Базовые SQL-запросы и временные окна

Flink SQL на 90% совпадает со стандартом SQL: тот же SELECT, WHERE, GROUP BY, JOIN, ORDER BY. Если вы пишете аналитику в Postgres или Snowflake, базовая часть Flink SQL вам уже знакома.

Streaming SQL в Spark Structured Streaming Различия начинаются там, где появляется streaming-семантика: бесконечные источники, event-time, windowing. В этом уроке разберём базовый набор операций, акцентируя внимание на streaming-нюансах.


Простые запросы: SELECT, WHERE, projection

Самый простой continuous query — projection без агрегаций:

SELECT
  user_id,
  url,
  ts
FROM clicks
WHERE url LIKE '/product/%';

Результат — append-only dynamic table. Каждое входящее событие либо проходит фильтр и появляется в результате (+I), либо отсекается. Никаких -U/+U, никакого state. Этот запрос имеет нулевой keyed state и горизонтально масштабируется параллелизмом source.

Преобразования в SELECT:

SELECT
  user_id,
  LOWER(url) AS url_lower,
  DATE_FORMAT(ts, 'yyyy-MM-dd') AS event_date,
  CASE WHEN url LIKE '/product/%' THEN 'product' ELSE 'other' END AS page_type
FROM clicks;

Все scalar-функции стандартного SQL поддерживаются: строковые (LOWER, UPPER, SUBSTRING, REGEXP_EXTRACT), даты (DATE_FORMAT, EXTRACT, TIMESTAMPADD), числовые, JSON-функции, типы данных. Полный list — в Flink SQL docs.


GROUP BY и агрегации

GROUP BY в streaming-контексте работает как ожидаемо — но порождает state и retract-changelog:

SELECT
  user_id,
  COUNT(*) AS clicks_count,
  AVG(load_time_ms) AS avg_load_time
FROM clicks
GROUP BY user_id;

Что происходит под капотом:

  1. Каждое новое событие попадает в keyed state по user_id (как HashMap внутри Flink).
  2. Запоминается текущий count и running sum для каждого ключа.
  3. На выход идёт пара -U (alice, old_count, old_avg) / +U (alice, new_count, new_avg).

Важно: state растёт линейно от количества уникальных ключей. Если ключи — это user_id и у вас 100 миллионов уникальных пользователей, state будет содержать 100 миллионов записей. Без TTL это будет расти бесконечно. Решения:

  • Установить state TTL через table.exec.state.ttl (или per-operator hint).
  • Добавить временное окно — тогда state очищается после закрытия окна.
  • Использовать upsert sink с явным удалением старых записей.
WARNING

GROUP BY без time window — это unbounded state. Запускайте такой запрос только если уверены в ограниченной кардинальности ключа или настроили TTL. На high-cardinality потоке (user_id, session_id, transaction_id) без TTL state быстро вырастет до десятков GB и убьёт TaskManager OOM.


Watermark в DDL

Чтобы использовать event-time окна, в DDL источника нужно объявить WATERMARK:

CREATE TABLE clicks (
  user_id STRING,
  url STRING,
  event_time TIMESTAMP_LTZ(3),
  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'clicks',
  ...
);

Что означает WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND:

  • event_time — колонка, которая является event-time attribute.
  • event_time - INTERVAL '5' SECOND — формула расчёта watermark. Здесь: watermark всегда отстаёт от maximum observed event_time на 5 секунд. Это значит, что мы готовы ждать события до 5 секунд out-of-orderness; после этого окна с t < watermark закрываются.

Watermark — фундамент event-time обработки. Если в DDL нет WATERMARK, нельзя использовать event-time оконные функции; Flink выкинет ошибку планирования. Подробно watermarks в модуле 06.


Tumbling windows (TUMBLE)

Tumbling window — это непересекающиеся окна фиксированной длины. Каждое событие попадает ровно в одно окно.

SELECT
  window_start,
  window_end,
  user_id,
  COUNT(*) AS clicks
FROM TABLE(
  TUMBLE(TABLE clicks, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end, user_id;

Синтаксис Windowing TVF (table-valued function, Flink 1.13+). Он заменил старый синтаксис GROUP BY TUMBLE(event_time, ...) и стал каноничным.

Что возвращает: для каждого пользователя — count кликов за каждое одноминутное окно. Окно закрывается, когда watermark пересекает window_end. После закрытия результат финален и больше не обновляется — append-only changelog.

Tumbling vs Hopping vs Sliding windows
Окна длиной 1 минута на потоке кликов
TUMBLEНепересекающиеся окна. Окно [10:00, 10:01), [10:01, 10:02), [10:02, 10:03). Каждое событие попадает ровно в одно окно. Минимальный state.
HOPПересекающиеся (sliding) окна. Размер 1 мин, шаг 15 сек. Окна: [10:00:00, 10:01:00), [10:00:15, 10:01:15), [10:00:30, 10:01:30). Одно событие попадает в 4 окна. State в 4 раза больше.
CUMULATEНакопительные окна. Например за день: окна [00:00, 01:00), [00:00, 02:00), ..., [00:00, 24:00). Дают прогрессивные snapshot'ы — полезно для интрадей-агрегации.

Hopping windows (HOP)

Hopping window — это пересекающиеся окна. Размер окна больше, чем шаг. Применяется для скользящих средних, anomaly detection с overlap.

SELECT
  window_start,
  window_end,
  AVG(load_time_ms) AS avg_load
FROM TABLE(
  HOP(TABLE clicks, DESCRIPTOR(event_time), INTERVAL '15' SECOND, INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end;

Параметры HOP: slide (шаг, 15s), size (длина окна, 1 минута). Каждое событие в этой схеме попадает в size / slide = 4 окна.

TIP

HOP state-heavy: для каждого события нужно держать его в нескольких active windows одновременно. State растёт линейно от количества окон в которые событие попадает. Если slide=1s, size=1h — событие в 3600 окон одновременно. Это редко практично — обычно ограничиваются 4-12 окнами. Если нужна высокая частота update — рассмотрите CUMULATE или другие подходы (например, materialized aggregations с slot-based downsample).


Cumulate windows (CUMULATE)

Cumulate window — это накопительное окно. Полезно для интрадей-агрегации: “сколько кликов с начала суток на каждый час”.

SELECT
  window_start,
  window_end,
  COUNT(*) AS cumulative_clicks
FROM TABLE(
  CUMULATE(TABLE clicks, DESCRIPTOR(event_time), INTERVAL '1' HOUR, INTERVAL '1' DAY)
)
GROUP BY window_start, window_end;

Параметры: step (1 час), size (1 день). Окна: [00:00, 01:00), [00:00, 02:00), …, [00:00, 24:00). Все окна имеют одинаковое начало — увеличивается только window_end.


ORDER BY в streaming

В обычном SQL ORDER BY сортирует весь результат. В streaming это не имеет смысла — поток бесконечен, отсортировать его финально невозможно. Flink ограничивает использование ORDER BY:

  1. ORDER BY по event-time возрастанию — разрешён. Это просто гарантирует выдачу в порядке event-time.
SELECT * FROM clicks
ORDER BY event_time;
  1. ORDER BY + LIMIT — разрешён. Top-N запросы.
SELECT user_id, total_amount
FROM (
  SELECT user_id, SUM(amount) AS total_amount,
         ROW_NUMBER() OVER (ORDER BY SUM(amount) DESC) AS rn
  FROM orders
  GROUP BY user_id
)
WHERE rn <= 10;
  1. ORDER BY без time или LIMITзапрещён. Compile-time error.

Простые SUM/AVG примеры

-- Tumbling: count кликов на сайт каждую минуту
SELECT
  window_start,
  url,
  COUNT(*) AS click_count,
  COUNT(DISTINCT user_id) AS unique_users
FROM TABLE(
  TUMBLE(TABLE clicks, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end, url;

-- AVG response time за окно, фильтр по success-кодам
SELECT
  window_start,
  AVG(response_time_ms) AS avg_response,
  PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY response_time_ms) AS p95
FROM TABLE(
  TUMBLE(TABLE requests, DESCRIPTOR(event_time), INTERVAL '30' SECOND)
)
WHERE status_code BETWEEN 200 AND 299
GROUP BY window_start, window_end;

PERCENTILE_CONT — пример approximate-aggregation; точные percentile в streaming дорого (надо хранить все значения), Flink использует TDigest или подобный sketch.


Попробуй сам

  1. Напиши запрос, считающий “топ-5 URL по уникальным посетителям за каждый час”. Какой это будет тип окна и какой changelog породит?
  2. Объясни, что произойдёт, если в DDL Kafka source нет WATERMARK, а вы пытаетесь использовать TUMBLE(TABLE t, DESCRIPTOR(ts), INTERVAL '1' MINUTE). Какая будет ошибка?
  3. Подумай, в каком сценарии CUMULATE предпочтительнее HOP: представь, что нужно “сумма продаж нарастающим итогом по часам с начала дня”.
Проверка знанийKnowledge check
Запрос: SELECT user_id, COUNT(*) FROM clicks GROUP BY user_id запускается на потоке с 50 млн уникальных пользователей в месяц. Через 3 месяца TaskManager падает с OOM. В чём проблема и как её решить, не меняя бизнес-логику?
ОтветAnswer
Проблема — unbounded state. GROUP BY user_id без time window держит в keyed state одну запись на каждый уникальный user_id. За 3 месяца накопилось ~150 млн ключей; даже если каждая запись 200 байт (счётчик + ключ + RocksDB-overhead), это 30 ГБ state. RocksDB поддерживает spillover на диск, но если working set большой — page-faults убивают throughput, а pinned memory растёт; в итоге OOM. Решения: (1) state TTL: SET table.exec.state.ttl = '7 days' — Flink периодически чистит записи, не обновлявшиеся 7 дней; (2) переписать на window: вместо unbounded counter использовать TUMBLE окно (например, по дням), тогда state очищается при закрытии окна; (3) изменить запрос на materialized aggregation с downsampling и периодической пересчисткой. Бизнес-логика "топ-N кликеров за всё время" может потребовать (3) или альтернативный storage (Redis с TTL), но в большинстве случаев TTL или окна решают проблему.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Какое SQL-выражение в Flink SQL DDL ОБЯЗАТЕЛЬНО для использования event-time оконных функций (TUMBLE, HOP, CUMULATE)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5