Learning Platform
Глоссарий Troubleshooting
Урок 09.02 · 25 мин
Продвинутый
CSASCTASPush QueryPull QueryEMIT CHANGESMaterialized ViewPersistent Query

CSAS/CTAS и Push/Pull запросы

В предыдущем уроке мы создавали STREAM и TABLE, регистрируя существующие Kafka-топики. Теперь рассмотрим, как создавать непрерывные запросы — выражения, которые непрерывно читают из источника, трансформируют данные и пишут результаты в новый топик. Это и есть ядро потоковой обработки в ksqlDB.


CSAS: CREATE STREAM AS SELECT

CSAS (Create Stream As Select) — создаёт persistent query, который непрерывно читает из исходного STREAM, трансформирует данные и записывает результат в новый Kafka-топик.

CREATE STREAM enriched_orders AS
  SELECT
    o.order_id,
    o.amount,
    o.product,
    c.name AS customer_name,
    c.city AS customer_city
  FROM orders_stream o
  LEFT JOIN customers_table c ON o.customer_id = c.user_id
  EMIT CHANGES;

Что происходит при выполнении этого выражения:

  1. ksqlDB автоматически создаёт новый Kafka-топик ENRICHED_ORDERS (имя берётся из названия STREAM, приводится к верхнему регистру).
  2. Запускается Kafka Streams топология: читает из orders и customers, выполняет LEFT JOIN, пишет в ENRICHED_ORDERS.
  3. Topology работает непрерывно. Каждое новое событие в orders немедленно проходит через джойн и появляется в ENRICHED_ORDERS.
  4. Запрос становится persistent — он сохраняется на сервере, переживает рестарты ksqlDB и виден в SHOW QUERIES.

CSAS — это эквивалент .filter(), .map(), .join().to() в Kafka Streams DSL, только в одном SQL-выражении.


CTAS: CREATE TABLE AS SELECT

CTAS (Create Table As Select) — создаёт persistent query, который непрерывно читает из источника, агрегирует данные и материализует результат как TABLE.

CREATE TABLE order_counts AS
  SELECT
    customer_id,
    COUNT(*) AS total_orders,
    SUM(amount) AS total_amount
  FROM orders_stream
  GROUP BY customer_id
  EMIT CHANGES;

Что происходит:

  1. Создаётся Kafka-топик ORDER_COUNTS — выходной changelog.
  2. Запускается Kafka Streams топология с state store (RocksDB): агрегирует orders по customer_id.
  3. При каждом новом заказе: счётчик и сумма для customer_id обновляются, новое значение записывается в выходной топик.
  4. TABLE order_counts становится материализованным представлением — оно доступно для pull-запросов (точечных lookups по ключу).

CTAS — это эквивалент .groupByKey().aggregate(..., Materialized.as("store")) в Kafka Streams DSL.


Push queries: непрерывная подписка

Push query — это запрос с EMIT CHANGES, который возвращает новые строки по мере их появления. Соединение не закрывается.

SELECT * FROM enriched_orders EMIT CHANGES;

Или с условием:

SELECT order_id, amount, customer_name
FROM enriched_orders
WHERE amount > 1000
EMIT CHANGES;

Поведение:

  • Клиент (CLI или HTTP-соединение) получает каждую новую строку в момент её появления.
  • Соединение остаётся открытым бессрочно — пока клиент не закроет его.
  • Работает как с STREAM, так и с TABLE.
  • Transient (временный): если клиент отключился, запрос завершается. Никаких постоянных ресурсов не расходуется.

Push query через REST API:

curl -X POST http://localhost:8088/query-stream \
  -H 'Content-Type: application/vnd.ksql.v1+json' \
  -d '{"sql": "SELECT * FROM enriched_orders EMIT CHANGES;"}'

Сервер возвращает chunked HTTP-ответ: каждая строка — отдельный JSON-объект в потоке.

Применение: реальная-временные дашборды, мониторинг событий, WebSocket-обновления UI.


Pull queries: точечный lookup

Pull query — это обычный SELECT без EMIT CHANGES. Возвращает текущее состояние, затем закрывает соединение.

SELECT customer_id, total_orders, total_amount
FROM order_counts
WHERE customer_id = 'customer-42';

Поведение:

  • Немедленно возвращает текущее значение для указанного ключа.
  • Соединение закрывается сразу после ответа.
  • Работает только с материализованными TABLE (результат CTAS или TABLE с явным state store).
  • Под капотом: запрос идёт в RocksDB state store ksqlDB-сервера — это O(1) lookup по ключу.
TIP

Pull queries работают только с материализованными таблицами (результат CTAS или CREATE TABLE с query). Попытка выполнить pull query на STREAM вернёт ошибку. Если нужен точечный запрос — материализуйте данные через CTAS.

Pull query через REST API:

