Learning Platform
Глоссарий Troubleshooting
Урок 13.04 · 25 мин
Средний
JoinTemporal JoinInterval JoinLookup JoinFOR SYSTEM_TIME AS OFVersioned Table

Temporal, interval и lookup joins

Join — самая опасная операция в streaming SQL. В batch-SQL вы привыкли, что JOIN просто работает: оптимизатор выбирает hash/merge, считает результат, всё хорошо. В streaming у JOIN четыре разных типа, у каждого свои семантика, требования и риски. Сегодня разберём все четыре, и почему regular join без всяких оговорок — обычно ошибка в production.


Regular join (cartesian risk)

SELECT * FROM A JOIN B ON A.key = B.key без temporal-предикатов — это regular join. Flink семантически точно такой же как в batch: каждая строка A с подходящей строкой B даёт пару.

SELECT
  o.order_id,
  o.amount,
  c.customer_name
FROM orders o
JOIN customers c
  ON o.customer_id = c.customer_id;

В чём подвох:

  • Чтобы регулярный join работал, Flink держит обе таблицы в state. Для каждой строки A надо знать все B с тем же ключом, и наоборот.
  • State растёт линейно от cardinality обеих таблиц. Без TTL — unbounded.
  • Изменения в B (новый customer, обновлённый профиль) вызывают retract+emit всех уже виденных матчей. На больших таблицах это лавинный пересчёт.
  • Outer join (LEFT JOIN) только усугубляет: нужно держать ещё и null-padded строки, и менять их статус при появлении match.
WARNING

Regular join в production без явного state TTL — почти всегда баг. На потоке 10K orders/s и 1M customers через месяц state будет содержать миллиарды записей, RocksDB не справится, job упадёт. Если действительно нужен generic join (новый customer должен соединиться с историческими orders) — настройте state TTL или используйте versioned table + temporal join (см. ниже).

Join internals: как Flink реализует state для разных типов join
Сравнение четырёх типов join
Regular JOINGeneric join без time-предикатов. State держит обе таблицы. Cartesian-risk: cardinality(A) x cardinality(B).
StateПолные таблицы A и B в state. Без TTL растёт неограниченно.
Interval JOINJoin с time-bounded предикатом. State держит только окно за интервал, потом cleanup. Underlying — KeyedCoProcessFunction с timers.
StateState ограничен размером временного окна. Cleanup автоматический по watermark.
Temporal JOINJoin с versioned table FOR SYSTEM_TIME AS OF. State держит historical versions B. Подходит для enrichment по 'актуальному на момент event' значению.
StateВерсионная история B в state. С TTL — ограничен старейшей живой версией.
Lookup JOINJoin с внешним key-value store через connector. State минимальный (только кеш). Каждый матч — round-trip в external store.
StateТолько LRU-кеш. Latency limited by external store.

Interval join

Interval join — это join с time-bounded предикатом. Между событиями A и B должно соблюдаться временное соотношение: B.event_time лежит в окне [A.event_time - lower, A.event_time + upper].

SELECT
  o.order_id,
  s.shipment_id,
  o.amount
FROM orders o, shipments s
WHERE
  o.order_id = s.order_id
  AND s.event_time BETWEEN o.event_time - INTERVAL '5' MINUTE
                       AND o.event_time + INTERVAL '1' HOUR;

Здесь мы говорим: shipment может произойти от 5 минут до часа после создания order. Flink держит в state только те orders и shipments, которые ещё могут совпасть — после watermark пересекает верхнюю границу, запись чистится.

Когда использовать:

  • События в логически связанных потоках, где между ними ожидаемое время задержки.
  • Логи запросов и ответов (request, response).
  • Транзакция и confirmation.
  • Click-through-rate (impression -> click within window).
TIP

Interval join требует WATERMARK на обоих источниках. Без watermark cleanup невозможен, state будет расти. Если один из потоков не имеет event-time — это сигнал, что вам нужен temporal или lookup join, а не interval.


Temporal join (FOR SYSTEM_TIME AS OF)

Temporal join — это join потока с versioned table (таблицей, у которой есть история версий по primary key). Семантика: для каждой строки потока вернуть строку versioned table, которая была актуальной на момент event-time потока.

Канонический пример — обогащение orders текущим курсом валюты:

-- Versioned table: курсы валют
CREATE TABLE rates (
  currency STRING,
  rate DECIMAL(10, 4),
  update_time TIMESTAMP_LTZ(3),
  WATERMARK FOR update_time AS update_time,
  PRIMARY KEY (currency) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'rates',
  'format' = 'debezium-json',
  ...
);

-- Stream: orders
SELECT
  o.order_id,
  o.amount,
  o.currency,
  r.rate,
  o.amount * r.rate AS amount_usd
FROM orders AS o
JOIN rates FOR SYSTEM_TIME AS OF o.event_time AS r
  ON o.currency = r.currency;

Что происходит:

  1. Order приходит в момент t = 10:30:00 с currency=EUR.
  2. Flink ищет в state rates запись currency=EUR, актуальную на 10:30:00 (последнее update_time <= 10:30:00).
  3. Возвращает join.

