Learning Platform
Глоссарий Troubleshooting
Урок 09.01 · 25 мин
Продвинутый
ksqlDBCREATE STREAMCREATE TABLEKStreamKTablevalue_formatSchema Registry

Streams и Tables в ksqlDB

В Модуле 07 вы освоили Kafka Streams как Java-библиотеку: KStream, KTable, state stores, windowing — всё через Java DSL. Теперь рассмотрим другой подход к потоковой обработке поверх той же инфраструктуры. ksqlDB — это SQL-интерфейс, который компилирует SQL-выражения в Kafka Streams топологии. Одна и та же задача, разные языки.


Что такое ksqlDB

ksqlDB — это сервер потоковой обработки, разработанный Confluent. Он принимает SQL-запросы, компилирует их в Kafka Streams топологии и выполняет их непрерывно. Вы не пишете Java-код — вы пишете SQL.

Ключевые характеристики:

  • Отдельный JVM-процесс. ksqlDB — не библиотека, а standalone-сервер (в отличие от Kafka Streams, который встраивается в ваше приложение).
  • Кластерный режим. Несколько серверов ksqlDB с одинаковым ksql.service.id образуют кластер. Запросы распределяются между узлами.
  • Kafka Streams под капотом. Каждое SQL-выражение компилируется в Kafka Streams топологию. State stores, changelog topics, consumer groups — всё из Модуля 07 применимо здесь.
  • Confluent Community License. Open-source, но не Apache License — проверьте ограничения перед коммерческим использованием.

Зачем нужен ksqlDB если есть Kafka Streams? Ответ прост: не все разработчики хотят писать на Java. Аналитики данных, SQL-инженеры, команды без Java-экспертизы — для них ksqlDB открывает потоковую обработку через знакомый язык.


Архитектура ksqlDB

Внутри каждого ksqlDB-сервера работает экземпляр Kafka Streams. Когда вы отправляете SQL-выражение через CLI или REST API, происходит следующее:

  1. Парсинг. SQL разбирается в абстрактное синтаксическое дерево.
  2. Планирование. Создаётся план выполнения — логический граф операций.
  3. Компиляция. План транслируется в Kafka Streams топологию (набор Source/Processor/Sink узлов).
  4. Выполнение. Kafka Streams экземпляр внутри ksqlDB-сервера запускает топологию как обычное Streams-приложение.
Архитектура ksqlDB

SQL-клиент отправляет выражение — ksqlDB компилирует его в Kafka Streams топологию

SQL-клиент (CLI / REST)

SQL-клиент: ksqlDB CLI или REST API. Отправляет SQL-выражения на ksqlDB-сервер. Получает результаты в реальном времени (push query) или разово (pull query).
SQL-запрос

ksqlDB-сервер (кластер)

ksqlDB-сервер: парсит SQL, строит план, компилирует в Kafka Streams топологию. Несколько серверов с одинаковым ksql.service.id образуют кластер. Запросы распределяются между узлами через Raft-подобный протокол.
Kafka Streams топология

Kafka Streams (внутренний)

Kafka Streams экземпляр внутри ksqlDB-сервера. Именно здесь происходит реальная обработка: Source процессор читает из Kafka, Stream процессоры трансформируют данные, Sink процессор пишет результаты обратно в Kafka. State stores — в RocksDB на диске ksqlDB-сервера.
чтение / запись

Kafka Cluster

Kafka Cluster: исходные топики (orders, users) и выходные топики (создаются автоматически для CSAS/CTAS запросов). ksqlDB хранит свои внутренние топики с префиксом ksql.service.id.

Каждое SQL-выражение CREATE STREAM AS SELECT или CREATE TABLE AS SELECT компилируется в отдельную Kafka Streams топологию.

NOTE

ksqlDB построен поверх Kafka Streams. Каждое SQL-выражение CREATE STREAM AS SELECT или CREATE TABLE AS SELECT компилируется в Kafka Streams topology и выполняется как стандартное Streams-приложение. Понимание Модуля 07 (KStream/KTable, state stores, windowing) даёт глубокое понимание того, что происходит под капотом.


STREAM vs TABLE в ksqlDB

Центральная идея ksqlDB — прямая проекция Kafka Streams абстракций на SQL:

ksqlDBKafka StreamsKafka-семантика
STREAMKStreamAppend-only лог. Каждая строка — независимое событие
TABLEKTableChangelog. Каждая строка — актуальное состояние ключа

STREAM — это KStream в SQL. Записи иммутабельны. Новая строка с тем же ключом — это новое событие, а не обновление предыдущего. Поток транзакций, кликов, логов — всё это STREAM.

TABLE — это KTable в SQL. Каждый ключ имеет ровно одно актуальное значение. Новая строка с существующим ключом перезаписывает предыдущее значение. Словарь пользователей, инвентарь товаров, конфигурация — всё это TABLE.

CREATE STREAM

