Learning Platform
Глоссарий Troubleshooting
Урок 08.05 · 30 мин
Продвинутый
JoinStream-Stream JoinStream-Table JoinKTable-KTable JoinGlobalKTable JoinCo-partitioningJoinWindowsForeign Key Join

Joins

Joins в Kafka Streams — один из самых сложных топиков курса. В отличие от SQL joins, которые работают со статичными снапшотами, потоковые joins работают с бесконечными, асинхронно обновляемыми данными. Это требует понимания co-partitioning, временных окон и семантики каждого типа join.


Co-partitioning: фундаментальное требование

Co-partitioning — ключевое условие для большинства joins в Kafka Streams. Co-partitioned топики должны иметь:

  1. Одинаковое число партиций — topics A (6 партиций) и B (3 партиции) НЕ co-partitioned
  2. Одинаковую стратегию партиционирования — одинаковые ключи должны попадать в одинаковые партиции на обоих топиках
  3. Одинаковый тип ключа — оба топика должны использовать совместимые ключи и SerDes

Почему это необходимо: Kafka Streams назначает партиции N из обоих топиков одному task. Если orders-partition-0 обрабатывается task 0, то customers-partition-0 тоже должна обрабатываться task 0. Это гарантирует, что записи с одним ключом customer_id из обоих топиков встретятся в одном task.

Co-partitioning: корректная и некорректная конфигурация

orders (6 партиций)

Топик 'orders': 6 партиций, ключ = customer_id. Запись customer_id='C-1' → partition 2 (hash('C-1') % 6).

join()

KStream-KTable join. Требует co-partitioning: одинаковое число партиций И одинаковая стратегия партиционирования по customer_id.

customers (3 партиции) — ОШИБКА

Топик 'customers': 3 партиции, ключ = customer_id. Запись customer_id='C-1' → partition 2 (hash('C-1') % 3). НО это другой номер партиции! 6 vs 3 партиций нарушает co-partitioning.
TopologyException при запуске: Different number of partitions
WARNING

Co-partitioning — самое частое место ошибок при join. Если топики ‘orders’ (6 партиций) и ‘customers’ (3 партиции) — join НЕ РАБОТАЕТ. Kafka Streams выбрасывает TopologyException при построении топологии. Исправление: пересоздать один из топиков с правильным числом партиций, или использовать repartition через selectKey().repartition().


KStream-KStream Join (оконный)

Оба потока содержат события. Join находит пары событий из двух потоков, поступившие в пределах заданного временного окна.

// Соединить заказы и платежи в пределах 5 минут
KStream<String, EnrichedOrder> joined = orders.join(
    payments,
    (order, payment) -> new EnrichedOrder(order, payment),
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
    StreamJoined.with(Serdes.String(), orderSerde, paymentSerde)
);

Варианты:

  • join() — inner join: запись появляется только если нашлась пара в обоих потоках
  • leftJoin() — left outer: запись из левого потока без пары всё равно эмитируется (с null для правой стороны)
  • outerJoin() — full outer: записи из обоих потоков без пар эмитируются отдельно

Как работает под капотом: Kafka Streams буферизует записи из обоих потоков в WindowStore на период timeDifference. Запись из orders в момент T1 остаётся в буфере до T1 + timeDifference. Если запись из payments с тем же ключом приходит в этом окне — join выполняется.

Требования: Co-partitioning обязателен. JoinWindows обязателен (нет non-windowed stream-stream join).

Использование: корреляция заказов и платежей, сопоставление запросов и ответов, fraud detection (клик + транзакция в течение минуты).


KStream-KTable Join (обогащение потока)

Поток событий обогащается текущим значением из changelog-таблицы. Нет временного окна — используется последнее актуальное значение из KTable в момент прихода записи в KStream.

// Обогатить каждый заказ текущим профилем клиента
KStream<String, EnrichedOrder> enriched = orders.join(
    customerProfiles,
    (order, profile) -> new EnrichedOrder(order, profile.getName(), profile.getTier())
);

Семантика: записи из KTable могут меняться асинхронно. Если профиль клиента обновился, все последующие записи из orders для этого клиента получат новый профиль. Предыдущие записи уже обработаны и не пересчитываются.

Варианты: join() (inner — если KTable не содержит ключ, запись из KStream игнорируется), leftJoin() (left outer — если KTable не содержит ключ, проходит с null значением таблицы).

Требования: Co-partitioning обязателен. Нет временного окна.

Использование: обогащение транзакций профилем пользователя, добавление метаданных к событиям, применение конфигурации к потоку.


KStream-GlobalKTable Join

Аналогично KStream-KTable, но без требования co-partitioning. GlobalKTable реплицирована на все инстансы, поэтому любая запись из KStream может быть обогащена без привязки к партиции.

// Обогатить заказы данными о стране (GlobalKTable из топика с 1 партицией)
KStream<String, EnrichedOrder> enrichedWithCountry = orders.join(
    countryGlobalTable,
    (orderId, order) -> order.getCountryCode(),  // key mapper: извлечь ключ для поиска в GlobalKTable
    (order, country) -> new EnrichedOrder(order, country)
);

Отличие от KStream-KTable: join с GlobalKTable принимает keyMapper — функцию, которая извлекает ключ для поиска в GlobalKTable из записи KStream. Это позволяет делать join по любому полю записи, не только по ключу потока.

Trade-off: Весь GlobalKTable загружен в память каждого инстанса. Для небольших справочников (страны, категории, конфигурации) это приемлемо. Для больших таблиц используйте KTable.


KTable-KTable Join (join материализованных таблиц)

Обе стороны — changelog-таблицы. Результат — новая KTable, отражающая текущее join состояние.

