Learning Platform
Глоссарий Troubleshooting
Урок 08.01 · 25 мин
Продвинутый
KStreamKTableGlobalKTableStream-Table DualityChangelogKafka Streams DSL

KStream, KTable и GlobalKTable

Kafka Streams — это не кластер. Не отдельный сервис. Не инфраструктура. Kafka Streams — это клиентская библиотека (JAR-файл), которую вы добавляете в обычное JVM-приложение. Ваше приложение запускается, читает из Kafka, преобразует данные и пишет обратно в Kafka. Никаких дополнительных серверов. Никаких лицензий. Только зависимость в pom.xml и логика обработки потоков.

Прежде чем писать код — необходимо понять три фундаментальные абстракции, на которых строится весь Kafka Streams DSL.


Что такое Kafka Streams

Архитектурная единица Kafka Streams — топология. Топология — это направленный ациклический граф (DAG) процессоров:

  • Source Processor — читает записи из Kafka-топика. Точка входа топологии.
  • Stream Processor — трансформирует, фильтрует, агрегирует записи. Узлы обработки.
  • Sink Processor — записывает результаты в Kafka-топик. Точка выхода.

Каждое Kafka Streams приложение идентифицируется параметром application.id. Этот ID выполняет двойную роль: он становится идентификатором consumer group (для чтения входных топиков) и префиксом внутренних топиков (repartition, changelog). Все инстансы с одним application.id образуют один streaming-кластер, разделяя партиции между собой.

Kafka Streams: Topology DAG
ordersSource Processor: читает записи из Kafka-топика и подаёт в поток обработки. Один source processor на каждый входной топик. Kafka Streams автоматически создаёт consumer внутри приложения для чтения — вы не управляете им напрямую.
customersSource Processor (второй вход): топик 'customers' читается как KTable — только последнее значение по каждому ключу хранится в state store. Используется для enrichment-join: каждый заказ обогащается актуальными данными клиента. Требует co-partitioning с 'orders'.
KStream / KTable
filter(amount > 100)Stateless оператор filter(): фильтрует записи без сохранения состояния. Не требует state store. Не создаёт changelog topic. Обрабатывает каждую запись независимо — вычислительная сложность O(1). DSL: stream.filter((k, v) -> v.getAmount() > 100)
join(customers)Stateful оператор join(): KStream-KTable join обогащает каждый заказ данными клиента. Требует co-partitioning: оба топика должны иметь одинаковое количество партиций и совпадающий ключ партиционирования. Результат — новый KStream с обогащёнными записями.
groupByKey().count()Stateful агрегация groupByKey().count(): считает количество заказов по ключу клиента. Каждый вызов count() создаёт state store (RocksDB по умолчанию) и changelog topic вида '{app.id}-{store-name}-changelog'. Промежуточные результаты материализуются как KTable.
State Store (RocksDB)Материализованная KTable 'customers'. RocksDB хранит текущее состояние каждого ключа на локальном диске задачи. Changelog topic ({app.id}-customers-changelog) обеспечивает восстановление при crash. num.standby.replicas=1 создаёт горячую копию на втором инстансе — failover без полного replay.
changelog backup
changelog topicChangelog Topic: автоматически создаётся Kafka Streams для каждого persistent state store. cleanup.policy=compact — хранит только последнее значение по каждому ключу. Имя формируется как '{application.id}-{store-name}-changelog'. Не требует ручного создания.
KTable результат
enriched-ordersSink Processor: записывает результат обработки в выходной Kafka-топик. Использует Producer API с exactly-once семантикой при processing.guarantee=exactly_once_v2. DSL: stream.to('enriched-orders', Produced.with(Serdes.String(), orderSerde))
application.idapplication.id: идентификатор Kafka Streams приложения. Используется как префикс consumer group ID и всех внутренних топиков (changelog, repartition). Все инстансы с одинаковым application.id образуют один вычислительный кластер и делят задачи между собой.
Topology.describe()Topology.describe(): метод, возвращающий текстовое описание DAG — все source/processor/sink узлы, связи между ними, state stores. Полезен для отладки: позволяет увидеть, какие внутренние repartition-топики добавил Kafka Streams автоматически при смене ключа.
NOTE

