Learning Platform
Глоссарий Troubleshooting
Урок 08.06 · 20 мин
Продвинутый
Interactive QueriesStandby ReplicasEOS-v2Processor APISummary

Итоги модуля: Kafka Streams

Этот урок завершает Модуль 07. Здесь собраны дополнительные продвинутые темы, которые не уместились в предыдущих уроках, и систематизированы все ключевые концепции для подготовки к CCDAK.


Interactive Queries: Kafka Streams как queryable сервис

Kafka Streams — не только вычислительный движок. Приложение с materialized state store можно превратить в queryable микросервис: клиент запрашивает текущее состояние агрегации напрямую, без чтения output-топика.

Архитектура Interactive Queries:

  1. Kafka Streams приложение с 3 инстансами. Каждый инстанс обрабатывает свои партиции и хранит соответствующее подмножество state store.
  2. Встроенный HTTP-сервер (Javalin, Spring Boot, Micronaut) на каждом инстансе.
  3. Клиент делает запрос к любому инстансу. Если нужный ключ находится на другом инстансе — текущий инстанс делает RPC-запрос к нужному и возвращает результат.
// Обнаружить, на каком инстансе находится ключ
StreamsMetadata metadata = streams.queryMetadataForKey(
    "order-counts-store",
    "customer-123",
    Serdes.String().serializer()
);

// Если ключ на текущем инстансе — прямой запрос
if (metadata.hostInfo().equals(thisHostInfo)) {
    ReadOnlyKeyValueStore<String, Long> store = streams.store(
        StoreQueryParameters.fromNameAndType(
            "order-counts-store",
            QueryableStoreTypes.keyValueStore()
        )
    );
    Long count = store.get("customer-123");
} else {
    // RPC к инстансу metadata.hostInfo()
    Long count = httpClient.get(
        metadata.hostInfo().host(),
        metadata.hostInfo().port(),
        "/state/order-counts/customer-123"
    );
}

Типы запросов к store:

  • store.get(key) — точечный запрос по ключу
  • store.range(from, to) — диапазонный запрос
  • store.all() — полный обход store (осторожно: может быть медленно для больших state)
  • windowStore.fetch(key, from, to) — для WindowStore: записи в заданном диапазоне времени
Interactive Queries: multi-instance routing

Клиент

Клиент запрашивает count для customer-123. Запрос направлен на инстанс 1.

Инстанс 1 (partition 0-3)

Инстанс 1: проверяет StreamsMetadata. customer-123 партиционируется в partition 5, которая назначена инстансу 3.

Инстанс 3 (partition 8-11)

Инстанс 3: ключ customer-123 находится здесь. Локальный store.get(customer-123) = 42. Возвращает результат инстансу 1.

Ответ: 42

Клиент получает ответ: {'customer-123': 42}. Весь routing прозрачен для клиента.

Standby Replicas: детали настройки

Standby replicas обеспечивают быстрое восстановление при сбое. Важные детали:

# Количество горячих копий для каждого state store
num.standby.replicas=1

# При num.standby.replicas=1 и 3 инстансах:
# - Инстанс A: активный для partition 0-3, standby для 8-11
# - Инстанс B: активный для partition 4-7, standby для 0-3
# - Инстанс C: активный для partition 8-11, standby для 4-7

Время восстановления с standby vs без:

СценарийБез standbyС standby (num.standby.replicas=1)
State 100 МБ~10 секунд~1 секунда
State 10 ГБ~10-50 минут~1-5 секунд
State 100 ГБчасы~10-30 секунд

Рекомендация для production: num.standby.replicas=1 для любого приложения с state store объёмом > 1 ГБ.


EOS-v2: Exactly-Once Semantics

По умолчанию Kafka Streams работает в режиме at-least-once: при сбое запись может быть обработана дважды. Это приемлемо для идемпотентных операций (логи, метрики), но недопустимо для финансовых транзакций.

