Temporal, interval и lookup joins
Join — самая опасная операция в streaming SQL. В batch-SQL вы привыкли, что JOIN просто работает: оптимизатор выбирает hash/merge, считает результат, всё хорошо. В streaming у JOIN четыре разных типа, у каждого свои семантика, требования и риски. Сегодня разберём все четыре, и почему regular join без всяких оговорок — обычно ошибка в production.
Regular join (cartesian risk)
SELECT * FROM A JOIN B ON A.key = B.key без temporal-предикатов — это regular join. Flink семантически точно такой же как в batch: каждая строка A с подходящей строкой B даёт пару.
SELECT
o.order_id,
o.amount,
c.customer_name
FROM orders o
JOIN customers c
ON o.customer_id = c.customer_id;
В чём подвох:
- Чтобы регулярный join работал, Flink держит обе таблицы в state. Для каждой строки A надо знать все B с тем же ключом, и наоборот.
- State растёт линейно от cardinality обеих таблиц. Без TTL — unbounded.
- Изменения в B (новый customer, обновлённый профиль) вызывают retract+emit всех уже виденных матчей. На больших таблицах это лавинный пересчёт.
- Outer join (
LEFT JOIN) только усугубляет: нужно держать ещё и null-padded строки, и менять их статус при появлении match.
Regular join в production без явного state TTL — почти всегда баг. На потоке 10K orders/s и 1M customers через месяц state будет содержать миллиарды записей, RocksDB не справится, job упадёт. Если действительно нужен generic join (новый customer должен соединиться с историческими orders) — настройте state TTL или используйте versioned table + temporal join (см. ниже).
Interval join
Interval join — это join с time-bounded предикатом. Между событиями A и B должно соблюдаться временное соотношение: B.event_time лежит в окне [A.event_time - lower, A.event_time + upper].
SELECT
o.order_id,
s.shipment_id,
o.amount
FROM orders o, shipments s
WHERE
o.order_id = s.order_id
AND s.event_time BETWEEN o.event_time - INTERVAL '5' MINUTE
AND o.event_time + INTERVAL '1' HOUR;
Здесь мы говорим: shipment может произойти от 5 минут до часа после создания order. Flink держит в state только те orders и shipments, которые ещё могут совпасть — после watermark пересекает верхнюю границу, запись чистится.
Когда использовать:
- События в логически связанных потоках, где между ними ожидаемое время задержки.
- Логи запросов и ответов (request, response).
- Транзакция и confirmation.
- Click-through-rate (impression -> click within window).
Interval join требует WATERMARK на обоих источниках. Без watermark cleanup невозможен, state будет расти. Если один из потоков не имеет event-time — это сигнал, что вам нужен temporal или lookup join, а не interval.
Temporal join (FOR SYSTEM_TIME AS OF)
Temporal join — это join потока с versioned table (таблицей, у которой есть история версий по primary key). Семантика: для каждой строки потока вернуть строку versioned table, которая была актуальной на момент event-time потока.
Канонический пример — обогащение orders текущим курсом валюты:
-- Versioned table: курсы валют
CREATE TABLE rates (
currency STRING,
rate DECIMAL(10, 4),
update_time TIMESTAMP_LTZ(3),
WATERMARK FOR update_time AS update_time,
PRIMARY KEY (currency) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'rates',
'format' = 'debezium-json',
...
);
-- Stream: orders
SELECT
o.order_id,
o.amount,
o.currency,
r.rate,
o.amount * r.rate AS amount_usd
FROM orders AS o
JOIN rates FOR SYSTEM_TIME AS OF o.event_time AS r
ON o.currency = r.currency;
Что происходит:
- Order приходит в момент
t = 10:30:00с currency=EUR. - Flink ищет в state rates запись
currency=EUR, актуальную на 10:30:00 (последнееupdate_time <= 10:30:00). - Возвращает join.
Это семантически правильное обогащение: если rate обновился в 10:35, order от 10:30 всё равно получит исторически правильный курс на момент совершения order.
Versioned table требует:
- PRIMARY KEY (
PRIMARY KEY (...) NOT ENFORCED). - WATERMARK для определения “версионности”.
- Источник, поддерживающий update/delete (Debezium CDC, Upsert Kafka, Hudi/Iceberg/Paimon).
Lookup join (FOR SYSTEM_TIME AS OF PROC())
Lookup join — это join с внешним key-value store в processing-time. Для каждой строки потока Flink делает round-trip в external store (через специальный connector) и подкладывает данные.
-- Lookup-source: customers в JDBC БД
CREATE TABLE customers (
customer_id BIGINT,
customer_name STRING,
segment STRING,
PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://db:5432/crm',
'table-name' = 'customers',
'lookup.cache.max-rows' = '10000',
'lookup.cache.ttl' = '5 min'
);
-- Stream + lookup-join
SELECT
o.order_id,
o.amount,
c.customer_name,
c.segment
FROM orders AS o
JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.customer_id;
Что происходит:
- Order приходит. Flink берёт customer_id.
- Если в LRU-кеше нет — делает query в JDBC (
SELECT * FROM customers WHERE customer_id = ?). - Кладёт в кеш, возвращает join.
Это processing-time join (o.proc_time), а не event-time. То есть мы получаем актуальное на момент обработки значение customer, а не на момент совершения order. Для исторически точного обогащения нужен temporal join (но lookup-source обычно его не поддерживает — нет changelog).
Lookup join делает round-trip за каждый матч (минус кеш). Это латентность и нагрузка на внешнюю БД. На высоких throughput (10K events/s) даже с кешем external store становится bottleneck. Решения: настроить lookup.cache.max-rows побольше, увеличить lookup.cache.ttl, использовать Async I/O (Flink делает это автоматически в lookup join, но настройки parallelism важны). Для очень высокого throughput — рассмотрите temporal join с versioned-таблицей вместо lookup, тогда state Flink сам кеширует.
Сравнительная таблица
| Join | Когда использовать | State | Latency | Risk |
|---|---|---|---|---|
| Regular | Только если cardinality маленькая и TTL настроен | O(A)+O(B) — unbounded по умолчанию | Минимальная | Cartesian, OOM |
| Interval | Время задержки между потоками ограничено | O(rate × interval) | Минимальная | Watermark обязателен |
| Temporal | Enrichment с историей по primary key | O(versions) | Минимальная | Versioned source нужен |
| Lookup | Enrichment с внешней БД, кеш приемлем | O(cache) | External store-bound | Latency, нагрузка на БД |
Production-выбор
Простой decision tree:
- Enrichment редко-обновляемыми справочниками (страны, тарифы, мерчанты)? — Lookup join с большим TTL.
- Enrichment курсами валют, ценами, балансами с историей? — Temporal join + Debezium CDC.
- Корреляция событий с временной зависимостью (request + response, click + impression)? — Interval join.
- Generic join по PK без time-связи? — Regular join с явным state TTL.
Попробуй сам
- Напиши interval join, который сопоставляет page_views и subsequent purchases в окне 30 минут (если за 30 мин после view был purchase того же продукта тем же user_id).
- Объясни, почему temporal join требует, чтобы versioned-таблица имела PRIMARY KEY. Что произойдёт, если PK не указать?
- Lookup join с lookup.cache.ttl=5min: что произойдёт, если customer в JDBC обновился, но event приходит через 2 минуты после update?