Kafka Streams — это клиентская библиотека, не кластер. Ваше приложение с Kafka Streams — это обычный JVM-процесс, который читает из Kafka и пишет в Kafka. Масштабирование = запуск дополнительных инстансов с одним application.id. Kafka автоматически перераспределяет партиции между инстансами через consumer group protocol.


KStream — поток событий

KStream представляет поток записей с INSERT-семантикой. Каждая запись — это независимый факт. Запись с ключом user-1 и значением {"action": "click"} — это отдельное событие, не связанное с предыдущими записями с тем же ключом.

Аналогия: журнал транзакций. Каждая строка журнала — независимая запись. Пять записей с ключом account-42 — это пять разных транзакций, не обновление одной записи.

StreamsBuilder builder = new StreamsBuilder();

// Создать KStream из топика "page-views"
KStream<String, PageView> pageViews = builder.stream(
    "page-views",
    Consumed.with(Serdes.String(), pageViewSerde)
);

Типичные данные для KStream:

  • События кликов пользователей
  • Транзакции (каждая транзакция — отдельное событие)
  • Логи и метрики (каждая запись — измерение в момент времени)
  • Показания IoT-датчиков

KTable — таблица изменений

KTable представляет changelog-поток с UPSERT-семантикой. Каждая запись — это обновление состояния для ключа. Последнее значение для каждого ключа — текущее состояние. Предыдущие значения для того же ключа устаревают.

Аналогия: таблица в базе данных с upsert. Если в таблице user_profiles появляется запись {user_id: "u-1", name: "Alice", email: "[email protected]"} — она создаёт или обновляет профиль пользователя u-1.

// Создать KTable из топика "user-profiles"
KTable<String, UserProfile> profiles = builder.table(
    "user-profiles",
    Consumed.with(Serdes.String(), profileSerde)
);

Внутри KTable хранит своё состояние в state store — по умолчанию RocksDB на локальном диске. При перезапуске или сбое состояние восстанавливается из changelog-топика.

Типичные данные для KTable:

  • Профили пользователей (ключ = user_id, значение = текущий профиль)
  • Инвентарь товаров (ключ = product_id, значение = текущий остаток)
  • Балансы аккаунтов (ключ = account_id, значение = текущий баланс)
  • Конфигурации (ключ = config_key, значение = текущее значение)

Stream-Table Duality

Потоки и таблицы — это два представления одних и тех же данных. Это центральная идея Kafka Streams.

Поток → Таблица: Материализуйте поток, агрегируя записи по ключу. KStream с транзакциями → groupByKey().aggregate() → KTable с текущим балансом.

Таблица → Поток: Каждое изменение в KTable превращается в запись в KStream. kTable.toStream() возвращает KStream, где каждая запись — это изменение в таблице.

INSERT vs UPSERT: KStream и KTable с одними данными
Три записи поступают в систему: ключ A=1, ключ B=2, ключ A=3 (обновление)

Входящие записи

Входящие записи: (A,1), (B,2), (A,3). Эти же три записи будут интерпретированы по-разному KStream и KTable.
fan-out

KStream: 3 записи

KStream (INSERT semantics): все три записи сохраняются независимо. (A,1), (B,2), (A,3) — три отдельных события. KStream не знает, что (A,1) и (A,3) связаны.

KTable: A=3, B=2

KTable (UPSERT semantics): (A,3) заменяет (A,1). Текущее состояние: A=3, B=2. KTable хранит только последнее значение для каждого ключа.

GlobalKTable — глобальная таблица

GlobalKTable — это KTable, реплицированная на все инстансы приложения. Обычная KTable партиционирована: инстанс 0 видит партиции 0-1, инстанс 1 — партиции 2-3. GlobalKTable: каждый инстанс загружает весь топик целиком.

// Создать GlobalKTable из топика "countries"
GlobalKTable<String, Country> countries = builder.globalTable(
    "countries",
    Consumed.with(Serdes.String(), countrySerde)
);

