Базовые SQL-запросы и временные окна
Flink SQL на 90% совпадает со стандартом SQL: тот же SELECT, WHERE, GROUP BY, JOIN, ORDER BY. Если вы пишете аналитику в Postgres или Snowflake, базовая часть Flink SQL вам уже знакома.
Streaming SQL в Spark Structured Streaming Различия начинаются там, где появляется streaming-семантика: бесконечные источники, event-time, windowing. В этом уроке разберём базовый набор операций, акцентируя внимание на streaming-нюансах.
Простые запросы: SELECT, WHERE, projection
Самый простой continuous query — projection без агрегаций:
SELECT
user_id,
url,
ts
FROM clicks
WHERE url LIKE '/product/%';
Результат — append-only dynamic table. Каждое входящее событие либо проходит фильтр и появляется в результате (+I), либо отсекается. Никаких -U/+U, никакого state. Этот запрос имеет нулевой keyed state и горизонтально масштабируется параллелизмом source.
Преобразования в SELECT:
SELECT
user_id,
LOWER(url) AS url_lower,
DATE_FORMAT(ts, 'yyyy-MM-dd') AS event_date,
CASE WHEN url LIKE '/product/%' THEN 'product' ELSE 'other' END AS page_type
FROM clicks;
Все scalar-функции стандартного SQL поддерживаются: строковые (LOWER, UPPER, SUBSTRING, REGEXP_EXTRACT), даты (DATE_FORMAT, EXTRACT, TIMESTAMPADD), числовые, JSON-функции, типы данных. Полный list — в Flink SQL docs.
GROUP BY и агрегации
GROUP BY в streaming-контексте работает как ожидаемо — но порождает state и retract-changelog:
SELECT
user_id,
COUNT(*) AS clicks_count,
AVG(load_time_ms) AS avg_load_time
FROM clicks
GROUP BY user_id;
Что происходит под капотом:
- Каждое новое событие попадает в keyed state по
user_id(как HashMap внутри Flink). - Запоминается текущий count и running sum для каждого ключа.
- На выход идёт пара
-U (alice, old_count, old_avg)/+U (alice, new_count, new_avg).
Важно: state растёт линейно от количества уникальных ключей. Если ключи — это user_id и у вас 100 миллионов уникальных пользователей, state будет содержать 100 миллионов записей. Без TTL это будет расти бесконечно. Решения:
- Установить state TTL через
table.exec.state.ttl(или per-operator hint). - Добавить временное окно — тогда state очищается после закрытия окна.
- Использовать upsert sink с явным удалением старых записей.
GROUP BY без time window — это unbounded state. Запускайте такой запрос только если уверены в ограниченной кардинальности ключа или настроили TTL. На high-cardinality потоке (user_id, session_id, transaction_id) без TTL state быстро вырастет до десятков GB и убьёт TaskManager OOM.
Watermark в DDL
Чтобы использовать event-time окна, в DDL источника нужно объявить WATERMARK:
CREATE TABLE clicks (
user_id STRING,
url STRING,
event_time TIMESTAMP_LTZ(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'clicks',
...
);
Что означает WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND:
event_time— колонка, которая является event-time attribute.event_time - INTERVAL '5' SECOND— формула расчёта watermark. Здесь: watermark всегда отстаёт от maximum observed event_time на 5 секунд. Это значит, что мы готовы ждать события до 5 секунд out-of-orderness; после этого окна сt < watermarkзакрываются.
Watermark — фундамент event-time обработки. Если в DDL нет WATERMARK, нельзя использовать event-time оконные функции; Flink выкинет ошибку планирования. Подробно watermarks в модуле 06.
Tumbling windows (TUMBLE)
Tumbling window — это непересекающиеся окна фиксированной длины. Каждое событие попадает ровно в одно окно.
SELECT
window_start,
window_end,
user_id,
COUNT(*) AS clicks
FROM TABLE(
TUMBLE(TABLE clicks, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end, user_id;
Синтаксис Windowing TVF (table-valued function, Flink 1.13+). Он заменил старый синтаксис GROUP BY TUMBLE(event_time, ...) и стал каноничным.
Что возвращает: для каждого пользователя — count кликов за каждое одноминутное окно. Окно закрывается, когда watermark пересекает window_end. После закрытия результат финален и больше не обновляется — append-only changelog.
Hopping windows (HOP)
Hopping window — это пересекающиеся окна. Размер окна больше, чем шаг. Применяется для скользящих средних, anomaly detection с overlap.
SELECT
window_start,
window_end,
AVG(load_time_ms) AS avg_load
FROM TABLE(
HOP(TABLE clicks, DESCRIPTOR(event_time), INTERVAL '15' SECOND, INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end;
Параметры HOP: slide (шаг, 15s), size (длина окна, 1 минута). Каждое событие в этой схеме попадает в size / slide = 4 окна.
HOP state-heavy: для каждого события нужно держать его в нескольких active windows одновременно. State растёт линейно от количества окон в которые событие попадает. Если slide=1s, size=1h — событие в 3600 окон одновременно. Это редко практично — обычно ограничиваются 4-12 окнами. Если нужна высокая частота update — рассмотрите CUMULATE или другие подходы (например, materialized aggregations с slot-based downsample).
Cumulate windows (CUMULATE)
Cumulate window — это накопительное окно. Полезно для интрадей-агрегации: “сколько кликов с начала суток на каждый час”.
SELECT
window_start,
window_end,
COUNT(*) AS cumulative_clicks
FROM TABLE(
CUMULATE(TABLE clicks, DESCRIPTOR(event_time), INTERVAL '1' HOUR, INTERVAL '1' DAY)
)
GROUP BY window_start, window_end;
Параметры: step (1 час), size (1 день). Окна: [00:00, 01:00), [00:00, 02:00), …, [00:00, 24:00). Все окна имеют одинаковое начало — увеличивается только window_end.
ORDER BY в streaming
В обычном SQL ORDER BY сортирует весь результат. В streaming это не имеет смысла — поток бесконечен, отсортировать его финально невозможно. Flink ограничивает использование ORDER BY:
ORDER BYпо event-time возрастанию — разрешён. Это просто гарантирует выдачу в порядке event-time.
SELECT * FROM clicks
ORDER BY event_time;
ORDER BY+LIMIT— разрешён. Top-N запросы.
SELECT user_id, total_amount
FROM (
SELECT user_id, SUM(amount) AS total_amount,
ROW_NUMBER() OVER (ORDER BY SUM(amount) DESC) AS rn
FROM orders
GROUP BY user_id
)
WHERE rn <= 10;
ORDER BYбез time или LIMIT — запрещён. Compile-time error.
Простые SUM/AVG примеры
-- Tumbling: count кликов на сайт каждую минуту
SELECT
window_start,
url,
COUNT(*) AS click_count,
COUNT(DISTINCT user_id) AS unique_users
FROM TABLE(
TUMBLE(TABLE clicks, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end, url;
-- AVG response time за окно, фильтр по success-кодам
SELECT
window_start,
AVG(response_time_ms) AS avg_response,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY response_time_ms) AS p95
FROM TABLE(
TUMBLE(TABLE requests, DESCRIPTOR(event_time), INTERVAL '30' SECOND)
)
WHERE status_code BETWEEN 200 AND 299
GROUP BY window_start, window_end;
PERCENTILE_CONT — пример approximate-aggregation; точные percentile в streaming дорого (надо хранить все значения), Flink использует TDigest или подобный sketch.
Попробуй сам
- Напиши запрос, считающий “топ-5 URL по уникальным посетителям за каждый час”. Какой это будет тип окна и какой changelog породит?
- Объясни, что произойдёт, если в DDL Kafka source нет WATERMARK, а вы пытаетесь использовать
TUMBLE(TABLE t, DESCRIPTOR(ts), INTERVAL '1' MINUTE). Какая будет ошибка? - Подумай, в каком сценарии CUMULATE предпочтительнее HOP: представь, что нужно “сумма продаж нарастающим итогом по часам с начала дня”.