Learning Platform
Глоссарий Troubleshooting
Урок 10.05 · 28 мин
Продвинутый
DeltaJoinMultiJoinStreaming JoinState FootprintFlink 2.1Flink 2.2

DeltaJoin и MultiJoin — две главные оптимизации joins в 2.x

State footprint — главная operational боль streaming joins в Flink 1.x. Regular join хранит обе стороны в state: каждый incoming record requires lookup, append to own state, emit matches. Для двух потоков по миллиону событий в день state растёт линейно, упирается в RocksDB capacity, требует TTL для cleanup, влияет на checkpoint duration.

Flink 2.1 (январь 2026) ввёл DeltaJoin как optimization для типичных случаев — стал default в 2.2 (май 2026). Flink 2.2 добавил MultiJoin для каскадных joins без intermediate state. Эти две оптимизации фактически делают streaming joins production-ready на стейтах, которые раньше требовали Flink CEP или custom DataStream код.

В этом уроке разбираемся, как они работают, какие требования к данным и где помещаются в pipeline оптимизатора.

Join-оптимизации в Spark AQE

Проблема regular streaming join

Классический regular join (INNER):

SELECT *
FROM orders o JOIN payments p ON o.order_id = p.order_id;

State в Flink 1.x:
  orders_state: MapState<order_id, List<OrderRow>>
  payments_state: MapState<order_id, List<PaymentRow>>

On new order o:
  orders_state.put(o.order_id, append(o))
  for each p in payments_state.get(o.order_id):
    emit join(o, p)

On new payment p:
  payments_state.put(p.order_id, append(p))
  for each o in orders_state.get(p.order_id):
    emit join(o, p)

Размер state = O(unique_keys) * (avg_orders_per_key + avg_payments_per_key)

Если для каждого order_id есть 1 order и 1 payment, state = 2 × число заказов. Для миллиарда заказов и средний row ~500 байт — state ~1 TB.

Эта линейная масштабируемость state — главная проблема. Решения 1.x:

  1. State TTL — старые записи удаляются, но рискуют пропустить late join.
  2. Interval join — фиксированное временное окно, ограничивает state.
  3. Temporal join — для dimension data.
  4. Lookup join — для small dim tables.

Эти решения работают для определённых паттернов, но regular join для двух больших streams оставался дорогим.

DeltaJoin: идея и motivation

DeltaJoin предложен в FLIP-486 (Flink Improvement Proposal). Базовая идея: если у обоих streams есть primary key и они представлены как UPSERT/CDC streams, можно хранить только одну сторону полностью, а другую — только при необходимости через lookup в source.

Условия для DeltaJoin:
  1. Оба источника — UPSERT или CDC (имеют PK)
  2. Source supports lookup (как dimension table)
     Например: Paimon с lookup, Kafka compacted topic, JDBC, HBase
  3. Join condition включает PK на обеих сторонах (или одной)
  4. Изменения на одной стороне триггерят lookup на другой

Архитектура:

DeltaJoin между orders и payments (оба Paimon):

  orders stream  ----+
                     |
                     v
              [DeltaJoin operator]
                     ^
                     |
  payments stream ---+

On change in orders (e.g. new or updated order O):
  1. Lookup в payments BY join key (через Paimon lookup API)
  2. emit join(O, P) для найденных P

On change in payments (e.g. new or updated payment P):
  1. Lookup в orders BY join key
  2. emit join(O, P) для найденных O

State в operator:
  Только cache for lookups (LRU, bounded)
  НЕ полные orders + payments

Lookup performs:
  Paimon: file-level pruning + LSM lookup ~ms
  Kafka compacted: state from changelog log compaction
  JDBC: SELECT WHERE key = ?
  HBase: get(key)

Главный выигрыш: state footprint от O(N) до O(cache_size). Cache size может быть constant (например, 1 GB), независимо от роста потока.

Regular Join vs DeltaJoin — state architecture
Aspect
Regular Join
DeltaJoin

DeltaJoin: пример работы

Schema:
  orders(order_id PK, user_id, amount, ts)
  payments(payment_id PK, order_id FK, amount, ts)

SQL:
  SELECT o.order_id, o.amount, p.payment_id, p.amount
  FROM orders o JOIN payments p ON o.order_id = p.order_id;

Условие применимости:
  - orders: Paimon table с PK = order_id, lookup support
  - payments: Paimon table с PK = payment_id, lookup support
  - Join key = orders.order_id = payments.order_id
  - При изменении orders нужен lookup в payments by order_id
  - Но payments PK = payment_id, не order_id!
  - DeltaJoin требует, чтобы lookup был по любому индексированному полю
    (Paimon поддерживает secondary indexes / sorted lookup tables)

