Learning Platform
Глоссарий Troubleshooting
Урок 09.04 · 25 мин
Продвинутый
ksqlDB ProductionInteractive ModeHeadless ModeCapacity PlanningMonitoringHigh AvailabilityConnector Integration

ksqlDB в Production

Разработка ksqlDB-пайплайна в interactive mode с CLI — это удобно. Но запуск в production требует другого подхода: декларативные запросы, управление ресурсами, мониторинг, высокая доступность. Рассмотрим, как правильно развернуть ksqlDB для production-нагрузки.


Interactive vs Headless mode

ksqlDB поддерживает два режима работы:

Interactive mode (по умолчанию)

# ksqlDB server config (ksql-server.properties)
ksql.headless=false
  • REST API и CLI полностью доступны.
  • Разработчики могут создавать, изменять, удалять запросы в любой момент.
  • Используется для разработки, отладки, экспериментов.
  • Риск в production: один случайный TERMINATE query_id уничтожает критичный pipeline.

Headless mode (рекомендован для production)

# ksqlDB server config
ksql.headless=true
ksql.queries.file=/etc/ksqldb/queries.sql
  • Сервер при старте читает SQL-файл и запускает все указанные queries.
  • REST API для создания/изменения запросов недоступен — только чтение состояния.
  • CLI не может подключиться для изменения запросов.
  • Все изменения идут через Git: обновил queries.sql → задеплоил → ksqlDB-сервер перезапустился с новым набором запросов.

queries.sql для headless mode:

-- Создание источников
CREATE STREAM IF NOT EXISTS orders_stream (
  order_id VARCHAR KEY,
  customer_id VARCHAR,
  amount DOUBLE,
  product VARCHAR
) WITH (kafka_topic = 'orders', value_format = 'JSON');

CREATE TABLE IF NOT EXISTS customers_table (
  customer_id VARCHAR PRIMARY KEY,
  name VARCHAR,
  tier VARCHAR
) WITH (kafka_topic = 'customers', value_format = 'AVRO');

-- Persistent queries
CREATE STREAM IF NOT EXISTS enriched_orders AS
  SELECT o.order_id, o.amount, c.name AS customer_name, c.tier
  FROM orders_stream o
  LEFT JOIN customers_table c ON o.customer_id = c.customer_id
  EMIT CHANGES;

CREATE TABLE IF NOT EXISTS customer_order_totals AS
  SELECT customer_id, COUNT(*) AS total_orders, SUM(amount) AS total_amount
  FROM orders_stream
  GROUP BY customer_id
  EMIT CHANGES;
WARNING

В production используйте headless mode (ksql.queries.file). Interactive mode удобен для разработки, но в production один случайный TERMINATE уничтожает критичный pipeline. Headless mode делает все запросы декларативными и воспроизводимыми через систему контроля версий.


Capacity planning для ksqlDB кластера

Каждый persistent query = Kafka Streams топология = набор ресурсов:

На каждый persistent query выделяются:

  • Один или несколько consumer group с задачами (tasks = количество партиций источника).
  • State store на диске (RocksDB) пропорциональный размеру state: агрегации по ключам, windowed stores, join stores.
  • CPU для stream processing: фильтрация, трансформация, агрегация.
  • Сетевой bandwidth для чтения из Kafka и записи результатов.

Практические ориентиры:

ПараметрРекомендация
CPU1-2 ядра на каждые 10 lightweight queries (фильтр/map)
RAM2-4 GB per ksqlDB instance + Kafka Streams heap
ДискRocksDB state store: планируйте 10x размер “hot” state
NodesМинимум 2 ноды для HA; масштабируйте горизонтально при росте числа queries

Для 50 persistent queries на 2-нодовый кластер: каждая нода обрабатывает ~25 топологий. Если state stores агрессивны (windowed агрегации по миллионам ключей), потребность в диске может быть значительной.

Начальная конфигурация JVM:

# В переменной окружения KSQL_JVM_PERFORMANCE_OPTS
KSQL_JVM_PERFORMANCE_OPTS="-Xms2g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=200"

