State Stores
Stateful операции Kafka Streams — count, aggregate, reduce, windowed aggregations, joins — требуют хранилища состояния. Без него невозможно помнить предыдущие записи: нельзя посчитать “сколько транзакций сегодня”, не запомнив предыдущие.
Kafka Streams реализует state store локально на инстансе, ответственном за обработку данной партиции. Это ключевое архитектурное решение: в отличие от централизованного хранилища (Redis, PostgreSQL), каждый инстанс обрабатывает своё состояние независимо — никакого latency от сетевых запросов к внешней БД.
Почему локальное состояние
Представьте: вы считаете количество транзакций по merchant_id. В топике 12 партиций, 3 инстанса Kafka Streams. Инстанс 0 обрабатывает партиции 0-3, инстанс 1 — 4-7, инстанс 2 — 8-11.
Для merchant_id=“merchant-42” все транзакции гарантированно попадают в одну партицию (через key-based partitioning). Значит, один инстанс обрабатывает ВСЕ транзакции этого merchant. Его локальный state store содержит полный и корректный счётчик для merchant-42.
Никакого coordination overhead. Никакого contention. Каждый инстанс — независимый, изолированный процессор своего подмножества ключей.
In-Memory State Store
Самый быстрый вариант: данные хранятся исключительно в куче JVM.
// Явно указать in-memory store через Materialized
KTable<String, Long> counts = stream
.groupByKey()
.count(
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(
Stores.inMemoryKeyValueStore("click-counts")
).withValueSerde(Serdes.Long())
);
Свойства in-memory store:
- Максимальная скорость чтения и записи (нет disk I/O)
- Данные теряются при падении или перезапуске приложения
- При рестарте: полное восстановление из changelog-топика (replay всей истории)
- Ограничен размером heap JVM
Когда использовать:
- Небольшой объём состояния (счётчики в пределах GC-приемлемого размера)
- Тесты и разработка
- Топологии, где быстрое время восстановления из changelog важнее экономии памяти
RocksDB State Store
По умолчанию для всех stateful операций в Kafka Streams. RocksDB — это embedded LSM-tree key-value store, оптимизированный для write-heavy нагрузок.
// RocksDB используется по умолчанию
KTable<String, Long> counts = stream
.groupByKey()
.count(Materialized.as("click-counts-store"));
// Явное указание RocksDB store
KTable<String, Long> explicit = stream
.groupByKey()
.count(
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(
Stores.persistentKeyValueStore("click-counts-persistent")
).withValueSerde(Serdes.Long())
);
Свойства RocksDB store:
- Данные персистируются на диск (директория
state.dirиз конфигурации) - Переживает рестарт приложения: при перезапуске Kafka Streams сначала проверяет локальный RocksDB
- Если локальные данные актуальны (совпадают с changelog) — replay не нужен (быстрый старт)
- Если данные устарели или отсутствуют — воспроизводится только дельта из changelog
Конфигурация пути:
state.dir=/var/lib/kafka-streams
RocksDB тюнинг (для продвинутых сценариев):
// Кастомный RocksDB конфиг через RocksDBConfigSetter
public class CustomRocksDBConfig implements RocksDBConfigSetter {
@Override
public void setConfig(String storeName, Options options, Map<String, Object> configs) {
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockCacheSize(64 * 1024 * 1024L); // 64 MB block cache
tableConfig.setFilterPolicy(new BloomFilter(10, false));
options.setTableFormatConfig(tableConfig);
options.setWriteBufferSize(32 * 1024 * 1024L); // 32 MB memtable
}
}
В конфигурации Streams: rocksdb.config.setter=com.example.CustomRocksDBConfig
Сравнение In-Memory vs RocksDB
Changelog Topics
Каждый persistent state store автоматически создаёт changelog-топик в Kafka. Это механизм fault tolerance: любая запись в state store асинхронно реплицируется в changelog.
Имя changelog-топика: {application.id}-{store-name}-changelog
Пример: orders-processor-click-counts-store-changelog
Конфигурация changelog-топика:
cleanup.policy=compact— log compaction хранит только последнее значение для каждого ключа- Это гарантирует, что changelog не растёт бесконечно при обновлениях одних и тех же ключей
// Настройка changelog через Materialized
Materialized<String, Long, KeyValueStore<Bytes, byte[]>> materialized =
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("my-store")
.withLoggingEnabled(Map.of(
"min.insync.replicas", "2",
"replication.factor", "3"
));
Без changelog: падение инстанса = потеря состояния = некорректные агрегаты. С changelog: падение = replay из changelog = восстановление корректного состояния.
Процесс восстановления состояния
Когда Kafka Streams инстанс падает:
- Rebalance — consumer group protocol перераспределяет партиции. Другой инстанс получает задачу (task) с упавших партиций.
- Restoration — новый инстанс для этой задачи не имеет локального state store (или имеет устаревший). Начинается чтение changelog с последнего checkpoint.
- Replay — инстанс читает changelog-топик от начала (или с checkpoint), воспроизводя все записи в свой локальный store.
- Resumption — когда changelog полностью воспроизведён, инстанс начинает обрабатывать новые записи из основного топика.
State store restoration может занять минуты или часы для больших состояний. 10 ГБ state store = десятки минут восстановления из changelog при типичной пропускной способности сети. В это время задача не обрабатывает новые записи — накапливается lag. Используйте standby replicas (num.standby.replicas >= 1) для критичных приложений.
Standby Replicas
Standby replicas — горячие копии state store на других инстансах. Настраиваются через num.standby.replicas.
# StreamsConfig
num.standby.replicas=1
Как работают standby replicas:
- Для каждого state store создаётся N дополнительных копий на других инстансах
- Standby непрерывно воспроизводит changelog-топик, поддерживая актуальную копию
- При падении активного инстанса: standby мгновенно становится активным — никакого replay с нуля
- Время восстановления: секунды вместо минут (только небольшая дельта, накопившаяся с момента последнего чтения changelog standby-ом)
Trade-off: дополнительный disk I/O (хранение копии) + network I/O (чтение changelog). Для критичных production приложений это оправданная цена.
Типы Store и Оконные агрегации
Kafka Streams предоставляет разные типы store для разных операций:
| Тип Store | Интерфейс | Используется для |
|---|---|---|
| KeyValueStore | KeyValueStore<K, V> | count, aggregate, reduce (non-windowed) |
| WindowStore | WindowStore<K, V> | Windowed aggregations (tumbling, hopping, sliding) |
| SessionStore | SessionStore<K, V> | Session windows |
| TimestampedKeyValueStore | TimestampedKeyValueStore<K, V> | KTable с timestamp per entry |
// WindowStore для windowed count
KTable<Windowed<String>, Long> windowedCounts = stream
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("windowed-counts")
.withValueSerde(Serdes.Long()));
Interactive Queries
State stores доступны не только внутри топологии — вы можете запрашивать их напрямую из running приложения.
// Получить ссылку на state store
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(
"click-counts-store",
QueryableStoreTypes.keyValueStore()
)
);
// Прямой запрос
Long count = store.get("user-42");
// Диапазонный запрос
KeyValueIterator<String, Long> range = store.range("user-0", "user-9");
Это превращает Kafka Streams приложение в queryable microservice: вместо записи агрегатов в output-топик и их чтения из базы данных, клиент может запросить актуальное состояние напрямую через REST API, который вы добавляете поверх KafkaStreams.store().
Interactive queries позволяют запрашивать state store напрямую через REST API. Это превращает Kafka Streams приложение в микросервис с query endpoint — без необходимости писать результат в отдельный топик для чтения. Для мульти-инстансного приложения используйте StreamsMetadata для обнаружения, на каком инстансе находится нужный ключ, и делайте RPC-запрос к нему.
Отключение changelog
В некоторых случаях changelog не нужен — например, для кешей с коротким временем жизни или когда исходные данные можно восстановить другим способом:
// Отключить changelog logging (осторожно: потеря состояния при падении)
Materialized<String, Long, KeyValueStore<Bytes, byte[]>> noLog =
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("cache-store")
.withLoggingDisabled();
Отключение changelog ускоряет запись (нет overhead реплицирования в Kafka), но теряет fault tolerance. Используйте только если: (a) состояние легко пересчитать, или (b) потеря состояния допустима.