Learning Platform
Глоссарий Troubleshooting
Урок 08.03 · 30 мин
Продвинутый
State StoreRocksDBIn-MemoryChangelog TopicStandby ReplicasState RestorationMaterialized

State Stores

Stateful операции Kafka Streams — count, aggregate, reduce, windowed aggregations, joins — требуют хранилища состояния. Без него невозможно помнить предыдущие записи: нельзя посчитать “сколько транзакций сегодня”, не запомнив предыдущие.

Kafka Streams реализует state store локально на инстансе, ответственном за обработку данной партиции. Это ключевое архитектурное решение: в отличие от централизованного хранилища (Redis, PostgreSQL), каждый инстанс обрабатывает своё состояние независимо — никакого latency от сетевых запросов к внешней БД.


Почему локальное состояние

Представьте: вы считаете количество транзакций по merchant_id. В топике 12 партиций, 3 инстанса Kafka Streams. Инстанс 0 обрабатывает партиции 0-3, инстанс 1 — 4-7, инстанс 2 — 8-11.

Для merchant_id=“merchant-42” все транзакции гарантированно попадают в одну партицию (через key-based partitioning). Значит, один инстанс обрабатывает ВСЕ транзакции этого merchant. Его локальный state store содержит полный и корректный счётчик для merchant-42.

Никакого coordination overhead. Никакого contention. Каждый инстанс — независимый, изолированный процессор своего подмножества ключей.


In-Memory State Store

Самый быстрый вариант: данные хранятся исключительно в куче JVM.

// Явно указать in-memory store через Materialized
KTable<String, Long> counts = stream
    .groupByKey()
    .count(
        Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(
            Stores.inMemoryKeyValueStore("click-counts")
        ).withValueSerde(Serdes.Long())
    );

Свойства in-memory store:

  • Максимальная скорость чтения и записи (нет disk I/O)
  • Данные теряются при падении или перезапуске приложения
  • При рестарте: полное восстановление из changelog-топика (replay всей истории)
  • Ограничен размером heap JVM

Когда использовать:

  • Небольшой объём состояния (счётчики в пределах GC-приемлемого размера)
  • Тесты и разработка
  • Топологии, где быстрое время восстановления из changelog важнее экономии памяти

RocksDB State Store

По умолчанию для всех stateful операций в Kafka Streams. RocksDB — это embedded LSM-tree key-value store, оптимизированный для write-heavy нагрузок.

// RocksDB используется по умолчанию
KTable<String, Long> counts = stream
    .groupByKey()
    .count(Materialized.as("click-counts-store"));

// Явное указание RocksDB store
KTable<String, Long> explicit = stream
    .groupByKey()
    .count(
        Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(
            Stores.persistentKeyValueStore("click-counts-persistent")
        ).withValueSerde(Serdes.Long())
    );

Свойства RocksDB store:

  • Данные персистируются на диск (директория state.dir из конфигурации)
  • Переживает рестарт приложения: при перезапуске Kafka Streams сначала проверяет локальный RocksDB
  • Если локальные данные актуальны (совпадают с changelog) — replay не нужен (быстрый старт)
  • Если данные устарели или отсутствуют — воспроизводится только дельта из changelog

Конфигурация пути:

state.dir=/var/lib/kafka-streams

RocksDB тюнинг (для продвинутых сценариев):

// Кастомный RocksDB конфиг через RocksDBConfigSetter
public class CustomRocksDBConfig implements RocksDBConfigSetter {
    @Override
    public void setConfig(String storeName, Options options, Map<String, Object> configs) {
        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
        tableConfig.setBlockCacheSize(64 * 1024 * 1024L);  // 64 MB block cache
        tableConfig.setFilterPolicy(new BloomFilter(10, false));
        options.setTableFormatConfig(tableConfig);
        options.setWriteBufferSize(32 * 1024 * 1024L);  // 32 MB memtable
    }
}

В конфигурации Streams: rocksdb.config.setter=com.example.CustomRocksDBConfig


Сравнение In-Memory vs RocksDB

In-Memory vs RocksDB: характеристики
In-MemoryДанные в куче JVM. Максимальная скорость: нет обращений к диску. Ограничен heap size. JVM GC давление при больших объёмах.
ПерсистентностьНет — данные теряются при рестарте. Восстановление: полный replay changelog с offset 0. Для 10 GB состояния = десятки минут восстановления.
ВосстановлениеПолный replay changelog от начала. Время восстановления = размер changelog / скорость чтения из Kafka. Блокирует обработку на время восстановления.
RocksDBДанные на диске (LSM-tree). Небольшой overhead по сравнению с in-memory, но поддерживает объёмы, превышающие доступную RAM.
ПерсистентностьДа — данные сохраняются в state.dir. При рестарте: сравнить local checkpoint с changelog, воспроизвести только дельту. Быстрый старт если данные актуальны.
ВосстановлениеЕсли локальный RocksDB актуален — минимальный replay (только дельта). Если данные потеряны (диск упал) — полный replay как у in-memory.

Changelog Topics

Каждый persistent state store автоматически создаёт changelog-топик в Kafka. Это механизм fault tolerance: любая запись в state store асинхронно реплицируется в changelog.

