Streams и Tables в ksqlDB
В Модуле 07 вы освоили Kafka Streams как Java-библиотеку: KStream, KTable, state stores, windowing — всё через Java DSL. Теперь рассмотрим другой подход к потоковой обработке поверх той же инфраструктуры. ksqlDB — это SQL-интерфейс, который компилирует SQL-выражения в Kafka Streams топологии. Одна и та же задача, разные языки.
Что такое ksqlDB
ksqlDB — это сервер потоковой обработки, разработанный Confluent. Он принимает SQL-запросы, компилирует их в Kafka Streams топологии и выполняет их непрерывно. Вы не пишете Java-код — вы пишете SQL.
Ключевые характеристики:
- Отдельный JVM-процесс. ksqlDB — не библиотека, а standalone-сервер (в отличие от Kafka Streams, который встраивается в ваше приложение).
- Кластерный режим. Несколько серверов ksqlDB с одинаковым
ksql.service.idобразуют кластер. Запросы распределяются между узлами. - Kafka Streams под капотом. Каждое SQL-выражение компилируется в Kafka Streams топологию. State stores, changelog topics, consumer groups — всё из Модуля 07 применимо здесь.
- Confluent Community License. Open-source, но не Apache License — проверьте ограничения перед коммерческим использованием.
Зачем нужен ksqlDB если есть Kafka Streams? Ответ прост: не все разработчики хотят писать на Java. Аналитики данных, SQL-инженеры, команды без Java-экспертизы — для них ksqlDB открывает потоковую обработку через знакомый язык.
Архитектура ksqlDB
Внутри каждого ksqlDB-сервера работает экземпляр Kafka Streams. Когда вы отправляете SQL-выражение через CLI или REST API, происходит следующее:
- Парсинг. SQL разбирается в абстрактное синтаксическое дерево.
- Планирование. Создаётся план выполнения — логический граф операций.
- Компиляция. План транслируется в Kafka Streams топологию (набор Source/Processor/Sink узлов).
- Выполнение. Kafka Streams экземпляр внутри ksqlDB-сервера запускает топологию как обычное Streams-приложение.
SQL-клиент отправляет выражение — ksqlDB компилирует его в Kafka Streams топологию
SQL-клиент (CLI / REST)
SQL-клиент: ksqlDB CLI или REST API. Отправляет SQL-выражения на ksqlDB-сервер. Получает результаты в реальном времени (push query) или разово (pull query).ksqlDB-сервер (кластер)
ksqlDB-сервер: парсит SQL, строит план, компилирует в Kafka Streams топологию. Несколько серверов с одинаковым ksql.service.id образуют кластер. Запросы распределяются между узлами через Raft-подобный протокол.Kafka Streams (внутренний)
Kafka Streams экземпляр внутри ksqlDB-сервера. Именно здесь происходит реальная обработка: Source процессор читает из Kafka, Stream процессоры трансформируют данные, Sink процессор пишет результаты обратно в Kafka. State stores — в RocksDB на диске ksqlDB-сервера.Kafka Cluster
Kafka Cluster: исходные топики (orders, users) и выходные топики (создаются автоматически для CSAS/CTAS запросов). ksqlDB хранит свои внутренние топики с префиксом ksql.service.id.Каждое SQL-выражение CREATE STREAM AS SELECT или CREATE TABLE AS SELECT компилируется в отдельную Kafka Streams топологию.
ksqlDB построен поверх Kafka Streams. Каждое SQL-выражение CREATE STREAM AS SELECT или CREATE TABLE AS SELECT компилируется в Kafka Streams topology и выполняется как стандартное Streams-приложение. Понимание Модуля 07 (KStream/KTable, state stores, windowing) даёт глубокое понимание того, что происходит под капотом.
STREAM vs TABLE в ksqlDB
Центральная идея ksqlDB — прямая проекция Kafka Streams абстракций на SQL:
| ksqlDB | Kafka Streams | Kafka-семантика |
|---|---|---|
STREAM | KStream | Append-only лог. Каждая строка — независимое событие |
TABLE | KTable | Changelog. Каждая строка — актуальное состояние ключа |
STREAM — это KStream в SQL. Записи иммутабельны. Новая строка с тем же ключом — это новое событие, а не обновление предыдущего. Поток транзакций, кликов, логов — всё это STREAM.
TABLE — это KTable в SQL. Каждый ключ имеет ровно одно актуальное значение. Новая строка с существующим ключом перезаписывает предыдущее значение. Словарь пользователей, инвентарь товаров, конфигурация — всё это TABLE.
CREATE STREAM
CREATE STREAM pageviews (
user_id VARCHAR,
page VARCHAR,
duration INT,
ts BIGINT
) WITH (
kafka_topic = 'pageviews',
value_format = 'JSON'
);
Это не создаёт новый Kafka-топик — это регистрирует существующий топик pageviews как STREAM в ksqlDB. Каждое сообщение в топике становится строкой в потоке.
CREATE TABLE
CREATE TABLE users (
user_id VARCHAR PRIMARY KEY,
name VARCHAR,
email VARCHAR,
city VARCHAR
) WITH (
kafka_topic = 'users',
value_format = 'AVRO'
);
Здесь PRIMARY KEY обязателен для TABLE — ksqlDB должен знать, по какому полю хранить актуальное состояние. Это прямой аналог ключа в KTable. Сообщение с user_id = 'user-42' и value = null является tombstone — удалением записи из таблицы.
Value Formats: сериализация данных
Параметр value_format в WITH (...) определяет, как сериализованы значения в Kafka-топике:
| Format | Описание | Требует Schema Registry |
|---|---|---|
JSON | Обычный JSON без схемы | Нет |
JSON_SR | JSON с регистрацией схемы в Schema Registry | Да |
AVRO | Avro binary с Confluent Wire Format | Да |
PROTOBUF | Protocol Buffers | Да |
DELIMITED | CSV / разделённый текст | Нет |
Для AVRO, PROTOBUF и JSON_SR ksqlDB автоматически регистрирует и получает схемы из Schema Registry (Модуль 06). При создании CSAS или CTAS ksqlDB регистрирует выходную схему от имени сервера.
Инференс схемы из Schema Registry
Для Avro и Protobuf ksqlDB может автоматически вывести структуру столбцов из Schema Registry — без явного объявления полей:
-- Без объявления столбцов — схема берётся из Schema Registry
CREATE STREAM pageviews WITH (
kafka_topic = 'pageviews',
value_format = 'AVRO'
);
Это работает, если продюсер уже зарегистрировал схему для топика pageviews. ksqlDB делает запрос в Schema Registry, получает структуру и создаёт STREAM с соответствующими столбцами.
Kafka Streams Java vs ksqlDB SQL: сравнение
Рассмотрим одну и ту же задачу — фильтрация событий просмотра страниц с длительностью более 10 секунд.
Kafka Streams (Java), Модуль 07:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, PageView> pageviews = builder.stream(
"pageviews",
Consumed.with(Serdes.String(), pageViewSerde)
);
KStream<String, PageView> filtered = pageviews
.filter((key, view) -> view.getDuration() > 10);
filtered.to("pageviews-filtered",
Produced.with(Serdes.String(), pageViewSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
ksqlDB (SQL):
CREATE STREAM pageviews (
user_id VARCHAR,
page VARCHAR,
duration INT
) WITH (kafka_topic = 'pageviews', value_format = 'JSON');
CREATE STREAM pageviews_filtered AS
SELECT user_id, page, duration
FROM pageviews
WHERE duration > 10
EMIT CHANGES;
Оба варианта создают идентичную Kafka Streams топологию под капотом: Source Processor (читает pageviews) → Filter Processor (duration > 10) → Sink Processor (пишет в выходной топик). ksqlDB просто генерирует Java-код автоматически.
Ключевые выводы
- ksqlDB — это сервер, который компилирует SQL в Kafka Streams топологии. Не библиотека, а отдельный процесс.
STREAM=KStream: append-only события.TABLE=KTable: актуальное состояние по ключу.CREATE STREAM ... WITH (kafka_topic=...)регистрирует существующий топик как STREAM, не создаёт новый.value_formatопределяет сериализацию:JSONбез схемы,AVRO/PROTOBUF/JSON_SRчерез Schema Registry.- Понимание Kafka Streams (Модуль 07) объясняет поведение ksqlDB: те же state stores, те же changelog topics, те же consumer groups.