# В StreamsConfig
processing.guarantee=exactly_once_v2

Что гарантирует EOS-v2:

Атомарная единица commit включает:

  1. Чтение записей из input-топика (consume offset commit)
  2. Запись результатов в output-топик (produce)
  3. Обновление state store + changelog
  4. Всё это как одна Kafka-транзакция

Если что-то падает между шагами — транзакция откатывается. Запись будет обработана снова, но результат не будет опубликован дважды.

EOS-v2 vs EOS-v1:

  • EOS-v1 (exactly_once): одна транзакция на поток. Много координации между потоками.
  • EOS-v2 (exactly_once_v2, Kafka 2.6+): одна транзакция на задачу (task). Меньше координации, выше пропускная способность.

Требования для EOS-v2:

  • Kafka брокеры версии >= 2.5
  • replication.factor >= 3 для internal topics (changelog, repartition)
  • min.insync.replicas >= 2

Overhead: ~5-10% снижение пропускной способности по сравнению с at-least-once. Принимайте это как цену корректности.

NOTE

EOS-v2 (exactly_once_v2) — это транзакционные гарантии внутри Kafka. Если ваша топология пишет в внешние системы (база данных, REST API) — EOS-v2 не распространяется на эти записи. Для end-to-end exactly-once включая внешние системы нужна идемпотентность на стороне приёмника.


Processor API: низкоуровневый контроль

DSL Kafka Streams покрывает 95% случаев. Для оставшихся 5% — нестандартная логика, которую нельзя выразить операторами DSL — существует Processor API.

// Реализация кастомного процессора
public class OrderEnricher implements Processor<String, Order, String, EnrichedOrder> {
    private ProcessorContext<String, EnrichedOrder> context;
    private KeyValueStore<String, CustomerProfile> profileStore;

    @Override
    public void init(ProcessorContext<String, EnrichedOrder> context) {
        this.context = context;
        this.profileStore = context.getStateStore("customer-profiles");

        // Планировщик: выполнять каждые 60 секунд по wall clock
        context.schedule(
            Duration.ofSeconds(60),
            PunctuationType.WALL_CLOCK_TIME,
            timestamp -> emitHeartbeat(timestamp)
        );
    }

    @Override
    public void process(Record<String, Order> record) {
        String customerId = record.value().getCustomerId();
        CustomerProfile profile = profileStore.get(customerId);

        EnrichedOrder enriched = new EnrichedOrder(record.value(), profile);

        // Условная пересылка: в разные топики в зависимости от логики
        if (enriched.isVip()) {
            context.forward(record.withValue(enriched), "vip-processor");
        } else {
            context.forward(record.withValue(enriched));
        }
    }

    @Override
    public void close() { /* освободить ресурсы */ }
}

Ключевые возможности Processor API:

  • context.forward() — передать запись следующему процессору
  • context.schedule() — планировщик с PunctuationType.STREAM_TIME (по event time) или WALL_CLOCK_TIME
  • context.getStateStore() — доступ к state store
  • Произвольные условия пересылки (conditional forwarding)

PunctuationType:

  • STREAM_TIME — продвигается с event time записей. Не вызывается, если нет входящих данных.
  • WALL_CLOCK_TIME — продвигается с реальным временем. Полезно для flush/heartbeat независимо от data flow.

Когда использовать Processor API:

  • Сложная логика ветвления с доступом к нескольким state stores
  • Кастомные таймеры (punctuate каждые N секунд)
  • Внешние lookups внутри processor (кэш)
  • Специальная логика forwarding (условная, с задержкой, в несколько дочерних процессоров)

Общая карта модуля

