DeltaJoin и MultiJoin — две главные оптимизации joins в 2.x
State footprint — главная operational боль streaming joins в Flink 1.x. Regular join хранит обе стороны в state: каждый incoming record requires lookup, append to own state, emit matches. Для двух потоков по миллиону событий в день state растёт линейно, упирается в RocksDB capacity, требует TTL для cleanup, влияет на checkpoint duration.
Flink 2.1 (январь 2026) ввёл DeltaJoin как optimization для типичных случаев — стал default в 2.2 (май 2026). Flink 2.2 добавил MultiJoin для каскадных joins без intermediate state. Эти две оптимизации фактически делают streaming joins production-ready на стейтах, которые раньше требовали Flink CEP или custom DataStream код.
В этом уроке разбираемся, как они работают, какие требования к данным и где помещаются в pipeline оптимизатора.
Join-оптимизации в Spark AQEПроблема regular streaming join
Классический regular join (INNER):
SELECT *
FROM orders o JOIN payments p ON o.order_id = p.order_id;
State в Flink 1.x:
orders_state: MapState<order_id, List<OrderRow>>
payments_state: MapState<order_id, List<PaymentRow>>
On new order o:
orders_state.put(o.order_id, append(o))
for each p in payments_state.get(o.order_id):
emit join(o, p)
On new payment p:
payments_state.put(p.order_id, append(p))
for each o in orders_state.get(p.order_id):
emit join(o, p)
Размер state = O(unique_keys) * (avg_orders_per_key + avg_payments_per_key)
Если для каждого order_id есть 1 order и 1 payment, state = 2 × число заказов. Для миллиарда заказов и средний row ~500 байт — state ~1 TB.
Эта линейная масштабируемость state — главная проблема. Решения 1.x:
- State TTL — старые записи удаляются, но рискуют пропустить late join.
- Interval join — фиксированное временное окно, ограничивает state.
- Temporal join — для dimension data.
- Lookup join — для small dim tables.
Эти решения работают для определённых паттернов, но regular join для двух больших streams оставался дорогим.
DeltaJoin: идея и motivation
DeltaJoin предложен в FLIP-486 (Flink Improvement Proposal). Базовая идея: если у обоих streams есть primary key и они представлены как UPSERT/CDC streams, можно хранить только одну сторону полностью, а другую — только при необходимости через lookup в source.
Условия для DeltaJoin:
1. Оба источника — UPSERT или CDC (имеют PK)
2. Source supports lookup (как dimension table)
Например: Paimon с lookup, Kafka compacted topic, JDBC, HBase
3. Join condition включает PK на обеих сторонах (или одной)
4. Изменения на одной стороне триггерят lookup на другой
Архитектура:
DeltaJoin между orders и payments (оба Paimon):
orders stream ----+
|
v
[DeltaJoin operator]
^
|
payments stream ---+
On change in orders (e.g. new or updated order O):
1. Lookup в payments BY join key (через Paimon lookup API)
2. emit join(O, P) для найденных P
On change in payments (e.g. new or updated payment P):
1. Lookup в orders BY join key
2. emit join(O, P) для найденных O
State в operator:
Только cache for lookups (LRU, bounded)
НЕ полные orders + payments
Lookup performs:
Paimon: file-level pruning + LSM lookup ~ms
Kafka compacted: state from changelog log compaction
JDBC: SELECT WHERE key = ?
HBase: get(key)
Главный выигрыш: state footprint от O(N) до O(cache_size). Cache size может быть constant (например, 1 GB), независимо от роста потока.
DeltaJoin: пример работы
Schema:
orders(order_id PK, user_id, amount, ts)
payments(payment_id PK, order_id FK, amount, ts)
SQL:
SELECT o.order_id, o.amount, p.payment_id, p.amount
FROM orders o JOIN payments p ON o.order_id = p.order_id;
Условие применимости:
- orders: Paimon table с PK = order_id, lookup support
- payments: Paimon table с PK = payment_id, lookup support
- Join key = orders.order_id = payments.order_id
- При изменении orders нужен lookup в payments by order_id
- Но payments PK = payment_id, не order_id!
- DeltaJoin требует, чтобы lookup был по любому индексированному полю
(Paimon поддерживает secondary indexes / sorted lookup tables)
Если выполнено:
Flink планировщик заменяет StreamPhysicalJoin на StreamPhysicalDeltaJoin
Streaming работает так:
Время T1: новый order arrives: +I(order_id=42, amount=100)
DeltaJoin:
1. cache.get(payments, by order_id=42) -> miss
2. paimon_source.lookup(payments WHERE order_id=42) -> []
3. emit nothing (нет matching payments yet)
4. cache.put(payments, key=42, value=[])
Время T2: новый payment arrives: +I(payment_id=99, order_id=42, amount=100)
DeltaJoin:
1. cache.get(orders, by order_id=42) -> potentially in cache
или paimon_source.lookup(orders WHERE order_id=42) -> [order_id=42, ...]
2. emit join: +I(order_id=42, amount=100, payment_id=99, amount=100)
3. cache.put(orders, key=42, value=order)
State в Flink operator ≈ 0
Данные живут в Paimon LSM-tree.
DeltaJoin: правило в оптимизаторе
StreamPhysicalDeltaJoinRule:
Matches:
StreamPhysicalJoin
StreamPhysicalSource(left) where left supports lookup AND has PK
StreamPhysicalSource(right) where right supports lookup AND has PK
Join condition includes equi-condition on lookup-capable keys
Transforms to:
StreamPhysicalDeltaJoin
StreamPhysicalSource(left) — но с lookup capability
StreamPhysicalSource(right) — но с lookup capability
Cost comparison:
Regular join cost ~ O(N) state
DeltaJoin cost ~ O(cache_size) state + O(lookup_count × lookup_latency) IO
Если DeltaJoin cheaper по cost — выбирается им.
С Flink 2.2 DeltaJoin — default для подходящих случаев.
Конфиг:
# Flink 2.1 (опт-ин)
table.optimizer.delta-join.strategy: AUTO # AUTO, FORCE, NONE
# Flink 2.2 (default ON)
# Можно отключить через NONE если нужен старый behavior
# Тюнинг cache
table.optimizer.delta-join.cache.max-size: 100000
table.optimizer.delta-join.cache.ttl: 1h
DeltaJoin особенно хорошо работает с Paimon — он специально проектировался как lakehouse format для streaming с lookup capability. С Kafka compacted topics работает, но lookup latency выше из-за необходимости обходить partitions. С JDBC — отлично для dimension tables, но не для огромных fact tables.
Когда DeltaJoin не применяется
Сценарии, где DeltaJoin NOT applied:
1. Append-only sources без PK
Solution: regular join + state TTL
2. Один source не поддерживает lookup
Solution: regular join
3. Join condition не индексирован в source
Например, JOIN ON o.user_id = p.user_id, а PK = order_id и payment_id
Если нет secondary index — нет efficient lookup
Solution: create lookup index в Paimon или regular join
4. Non-equi join condition
Например, BETWEEN x AND y, range comparisons
Solution: interval join или CEP
5. Cardinality высокая (cache miss rate высокий)
Если 99% lookups идут в Paimon — может быть медленнее
Cost-based optimizer не выберет DeltaJoin
6. Hot keys (skew)
Если несколько keys получают 90% records, cache thrashing
Может быть медленнее regular join
MultiJoin: каскадные joins zero intermediate state
MultiJoin (FLIP-486, выпущен в 2.2) — другая optimization для случая, когда несколько tables соединяются в цепочку:
SELECT *
FROM orders o
JOIN payments p ON o.order_id = p.order_id
JOIN refunds r ON p.payment_id = r.payment_id
JOIN users u ON o.user_id = u.user_id;
Без MultiJoin это 3 отдельных StreamPhysicalJoin, каждый с state:
Plan:
StreamPhysicalJoin (with users)
StreamPhysicalExchange (hash by user_id)
StreamPhysicalJoin (with refunds)
StreamPhysicalExchange (hash by payment_id)
StreamPhysicalJoin (orders + payments)
StreamPhysicalSource(orders)
StreamPhysicalSource(payments)
StreamPhysicalSource(refunds)
StreamPhysicalSource(users)
State usage:
Join1 (orders+payments): orders + payments tables
Join2 (+refunds): intermediate (orders+payments) + refunds
Join3 (+users): intermediate (orders+payments+refunds) + users
Total: 4 × source size + 2 × intermediate sizes
Intermediate state — ОГРОМЕН (rows after each join)
С MultiJoin:
Plan:
StreamPhysicalMultiJoin (4 inputs)
StreamPhysicalExchange (hash by orders join keys)
StreamPhysicalSource(orders)
StreamPhysicalExchange
StreamPhysicalSource(payments)
StreamPhysicalExchange
StreamPhysicalSource(refunds)
StreamPhysicalExchange
StreamPhysicalSource(users)
State usage:
Только indexes на каждый source: O(orders + payments + refunds + users)
НЕТ intermediate state (ключевая идея!)
Total: 4 × source size (без умножений на intermediate sizes)
MultiJoin: как работает
MultiJoin operator принимает N входов и поддерживает per-input state как index для join keys. Когда приходит record на любой из входов, operator делает multi-way lookup в state других input’ов и эмитит matching результаты.
MultiJoin для (orders, payments, refunds, users):
State:
orders_idx: MapState<order_id, OrderRow>
payments_idx_by_order: MapState<order_id, PaymentRow>
payments_idx_by_payment: MapState<payment_id, PaymentRow> -- secondary
refunds_idx_by_payment: MapState<payment_id, RefundRow>
users_idx: MapState<user_id, UserRow>
On new order O:
orders_idx.put(O.order_id, O)
payment_candidates = payments_idx_by_order.get(O.order_id)
user_candidate = users_idx.get(O.user_id)
for p in payment_candidates:
refund_candidates = refunds_idx_by_payment.get(p.payment_id)
for r in refund_candidates:
emit (O, p, r, user_candidate)
On new payment P:
payments_idx_by_order.put(P.order_id, P)
payments_idx_by_payment.put(P.payment_id, P)
order_candidate = orders_idx.get(P.order_id)
refund_candidates = refunds_idx_by_payment.get(P.payment_id)
for r in refund_candidates:
user_candidate = users_idx.get(order_candidate.user_id)
emit (order_candidate, P, r, user_candidate)
...similar для refunds и users
Ключевая идея — никакого intermediate state. Каждый input хранит только свой own rows + indexes. Multi-way join выполняется в operator на лету через cross-lookup.
MultiJoin: правило и применимость
StreamPhysicalMultiJoinRule:
Matches:
Cascade of StreamPhysicalJoin nodes
Все joins имеют equi-conditions
Join keys можно "связать" — общий граф ключей
Transforms:
Заменяет N-1 StreamPhysicalJoin узлов на 1 StreamPhysicalMultiJoin с N inputs
Condition:
Должно быть >2 inputs (для 2 inputs обычный join эффективнее)
Joins должны быть INNER (для outer joins MultiJoin сложнее, в работе)
Cost-based: оптимизатор выбирает MultiJoin только если intermediate state
в cascade joins был бы значительным
Включается в Flink 2.2:
table.optimizer.multi-join.enabled: true # default true в 2.2
MultiJoin: когда выигрывает
Сценарии, где MultiJoin сильно выигрывает:
1. Star schema joins (fact + multiple dims)
SELECT *
FROM fact_orders f
JOIN dim_users u ON f.user_id = u.id
JOIN dim_products p ON f.product_id = p.id
JOIN dim_stores s ON f.store_id = s.id;
Cascade: 3 huge intermediates
MultiJoin: single op, indexes on each dim
2. Enrichment chains
SELECT *
FROM events e
JOIN session_data sd ON e.session = sd.id
JOIN user_profile up ON sd.user = up.id
JOIN org_data od ON up.org = od.id;
Cascade: 3 intermediates each enriched
MultiJoin: один проход с 4-way
3. Reference data joins
Joins с большим количеством lookups в reference tables
Сценарии, где MultiJoin НЕ помогает:
1. 2-table joins
Overhead MultiJoin > обычный join
2. Skewed joins
Skew распространяется на все inputs
3. Очень разные cardinalities
Иногда лучше cascade с small + huge сначала
(cost-based optimizer должен это поймать)
4. Outer joins (пока)
В 2.2 MultiJoin поддерживает только INNER
DeltaJoin + MultiJoin вместе
В Flink 2.2 оба применяются в pipeline. Сначала идёт MultiJoinRule (merging cascade), потом DeltaJoinRule (replacement с lookup-based для подходящих subjoins).
SQL:
SELECT * FROM orders o JOIN payments p ON o.id = p.order_id
JOIN users u ON o.user = u.id;
После MultiJoinRule:
StreamPhysicalMultiJoin (3 inputs)
Source(orders)
Source(payments)
Source(users)
Если orders и payments оба Paimon с lookup support:
После DeltaJoinRule:
StreamPhysicalMultiJoin с DeltaJoin behavior для (orders, payments)
и regular state для users (если users — append-only stream)
Combination даёт уменьшение state в десятки раз для типичных streaming SQL workloads.
Реальные числа
Из FLIP-486 motivation и benchmarks (CDC e-commerce workload):
Workload: 1B records per day, 4-way join
fact_orders × dim_users × dim_products × dim_stores
Flink 1.20 (cascade regular joins):
State size: ~3 TB (intermediates × scale factors)
Recovery time: 40+ minutes
Checkpoint duration: 5+ minutes
Required TaskManagers: 20+ для I/O bandwidth
Flink 2.2 (MultiJoin + DeltaJoin):
State size: ~80 GB (indexes per source)
Recovery time: 30 seconds (lazy load)
Checkpoint duration: 30 seconds
Required TaskManagers: 6-8
Cost снижение ~70-80%.
Limitations и pitfalls
DeltaJoin pitfalls:
1. Cache hit rate critical
Низкий cache hit rate -> мощный source pressure
Monitor: cache_hit_rate metric
2. Source latency
Slow Paimon snapshot reads -> slow join
Monitor: lookup latency p99
3. Schema evolution
Изменение PK или index requires job rebuild
MultiJoin pitfalls:
1. State per source может быть unbounded без TTL
Solution: STATE_TTL hint per input
2. Hot keys
Если одно value join key получает 90% rows — skew
Все equi-joins страдают, MultiJoin не исключение
3. Plan stability
Включение MultiJoin меняет ExecNode graph
Может потребовать новый savepoint (state schema change)
4. Debugging
Multi-input операторы сложнее дебажить через UI
Один operator вместо N — единая запись в метриках
Миграция с regular joins на DeltaJoin/MultiJoin не savepoint-compatible автоматически. Если в production есть job с regular joins и большим state, прямой апгрейд на Flink 2.2 с включёнными оптимизациями приведёт к incompatible state schema. Решение: либо отключить оптимизации (table.optimizer.delta-join.strategy=NONE), либо мигрировать через bootstrap state.
Чтение source
flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/
rules/physical/stream/
StreamPhysicalDeltaJoinRule.scala
StreamPhysicalMultiJoinRule.scala
nodes/exec/stream/
StreamExecDeltaJoin.java
StreamExecMultiJoin.java
nodes/physical/stream/
StreamPhysicalDeltaJoin.scala
StreamPhysicalMultiJoin.scala
flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/
delta/DeltaJoinOperator.java
multi/StreamingMultiJoinOperator.java
flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/
DeltaJoinCodeGenerator.scala
MultiJoinCodeGenerator.scala
FLIP документы:
FLIP-486: Introduce StreamPhysicalDeltaJoin
FLIP-... : Introduce StreamPhysicalMultiJoin
Migration strategy для production
Шаги для безопасной миграции:
1. Compile plan на 1.20 (текущий job)
COMPILE PLAN '/savepoints/job_1.20.json' FOR <sql>
-- compiled plan гарантирует stable execution
2. Upgrade Flink до 2.2 БЕЗ изменений config
- Compiled plans продолжают работать с regular joins
3. Тестирование на staging
- Включить delta-join.strategy=AUTO в новом env
- Запустить новый compile -> увидеть новый план
- Сравнить production по: throughput, state size, recovery time
4. Decision point:
- Если выигрыш существенный -> миграция
- Если нет -> остаться на compiled plan
5. Миграция:
- Подготовить новый savepoint через bootstrap state (State Processor API)
- Стоп old job
- Старт new job from new savepoint
6. Rollback plan:
- Сохранить старый savepoint
- Если новый job нестабилен -> возврат на compiled plan