CREATE STREAM pageviews (
  user_id VARCHAR,
  page VARCHAR,
  duration INT,
  ts BIGINT
) WITH (
  kafka_topic = 'pageviews',
  value_format = 'JSON'
);

Это не создаёт новый Kafka-топик — это регистрирует существующий топик pageviews как STREAM в ksqlDB. Каждое сообщение в топике становится строкой в потоке.

CREATE TABLE

CREATE TABLE users (
  user_id VARCHAR PRIMARY KEY,
  name VARCHAR,
  email VARCHAR,
  city VARCHAR
) WITH (
  kafka_topic = 'users',
  value_format = 'AVRO'
);

Здесь PRIMARY KEY обязателен для TABLE — ksqlDB должен знать, по какому полю хранить актуальное состояние. Это прямой аналог ключа в KTable. Сообщение с user_id = 'user-42' и value = null является tombstone — удалением записи из таблицы.


Value Formats: сериализация данных

Параметр value_format в WITH (...) определяет, как сериализованы значения в Kafka-топике:

FormatОписаниеТребует Schema Registry
JSONОбычный JSON без схемыНет
JSON_SRJSON с регистрацией схемы в Schema RegistryДа
AVROAvro binary с Confluent Wire FormatДа
PROTOBUFProtocol BuffersДа
DELIMITEDCSV / разделённый текстНет

Для AVRO, PROTOBUF и JSON_SR ksqlDB автоматически регистрирует и получает схемы из Schema Registry (Модуль 06). При создании CSAS или CTAS ksqlDB регистрирует выходную схему от имени сервера.


Инференс схемы из Schema Registry

Для Avro и Protobuf ksqlDB может автоматически вывести структуру столбцов из Schema Registry — без явного объявления полей:

-- Без объявления столбцов — схема берётся из Schema Registry
CREATE STREAM pageviews WITH (
  kafka_topic = 'pageviews',
  value_format = 'AVRO'
);

Это работает, если продюсер уже зарегистрировал схему для топика pageviews. ksqlDB делает запрос в Schema Registry, получает структуру и создаёт STREAM с соответствующими столбцами.


Kafka Streams Java vs ksqlDB SQL: сравнение

Рассмотрим одну и ту же задачу — фильтрация событий просмотра страниц с длительностью более 10 секунд.

Kafka Streams (Java), Модуль 07:

StreamsBuilder builder = new StreamsBuilder();

KStream<String, PageView> pageviews = builder.stream(
    "pageviews",
    Consumed.with(Serdes.String(), pageViewSerde)
);

KStream<String, PageView> filtered = pageviews
    .filter((key, view) -> view.getDuration() > 10);

filtered.to("pageviews-filtered",
    Produced.with(Serdes.String(), pageViewSerde));

KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();

ksqlDB (SQL):

CREATE STREAM pageviews (
  user_id VARCHAR,
  page VARCHAR,
  duration INT
) WITH (kafka_topic = 'pageviews', value_format = 'JSON');

CREATE STREAM pageviews_filtered AS
  SELECT user_id, page, duration
  FROM pageviews
  WHERE duration > 10
  EMIT CHANGES;

Оба варианта создают идентичную Kafka Streams топологию под капотом: Source Processor (читает pageviews) → Filter Processor (duration > 10) → Sink Processor (пишет в выходной топик). ksqlDB просто генерирует Java-код автоматически.


Ключевые выводы

  1. ksqlDB — это сервер, который компилирует SQL в Kafka Streams топологии. Не библиотека, а отдельный процесс.
  2. STREAM = KStream: append-only события. TABLE = KTable: актуальное состояние по ключу.
  3. CREATE STREAM ... WITH (kafka_topic=...) регистрирует существующий топик как STREAM, не создаёт новый.
  4. value_format определяет сериализацию: JSON без схемы, AVRO/PROTOBUF/JSON_SR через Schema Registry.
  5. Понимание Kafka Streams (Модуль 07) объясняет поведение ksqlDB: те же state stores, те же changelog topics, те же consumer groups.
Проверка знанийKnowledge check
В чём разница между STREAM и TABLE в ksqlDB? Как они соотносятся с KStream и KTable из Kafka Streams?
ОтветAnswer
STREAM соответствует KStream: это append-only поток событий, где каждая строка — независимое неизменяемое событие. Новая строка с тем же ключом — это новое событие, не обновление. TABLE соответствует KTable: это актуальное состояние по ключу, где новая строка с существующим ключом перезаписывает предыдущее значение (UPSERT-семантика). Разница критична при выборе абстракции: поток транзакций и кликов → STREAM, словарь пользователей и инвентарь → TABLE. ksqlDB компилирует оба типа в Kafka Streams KStream/KTable под капотом, используя те же state stores и changelog topics.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. В чём принципиальная разница между STREAM и TABLE в ksqlDB с точки зрения семантики Kafka-топика?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5