KStream / KTable / GlobalKTable


    DSL Operations
    ├── Stateless: filter, map, mapValues, flatMap, peek, branch, merge
    └── Stateful: groupByKey → count / reduce / aggregate
                  groupBy → (repartition) → count / reduce / aggregate


    State Stores
    ├── In-Memory (fast, volatile)
    └── RocksDB (default, persistent)
         + Changelog topic (fault tolerance)
         + Standby replicas (fast failover)


    Windowing
    ├── Tumbling (non-overlapping)
    ├── Hopping (overlapping)
    ├── Session (inactivity-based)
    └── Sliding (record-centric)
         + Grace period
         + Suppress (final result)


    Joins
    ├── Stream-Stream (windowed, co-partition)
    ├── Stream-Table (lookup, co-partition)
    ├── Stream-GlobalKTable (no co-partition)
    ├── Table-Table (reactive, co-partition)
    └── Table-Table FK (no co-partition)


    Advanced
    ├── Interactive Queries (queryable store via REST)
    ├── EOS-v2 (exactly_once_v2)
    └── Processor API (low-level)

Kafka Streams vs ksqlDB: когда что выбрать

КритерийKafka StreamsksqlDB
Знание Java/KotlinОбязательноНе нужно (SQL)
ДеплойВаше JVM приложениеОтдельный сервер
Гибкость логикиМаксимальная (любой Java код)SQL + UDFs
МониторингJMX метрики, customREST API + CLI
ТестированиеTopologyTestDriver (unit tests)Сложнее юнит-тестировать
Когда выбратьСложная бизнес-логика, интеграция с JVM экосистемойБыстрая разработка, SQL команда, прототипирование

Если вы предпочитаете SQL вместо Java — следующий модуль (ksqlDB) предлагает SQL-интерфейс поверх Kafka Streams. Все концепции, изученные в Модуле 07 (state stores, windowing, joins), применяются напрямую в ksqlDB — только с SQL-синтаксисом.


Фокус для сертификации CCDAK

Kafka Streams составляет около 30% веса экзамена CCDAK — самый большой блок. Приоритизируйте:

  1. Windowing (самый частый раздел): типы окон, grace period, suppress, WindowedSerde
  2. Joins: co-partitioning требования, какой join когда использовать, foreign key join
  3. State Stores: changelog topic, standby replicas, восстановление
  4. DSL Operations: что вызывает repartitioning (map, selectKey, groupBy)
  5. EOS-v2: конфигурация, что гарантирует, overhead
Проверка знанийKnowledge check
Спроектируйте архитектуру Kafka Streams приложения для следующей задачи: поток транзакций (12 партиций), обогащение данными о клиенте (таблица 50K записей), подсчёт транзакций в скользящем окне 1 час с обновлением каждые 10 минут, exposed через REST API. Требования: быстрое восстановление при сбое, финансовые транзакции (no duplicates). Назовите конкретные компоненты и конфигурации.
ОтветAnswer
Topology: (1) KStream из 'transactions' (12 партиций). (2) GlobalKTable из 'customers' (50K записей — небольшой, нет co-partitioning проблем, нет RAM ограничений). (3) stream.join(globalKTable, keyMapper, joiner) — Stream-GlobalKTable join для обогащения. (4) enriched.groupByKey().windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofHours(1), Duration.ofMinutes(2)).advanceBy(Duration.ofMinutes(10))).count(Materialized.as('tx-counts-store')) — hopping window 1 час с advance 10 минут. Конфигурации: processing.guarantee=exactly_once_v2 (финансовые транзакции, no duplicates). num.standby.replicas=1 (быстрое восстановление). state.dir=/data/kafka-streams. REST API: KafkaStreams.store(StoreQueryParameters.fromNameAndType('tx-counts-store', QueryableStoreTypes.windowStore())) для запросов по ключу и временному окну. StreamsMetadata для routing между инстансами.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Kafka Streams приложение с 3 инстансами и state store 'order-counts'. Клиент отправляет HTTP GET на инстанс 1, запрашивая count для customer_id='cust-777'. StreamsMetadata показывает, что 'cust-777' принадлежит инстансу 3. Какова архитектура обработки этого запроса?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6