High Availability

ksqlDB наследует HA от Kafka Streams:

Нодовый сбой:

  1. Нода A падает. Её persistent queries теряют задачи.
  2. Оставшиеся ноды (B, C) через consumer group rebalancing получают задачи ноды A.
  3. Состояние восстанавливается из changelog-топиков в Kafka (как в Kafka Streams).

Standby replicas для быстрого восстановления:

# В ksqlDB server config
ksql.streams.num.standby.replicas=1

С этой настройкой каждая задача имеет горячий резерв на другой ноде, который непрерывно реплицирует changelog. При сбое — восстановление занимает секунды вместо минут.

Конфигурация кластера:

# Все ноды кластера должны использовать один ksql.service.id
ksql.service.id=production-ksqldb

# Listeners — настраивается для каждой ноды отдельно
listeners=http://0.0.0.0:8088

# Для ноды 1
ksql.advertised.listener=http://ksqldb-node-1:8088

В кластере из N нод ksqlDB использует Kafka-топик для координации (аналогично Kafka Connect group координации).


Мониторинг ksqlDB

ksqlDB экспортирует JMX-метрики. Ключевые метрики:

МетрикаJMX путьОписание
messages-consumed-per-secio.confluent.ksql.metricsСкорость чтения из Kafka
messages-produced-per-secio.confluent.ksql.metricsСкорость записи результатов
num-running-queriesio.confluent.ksql.metricsЧисло активных persistent queries
error-rateio.confluent.ksql.metricsЧастота ошибок обработки
query-error-countPer-query метрикаОшибки конкретного query

Prometheus JMX Exporter позволяет собирать JMX-метрики ksqlDB в Prometheus:

# docker-compose дополнение для jmx_exporter
- KSQL_JMX_OPTS="-Dcom.sun.jmx.remote.ssl=false -Dcom.sun.jmx.remote.authenticate=false -Djava.rmi.server.hostname=0.0.0.0 -Dcom.sun.jmx.remote.port=1099"

Grafana дашборды для ksqlDB должны включать:

  • Consumer lag на входных топиках (рост = ksqlDB не успевает обрабатывать).
  • State store size per query (рост без границ = проблема retention или windowing).
  • Query error rate (ненулевой = проблема с данными или логикой).
  • JVM heap usage (близко к Xmx = риск OOM и рестарта ноды).

ksqlDB + Connect: полный SQL-пайплайн

ksqlDB позволяет управлять Kafka Connect коннекторами прямо из SQL-интерфейса через CREATE SOURCE CONNECTOR и CREATE SINK CONNECTOR.

-- Создать source connector для чтения из PostgreSQL
CREATE SOURCE CONNECTOR jdbc_source WITH (
  'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
  'connection.url' = 'jdbc:postgresql://db:5432/mydb',
  'connection.user' = 'kafka_user',
  'connection.password' = 'secret',
  'table.whitelist' = 'users,products',
  'mode' = 'incrementing',
  'incrementing.column.name' = 'id',
  'topic.prefix' = 'jdbc_'
);

-- После создания коннектора — users становится доступен как Kafka-топик
-- Регистрируем его как TABLE в ksqlDB
CREATE TABLE users_from_db (
  id INT PRIMARY KEY,
  name VARCHAR,
  email VARCHAR
) WITH (kafka_topic = 'jdbc_users', value_format = 'JSON');

-- Создать sink connector для записи обработанных данных
CREATE SINK CONNECTOR elasticsearch_sink WITH (
  'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url' = 'http://elasticsearch:9200',
  'topics' = 'ENRICHED_ORDERS',
  'type.name' = '_doc',
  'key.ignore' = 'true'
);
TIP

ksqlDB + Connect = единый SQL-интерфейс для data pipeline: CREATE SOURCE CONNECTOR для ingestion, CREATE STREAM AS SELECT для трансформации, CREATE SINK CONNECTOR для egression. Три SQL-выражения = полный ETL pipeline без написания Java-кода. Коннектор выполняется на Connect-кластере (Модуль 05) — ksqlDB только управляет им через Connect REST API.

