Итоги модуля: Kafka Streams
Этот урок завершает Модуль 07. Здесь собраны дополнительные продвинутые темы, которые не уместились в предыдущих уроках, и систематизированы все ключевые концепции для подготовки к CCDAK.
Interactive Queries: Kafka Streams как queryable сервис
Kafka Streams — не только вычислительный движок. Приложение с materialized state store можно превратить в queryable микросервис: клиент запрашивает текущее состояние агрегации напрямую, без чтения output-топика.
Архитектура Interactive Queries:
- Kafka Streams приложение с 3 инстансами. Каждый инстанс обрабатывает свои партиции и хранит соответствующее подмножество state store.
- Встроенный HTTP-сервер (Javalin, Spring Boot, Micronaut) на каждом инстансе.
- Клиент делает запрос к любому инстансу. Если нужный ключ находится на другом инстансе — текущий инстанс делает RPC-запрос к нужному и возвращает результат.
// Обнаружить, на каком инстансе находится ключ
StreamsMetadata metadata = streams.queryMetadataForKey(
"order-counts-store",
"customer-123",
Serdes.String().serializer()
);
// Если ключ на текущем инстансе — прямой запрос
if (metadata.hostInfo().equals(thisHostInfo)) {
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(
"order-counts-store",
QueryableStoreTypes.keyValueStore()
)
);
Long count = store.get("customer-123");
} else {
// RPC к инстансу metadata.hostInfo()
Long count = httpClient.get(
metadata.hostInfo().host(),
metadata.hostInfo().port(),
"/state/order-counts/customer-123"
);
}
Типы запросов к store:
store.get(key)— точечный запрос по ключуstore.range(from, to)— диапазонный запросstore.all()— полный обход store (осторожно: может быть медленно для больших state)windowStore.fetch(key, from, to)— для WindowStore: записи в заданном диапазоне времени
Клиент
Клиент запрашивает count для customer-123. Запрос направлен на инстанс 1.Инстанс 1 (partition 0-3)
Инстанс 1: проверяет StreamsMetadata. customer-123 партиционируется в partition 5, которая назначена инстансу 3.Инстанс 3 (partition 8-11)
Инстанс 3: ключ customer-123 находится здесь. Локальный store.get(customer-123) = 42. Возвращает результат инстансу 1.Ответ: 42
Клиент получает ответ: {'customer-123': 42}. Весь routing прозрачен для клиента.Standby Replicas: детали настройки
Standby replicas обеспечивают быстрое восстановление при сбое. Важные детали:
# Количество горячих копий для каждого state store
num.standby.replicas=1
# При num.standby.replicas=1 и 3 инстансах:
# - Инстанс A: активный для partition 0-3, standby для 8-11
# - Инстанс B: активный для partition 4-7, standby для 0-3
# - Инстанс C: активный для partition 8-11, standby для 4-7
Время восстановления с standby vs без:
| Сценарий | Без standby | С standby (num.standby.replicas=1) |
|---|---|---|
| State 100 МБ | ~10 секунд | ~1 секунда |
| State 10 ГБ | ~10-50 минут | ~1-5 секунд |
| State 100 ГБ | часы | ~10-30 секунд |
Рекомендация для production: num.standby.replicas=1 для любого приложения с state store объёмом > 1 ГБ.
EOS-v2: Exactly-Once Semantics
По умолчанию Kafka Streams работает в режиме at-least-once: при сбое запись может быть обработана дважды. Это приемлемо для идемпотентных операций (логи, метрики), но недопустимо для финансовых транзакций.
# В StreamsConfig
processing.guarantee=exactly_once_v2
Что гарантирует EOS-v2:
Атомарная единица commit включает:
- Чтение записей из input-топика (consume offset commit)
- Запись результатов в output-топик (produce)
- Обновление state store + changelog
- Всё это как одна Kafka-транзакция
Если что-то падает между шагами — транзакция откатывается. Запись будет обработана снова, но результат не будет опубликован дважды.
EOS-v2 vs EOS-v1:
- EOS-v1 (
exactly_once): одна транзакция на поток. Много координации между потоками. - EOS-v2 (
exactly_once_v2, Kafka 2.6+): одна транзакция на задачу (task). Меньше координации, выше пропускная способность.
Требования для EOS-v2:
- Kafka брокеры версии >= 2.5
replication.factor >= 3для internal topics (changelog, repartition)min.insync.replicas >= 2
Overhead: ~5-10% снижение пропускной способности по сравнению с at-least-once. Принимайте это как цену корректности.
EOS-v2 (exactly_once_v2) — это транзакционные гарантии внутри Kafka. Если ваша топология пишет в внешние системы (база данных, REST API) — EOS-v2 не распространяется на эти записи. Для end-to-end exactly-once включая внешние системы нужна идемпотентность на стороне приёмника.
Processor API: низкоуровневый контроль
DSL Kafka Streams покрывает 95% случаев. Для оставшихся 5% — нестандартная логика, которую нельзя выразить операторами DSL — существует Processor API.
// Реализация кастомного процессора
public class OrderEnricher implements Processor<String, Order, String, EnrichedOrder> {
private ProcessorContext<String, EnrichedOrder> context;
private KeyValueStore<String, CustomerProfile> profileStore;
@Override
public void init(ProcessorContext<String, EnrichedOrder> context) {
this.context = context;
this.profileStore = context.getStateStore("customer-profiles");
// Планировщик: выполнять каждые 60 секунд по wall clock
context.schedule(
Duration.ofSeconds(60),
PunctuationType.WALL_CLOCK_TIME,
timestamp -> emitHeartbeat(timestamp)
);
}
@Override
public void process(Record<String, Order> record) {
String customerId = record.value().getCustomerId();
CustomerProfile profile = profileStore.get(customerId);
EnrichedOrder enriched = new EnrichedOrder(record.value(), profile);
// Условная пересылка: в разные топики в зависимости от логики
if (enriched.isVip()) {
context.forward(record.withValue(enriched), "vip-processor");
} else {
context.forward(record.withValue(enriched));
}
}
@Override
public void close() { /* освободить ресурсы */ }
}
Ключевые возможности Processor API:
context.forward()— передать запись следующему процессоруcontext.schedule()— планировщик сPunctuationType.STREAM_TIME(по event time) илиWALL_CLOCK_TIMEcontext.getStateStore()— доступ к state store- Произвольные условия пересылки (conditional forwarding)
PunctuationType:
STREAM_TIME— продвигается с event time записей. Не вызывается, если нет входящих данных.WALL_CLOCK_TIME— продвигается с реальным временем. Полезно для flush/heartbeat независимо от data flow.
Когда использовать Processor API:
- Сложная логика ветвления с доступом к нескольким state stores
- Кастомные таймеры (punctuate каждые N секунд)
- Внешние lookups внутри processor (кэш)
- Специальная логика forwarding (условная, с задержкой, в несколько дочерних процессоров)
Общая карта модуля
KStream / KTable / GlobalKTable
│
▼
DSL Operations
├── Stateless: filter, map, mapValues, flatMap, peek, branch, merge
└── Stateful: groupByKey → count / reduce / aggregate
groupBy → (repartition) → count / reduce / aggregate
│
▼
State Stores
├── In-Memory (fast, volatile)
└── RocksDB (default, persistent)
+ Changelog topic (fault tolerance)
+ Standby replicas (fast failover)
│
▼
Windowing
├── Tumbling (non-overlapping)
├── Hopping (overlapping)
├── Session (inactivity-based)
└── Sliding (record-centric)
+ Grace period
+ Suppress (final result)
│
▼
Joins
├── Stream-Stream (windowed, co-partition)
├── Stream-Table (lookup, co-partition)
├── Stream-GlobalKTable (no co-partition)
├── Table-Table (reactive, co-partition)
└── Table-Table FK (no co-partition)
│
▼
Advanced
├── Interactive Queries (queryable store via REST)
├── EOS-v2 (exactly_once_v2)
└── Processor API (low-level)
Kafka Streams vs ksqlDB: когда что выбрать
| Критерий | Kafka Streams | ksqlDB |
|---|---|---|
| Знание Java/Kotlin | Обязательно | Не нужно (SQL) |
| Деплой | Ваше JVM приложение | Отдельный сервер |
| Гибкость логики | Максимальная (любой Java код) | SQL + UDFs |
| Мониторинг | JMX метрики, custom | REST API + CLI |
| Тестирование | TopologyTestDriver (unit tests) | Сложнее юнит-тестировать |
| Когда выбрать | Сложная бизнес-логика, интеграция с JVM экосистемой | Быстрая разработка, SQL команда, прототипирование |
Если вы предпочитаете SQL вместо Java — следующий модуль (ksqlDB) предлагает SQL-интерфейс поверх Kafka Streams. Все концепции, изученные в Модуле 07 (state stores, windowing, joins), применяются напрямую в ksqlDB — только с SQL-синтаксисом.
Фокус для сертификации CCDAK
Kafka Streams составляет около 30% веса экзамена CCDAK — самый большой блок. Приоритизируйте:
- Windowing (самый частый раздел): типы окон, grace period, suppress, WindowedSerde
- Joins: co-partitioning требования, какой join когда использовать, foreign key join
- State Stores: changelog topic, standby replicas, восстановление
- DSL Operations: что вызывает repartitioning (map, selectKey, groupBy)
- EOS-v2: конфигурация, что гарантирует, overhead