Если выполнено:
  Flink планировщик заменяет StreamPhysicalJoin на StreamPhysicalDeltaJoin

Streaming работает так:

  Время T1: новый order arrives: +I(order_id=42, amount=100)
    DeltaJoin:
      1. cache.get(payments, by order_id=42) -> miss
      2. paimon_source.lookup(payments WHERE order_id=42) -> []
      3. emit nothing (нет matching payments yet)
      4. cache.put(payments, key=42, value=[])

  Время T2: новый payment arrives: +I(payment_id=99, order_id=42, amount=100)
    DeltaJoin:
      1. cache.get(orders, by order_id=42) -> potentially in cache
         или paimon_source.lookup(orders WHERE order_id=42) -> [order_id=42, ...]
      2. emit join: +I(order_id=42, amount=100, payment_id=99, amount=100)
      3. cache.put(orders, key=42, value=order)

State в Flink operator ≈ 0
Данные живут в Paimon LSM-tree.

DeltaJoin: правило в оптимизаторе

StreamPhysicalDeltaJoinRule:

  Matches:
    StreamPhysicalJoin
      StreamPhysicalSource(left) where left supports lookup AND has PK
      StreamPhysicalSource(right) where right supports lookup AND has PK
    Join condition includes equi-condition on lookup-capable keys

  Transforms to:
    StreamPhysicalDeltaJoin
      StreamPhysicalSource(left) — но с lookup capability
      StreamPhysicalSource(right) — но с lookup capability

  Cost comparison:
    Regular join cost ~ O(N) state
    DeltaJoin cost ~ O(cache_size) state + O(lookup_count × lookup_latency) IO

  Если DeltaJoin cheaper по cost — выбирается им.
  С Flink 2.2 DeltaJoin — default для подходящих случаев.

Конфиг:

# Flink 2.1 (опт-ин)
table.optimizer.delta-join.strategy: AUTO  # AUTO, FORCE, NONE

# Flink 2.2 (default ON)
# Можно отключить через NONE если нужен старый behavior

# Тюнинг cache
table.optimizer.delta-join.cache.max-size: 100000
table.optimizer.delta-join.cache.ttl: 1h
TIP

DeltaJoin особенно хорошо работает с Paimon — он специально проектировался как lakehouse format для streaming с lookup capability. С Kafka compacted topics работает, но lookup latency выше из-за необходимости обходить partitions. С JDBC — отлично для dimension tables, но не для огромных fact tables.

Когда DeltaJoin не применяется

Сценарии, где DeltaJoin NOT applied:

1. Append-only sources без PK
   Solution: regular join + state TTL

2. Один source не поддерживает lookup
   Solution: regular join

3. Join condition не индексирован в source
   Например, JOIN ON o.user_id = p.user_id, а PK = order_id и payment_id
   Если нет secondary index — нет efficient lookup
   Solution: create lookup index в Paimon или regular join

4. Non-equi join condition
   Например, BETWEEN x AND y, range comparisons
   Solution: interval join или CEP

5. Cardinality высокая (cache miss rate высокий)
   Если 99% lookups идут в Paimon — может быть медленнее
   Cost-based optimizer не выберет DeltaJoin

6. Hot keys (skew)
   Если несколько keys получают 90% records, cache thrashing
   Может быть медленнее regular join

MultiJoin: каскадные joins zero intermediate state

MultiJoin (FLIP-486, выпущен в 2.2) — другая optimization для случая, когда несколько tables соединяются в цепочку:

SELECT *
FROM orders o
JOIN payments p ON o.order_id = p.order_id
JOIN refunds r ON p.payment_id = r.payment_id
JOIN users u ON o.user_id = u.user_id;

Без MultiJoin это 3 отдельных StreamPhysicalJoin, каждый с state:

Plan:
  StreamPhysicalJoin (with users)
    StreamPhysicalExchange (hash by user_id)
      StreamPhysicalJoin (with refunds)
        StreamPhysicalExchange (hash by payment_id)
          StreamPhysicalJoin (orders + payments)
            StreamPhysicalSource(orders)
            StreamPhysicalSource(payments)
        StreamPhysicalSource(refunds)
    StreamPhysicalSource(users)

State usage:
  Join1 (orders+payments): orders + payments tables
  Join2 (+refunds): intermediate (orders+payments) + refunds
  Join3 (+users): intermediate (orders+payments+refunds) + users

Total: 4 × source size + 2 × intermediate sizes
Intermediate state — ОГРОМЕН (rows after each join)

С MultiJoin:

Plan:
  StreamPhysicalMultiJoin (4 inputs)
    StreamPhysicalExchange (hash by orders join keys)
      StreamPhysicalSource(orders)
    StreamPhysicalExchange
      StreamPhysicalSource(payments)
    StreamPhysicalExchange
      StreamPhysicalSource(refunds)
    StreamPhysicalExchange
      StreamPhysicalSource(users)

State usage:
  Только indexes на каждый source: O(orders + payments + refunds + users)
  НЕТ intermediate state (ключевая идея!)

Total: 4 × source size (без умножений на intermediate sizes)

MultiJoin: как работает

MultiJoin operator принимает N входов и поддерживает per-input state как index для join keys. Когда приходит record на любой из входов, operator делает multi-way lookup в state других input’ов и эмитит matching результаты.

MultiJoin для (orders, payments, refunds, users):

  State:
    orders_idx: MapState<order_id, OrderRow>
    payments_idx_by_order: MapState<order_id, PaymentRow>
    payments_idx_by_payment: MapState<payment_id, PaymentRow>  -- secondary
    refunds_idx_by_payment: MapState<payment_id, RefundRow>
    users_idx: MapState<user_id, UserRow>

  On new order O:
    orders_idx.put(O.order_id, O)
    payment_candidates = payments_idx_by_order.get(O.order_id)
    user_candidate = users_idx.get(O.user_id)
    for p in payment_candidates:
      refund_candidates = refunds_idx_by_payment.get(p.payment_id)
      for r in refund_candidates:
        emit (O, p, r, user_candidate)

  On new payment P:
    payments_idx_by_order.put(P.order_id, P)
    payments_idx_by_payment.put(P.payment_id, P)
    order_candidate = orders_idx.get(P.order_id)
    refund_candidates = refunds_idx_by_payment.get(P.payment_id)
    for r in refund_candidates:
      user_candidate = users_idx.get(order_candidate.user_id)
      emit (order_candidate, P, r, user_candidate)

  ...similar для refunds и users

Ключевая идея — никакого intermediate state. Каждый input хранит только свой own rows + indexes. Multi-way join выполняется в operator на лету через cross-lookup.

Regular cascade joins vs MultiJoin
Regular cascade joinsRegular cascade: 3 StreamPhysicalJoin узла, между каждой парой шафл + intermediate state.
state
O+P stateState1: orders + payments tables (~2x size). State2: intermediate(O+P) joined ~ multiplication * refunds.
exchange
(O+P)+R stateState3: previous intermediate + refunds. Может в разы больше original sources.
exchange
all + U stateState4: previous intermediate + users. Mega state.
MultiJoin operatorMultiJoin: один operator с 4 inputs, per-input state с indexes only. NO intermediate state.
indexed state per input
O + P + R + U indexesState: только rows of each source + secondary indexes. Размер пропорционален сумме sources, не их произведению.

MultiJoin: правило и применимость

StreamPhysicalMultiJoinRule:

  Matches:
    Cascade of StreamPhysicalJoin nodes
    Все joins имеют equi-conditions
    Join keys можно "связать" — общий граф ключей

  Transforms:
    Заменяет N-1 StreamPhysicalJoin узлов на 1 StreamPhysicalMultiJoin с N inputs

  Condition:
    Должно быть >2 inputs (для 2 inputs обычный join эффективнее)
    Joins должны быть INNER (для outer joins MultiJoin сложнее, в работе)
    Cost-based: оптимизатор выбирает MultiJoin только если intermediate state
                в cascade joins был бы значительным

Включается в Flink 2.2:

table.optimizer.multi-join.enabled: true  # default true в 2.2

MultiJoin: когда выигрывает

Сценарии, где MultiJoin сильно выигрывает:

1. Star schema joins (fact + multiple dims)
   SELECT *
   FROM fact_orders f
   JOIN dim_users u ON f.user_id = u.id
   JOIN dim_products p ON f.product_id = p.id
   JOIN dim_stores s ON f.store_id = s.id;
   
   Cascade: 3 huge intermediates
   MultiJoin: single op, indexes on each dim

2. Enrichment chains
   SELECT *
   FROM events e
   JOIN session_data sd ON e.session = sd.id
   JOIN user_profile up ON sd.user = up.id
   JOIN org_data od ON up.org = od.id;
   
   Cascade: 3 intermediates each enriched
   MultiJoin: один проход с 4-way

3. Reference data joins
   Joins с большим количеством lookups в reference tables
Сценарии, где MultiJoin НЕ помогает:

1. 2-table joins
   Overhead MultiJoin > обычный join

2. Skewed joins
   Skew распространяется на все inputs

3. Очень разные cardinalities
   Иногда лучше cascade с small + huge сначала
   (cost-based optimizer должен это поймать)