Имя changelog-топика: {application.id}-{store-name}-changelog

Пример: orders-processor-click-counts-store-changelog

Конфигурация changelog-топика:

  • cleanup.policy=compact — log compaction хранит только последнее значение для каждого ключа
  • Это гарантирует, что changelog не растёт бесконечно при обновлениях одних и тех же ключей
// Настройка changelog через Materialized
Materialized<String, Long, KeyValueStore<Bytes, byte[]>> materialized =
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("my-store")
        .withLoggingEnabled(Map.of(
            "min.insync.replicas", "2",
            "replication.factor", "3"
        ));

Без changelog: падение инстанса = потеря состояния = некорректные агрегаты. С changelog: падение = replay из changelog = восстановление корректного состояния.

Kafka Streams: Жизненный цикл State Store
Input TopicInput Topic: Kafka Streams задача читает записи из входного топика через внутренний KafkaConsumer. Задача привязана к набору партиций — один task на одну партицию (по умолчанию). Kafka Streams автоматически назначает задачи через механизм consumer group rebalance.
consume
Stream ProcessorStream Processor: обрабатывает каждую запись из входного топика. При stateful операции (aggregate, count, join) обращается к local state store для чтения и записи. Все операции со state store локальны — нет сетевых вызовов в ходе обработки.
read/write
State Store (RocksDB)State Store (RocksDB): локальная встроенная база данных на диске задачи. RocksDB — LSM-дерево с компакцией, оптимизировано для write-heavy нагрузки. Путь хранения: state.dir/{application.id}/{task.id}/{store-name}. Данные персистентны между перезапусками приложения (пока state.dir доступен).
changelog write
Changelog TopicChangelog Topic: каждая запись в state store автоматически реплицируется в changelog topic с cleanup.policy=compact. Формат: key=store_key, value=store_value. Имя: '{application.id}-{store-name}-changelog'. Это страховка — если state store потерян (диск умер, задача переехала), он пересоздаётся из changelog.
Crash / RebalanceCrash / Rebalance: при crash задачи или rebalance партиций — state store на упавшей задаче становится недоступен. Kafka Streams coordinator (consumer group coordinator) обнаруживает недоступный инстанс через heartbeat timeout (session.timeout.ms). Задача переназначается другому инстансу.
восстановление
Changelog TopicChangelog Topic: новый инстанс читает changelog с начала (offset 0) или с последнего checkpoint (localCheckpoint файл в state.dir). Checkpoint периодически записывается (commit.interval.ms). Восстановление из checkpoint значительно быстрее полного replay.
replay
State Store (новый)Восстановленный State Store: новый инстанс воспроизводит все записи из changelog в локальный RocksDB. Время восстановления зависит от: размера state store, пропускной способности broker-to-consumer, наличия checkpoint. Во время восстановления задача находится в состоянии REBALANCING — не обрабатывает новые записи.
Active StoreActive Store: основной инстанс задачи, обрабатывающий записи. Каждое изменение state store отражается в changelog topic. Активная задача управляет ровно одной копией state store. Используется для всех операций чтения и записи в ходе обработки.
changelog
Changelog TopicChangelog Topic: буфер между активным store и standby репликой. Standby инстанс подписан на changelog и непрерывно воспроизводит все изменения. Лаг standby = задержка чтения changelog (обычно секунды). Мониторинг: JMX метрика kafka.streams:type=stream-task-metrics,task-id={id},store-id={name},restore-remaining-records.
непрерывный replay
Standby ReplicaStandby Replica: второй инстанс Kafka Streams приложения непрерывно воспроизводит changelog, поддерживая горячую копию state store. При failover активного инстанса: Kafka Streams coordinator назначает задачу на инстанс со standby — восстановление мгновенное, standby уже содержит актуальные данные. Config: num.standby.replicas=1.
RocksDB (persistent)Persistent RocksDB (по умолчанию): данные на диске, changelog topic, переживает перезапуск JVM. Materialized.as('store-name'). Медленнее in-memory при одиночных операциях, но справляется с данными больше RAM. Используйте для production.
In-MemoryIn-Memory Store: данные только в RAM, changelog topic не создаётся. Materialized.as(Stores.inMemoryKeyValueStore('store-name')). Быстрее, но данные теряются при перезапуске — полный replay changelog при каждом старте. Используйте для тестов или tiny datasets.
Interactive QueriesInteractive Queries: доступ к state store из REST-эндпоинта вашего приложения. streams.store(StoreQueryParameters.fromNameAndType('my-store', QueryableStoreTypes.keyValueStore())). Позволяет запрашивать состояние без записи в Kafka. Поддерживает point lookup, range scan, all(). Требует реализации роутинга запросов между инстансами.

Процесс восстановления состояния

Когда Kafka Streams инстанс падает:

  1. Rebalance — consumer group protocol перераспределяет партиции. Другой инстанс получает задачу (task) с упавших партиций.
  2. Restoration — новый инстанс для этой задачи не имеет локального state store (или имеет устаревший). Начинается чтение changelog с последнего checkpoint.
  3. Replay — инстанс читает changelog-топик от начала (или с checkpoint), воспроизводя все записи в свой локальный store.
  4. Resumption — когда changelog полностью воспроизведён, инстанс начинает обрабатывать новые записи из основного топика.
