CSAS/CTAS и Push/Pull запросы
В предыдущем уроке мы создавали STREAM и TABLE, регистрируя существующие Kafka-топики. Теперь рассмотрим, как создавать непрерывные запросы — выражения, которые непрерывно читают из источника, трансформируют данные и пишут результаты в новый топик. Это и есть ядро потоковой обработки в ksqlDB.
CSAS: CREATE STREAM AS SELECT
CSAS (Create Stream As Select) — создаёт persistent query, который непрерывно читает из исходного STREAM, трансформирует данные и записывает результат в новый Kafka-топик.
CREATE STREAM enriched_orders AS
SELECT
o.order_id,
o.amount,
o.product,
c.name AS customer_name,
c.city AS customer_city
FROM orders_stream o
LEFT JOIN customers_table c ON o.customer_id = c.user_id
EMIT CHANGES;
Что происходит при выполнении этого выражения:
- ksqlDB автоматически создаёт новый Kafka-топик
ENRICHED_ORDERS(имя берётся из названия STREAM, приводится к верхнему регистру). - Запускается Kafka Streams топология: читает из
ordersиcustomers, выполняет LEFT JOIN, пишет вENRICHED_ORDERS. - Topology работает непрерывно. Каждое новое событие в
ordersнемедленно проходит через джойн и появляется вENRICHED_ORDERS. - Запрос становится persistent — он сохраняется на сервере, переживает рестарты ksqlDB и виден в
SHOW QUERIES.
CSAS — это эквивалент .filter(), .map(), .join().to() в Kafka Streams DSL, только в одном SQL-выражении.
CTAS: CREATE TABLE AS SELECT
CTAS (Create Table As Select) — создаёт persistent query, который непрерывно читает из источника, агрегирует данные и материализует результат как TABLE.
CREATE TABLE order_counts AS
SELECT
customer_id,
COUNT(*) AS total_orders,
SUM(amount) AS total_amount
FROM orders_stream
GROUP BY customer_id
EMIT CHANGES;
Что происходит:
- Создаётся Kafka-топик
ORDER_COUNTS— выходной changelog. - Запускается Kafka Streams топология с state store (RocksDB): агрегирует orders по customer_id.
- При каждом новом заказе: счётчик и сумма для customer_id обновляются, новое значение записывается в выходной топик.
- TABLE
order_countsстановится материализованным представлением — оно доступно для pull-запросов (точечных lookups по ключу).
CTAS — это эквивалент .groupByKey().aggregate(..., Materialized.as("store")) в Kafka Streams DSL.
Push queries: непрерывная подписка
Push query — это запрос с EMIT CHANGES, который возвращает новые строки по мере их появления. Соединение не закрывается.
SELECT * FROM enriched_orders EMIT CHANGES;
Или с условием:
SELECT order_id, amount, customer_name
FROM enriched_orders
WHERE amount > 1000
EMIT CHANGES;
Поведение:
- Клиент (CLI или HTTP-соединение) получает каждую новую строку в момент её появления.
- Соединение остаётся открытым бессрочно — пока клиент не закроет его.
- Работает как с STREAM, так и с TABLE.
- Transient (временный): если клиент отключился, запрос завершается. Никаких постоянных ресурсов не расходуется.
Push query через REST API:
curl -X POST http://localhost:8088/query-stream \
-H 'Content-Type: application/vnd.ksql.v1+json' \
-d '{"sql": "SELECT * FROM enriched_orders EMIT CHANGES;"}'
Сервер возвращает chunked HTTP-ответ: каждая строка — отдельный JSON-объект в потоке.
Применение: реальная-временные дашборды, мониторинг событий, WebSocket-обновления UI.
Pull queries: точечный lookup
Pull query — это обычный SELECT без EMIT CHANGES. Возвращает текущее состояние, затем закрывает соединение.
SELECT customer_id, total_orders, total_amount
FROM order_counts
WHERE customer_id = 'customer-42';
Поведение:
- Немедленно возвращает текущее значение для указанного ключа.
- Соединение закрывается сразу после ответа.
- Работает только с материализованными TABLE (результат CTAS или TABLE с явным state store).
- Под капотом: запрос идёт в RocksDB state store ksqlDB-сервера — это O(1) lookup по ключу.
Pull queries работают только с материализованными таблицами (результат CTAS или CREATE TABLE с query). Попытка выполнить pull query на STREAM вернёт ошибку. Если нужен точечный запрос — материализуйте данные через CTAS.
Pull query через REST API:
curl -X POST http://localhost:8088/query \
-H 'Content-Type: application/vnd.ksql.v1+json' \
-d '{"ksql": "SELECT * FROM order_counts WHERE customer_id = '\''customer-42'\'';", "streamsProperties": {}}'
Ответ: массив строк с текущим состоянием. Одноразовый запрос, как обычный SELECT к базе данных.
Применение: REST API backend, сервисный слой, ответы на запросы пользователей о текущем состоянии.
Push vs Pull: визуальное сравнение
Push Query (EMIT CHANGES)
Push Query (EMIT CHANGES): клиент открывает соединение один раз. Каждое новое событие в потоке немедленно доставляется клиенту. Соединение не закрывается — клиент получает данные в реальном времени. Работает на STREAM и TABLE.Клиент (дашборд)
Push Query клиент: получает строки по мере появления. Timeout или явное закрытие — единственный способ завершить. Используется для дашбордов, мониторинга, событийных уведомлений.Pull Query (WHERE key=)
Pull Query (без EMIT CHANGES): клиент делает один запрос. ksqlDB идёт в RocksDB state store, берёт текущее значение по ключу и возвращает результат. Соединение закрывается. Только для материализованных TABLE (CTAS).Клиент (REST API)
Pull Query клиент: получает текущее значение и сразу закрывает соединение. Ведёт себя как обычный SELECT к базе данных. Подходит для REST API, микросервисов, запросов на конкретный ключ.Каждый persistent query (CSAS/CTAS) = один Kafka Streams topology = один consumer group. 100 persistent queries = 100 consumer groups = 100 наборов state stores. Планируйте ёмкость ksqlDB серверов соответственно: CPU для stream processing, диск для RocksDB state stores, память для кэша.
Persistent vs transient queries
Не все запросы в ksqlDB живут вечно:
| Тип запроса | Создаётся через | Lifetime | Виден в SHOW QUERIES |
|---|---|---|---|
| Persistent query | CSAS / CTAS / INSERT INTO | До явного TERMINATE | Да |
| Transient push query | SELECT … EMIT CHANGES | До отключения клиента | Нет |
| Transient pull query | SELECT … WHERE key= | До получения ответа | Нет |
Persistent queries — это продакшн-рабочая нагрузка. Transient queries — это инструмент разработки и отладки.
INSERT INTO: слияние потоков
INSERT INTO добавляет записи в существующий STREAM или объединяет несколько потоков:
-- Два источника событий сливаются в один поток
INSERT INTO combined_events
SELECT event_id, event_type, payload, ts
FROM events_source_a;
INSERT INTO combined_events
SELECT event_id, event_type, payload, ts
FROM events_source_b;
Каждый INSERT INTO создаёт отдельный persistent query. Оба потока записывают в один и тот же Kafka-топик combined_events. Это Kafka Streams merge() в SQL-синтаксисе.
Управление запросами
-- Просмотр всех persistent queries
SHOW QUERIES;
-- Детальное описание топологии для конкретного запроса
EXPLAIN CSAS_ENRICHED_ORDERS_0;
-- Остановка persistent query
TERMINATE CSAS_ENRICHED_ORDERS_0;
EXPLAIN возвращает текстовое описание Kafka Streams топологии — те же Source Processor, Filter Processor, Sink Processor, что вы видели в Модуле 07. Это полезный инструмент для отладки производительности.
Ключевые выводы
- CSAS создаёт persistent query, который непрерывно трансформирует один STREAM в другой.
- CTAS создаёт persistent query, который агрегирует данные в материализованную TABLE (с state store).
- Push query (
EMIT CHANGES) — непрерывная подписка на поток, соединение открыто, работает на STREAM и TABLE. - Pull query (без
EMIT CHANGES) — точечный lookup по ключу, только для материализованных TABLE. - Каждый persistent query = отдельная Kafka Streams топология = отдельный consumer group. Ресурсы планируются соответственно.