Learning Platform
Глоссарий Troubleshooting
Урок 08.02 · 30 мин
Продвинутый
DSLmapfilterflatMapgroupByaggregatereducecountpeekbranchmergeStatelessStateful

DSL Operations

Kafka Streams DSL предоставляет богатый набор операций для построения потоковых топологий. Все операции делятся на два фундаментальных класса: stateless (не требуют состояния) и stateful (требуют state store). Понимание этого разделения — ключ к правильному дизайну топологии.


Stateless операции

Stateless операции обрабатывают каждую запись в изоляции — независимо от предыдущих или последующих записей. Нет state store, нет changelog-топика. Стоимость: O(1) на запись. Это самые эффективные операции в DSL.

filter и filterNot

Пропускают только записи, соответствующие предикату.

// Оставить только транзакции на сумму > 100
KStream<String, Transaction> highValue = transactions
    .filter((key, value) -> value.getAmount() > 100);

// Отфильтровать удалённые записи
KStream<String, Order> activeOrders = orders
    .filterNot((key, value) -> value.isDeleted());

filter не меняет ключ — не вызывает repartitioning.

map и mapValues

map трансформирует и ключ, и значение. mapValues трансформирует только значение.

// map: изменяет ключ с order_id на customer_id
KStream<String, Order> byCustomer = orders
    .map((orderId, order) -> KeyValue.pair(order.getCustomerId(), order));

// mapValues: трансформирует только значение, ключ не меняется
KStream<String, OrderDTO> dtos = orders
    .mapValues(order -> order.toDTO());
TIP

Предпочитайте mapValues() вместо map(), когда ключ не меняется. mapValues() не вызывает repartitioning — это значительная экономия сети и диска. Repartitioning = автоматически созданный внутренний топик, запись в него и чтение из него, что удваивает I/O для этой части пипелайна.

flatMap и flatMapValues

Один входной запись → ноль или несколько выходных записей.

// flatMap: одна транзакция → несколько записей по тегам
KStream<String, TaggedTransaction> tagged = transactions
    .flatMap((txId, tx) -> tx.getTags().stream()
        .map(tag -> KeyValue.pair(tag, new TaggedTransaction(tx, tag)))
        .collect(Collectors.toList()));

// flatMapValues: split одного значения на части (ключ не меняется)
KStream<String, LineItem> lineItems = orders
    .flatMapValues(order -> order.getLineItems());

flatMap может изменять ключ — вызывает repartitioning. flatMapValues — нет.

peek

Наблюдает за записями без изменения. Используется для логирования и отладки.

KStream<String, Order> observed = orders
    .peek((key, value) -> log.info("Processing order: key={}, amount={}", key, value.getAmount()));

Не мутируйте состояние в peek — это нарушает семантику оператора и может привести к неопределённому поведению при переобработке.

selectKey

Заменяет ключ записи, вызывает repartitioning.

// Изменить ключ с order_id на region
KStream<String, Order> byRegion = orders
    .selectKey((orderId, order) -> order.getRegion());

branch

Разделяет один поток на несколько суб-потоков по предикатам. В Kafka Streams 3.x+ API изменился:

Map<String, KStream<String, Order>> branches = orders
    .split(Named.as("branch-"))
    .branch((key, value) -> value.getRegion().equals("EU"),
        Branched.as("eu"))
    .branch((key, value) -> value.getRegion().equals("US"),
        Branched.as("us"))
    .defaultBranch(Branched.as("other"));

KStream<String, Order> euOrders = branches.get("branch-eu");
KStream<String, Order> usOrders = branches.get("branch-us");

merge

Объединяет два KStream в один. Порядок записей из двух потоков не гарантирован.

KStream<String, Event> allEvents = euEvents.merge(usEvents);

Stateful операции

Stateful операции требуют state store — хранилища состояния, которое ведёт учёт прошлых записей. Kafka Streams автоматически создаёт state store (RocksDB по умолчанию) и соответствующий changelog-топик для каждой stateful операции.

groupByKey и groupBy

Группировка — обязательный шаг перед любой агрегацией.

// groupByKey: группировать по существующему ключу (НЕ вызывает repartitioning)
KGroupedStream<String, Transaction> grouped = transactions.groupByKey();

// groupBy: группировать по новому ключу (ВЫЗЫВАЕТ repartitioning)
KGroupedStream<String, Transaction> byRegion = transactions
    .groupBy((txId, tx) -> tx.getRegion(),
        Grouped.with(Serdes.String(), transactionSerde));

count

Подсчёт записей по ключу. Возвращает KTable.

KTable<String, Long> transactionCounts = transactions
    .groupByKey()
    .count(Materialized.as("tx-counts-store"));

reduce

Объединяет значения одного типа per ключ. Возвращает KTable.

// Суммировать суммы транзакций по ключу
KTable<String, Double> totalByKey = transactions
    .groupByKey()
    .reduce((accumulated, newValue) ->
        accumulated + newValue.getAmount());

aggregate

Наиболее гибкая агрегация — аккумулятор может иметь отличный тип от входного значения.