4. Outer joins (пока)
   В 2.2 MultiJoin поддерживает только INNER

DeltaJoin + MultiJoin вместе

В Flink 2.2 оба применяются в pipeline. Сначала идёт MultiJoinRule (merging cascade), потом DeltaJoinRule (replacement с lookup-based для подходящих subjoins).

SQL:
  SELECT * FROM orders o JOIN payments p ON o.id = p.order_id
                          JOIN users u ON o.user = u.id;

После MultiJoinRule:
  StreamPhysicalMultiJoin (3 inputs)
    Source(orders)
    Source(payments)
    Source(users)

Если orders и payments оба Paimon с lookup support:
  После DeltaJoinRule:
  StreamPhysicalMultiJoin с DeltaJoin behavior для (orders, payments)
  и regular state для users (если users — append-only stream)

Combination даёт уменьшение state в десятки раз для типичных streaming SQL workloads.

Реальные числа

Из FLIP-486 motivation и benchmarks (CDC e-commerce workload):

Workload: 1B records per day, 4-way join
  fact_orders × dim_users × dim_products × dim_stores

Flink 1.20 (cascade regular joins):
  State size: ~3 TB (intermediates × scale factors)
  Recovery time: 40+ minutes
  Checkpoint duration: 5+ minutes
  Required TaskManagers: 20+ для I/O bandwidth

Flink 2.2 (MultiJoin + DeltaJoin):
  State size: ~80 GB (indexes per source)
  Recovery time: 30 seconds (lazy load)
  Checkpoint duration: 30 seconds
  Required TaskManagers: 6-8

Cost снижение ~70-80%.

Limitations и pitfalls

DeltaJoin pitfalls:

1. Cache hit rate critical
   Низкий cache hit rate -> мощный source pressure
   Monitor: cache_hit_rate metric

2. Source latency
   Slow Paimon snapshot reads -> slow join
   Monitor: lookup latency p99

3. Schema evolution
   Изменение PK или index requires job rebuild

MultiJoin pitfalls:

1. State per source может быть unbounded без TTL
   Solution: STATE_TTL hint per input

2. Hot keys
   Если одно value join key получает 90% rows — skew
   Все equi-joins страдают, MultiJoin не исключение

3. Plan stability
   Включение MultiJoin меняет ExecNode graph
   Может потребовать новый savepoint (state schema change)

4. Debugging
   Multi-input операторы сложнее дебажить через UI
   Один operator вместо N — единая запись в метриках
WARNING

Миграция с regular joins на DeltaJoin/MultiJoin не savepoint-compatible автоматически. Если в production есть job с regular joins и большим state, прямой апгрейд на Flink 2.2 с включёнными оптимизациями приведёт к incompatible state schema. Решение: либо отключить оптимизации (table.optimizer.delta-join.strategy=NONE), либо мигрировать через bootstrap state.

Чтение source

flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/
  rules/physical/stream/
    StreamPhysicalDeltaJoinRule.scala
    StreamPhysicalMultiJoinRule.scala
  nodes/exec/stream/
    StreamExecDeltaJoin.java
    StreamExecMultiJoin.java
  nodes/physical/stream/
    StreamPhysicalDeltaJoin.scala
    StreamPhysicalMultiJoin.scala

flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/
  delta/DeltaJoinOperator.java
  multi/StreamingMultiJoinOperator.java

flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/
  DeltaJoinCodeGenerator.scala
  MultiJoinCodeGenerator.scala

FLIP документы:
  FLIP-486: Introduce StreamPhysicalDeltaJoin
  FLIP-... : Introduce StreamPhysicalMultiJoin

Migration strategy для production

Шаги для безопасной миграции:

1. Compile plan на 1.20 (текущий job)
   COMPILE PLAN '/savepoints/job_1.20.json' FOR <sql>
   -- compiled plan гарантирует stable execution

2. Upgrade Flink до 2.2 БЕЗ изменений config
   - Compiled plans продолжают работать с regular joins

3. Тестирование на staging
   - Включить delta-join.strategy=AUTO в новом env
   - Запустить новый compile -> увидеть новый план
   - Сравнить production по: throughput, state size, recovery time

4. Decision point:
   - Если выигрыш существенный -> миграция
   - Если нет -> остаться на compiled plan

5. Миграция:
   - Подготовить новый savepoint через bootstrap state (State Processor API)
   - Стоп old job
   - Старт new job from new savepoint

6. Rollback plan:
   - Сохранить старый savepoint
   - Если новый job нестабилен -> возврат на compiled plan
Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. В чём основная идея DeltaJoin и его выигрыш по сравнению с regular streaming join?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5