Главное преимущество GlobalKTable: для join с KStream не нужно co-partitioning. Поскольку каждый инстанс имеет полную копию GlobalKTable, любая запись KStream может быть обогащена данными из GlobalKTable без привязки к партиции.

WARNING

GlobalKTable загружает ВСЁ содержимое топика в память каждого инстанса. Для топика с миллионами записей это может потребовать гигабайты RAM. Используйте GlobalKTable только для справочных данных (коды стран, конфигурации). Для больших таблиц используйте обычный KTable join (с обязательным co-partitioning).


Сравнение: KStream vs KTable vs GlobalKTable

Три абстракции: характеристики
KStreamINSERT semantics. Каждая запись — независимое событие. Не хранит состояние. Не требует state store. Источник: append-only события.
ПартиционированиеОбрабатывает свои партиции. Инстанс с партицией N обрабатывает только записи этой партиции.
JoinДля join с другим KStream или KTable требует co-partitioning: одинаковое число партиций и одинаковую стратегию партиционирования.
KTableUPSERT semantics. Последнее значение для каждого ключа = текущее состояние. Требует state store (RocksDB). Источник: changelog топик.
ПартиционированиеОбрабатывает свои партиции. State store содержит только ключи своих партиций.
JoinДля join с KStream требует co-partitioning. State store хранит только локальные партиции.
GlobalKTableUPSERT semantics на всех инстансах. Весь топик реплицируется на каждый инстанс. Нет co-partitioning ограничений.
ПартиционированиеКаждый инстанс хранит полную копию всех партиций. Нет разделения — каждый инстанс = вся таблица.
JoinНе требует co-partitioning. KStream любой партиции может обогащаться данными GlobalKTable без переназначения.
ХарактеристикаKStreamKTableGlobalKTable
СемантикаINSERTUPSERTUPSERT
State StoreНетДа (локальный)Да (полная копия)
Для joinCo-partitioningCo-partitioningНе нужен
ПамятьМинимумПропорционально партицииВесь топик на каждом инстансе
Типичные данныеКлики, транзакции, логиПрофили, инвентарь, балансыКоды стран, тарифы, конфиги
MasштабируемостьГоризонтально (по партициям)Горизонтально (по партициям)Ограничена размером топика

Как выбрать абстракцию

Используйте KStream когда:

  • Каждое событие — независимый факт (транзакция, лог, клик)
  • Вас интересует история событий, а не только текущее состояние
  • Нужно обрабатывать каждое изменение отдельно

Используйте KTable когда:

  • Вас интересует текущее состояние для каждого ключа
  • Данные обновляются (профили пользователей, инвентарь)
  • Нужен join с потоком с co-partitioning (исходный топик с правильным числом партиций)

Используйте GlobalKTable когда:

  • Данные небольшие (тысячи или миллионы записей, не сотни миллионов)
  • Нужен join без co-partitioning ограничений
  • Данные — справочные (коды, конфигурации, тарифы)
Проверка знанийKnowledge check
У вас топик с обновлениями курсов валют, где ключ — пара валют (USD/EUR). Вы хотите обогатить поток транзакций текущим курсом. Поток транзакций имеет 12 партиций. Топик с курсами валют содержит около 500 уникальных пар и обновляется каждые 10 секунд. Какую абстракцию использовать для курсов валют и почему?
ОтветAnswer
KTable или GlobalKTable — оба подходят, но с разными компромиссами. GlobalKTable предпочтительнее здесь: 500 пар валют — это крошечный набор данных (несколько килобайт), поэтому загрузка полной копии на каждый инстанс не создаёт проблем с памятью. GlobalKTable устраняет требование co-partitioning: топик с курсами может иметь любое число партиций независимо от 12 партиций топика транзакций. Если бы данных были миллионы записей, правильным выбором был бы KTable с co-partitioning (обеспечить одинаковое число партиций для обоих топиков).
Spark Structured Streaming: основы Spark transformWithState: stateful processing

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Разработчик строит топологию Kafka Streams: поток событий кликов пользователей и таблица профилей пользователей. Он хочет подсчитать количество кликов для каждого пользователя (сохраняя только текущий счётчик, не историю). Какую комбинацию абстракций правильно использовать?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6