Это семантически правильное обогащение: если rate обновился в 10:35, order от 10:30 всё равно получит исторически правильный курс на момент совершения order.

Versioned table требует:

  • PRIMARY KEY (PRIMARY KEY (...) NOT ENFORCED).
  • WATERMARK для определения “версионности”.
  • Источник, поддерживающий update/delete (Debezium CDC, Upsert Kafka, Hudi/Iceberg/Paimon).

Lookup join (FOR SYSTEM_TIME AS OF PROC())

Lookup join — это join с внешним key-value store в processing-time. Для каждой строки потока Flink делает round-trip в external store (через специальный connector) и подкладывает данные.

-- Lookup-source: customers в JDBC БД
CREATE TABLE customers (
  customer_id BIGINT,
  customer_name STRING,
  segment STRING,
  PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://db:5432/crm',
  'table-name' = 'customers',
  'lookup.cache.max-rows' = '10000',
  'lookup.cache.ttl' = '5 min'
);

-- Stream + lookup-join
SELECT
  o.order_id,
  o.amount,
  c.customer_name,
  c.segment
FROM orders AS o
JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c
  ON o.customer_id = c.customer_id;

Что происходит:

  1. Order приходит. Flink берёт customer_id.
  2. Если в LRU-кеше нет — делает query в JDBC (SELECT * FROM customers WHERE customer_id = ?).
  3. Кладёт в кеш, возвращает join.

Это processing-time join (o.proc_time), а не event-time. То есть мы получаем актуальное на момент обработки значение customer, а не на момент совершения order. Для исторически точного обогащения нужен temporal join (но lookup-source обычно его не поддерживает — нет changelog).

WARNING

Lookup join делает round-trip за каждый матч (минус кеш). Это латентность и нагрузка на внешнюю БД. На высоких throughput (10K events/s) даже с кешем external store становится bottleneck. Решения: настроить lookup.cache.max-rows побольше, увеличить lookup.cache.ttl, использовать Async I/O (Flink делает это автоматически в lookup join, но настройки parallelism важны). Для очень высокого throughput — рассмотрите temporal join с versioned-таблицей вместо lookup, тогда state Flink сам кеширует.


Сравнительная таблица

JoinКогда использоватьStateLatencyRisk
RegularТолько если cardinality маленькая и TTL настроенO(A)+O(B) — unbounded по умолчаниюМинимальнаяCartesian, OOM
IntervalВремя задержки между потоками ограниченоO(rate × interval)МинимальнаяWatermark обязателен
TemporalEnrichment с историей по primary keyO(versions)МинимальнаяVersioned source нужен
LookupEnrichment с внешней БД, кеш приемлемO(cache)External store-boundLatency, нагрузка на БД

Production-выбор

Простой decision tree:

  1. Enrichment редко-обновляемыми справочниками (страны, тарифы, мерчанты)? — Lookup join с большим TTL.
  2. Enrichment курсами валют, ценами, балансами с историей? — Temporal join + Debezium CDC.
  3. Корреляция событий с временной зависимостью (request + response, click + impression)? — Interval join.
  4. Generic join по PK без time-связи? — Regular join с явным state TTL.

Попробуй сам

  1. Напиши interval join, который сопоставляет page_views и subsequent purchases в окне 30 минут (если за 30 мин после view был purchase того же продукта тем же user_id).
  2. Объясни, почему temporal join требует, чтобы versioned-таблица имела PRIMARY KEY. Что произойдёт, если PK не указать?
  3. Lookup join с lookup.cache.ttl=5min: что произойдёт, если customer в JDBC обновился, но event приходит через 2 минуты после update?
Проверка знанийKnowledge check
Команда написала SELECT o.*, c.name FROM orders o JOIN customers c ON o.customer_id = c.customer_id, где orders — поток 5K/s из Kafka, а customers — таблица в Postgres с 10 миллионами строк, обновляемая 100 раз в день. После 2 недель в production job падает с OOM, state size превысил 50 GB. Какой тип join был использован, что не так и какой правильный выбор?
ОтветAnswer
Был использован regular join. Это семантически generic equi-join, для которого Flink держит обе таблицы в state без TTL. Customers (10M строк) и orders за 2 недели (~6 млрд событий) накапливаются в RocksDB. Это и есть unbounded state. Правильный выбор зависит от семантики: (1) если нужны актуальные данные customer на момент обработки order и обновления customer редкие — lookup join с JDBC connector и LRU-кешем (lookup.cache.max-rows=100K, ttl=10min). Это уменьшит state до размера кеша, нагрузка на Postgres ограничена частотой кеш-промахов; (2) если нужны исторически правильные данные на момент совершения order — temporal join с versioned table из Debezium CDC топика customers (нужен CDC pipeline из Postgres в Kafka, что — другой проект). Regular join можно оставить, если установить state TTL (table.exec.state.ttl=30d) и принять, что орifs старше 30 дней не получат update, если customer изменился позже.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Запрос SELECT * FROM orders JOIN customers ON orders.customer_id = customers.customer_id запускается без temporal-предикатов. Что произойдёт со state через несколько месяцев в production?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5