// Считать статистику по каждому ключу
KTable<String, TransactionStats> stats = transactions
    .groupByKey()
    .aggregate(
        TransactionStats::new,  // initializer: создать пустой аккумулятор
        (key, tx, acc) -> acc.update(tx),  // adder: обновить аккумулятор
        Materialized.<String, TransactionStats, KeyValueStore<Bytes, byte[]>>as("stats-store")
            .withValueSerde(statsSerde)
    );

Repartitioning: когда и почему

Repartitioning происходит, когда операция изменяет ключ записи. Kafka Streams автоматически вставляет внутренний repartition-топик в топологию.

Repartitioning: автоматическая вставка внутреннего топика

orders (12 партиций)

Исходный топик 'orders'. 12 партиций. Ключ = order_id.

map() — новый ключ

map(): изменяет ключ с order_id на customer_id. Это ключевая операция! Она помечает поток как 'нуждающийся в repartitioning'.

repartition topic

Автоматически созданный repartition-топик: {app-id}-KSTREAM-MAP-0000000001-repartition. Kafka Streams пишет ТУДА, затем читает ОТТУДА. Дополнительный network I/O + disk I/O.

groupByKey().count()

groupByKey().count() — статeful операция. Теперь записи с одним customer_id гарантированно попадают в один task, так как ключ был правильно перераспределён через repartition-топик.

KTable (counts)

KTable с результатами count по каждому customer_id. State store хранит агрегаты локально.
Без repartitioning: записи с одним ключом могут попасть в разные tasks — агрегат будет некорректным

Операции, вызывающие repartitioning:

ОперацияRepartitioningПричина
map()ДаИзменяет ключ
flatMap()ДаМожет изменять ключ
selectKey()ДаЯвная замена ключа
groupBy()ДаНовый ключ группировки
mapValues()НетКлюч не меняется
flatMapValues()НетКлюч не меняется
filter()НетКлюч не меняется
groupByKey()НетИспользует существующий ключ

Terminal операции

Терминальные операции — конечные точки топологии, после них DSL-цепочка не продолжается.

// to(): записать в выходной топик (sink processor)
stream.to("output-topic", Produced.with(Serdes.String(), orderSerde));

// through(): записать во временный топик и вернуть новый KStream
// (устарело в пользу repartition() в новых версиях)
KStream<String, Order> repartitioned = stream.through("intermediate-topic");

// forEach(): side-effect, без записи в Kafka. Нельзя использовать для stateful логики.
stream.forEach((key, value) -> externalSystem.send(key, value));

Полный пример: многоступенчатый пипелайн

StreamsBuilder builder = new StreamsBuilder();

// Чтение из топика
KStream<String, Transaction> raw = builder.stream(
    "raw-transactions",
    Consumed.with(Serdes.String(), txSerde)
);

// Пипелайн: фильтр -> трансформация -> группировка -> агрегация -> запись
KTable<String, Long> dailyCounts = raw
    .filter((txId, tx) -> tx.getStatus().equals("COMPLETED"))  // stateless
    .mapValues(tx -> tx.normalize())                           // stateless, no repartition
    .groupBy(                                                  // repartitioning по merchant_id
        (txId, tx) -> tx.getMerchantId(),
        Grouped.with(Serdes.String(), normalizedTxSerde)
    )
    .count(Materialized.as("merchant-daily-counts"));          // stateful

// Записать результаты в топик
dailyCounts
    .toStream()
    .to("merchant-counts", Produced.with(Serdes.String(), Serdes.Long()));

Паттерн Branch-and-Merge

Разделить поток по условию, обработать каждую ветку по-своему, объединить результаты:

Map<String, KStream<String, Order>> branches = orders
    .split(Named.as("type-"))
    .branch((key, order) -> order.isExpress(), Branched.as("express"))
    .defaultBranch(Branched.as("standard"));

KStream<String, ProcessedOrder> expressProcessed =
    branches.get("type-express").mapValues(order -> processExpress(order));

KStream<String, ProcessedOrder> standardProcessed =
    branches.get("type-standard").mapValues(order -> processStandard(order));

// Объединить результаты
expressProcessed.merge(standardProcessed)
    .to("processed-orders");
Проверка знанийKnowledge check
Назовите все операции из следующего списка, которые вызывают repartitioning, и объясните почему: map(), mapValues(), filter(), selectKey(), groupByKey(), groupBy(), flatMap(), flatMapValues(). Также объясните, почему repartitioning — это дорогостоящая операция с точки зрения производительности.
ОтветAnswer
Repartitioning вызывают: map() (меняет ключ), selectKey() (явно меняет ключ), groupBy() (группирует по новому ключу), flatMap() (может менять ключ). НЕ вызывают: mapValues() (ключ неизменён), filter() (ключ неизменён), groupByKey() (использует существующий ключ), flatMapValues() (ключ неизменён). Repartitioning дорого потому что: Kafka Streams автоматически создаёт внутренний топик (repartition topic), записывает все записи в него (disk I/O на брокерах + network I/O), затем читает их обратно (снова network I/O). Это удваивает нагрузку на сеть и диск для всего потока данных, проходящего через эту операцию.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Топология Kafka Streams: stream.map((k,v) -> KeyValue.pair(v.getRegion(), v)).groupByKey().count(). Разработчик замечает некорректные счётчики — некоторые регионы недосчитываются. В чём причина?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6