curl -X POST http://localhost:8088/query \
  -H 'Content-Type: application/vnd.ksql.v1+json' \
  -d '{"ksql": "SELECT * FROM order_counts WHERE customer_id = '\''customer-42'\'';", "streamsProperties": {}}'

Ответ: массив строк с текущим состоянием. Одноразовый запрос, как обычный SELECT к базе данных.

Применение: REST API backend, сервисный слой, ответы на запросы пользователей о текущем состоянии.


Push vs Pull: визуальное сравнение

Push Query vs Pull Query

Push Query (EMIT CHANGES)

Push Query (EMIT CHANGES): клиент открывает соединение один раз. Каждое новое событие в потоке немедленно доставляется клиенту. Соединение не закрывается — клиент получает данные в реальном времени. Работает на STREAM и TABLE.
непрерывный поток строк

Клиент (дашборд)

Push Query клиент: получает строки по мере появления. Timeout или явное закрытие — единственный способ завершить. Используется для дашбордов, мониторинга, событийных уведомлений.

Pull Query (WHERE key=)

Pull Query (без EMIT CHANGES): клиент делает один запрос. ksqlDB идёт в RocksDB state store, берёт текущее значение по ключу и возвращает результат. Соединение закрывается. Только для материализованных TABLE (CTAS).
запрос / ответ

Клиент (REST API)

Pull Query клиент: получает текущее значение и сразу закрывает соединение. Ведёт себя как обычный SELECT к базе данных. Подходит для REST API, микросервисов, запросов на конкретный ключ.
WARNING

Каждый persistent query (CSAS/CTAS) = один Kafka Streams topology = один consumer group. 100 persistent queries = 100 consumer groups = 100 наборов state stores. Планируйте ёмкость ksqlDB серверов соответственно: CPU для stream processing, диск для RocksDB state stores, память для кэша.


Persistent vs transient queries

Не все запросы в ksqlDB живут вечно:

Тип запросаСоздаётся черезLifetimeВиден в SHOW QUERIES
Persistent queryCSAS / CTAS / INSERT INTOДо явного TERMINATEДа
Transient push querySELECT … EMIT CHANGESДо отключения клиентаНет
Transient pull querySELECT … WHERE key=До получения ответаНет

Persistent queries — это продакшн-рабочая нагрузка. Transient queries — это инструмент разработки и отладки.


INSERT INTO: слияние потоков

INSERT INTO добавляет записи в существующий STREAM или объединяет несколько потоков:

-- Два источника событий сливаются в один поток
INSERT INTO combined_events
  SELECT event_id, event_type, payload, ts
  FROM events_source_a;

INSERT INTO combined_events
  SELECT event_id, event_type, payload, ts
  FROM events_source_b;

Каждый INSERT INTO создаёт отдельный persistent query. Оба потока записывают в один и тот же Kafka-топик combined_events. Это Kafka Streams merge() в SQL-синтаксисе.


Управление запросами

-- Просмотр всех persistent queries
SHOW QUERIES;

-- Детальное описание топологии для конкретного запроса
EXPLAIN CSAS_ENRICHED_ORDERS_0;

-- Остановка persistent query
TERMINATE CSAS_ENRICHED_ORDERS_0;

EXPLAIN возвращает текстовое описание Kafka Streams топологии — те же Source Processor, Filter Processor, Sink Processor, что вы видели в Модуле 07. Это полезный инструмент для отладки производительности.


Ключевые выводы

  1. CSAS создаёт persistent query, который непрерывно трансформирует один STREAM в другой.
  2. CTAS создаёт persistent query, который агрегирует данные в материализованную TABLE (с state store).
  3. Push query (EMIT CHANGES) — непрерывная подписка на поток, соединение открыто, работает на STREAM и TABLE.
  4. Pull query (без EMIT CHANGES) — точечный lookup по ключу, только для материализованных TABLE.
  5. Каждый persistent query = отдельная Kafka Streams топология = отдельный consumer group. Ресурсы планируются соответственно.
Проверка знанийKnowledge check
У вас есть поток заказов. Вы хотите: (1) в реальном времени видеть все заказы дороже 1000, (2) по запросу узнать текущую сумму заказов конкретного клиента. Какие SQL-выражения ksqlDB нужно создать?
ОтветAnswer
Для (1) нужен CSAS с push query: сначала CREATE STREAM filtered_orders AS SELECT * FROM orders_stream WHERE amount > 1000 EMIT CHANGES; — это создаёт persistent query с фильтром. Затем SELECT * FROM filtered_orders EMIT CHANGES; — push query для реального наблюдения. Для (2) нужен CTAS: CREATE TABLE customer_totals AS SELECT customer_id, SUM(amount) AS total FROM orders_stream GROUP BY customer_id EMIT CHANGES; — материализует агрегацию. Затем pull query: SELECT * FROM customer_totals WHERE customer_id = 'cust-42'; — возвращает текущую сумму и закрывает соединение.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. В чём разница между persistent query и transient query в ksqlDB?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5