KStream, KTable и GlobalKTable
Kafka Streams — это не кластер. Не отдельный сервис. Не инфраструктура. Kafka Streams — это клиентская библиотека (JAR-файл), которую вы добавляете в обычное JVM-приложение. Ваше приложение запускается, читает из Kafka, преобразует данные и пишет обратно в Kafka. Никаких дополнительных серверов. Никаких лицензий. Только зависимость в pom.xml и логика обработки потоков.
Прежде чем писать код — необходимо понять три фундаментальные абстракции, на которых строится весь Kafka Streams DSL.
Что такое Kafka Streams
Архитектурная единица Kafka Streams — топология. Топология — это направленный ациклический граф (DAG) процессоров:
- Source Processor — читает записи из Kafka-топика. Точка входа топологии.
- Stream Processor — трансформирует, фильтрует, агрегирует записи. Узлы обработки.
- Sink Processor — записывает результаты в Kafka-топик. Точка выхода.
Каждое Kafka Streams приложение идентифицируется параметром application.id. Этот ID выполняет двойную роль: он становится идентификатором consumer group (для чтения входных топиков) и префиксом внутренних топиков (repartition, changelog). Все инстансы с одним application.id образуют один streaming-кластер, разделяя партиции между собой.
Kafka Streams — это клиентская библиотека, не кластер. Ваше приложение с Kafka Streams — это обычный JVM-процесс, который читает из Kafka и пишет в Kafka. Масштабирование = запуск дополнительных инстансов с одним application.id. Kafka автоматически перераспределяет партиции между инстансами через consumer group protocol.
KStream — поток событий
KStream представляет поток записей с INSERT-семантикой. Каждая запись — это независимый факт. Запись с ключом user-1 и значением {"action": "click"} — это отдельное событие, не связанное с предыдущими записями с тем же ключом.
Аналогия: журнал транзакций. Каждая строка журнала — независимая запись. Пять записей с ключом account-42 — это пять разных транзакций, не обновление одной записи.
StreamsBuilder builder = new StreamsBuilder();
// Создать KStream из топика "page-views"
KStream<String, PageView> pageViews = builder.stream(
"page-views",
Consumed.with(Serdes.String(), pageViewSerde)
);
Типичные данные для KStream:
- События кликов пользователей
- Транзакции (каждая транзакция — отдельное событие)
- Логи и метрики (каждая запись — измерение в момент времени)
- Показания IoT-датчиков
KTable — таблица изменений
KTable представляет changelog-поток с UPSERT-семантикой. Каждая запись — это обновление состояния для ключа. Последнее значение для каждого ключа — текущее состояние. Предыдущие значения для того же ключа устаревают.
Аналогия: таблица в базе данных с upsert. Если в таблице user_profiles появляется запись {user_id: "u-1", name: "Alice", email: "[email protected]"} — она создаёт или обновляет профиль пользователя u-1.
// Создать KTable из топика "user-profiles"
KTable<String, UserProfile> profiles = builder.table(
"user-profiles",
Consumed.with(Serdes.String(), profileSerde)
);
Внутри KTable хранит своё состояние в state store — по умолчанию RocksDB на локальном диске. При перезапуске или сбое состояние восстанавливается из changelog-топика.
Типичные данные для KTable:
- Профили пользователей (ключ = user_id, значение = текущий профиль)
- Инвентарь товаров (ключ = product_id, значение = текущий остаток)
- Балансы аккаунтов (ключ = account_id, значение = текущий баланс)
- Конфигурации (ключ = config_key, значение = текущее значение)
Stream-Table Duality
Потоки и таблицы — это два представления одних и тех же данных. Это центральная идея Kafka Streams.
Поток → Таблица: Материализуйте поток, агрегируя записи по ключу. KStream с транзакциями → groupByKey().aggregate() → KTable с текущим балансом.
Таблица → Поток: Каждое изменение в KTable превращается в запись в KStream. kTable.toStream() возвращает KStream, где каждая запись — это изменение в таблице.
Входящие записи
Входящие записи: (A,1), (B,2), (A,3). Эти же три записи будут интерпретированы по-разному KStream и KTable.KStream: 3 записи
KStream (INSERT semantics): все три записи сохраняются независимо. (A,1), (B,2), (A,3) — три отдельных события. KStream не знает, что (A,1) и (A,3) связаны.KTable: A=3, B=2
KTable (UPSERT semantics): (A,3) заменяет (A,1). Текущее состояние: A=3, B=2. KTable хранит только последнее значение для каждого ключа.GlobalKTable — глобальная таблица
GlobalKTable — это KTable, реплицированная на все инстансы приложения. Обычная KTable партиционирована: инстанс 0 видит партиции 0-1, инстанс 1 — партиции 2-3. GlobalKTable: каждый инстанс загружает весь топик целиком.
// Создать GlobalKTable из топика "countries"
GlobalKTable<String, Country> countries = builder.globalTable(
"countries",
Consumed.with(Serdes.String(), countrySerde)
);
Главное преимущество GlobalKTable: для join с KStream не нужно co-partitioning. Поскольку каждый инстанс имеет полную копию GlobalKTable, любая запись KStream может быть обогащена данными из GlobalKTable без привязки к партиции.
GlobalKTable загружает ВСЁ содержимое топика в память каждого инстанса. Для топика с миллионами записей это может потребовать гигабайты RAM. Используйте GlobalKTable только для справочных данных (коды стран, конфигурации). Для больших таблиц используйте обычный KTable join (с обязательным co-partitioning).
Сравнение: KStream vs KTable vs GlobalKTable
| Характеристика | KStream | KTable | GlobalKTable |
|---|---|---|---|
| Семантика | INSERT | UPSERT | UPSERT |
| State Store | Нет | Да (локальный) | Да (полная копия) |
| Для join | Co-partitioning | Co-partitioning | Не нужен |
| Память | Минимум | Пропорционально партиции | Весь топик на каждом инстансе |
| Типичные данные | Клики, транзакции, логи | Профили, инвентарь, балансы | Коды стран, тарифы, конфиги |
| Masштабируемость | Горизонтально (по партициям) | Горизонтально (по партициям) | Ограничена размером топика |
Как выбрать абстракцию
Используйте KStream когда:
- Каждое событие — независимый факт (транзакция, лог, клик)
- Вас интересует история событий, а не только текущее состояние
- Нужно обрабатывать каждое изменение отдельно
Используйте KTable когда:
- Вас интересует текущее состояние для каждого ключа
- Данные обновляются (профили пользователей, инвентарь)
- Нужен join с потоком с co-partitioning (исходный топик с правильным числом партиций)
Используйте GlobalKTable когда:
- Данные небольшие (тысячи или миллионы записей, не сотни миллионов)
- Нужен join без co-partitioning ограничений
- Данные — справочные (коды, конфигурации, тарифы)