ksqlDB в Production
Разработка ksqlDB-пайплайна в interactive mode с CLI — это удобно. Но запуск в production требует другого подхода: декларативные запросы, управление ресурсами, мониторинг, высокая доступность. Рассмотрим, как правильно развернуть ksqlDB для production-нагрузки.
Interactive vs Headless mode
ksqlDB поддерживает два режима работы:
Interactive mode (по умолчанию)
# ksqlDB server config (ksql-server.properties)
ksql.headless=false
- REST API и CLI полностью доступны.
- Разработчики могут создавать, изменять, удалять запросы в любой момент.
- Используется для разработки, отладки, экспериментов.
- Риск в production: один случайный
TERMINATE query_idуничтожает критичный pipeline.
Headless mode (рекомендован для production)
# ksqlDB server config
ksql.headless=true
ksql.queries.file=/etc/ksqldb/queries.sql
- Сервер при старте читает SQL-файл и запускает все указанные queries.
- REST API для создания/изменения запросов недоступен — только чтение состояния.
- CLI не может подключиться для изменения запросов.
- Все изменения идут через Git: обновил
queries.sql→ задеплоил → ksqlDB-сервер перезапустился с новым набором запросов.
queries.sql для headless mode:
-- Создание источников
CREATE STREAM IF NOT EXISTS orders_stream (
order_id VARCHAR KEY,
customer_id VARCHAR,
amount DOUBLE,
product VARCHAR
) WITH (kafka_topic = 'orders', value_format = 'JSON');
CREATE TABLE IF NOT EXISTS customers_table (
customer_id VARCHAR PRIMARY KEY,
name VARCHAR,
tier VARCHAR
) WITH (kafka_topic = 'customers', value_format = 'AVRO');
-- Persistent queries
CREATE STREAM IF NOT EXISTS enriched_orders AS
SELECT o.order_id, o.amount, c.name AS customer_name, c.tier
FROM orders_stream o
LEFT JOIN customers_table c ON o.customer_id = c.customer_id
EMIT CHANGES;
CREATE TABLE IF NOT EXISTS customer_order_totals AS
SELECT customer_id, COUNT(*) AS total_orders, SUM(amount) AS total_amount
FROM orders_stream
GROUP BY customer_id
EMIT CHANGES;
В production используйте headless mode (ksql.queries.file). Interactive mode удобен для разработки, но в production один случайный TERMINATE уничтожает критичный pipeline. Headless mode делает все запросы декларативными и воспроизводимыми через систему контроля версий.
Capacity planning для ksqlDB кластера
Каждый persistent query = Kafka Streams топология = набор ресурсов:
На каждый persistent query выделяются:
- Один или несколько consumer group с задачами (tasks = количество партиций источника).
- State store на диске (RocksDB) пропорциональный размеру state: агрегации по ключам, windowed stores, join stores.
- CPU для stream processing: фильтрация, трансформация, агрегация.
- Сетевой bandwidth для чтения из Kafka и записи результатов.
Практические ориентиры:
| Параметр | Рекомендация |
|---|---|
| CPU | 1-2 ядра на каждые 10 lightweight queries (фильтр/map) |
| RAM | 2-4 GB per ksqlDB instance + Kafka Streams heap |
| Диск | RocksDB state store: планируйте 10x размер “hot” state |
| Nodes | Минимум 2 ноды для HA; масштабируйте горизонтально при росте числа queries |
Для 50 persistent queries на 2-нодовый кластер: каждая нода обрабатывает ~25 топологий. Если state stores агрессивны (windowed агрегации по миллионам ключей), потребность в диске может быть значительной.
Начальная конфигурация JVM:
# В переменной окружения KSQL_JVM_PERFORMANCE_OPTS
KSQL_JVM_PERFORMANCE_OPTS="-Xms2g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=200"
High Availability
ksqlDB наследует HA от Kafka Streams:
Нодовый сбой:
- Нода A падает. Её persistent queries теряют задачи.
- Оставшиеся ноды (B, C) через consumer group rebalancing получают задачи ноды A.
- Состояние восстанавливается из changelog-топиков в Kafka (как в Kafka Streams).
Standby replicas для быстрого восстановления:
# В ksqlDB server config
ksql.streams.num.standby.replicas=1
С этой настройкой каждая задача имеет горячий резерв на другой ноде, который непрерывно реплицирует changelog. При сбое — восстановление занимает секунды вместо минут.
Конфигурация кластера:
# Все ноды кластера должны использовать один ksql.service.id
ksql.service.id=production-ksqldb
# Listeners — настраивается для каждой ноды отдельно
listeners=http://0.0.0.0:8088
# Для ноды 1
ksql.advertised.listener=http://ksqldb-node-1:8088
В кластере из N нод ksqlDB использует Kafka-топик для координации (аналогично Kafka Connect group координации).
Мониторинг ksqlDB
ksqlDB экспортирует JMX-метрики. Ключевые метрики:
| Метрика | JMX путь | Описание |
|---|---|---|
messages-consumed-per-sec | io.confluent.ksql.metrics | Скорость чтения из Kafka |
messages-produced-per-sec | io.confluent.ksql.metrics | Скорость записи результатов |
num-running-queries | io.confluent.ksql.metrics | Число активных persistent queries |
error-rate | io.confluent.ksql.metrics | Частота ошибок обработки |
query-error-count | Per-query метрика | Ошибки конкретного query |
Prometheus JMX Exporter позволяет собирать JMX-метрики ksqlDB в Prometheus:
# docker-compose дополнение для jmx_exporter
- KSQL_JMX_OPTS="-Dcom.sun.jmx.remote.ssl=false -Dcom.sun.jmx.remote.authenticate=false -Djava.rmi.server.hostname=0.0.0.0 -Dcom.sun.jmx.remote.port=1099"
Grafana дашборды для ksqlDB должны включать:
- Consumer lag на входных топиках (рост = ksqlDB не успевает обрабатывать).
- State store size per query (рост без границ = проблема retention или windowing).
- Query error rate (ненулевой = проблема с данными или логикой).
- JVM heap usage (близко к Xmx = риск OOM и рестарта ноды).
ksqlDB + Connect: полный SQL-пайплайн
ksqlDB позволяет управлять Kafka Connect коннекторами прямо из SQL-интерфейса через CREATE SOURCE CONNECTOR и CREATE SINK CONNECTOR.
-- Создать source connector для чтения из PostgreSQL
CREATE SOURCE CONNECTOR jdbc_source WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
'connection.url' = 'jdbc:postgresql://db:5432/mydb',
'connection.user' = 'kafka_user',
'connection.password' = 'secret',
'table.whitelist' = 'users,products',
'mode' = 'incrementing',
'incrementing.column.name' = 'id',
'topic.prefix' = 'jdbc_'
);
-- После создания коннектора — users становится доступен как Kafka-топик
-- Регистрируем его как TABLE в ksqlDB
CREATE TABLE users_from_db (
id INT PRIMARY KEY,
name VARCHAR,
email VARCHAR
) WITH (kafka_topic = 'jdbc_users', value_format = 'JSON');
-- Создать sink connector для записи обработанных данных
CREATE SINK CONNECTOR elasticsearch_sink WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'topics' = 'ENRICHED_ORDERS',
'type.name' = '_doc',
'key.ignore' = 'true'
);
ksqlDB + Connect = единый SQL-интерфейс для data pipeline: CREATE SOURCE CONNECTOR для ingestion, CREATE STREAM AS SELECT для трансформации, CREATE SINK CONNECTOR для egression. Три SQL-выражения = полный ETL pipeline без написания Java-кода. Коннектор выполняется на Connect-кластере (Модуль 05) — ksqlDB только управляет им через Connect REST API.
Важно: коннектор физически работает на Kafka Connect worker (Модуль 05), не на ksqlDB-сервере. ksqlDB только отправляет команды в Connect REST API. Для этого требуется настройка:
ksql.connect.url=http://connect-worker:8083
Безопасность ksqlDB
Основные аспекты security:
Аутентификация:
# HTTP Basic Auth через Jetty
authentication.method=BASIC
authentication.roles=admin,user
authentication.realm=KsqlServer-Props
# Файл паролей в JAAS-формате
SSL/TLS для Kafka:
# Шифрование ksqlDB <-> Kafka
ksql.streams.security.protocol=SSL
ksql.streams.ssl.truststore.location=/etc/ssl/kafka.truststore.jks
ksql.streams.ssl.truststore.password=changeme
RBAC (Confluent Platform): ksqlDB поддерживает role-based access control через Confluent RBAC. Позволяет разграничить: кто может CREATE, кто может DESCRIBE, кто может TERMINATE.
Для детальной настройки security — см. Модуль 09 (Security). Базовое правило: ksqlDB в production должен работать за reverse proxy (Nginx/Traefik) с TLS-терминацией.
Ограничения ksqlDB: когда не стоит использовать
ksqlDB — мощный инструмент, но не универсальное решение:
| Ограничение | Описание |
|---|---|
| Один Kafka-кластер | ksqlDB не поддерживает cross-cluster queries. Если источники в разных кластерах — нужен Kafka Streams или MirrorMaker |
| Нет sliding windows | SlidingWindows из Kafka Streams DSL не имеет SQL-аналога в ksqlDB |
| Ограниченный FK join | KTable-KTable foreign key join из Kafka Streams имеет ограниченную поддержку в SQL |
| UDF на Java | Пользовательские функции пишутся только на Java/Kotlin — нельзя написать UDF на SQL |
| Community License | Confluent Community License запрещает некоторые коммерческие use cases — проверьте перед использованием |
| Сложные multi-stage pipelines | Для цепочек из 10+ трансформаций Kafka Streams DSL читается лучше, чем набор CSAS/CTAS |
Правило выбора: если команда SQL-ориентирована и задачи — стандартные фильтр/агрегация/join, ksqlDB оптимален. Если нужны sliding windows, Processor API, внешние вызовы в процессинге или embedding в микросервис — Kafka Streams.
Ключевые выводы
- Headless mode (
ksql.queries.file) — обязателен для production. Декларативный, воспроизводимый, управляется через Git. - Capacity planning: каждый persistent query требует CPU, диск (RocksDB), RAM. Горизонтальное масштабирование — добавление нод в кластер.
- HA через Kafka Streams rebalancing +
num.standby.replicasдля быстрого восстановления state. - Мониторинг: JMX → Prometheus → Grafana. Ключевые метрики: consumer lag, state store size, error rate.
- ksqlDB + Connect = SQL-интерфейс для полного ETL pipeline. Коннектор работает на Connect-кластере, управляется через ksqlDB SQL.