// Join таблицы профилей с таблицей настроек
KTable<String, UserView> userView = profiles.join(
    preferences,
    (profile, prefs) -> new UserView(profile, prefs)
);

Семантика: когда обновляется значение в любой из входных KTable, соответствующая запись в результирующей KTable тоже обновляется. Это непрерывный, реактивный join.

Варианты: join(), leftJoin(), outerJoin().

Требования: Co-partitioning обязателен.


Foreign Key Join (KTable-KTable, Kafka 2.4+)

Революционное дополнение: join двух KTable не по первичному ключу, а по любому полю — без co-partitioning.

// orders keyed by order_id, customers keyed by customer_id
// Join orders с customers по полю order.customer_id
KTable<String, EnrichedOrder> enriched = orders.join(
    customers,
    order -> order.getCustomerId(),  // foreign key extractor
    (order, customer) -> new EnrichedOrder(order, customer)
);

Как работает внутри: Kafka Streams создаёт внутренний subscription-механизм. При обновлении orders для key order-1customer_id="cust-42"):

  1. Внутренний repartition-шаг перераспределяет запись по ключу customer-42
  2. Выполняется lookup в customers KTable по ключу customer-42
  3. Результирующая запись эмитируется в выходную KTable

Преимущество: не нужно co-partitioning по customer_id. Kafka Streams обрабатывает repartitioning внутренне.

TIP

Foreign key join (Kafka 2.4+) — единственный join, который не требует co-partitioning по ключу join. Kafka Streams автоматически обрабатывает repartitioning через внутренние подписки. Используйте для join по любому полю, не только по primary key — это устраняет необходимость менять схему партиционирования входных топиков.


Сводная таблица Join типов

Четыре типа Join: входы, ограничения, выходы

KStream A

KStream: поток событий INSERT semantics. Каждая запись — независимое событие.

KStream B

KStream: второй поток событий. Для stream-stream join оба должны быть co-partitioned.

Stream-Stream (оконный)

Stream-Stream join: WINDOWED. Co-partitioned обязателен. JoinWindows обязателен. Результат: KStream.

KStream

Результат: KStream с обогащёнными записями из обоих потоков в пределах временного окна.

KStream

KStream: поток событий.

KTable (co-partitioned)

KTable: таблица изменений. Co-partitioned с KStream. Хранит текущее значение per key.

Stream-Table (enrichment)

Stream-Table join: НЕ windowed. Использует текущее значение KTable. Co-partitioned обязателен. Варианты: join, leftJoin.

KStream

Результат: KStream с обогащёнными записями — каждое событие потока дополнено текущим значением из таблицы.

KStream (любая партиция)

KStream: поток событий. Любое число партиций.

GlobalKTable (нет ограничений)

GlobalKTable: полная копия на каждом инстансе. Co-partitioning НЕ требуется. Нет ограничений на партиции.

Stream-GlobalKTable

Stream-GlobalKTable join: НЕ windowed. Принимает keyMapper для извлечения ключа поиска. Нет co-partitioning ограничений.

KStream

Результат: KStream. Каждая запись потока обогащена данными из GlobalKTable по извлечённому ключу.
JoinТип левойТип правойCo-partitionОконныйРезультат
KStream-KStreamKStreamKStreamДаДа (JoinWindows)KStream
KStream-KTableKStreamKTableДаНетKStream
KStream-GlobalKTableKStreamGlobalKTableНетНетKStream
KTable-KTableKTableKTableДаНетKTable
KTable-KTable (FK)KTableKTableНетНетKTable

Исправление Co-partitioning нарушений

Два способа восстановить co-partitioning:

Способ 1: Пересоздать топик с правильным числом партиций. Наиболее чистый, но требует миграции данных.

Способ 2: Repartition через DSL.

// Если customers имеет 3 партиции, а orders — 6
// Repartition customers до 6 партиций
KStream<String, Customer> repartitionedCustomers = builder
    .stream("customers")
    .selectKey((k, v) -> v.getCustomerId())
    .repartition(Repartitioned.with(Serdes.String(), customerSerde)
        .withNumberOfPartitions(6));

KTable<String, Customer> customersTable = repartitionedCustomers
    .toTable(Materialized.as("customers-store"));
Проверка знанийKnowledge check
У вас KStream заказов (keyed by order_id, 12 партиций) и KTable клиентов (keyed by customer_id, 6 партиций). Вы хотите обогатить каждый заказ именем клиента через customer_id из записи заказа. Какой тип join использовать, и какие проблемы нужно решить? Предложите конкретное решение.
ОтветAnswer
Проблема 1: ключи разные. orders keyed by order_id, customers keyed by customer_id. Для KStream-KTable join нужно, чтобы оба топика были keyed by одному ключу (customer_id). Проблема 2: число партиций разное (12 vs 6) — нарушение co-partitioning. Решение A (Foreign Key Join): если использовать KTable-KTable, можно применить FK join: orders.toTable().join(customers, order -> order.getCustomerId(), joiner). Kafka Streams обрабатывает repartitioning внутренне. Нет co-partitioning проблем. Решение B (KStream-KTable): изменить ключ потока заказов через selectKey((k,v) -> v.getCustomerId()), что вызывает repartitioning до 6 партиций. Затем стандартный KStream-KTable join. Но: orders необходимо перепартиционировать (repartition topic создаётся автоматически). Решение C (GlobalKTable): если 'customers' небольшой — загрузить как GlobalKTable. Нет co-partitioning ограничений, join через keyMapper = order -> order.getCustomerId().

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что такое co-partitioning в контексте Kafka Streams joins, и какие три условия должны быть выполнены?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6