WARNING

State store restoration может занять минуты или часы для больших состояний. 10 ГБ state store = десятки минут восстановления из changelog при типичной пропускной способности сети. В это время задача не обрабатывает новые записи — накапливается lag. Используйте standby replicas (num.standby.replicas >= 1) для критичных приложений.


Standby Replicas

Standby replicas — горячие копии state store на других инстансах. Настраиваются через num.standby.replicas.

# StreamsConfig
num.standby.replicas=1

Как работают standby replicas:

  • Для каждого state store создаётся N дополнительных копий на других инстансах
  • Standby непрерывно воспроизводит changelog-топик, поддерживая актуальную копию
  • При падении активного инстанса: standby мгновенно становится активным — никакого replay с нуля
  • Время восстановления: секунды вместо минут (только небольшая дельта, накопившаяся с момента последнего чтения changelog standby-ом)

Trade-off: дополнительный disk I/O (хранение копии) + network I/O (чтение changelog). Для критичных production приложений это оправданная цена.


Типы Store и Оконные агрегации

Kafka Streams предоставляет разные типы store для разных операций:

Тип StoreИнтерфейсИспользуется для
KeyValueStoreKeyValueStore<K, V>count, aggregate, reduce (non-windowed)
WindowStoreWindowStore<K, V>Windowed aggregations (tumbling, hopping, sliding)
SessionStoreSessionStore<K, V>Session windows
TimestampedKeyValueStoreTimestampedKeyValueStore<K, V>KTable с timestamp per entry
// WindowStore для windowed count
KTable<Windowed<String>, Long> windowedCounts = stream
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("windowed-counts")
        .withValueSerde(Serdes.Long()));

Interactive Queries

State stores доступны не только внутри топологии — вы можете запрашивать их напрямую из running приложения.

// Получить ссылку на state store
ReadOnlyKeyValueStore<String, Long> store = streams.store(
    StoreQueryParameters.fromNameAndType(
        "click-counts-store",
        QueryableStoreTypes.keyValueStore()
    )
);

// Прямой запрос
Long count = store.get("user-42");

// Диапазонный запрос
KeyValueIterator<String, Long> range = store.range("user-0", "user-9");

Это превращает Kafka Streams приложение в queryable microservice: вместо записи агрегатов в output-топик и их чтения из базы данных, клиент может запросить актуальное состояние напрямую через REST API, который вы добавляете поверх KafkaStreams.store().

TIP

Interactive queries позволяют запрашивать state store напрямую через REST API. Это превращает Kafka Streams приложение в микросервис с query endpoint — без необходимости писать результат в отдельный топик для чтения. Для мульти-инстансного приложения используйте StreamsMetadata для обнаружения, на каком инстансе находится нужный ключ, и делайте RPC-запрос к нему.


Отключение changelog

В некоторых случаях changelog не нужен — например, для кешей с коротким временем жизни или когда исходные данные можно восстановить другим способом:

// Отключить changelog logging (осторожно: потеря состояния при падении)
Materialized<String, Long, KeyValueStore<Bytes, byte[]>> noLog =
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("cache-store")
        .withLoggingDisabled();

Отключение changelog ускоряет запись (нет overhead реплицирования в Kafka), но теряет fault tolerance. Используйте только если: (a) состояние легко пересчитать, или (b) потеря состояния допустима.

Проверка знанийKnowledge check
Kafka Streams инстанс обрабатывает партиции 0-3 топика 'transactions'. State store 'tx-counts-store' содержит 5 ГБ данных. Инстанс падает. Standby replicas не настроены (num.standby.replicas=0). Опишите полную последовательность событий от момента падения до возобновления обработки записей.
ОтветAnswer
1. Heartbeat timeout: Kafka consumer group coordinator обнаруживает потерю инстанса через session.timeout.ms (по умолчанию 10 секунд). 2. Rebalance: инициируется consumer group rebalance. Партиции 0-3 переназначаются на один из оставшихся инстансов. 3. Task assignment: новый инстанс получает задачи для партиций 0-3. Он создаёт новый локальный state store (или находит устаревший RocksDB). 4. Restoration: инстанс начинает читать changelog-топик '{app-id}-tx-counts-store-changelog' с начала (или с последнего checkpoint). Нужно воспроизвести данные для 5 ГБ состояния. 5. Blocking: во время restoration инстанс НЕ обрабатывает новые записи из 'transactions'. Накапливается consumer lag. 6. Completion: когда changelog полностью воспроизведён и state store восстановлен, инстанс начинает обработку новых записей. Время = 5 ГБ / скорость чтения из Kafka (обычно 100-500 МБ/с) = 10-50 секунд плюс время rebalance.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Kafka Streams приложение использует stateful aggregate() для подсчёта статистики. После ребалансировки один инстанс начинает обрабатывать новые партиции. Какова роль changelog-топика в этом процессе?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6