Joins
Joins в Kafka Streams — один из самых сложных топиков курса. В отличие от SQL joins, которые работают со статичными снапшотами, потоковые joins работают с бесконечными, асинхронно обновляемыми данными. Это требует понимания co-partitioning, временных окон и семантики каждого типа join.
Co-partitioning: фундаментальное требование
Co-partitioning — ключевое условие для большинства joins в Kafka Streams. Co-partitioned топики должны иметь:
- Одинаковое число партиций — topics A (6 партиций) и B (3 партиции) НЕ co-partitioned
- Одинаковую стратегию партиционирования — одинаковые ключи должны попадать в одинаковые партиции на обоих топиках
- Одинаковый тип ключа — оба топика должны использовать совместимые ключи и SerDes
Почему это необходимо: Kafka Streams назначает партиции N из обоих топиков одному task. Если orders-partition-0 обрабатывается task 0, то customers-partition-0 тоже должна обрабатываться task 0. Это гарантирует, что записи с одним ключом customer_id из обоих топиков встретятся в одном task.
orders (6 партиций)
Топик 'orders': 6 партиций, ключ = customer_id. Запись customer_id='C-1' → partition 2 (hash('C-1') % 6).join()
KStream-KTable join. Требует co-partitioning: одинаковое число партиций И одинаковая стратегия партиционирования по customer_id.customers (3 партиции) — ОШИБКА
Топик 'customers': 3 партиции, ключ = customer_id. Запись customer_id='C-1' → partition 2 (hash('C-1') % 3). НО это другой номер партиции! 6 vs 3 партиций нарушает co-partitioning.Co-partitioning — самое частое место ошибок при join. Если топики ‘orders’ (6 партиций) и ‘customers’ (3 партиции) — join НЕ РАБОТАЕТ. Kafka Streams выбрасывает TopologyException при построении топологии. Исправление: пересоздать один из топиков с правильным числом партиций, или использовать repartition через selectKey().repartition().
KStream-KStream Join (оконный)
Оба потока содержат события. Join находит пары событий из двух потоков, поступившие в пределах заданного временного окна.
// Соединить заказы и платежи в пределах 5 минут
KStream<String, EnrichedOrder> joined = orders.join(
payments,
(order, payment) -> new EnrichedOrder(order, payment),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
StreamJoined.with(Serdes.String(), orderSerde, paymentSerde)
);
Варианты:
join()— inner join: запись появляется только если нашлась пара в обоих потокахleftJoin()— left outer: запись из левого потока без пары всё равно эмитируется (с null для правой стороны)outerJoin()— full outer: записи из обоих потоков без пар эмитируются отдельно
Как работает под капотом: Kafka Streams буферизует записи из обоих потоков в WindowStore на период timeDifference. Запись из orders в момент T1 остаётся в буфере до T1 + timeDifference. Если запись из payments с тем же ключом приходит в этом окне — join выполняется.
Требования: Co-partitioning обязателен. JoinWindows обязателен (нет non-windowed stream-stream join).
Использование: корреляция заказов и платежей, сопоставление запросов и ответов, fraud detection (клик + транзакция в течение минуты).
KStream-KTable Join (обогащение потока)
Поток событий обогащается текущим значением из changelog-таблицы. Нет временного окна — используется последнее актуальное значение из KTable в момент прихода записи в KStream.
// Обогатить каждый заказ текущим профилем клиента
KStream<String, EnrichedOrder> enriched = orders.join(
customerProfiles,
(order, profile) -> new EnrichedOrder(order, profile.getName(), profile.getTier())
);
Семантика: записи из KTable могут меняться асинхронно. Если профиль клиента обновился, все последующие записи из orders для этого клиента получат новый профиль. Предыдущие записи уже обработаны и не пересчитываются.
Варианты: join() (inner — если KTable не содержит ключ, запись из KStream игнорируется), leftJoin() (left outer — если KTable не содержит ключ, проходит с null значением таблицы).
Требования: Co-partitioning обязателен. Нет временного окна.
Использование: обогащение транзакций профилем пользователя, добавление метаданных к событиям, применение конфигурации к потоку.
KStream-GlobalKTable Join
Аналогично KStream-KTable, но без требования co-partitioning. GlobalKTable реплицирована на все инстансы, поэтому любая запись из KStream может быть обогащена без привязки к партиции.
// Обогатить заказы данными о стране (GlobalKTable из топика с 1 партицией)
KStream<String, EnrichedOrder> enrichedWithCountry = orders.join(
countryGlobalTable,
(orderId, order) -> order.getCountryCode(), // key mapper: извлечь ключ для поиска в GlobalKTable
(order, country) -> new EnrichedOrder(order, country)
);
Отличие от KStream-KTable: join с GlobalKTable принимает keyMapper — функцию, которая извлекает ключ для поиска в GlobalKTable из записи KStream. Это позволяет делать join по любому полю записи, не только по ключу потока.
Trade-off: Весь GlobalKTable загружен в память каждого инстанса. Для небольших справочников (страны, категории, конфигурации) это приемлемо. Для больших таблиц используйте KTable.
KTable-KTable Join (join материализованных таблиц)
Обе стороны — changelog-таблицы. Результат — новая KTable, отражающая текущее join состояние.
// Join таблицы профилей с таблицей настроек
KTable<String, UserView> userView = profiles.join(
preferences,
(profile, prefs) -> new UserView(profile, prefs)
);
Семантика: когда обновляется значение в любой из входных KTable, соответствующая запись в результирующей KTable тоже обновляется. Это непрерывный, реактивный join.
Варианты: join(), leftJoin(), outerJoin().
Требования: Co-partitioning обязателен.
Foreign Key Join (KTable-KTable, Kafka 2.4+)
Революционное дополнение: join двух KTable не по первичному ключу, а по любому полю — без co-partitioning.
// orders keyed by order_id, customers keyed by customer_id
// Join orders с customers по полю order.customer_id
KTable<String, EnrichedOrder> enriched = orders.join(
customers,
order -> order.getCustomerId(), // foreign key extractor
(order, customer) -> new EnrichedOrder(order, customer)
);
Как работает внутри: Kafka Streams создаёт внутренний subscription-механизм. При обновлении orders для key order-1 (с customer_id="cust-42"):
- Внутренний repartition-шаг перераспределяет запись по ключу
customer-42 - Выполняется lookup в
customersKTable по ключуcustomer-42 - Результирующая запись эмитируется в выходную KTable
Преимущество: не нужно co-partitioning по customer_id. Kafka Streams обрабатывает repartitioning внутренне.
Foreign key join (Kafka 2.4+) — единственный join, который не требует co-partitioning по ключу join. Kafka Streams автоматически обрабатывает repartitioning через внутренние подписки. Используйте для join по любому полю, не только по primary key — это устраняет необходимость менять схему партиционирования входных топиков.
Сводная таблица Join типов
KStream A
KStream: поток событий INSERT semantics. Каждая запись — независимое событие.KStream B
KStream: второй поток событий. Для stream-stream join оба должны быть co-partitioned.Stream-Stream (оконный)
Stream-Stream join: WINDOWED. Co-partitioned обязателен. JoinWindows обязателен. Результат: KStream.KStream
Результат: KStream с обогащёнными записями из обоих потоков в пределах временного окна.KStream
KStream: поток событий.KTable (co-partitioned)
KTable: таблица изменений. Co-partitioned с KStream. Хранит текущее значение per key.Stream-Table (enrichment)
Stream-Table join: НЕ windowed. Использует текущее значение KTable. Co-partitioned обязателен. Варианты: join, leftJoin.KStream
Результат: KStream с обогащёнными записями — каждое событие потока дополнено текущим значением из таблицы.KStream (любая партиция)
KStream: поток событий. Любое число партиций.GlobalKTable (нет ограничений)
GlobalKTable: полная копия на каждом инстансе. Co-partitioning НЕ требуется. Нет ограничений на партиции.Stream-GlobalKTable
Stream-GlobalKTable join: НЕ windowed. Принимает keyMapper для извлечения ключа поиска. Нет co-partitioning ограничений.KStream
Результат: KStream. Каждая запись потока обогащена данными из GlobalKTable по извлечённому ключу.| Join | Тип левой | Тип правой | Co-partition | Оконный | Результат |
|---|---|---|---|---|---|
| KStream-KStream | KStream | KStream | Да | Да (JoinWindows) | KStream |
| KStream-KTable | KStream | KTable | Да | Нет | KStream |
| KStream-GlobalKTable | KStream | GlobalKTable | Нет | Нет | KStream |
| KTable-KTable | KTable | KTable | Да | Нет | KTable |
| KTable-KTable (FK) | KTable | KTable | Нет | Нет | KTable |
Исправление Co-partitioning нарушений
Два способа восстановить co-partitioning:
Способ 1: Пересоздать топик с правильным числом партиций. Наиболее чистый, но требует миграции данных.
Способ 2: Repartition через DSL.
// Если customers имеет 3 партиции, а orders — 6
// Repartition customers до 6 партиций
KStream<String, Customer> repartitionedCustomers = builder
.stream("customers")
.selectKey((k, v) -> v.getCustomerId())
.repartition(Repartitioned.with(Serdes.String(), customerSerde)
.withNumberOfPartitions(6));
KTable<String, Customer> customersTable = repartitionedCustomers
.toTable(Materialized.as("customers-store"));