Важно: коннектор физически работает на Kafka Connect worker (Модуль 05), не на ksqlDB-сервере. ksqlDB только отправляет команды в Connect REST API. Для этого требуется настройка:

ksql.connect.url=http://connect-worker:8083

Безопасность ksqlDB

Основные аспекты security:

Аутентификация:

# HTTP Basic Auth через Jetty
authentication.method=BASIC
authentication.roles=admin,user
authentication.realm=KsqlServer-Props
# Файл паролей в JAAS-формате

SSL/TLS для Kafka:

# Шифрование ksqlDB <-> Kafka
ksql.streams.security.protocol=SSL
ksql.streams.ssl.truststore.location=/etc/ssl/kafka.truststore.jks
ksql.streams.ssl.truststore.password=changeme

RBAC (Confluent Platform): ksqlDB поддерживает role-based access control через Confluent RBAC. Позволяет разграничить: кто может CREATE, кто может DESCRIBE, кто может TERMINATE.

Для детальной настройки security — см. Модуль 09 (Security). Базовое правило: ksqlDB в production должен работать за reverse proxy (Nginx/Traefik) с TLS-терминацией.


Ограничения ksqlDB: когда не стоит использовать

ksqlDB — мощный инструмент, но не универсальное решение:

ОграничениеОписание
Один Kafka-кластерksqlDB не поддерживает cross-cluster queries. Если источники в разных кластерах — нужен Kafka Streams или MirrorMaker
Нет sliding windowsSlidingWindows из Kafka Streams DSL не имеет SQL-аналога в ksqlDB
Ограниченный FK joinKTable-KTable foreign key join из Kafka Streams имеет ограниченную поддержку в SQL
UDF на JavaПользовательские функции пишутся только на Java/Kotlin — нельзя написать UDF на SQL
Community LicenseConfluent Community License запрещает некоторые коммерческие use cases — проверьте перед использованием
Сложные multi-stage pipelinesДля цепочек из 10+ трансформаций Kafka Streams DSL читается лучше, чем набор CSAS/CTAS

Правило выбора: если команда SQL-ориентирована и задачи — стандартные фильтр/агрегация/join, ksqlDB оптимален. Если нужны sliding windows, Processor API, внешние вызовы в процессинге или embedding в микросервис — Kafka Streams.


Ключевые выводы

  1. Headless mode (ksql.queries.file) — обязателен для production. Декларативный, воспроизводимый, управляется через Git.
  2. Capacity planning: каждый persistent query требует CPU, диск (RocksDB), RAM. Горизонтальное масштабирование — добавление нод в кластер.
  3. HA через Kafka Streams rebalancing + num.standby.replicas для быстрого восстановления state.
  4. Мониторинг: JMX → Prometheus → Grafana. Ключевые метрики: consumer lag, state store size, error rate.
  5. ksqlDB + Connect = SQL-интерфейс для полного ETL pipeline. Коннектор работает на Connect-кластере, управляется через ksqlDB SQL.
Проверка знанийKnowledge check
Команда планирует запустить 50 persistent queries в ksqlDB. Какие ресурсы нужно учесть при capacity planning и какой deployment mode рекомендуется?
ОтветAnswer
При capacity planning для 50 queries нужно учесть: (1) CPU — каждый query это Kafka Streams топология, нужны ядра для обработки; ориентир: 2-4 ноды с 8-16 vCPU. (2) Диск — каждый stateful query (CTAS, windowed агрегации, join) создаёт RocksDB state store; при большом объёме state нужны быстрые SSD. (3) RAM — JVM heap для Kafka Streams внутри ksqlDB, RocksDB block cache; минимум 8GB per нода. (4) Changelog topics — каждый state store создаёт changelog топик в Kafka, планируйте retention и дисковое пространство брокеров. Deployment mode: headless (ksql.queries.file) — all 50 queries описываются в SQL-файле, деплоятся через CI/CD, никаких случайных изменений через CLI в production.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. В чём главное отличие interactive mode от headless mode в ksqlDB? Какой режим рекомендуется для production и почему?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5