DSL Operations
Kafka Streams DSL предоставляет богатый набор операций для построения потоковых топологий. Все операции делятся на два фундаментальных класса: stateless (не требуют состояния) и stateful (требуют state store). Понимание этого разделения — ключ к правильному дизайну топологии.
Stateless операции
Stateless операции обрабатывают каждую запись в изоляции — независимо от предыдущих или последующих записей. Нет state store, нет changelog-топика. Стоимость: O(1) на запись. Это самые эффективные операции в DSL.
filter и filterNot
Пропускают только записи, соответствующие предикату.
// Оставить только транзакции на сумму > 100
KStream<String, Transaction> highValue = transactions
.filter((key, value) -> value.getAmount() > 100);
// Отфильтровать удалённые записи
KStream<String, Order> activeOrders = orders
.filterNot((key, value) -> value.isDeleted());
filter не меняет ключ — не вызывает repartitioning.
map и mapValues
map трансформирует и ключ, и значение. mapValues трансформирует только значение.
// map: изменяет ключ с order_id на customer_id
KStream<String, Order> byCustomer = orders
.map((orderId, order) -> KeyValue.pair(order.getCustomerId(), order));
// mapValues: трансформирует только значение, ключ не меняется
KStream<String, OrderDTO> dtos = orders
.mapValues(order -> order.toDTO());
Предпочитайте mapValues() вместо map(), когда ключ не меняется. mapValues() не вызывает repartitioning — это значительная экономия сети и диска. Repartitioning = автоматически созданный внутренний топик, запись в него и чтение из него, что удваивает I/O для этой части пипелайна.
flatMap и flatMapValues
Один входной запись → ноль или несколько выходных записей.
// flatMap: одна транзакция → несколько записей по тегам
KStream<String, TaggedTransaction> tagged = transactions
.flatMap((txId, tx) -> tx.getTags().stream()
.map(tag -> KeyValue.pair(tag, new TaggedTransaction(tx, tag)))
.collect(Collectors.toList()));
// flatMapValues: split одного значения на части (ключ не меняется)
KStream<String, LineItem> lineItems = orders
.flatMapValues(order -> order.getLineItems());
flatMap может изменять ключ — вызывает repartitioning. flatMapValues — нет.
peek
Наблюдает за записями без изменения. Используется для логирования и отладки.
KStream<String, Order> observed = orders
.peek((key, value) -> log.info("Processing order: key={}, amount={}", key, value.getAmount()));
Не мутируйте состояние в peek — это нарушает семантику оператора и может привести к неопределённому поведению при переобработке.
selectKey
Заменяет ключ записи, вызывает repartitioning.
// Изменить ключ с order_id на region
KStream<String, Order> byRegion = orders
.selectKey((orderId, order) -> order.getRegion());
branch
Разделяет один поток на несколько суб-потоков по предикатам. В Kafka Streams 3.x+ API изменился:
Map<String, KStream<String, Order>> branches = orders
.split(Named.as("branch-"))
.branch((key, value) -> value.getRegion().equals("EU"),
Branched.as("eu"))
.branch((key, value) -> value.getRegion().equals("US"),
Branched.as("us"))
.defaultBranch(Branched.as("other"));
KStream<String, Order> euOrders = branches.get("branch-eu");
KStream<String, Order> usOrders = branches.get("branch-us");
merge
Объединяет два KStream в один. Порядок записей из двух потоков не гарантирован.
KStream<String, Event> allEvents = euEvents.merge(usEvents);
Stateful операции
Stateful операции требуют state store — хранилища состояния, которое ведёт учёт прошлых записей. Kafka Streams автоматически создаёт state store (RocksDB по умолчанию) и соответствующий changelog-топик для каждой stateful операции.
groupByKey и groupBy
Группировка — обязательный шаг перед любой агрегацией.
// groupByKey: группировать по существующему ключу (НЕ вызывает repartitioning)
KGroupedStream<String, Transaction> grouped = transactions.groupByKey();
// groupBy: группировать по новому ключу (ВЫЗЫВАЕТ repartitioning)
KGroupedStream<String, Transaction> byRegion = transactions
.groupBy((txId, tx) -> tx.getRegion(),
Grouped.with(Serdes.String(), transactionSerde));
count
Подсчёт записей по ключу. Возвращает KTable.
KTable<String, Long> transactionCounts = transactions
.groupByKey()
.count(Materialized.as("tx-counts-store"));
reduce
Объединяет значения одного типа per ключ. Возвращает KTable.
// Суммировать суммы транзакций по ключу
KTable<String, Double> totalByKey = transactions
.groupByKey()
.reduce((accumulated, newValue) ->
accumulated + newValue.getAmount());
aggregate
Наиболее гибкая агрегация — аккумулятор может иметь отличный тип от входного значения.
// Считать статистику по каждому ключу
KTable<String, TransactionStats> stats = transactions
.groupByKey()
.aggregate(
TransactionStats::new, // initializer: создать пустой аккумулятор
(key, tx, acc) -> acc.update(tx), // adder: обновить аккумулятор
Materialized.<String, TransactionStats, KeyValueStore<Bytes, byte[]>>as("stats-store")
.withValueSerde(statsSerde)
);
Repartitioning: когда и почему
Repartitioning происходит, когда операция изменяет ключ записи. Kafka Streams автоматически вставляет внутренний repartition-топик в топологию.
orders (12 партиций)
Исходный топик 'orders'. 12 партиций. Ключ = order_id.map() — новый ключ
map(): изменяет ключ с order_id на customer_id. Это ключевая операция! Она помечает поток как 'нуждающийся в repartitioning'.repartition topic
Автоматически созданный repartition-топик: {app-id}-KSTREAM-MAP-0000000001-repartition. Kafka Streams пишет ТУДА, затем читает ОТТУДА. Дополнительный network I/O + disk I/O.groupByKey().count()
groupByKey().count() — статeful операция. Теперь записи с одним customer_id гарантированно попадают в один task, так как ключ был правильно перераспределён через repartition-топик.KTable (counts)
KTable с результатами count по каждому customer_id. State store хранит агрегаты локально.Операции, вызывающие repartitioning:
| Операция | Repartitioning | Причина |
|---|---|---|
map() | Да | Изменяет ключ |
flatMap() | Да | Может изменять ключ |
selectKey() | Да | Явная замена ключа |
groupBy() | Да | Новый ключ группировки |
mapValues() | Нет | Ключ не меняется |
flatMapValues() | Нет | Ключ не меняется |
filter() | Нет | Ключ не меняется |
groupByKey() | Нет | Использует существующий ключ |
Terminal операции
Терминальные операции — конечные точки топологии, после них DSL-цепочка не продолжается.
// to(): записать в выходной топик (sink processor)
stream.to("output-topic", Produced.with(Serdes.String(), orderSerde));
// through(): записать во временный топик и вернуть новый KStream
// (устарело в пользу repartition() в новых версиях)
KStream<String, Order> repartitioned = stream.through("intermediate-topic");
// forEach(): side-effect, без записи в Kafka. Нельзя использовать для stateful логики.
stream.forEach((key, value) -> externalSystem.send(key, value));
Полный пример: многоступенчатый пипелайн
StreamsBuilder builder = new StreamsBuilder();
// Чтение из топика
KStream<String, Transaction> raw = builder.stream(
"raw-transactions",
Consumed.with(Serdes.String(), txSerde)
);
// Пипелайн: фильтр -> трансформация -> группировка -> агрегация -> запись
KTable<String, Long> dailyCounts = raw
.filter((txId, tx) -> tx.getStatus().equals("COMPLETED")) // stateless
.mapValues(tx -> tx.normalize()) // stateless, no repartition
.groupBy( // repartitioning по merchant_id
(txId, tx) -> tx.getMerchantId(),
Grouped.with(Serdes.String(), normalizedTxSerde)
)
.count(Materialized.as("merchant-daily-counts")); // stateful
// Записать результаты в топик
dailyCounts
.toStream()
.to("merchant-counts", Produced.with(Serdes.String(), Serdes.Long()));
Паттерн Branch-and-Merge
Разделить поток по условию, обработать каждую ветку по-своему, объединить результаты:
Map<String, KStream<String, Order>> branches = orders
.split(Named.as("type-"))
.branch((key, order) -> order.isExpress(), Branched.as("express"))
.defaultBranch(Branched.as("standard"));
KStream<String, ProcessedOrder> expressProcessed =
branches.get("type-express").mapValues(order -> processExpress(order));
KStream<String, ProcessedOrder> standardProcessed =
branches.get("type-standard").mapValues(order -> processStandard(order));
// Объединить результаты
expressProcessed.merge(standardProcessed)
.to("processed-orders");