Learning Platform
Глоссарий Troubleshooting
Урок 07.03 · 18 мин
Продвинутый
HashJoinSortMergeJoinNestedLoopJoinJoinReorderingSubqueryDecorrelationTopKDynamicFilter

Оптимизация JOIN и подзапросов

JOIN — самая дорогая операция в аналитических запросах. Неправильный порядок соединений или неподходящий алгоритм может увеличить время выполнения на порядки. DataFusion автоматически выбирает стратегию, но понимание этого выбора позволяет писать запросы, которые оптимизатор может эффективно обработать.

Алгоритмы JOIN

DataFusion реализует три алгоритма соединения. Физический оптимизатор выбирает алгоритм на основе размера данных, наличия индексов сортировки и типа предиката.

Алгоритмы JOIN в DataFusion
HashJoinExecHash Join: строит хеш-таблицу по правой (меньшей) стороне, затем сканирует левую — оптимален для equi-join
SortMergeJoinExecSort-Merge Join: потоковое слияние двух отсортированных входов — минимальное потребление памяти O(1)
NestedLoopJoinExecNested Loop Join: полный перебор O(N*M) — единственный вариант для non-equi предикатов

HashJoinExec

Используется по умолчанию для equi-join (ON a.id = b.id):

EXPLAIN SELECT o.id, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id;
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@1, id@0)]
  -- build side (правая сторона, обычно меньшая таблица)
  CoalesceBatchesExec: target_batch_size=8192
    RepartitionExec: partitioning=Hash([id@0], 16)
      DataSourceExec: file_groups={...}, format=parquet, projection=[id, name]   -- customers
  -- probe side (левая сторона)
  CoalesceBatchesExec: target_batch_size=8192
    RepartitionExec: partitioning=Hash([customer_id@1], 16)
      DataSourceExec: file_groups={...}, format=parquet, projection=[id, customer_id]  -- orders

Partitioned mode — обе стороны перераспределяются по ключу join через RepartitionExec, каждая партиция строит свою хеш-таблицу. Это позволяет параллельное выполнение.

CollectLeft mode — правая сторона собирается в одну партицию (для малых таблиц). Полезно, когда одна сторона значительно меньше другой.

SortMergeJoinExec

Выбирается, когда данные уже отсортированы по ключу join:

-- Если orders и customers отсортированы по id
EXPLAIN SELECT *
FROM orders o
JOIN customers c ON o.customer_id = c.id
ORDER BY o.customer_id;

SortMergeJoin потребляет минимум памяти — обе стороны обрабатываются потоково. DataFusion выбирает его, когда EnforceSorting обнаруживает, что данные уже в нужном порядке или сортировка всё равно нужна для ORDER BY.

NestedLoopJoinExec

Применяется для non-equi соединений:

-- Диапазонный join — нет оператора =
SELECT e.name, s.label
FROM events e
JOIN severity s ON e.level BETWEEN s.min_level AND s.max_level;
NestedLoopJoinExec: join_type=Inner,
  filter=level@0 >= min_level@1 AND level@0 <= max_level@2
WARNING

NestedLoopJoinExec имеет сложность O(N * M). Для таблиц с миллионами строк это неприемлемо. Если видите NestedLoop в EXPLAIN для больших таблиц — перепишите запрос с equi-join предикатом или используйте оконные функции.

Перестановка таблиц (Join Reordering)

Порядок соединений влияет на промежуточные результаты. DataFusion переставляет таблицы для минимизации размера промежуточных данных.

Пример: три таблицы

SELECT *
FROM orders o                           -- 10M строк
JOIN customers c ON o.customer_id = c.id  -- 100K строк
JOIN regions r ON c.region_id = r.id;     -- 50 строк

Наивный порядок (как написано): orders JOIN customers → 10M строк промежуточно → JOIN regions.

Оптимальный порядок: customers JOIN regions → 100K строк → JOIN orders. Промежуточный результат значительно меньше.

DataFusion использует эвристики и статистику (когда доступна) для выбора порядка:

  1. Маленькие таблицы соединяются первыми
  2. Фильтрованные таблицы имеют приоритет
  3. Equi-join предпочтительнее cross-join

Build side selection

Для HashJoin оптимизатор выбирает меньшую таблицу как build side (правая сторона):

-- customers (100K) → build side, orders (10M) → probe side
HashJoinExec: mode=Partitioned, join_type=Inner
  RepartitionExec: ...  -- orders (probe)
  RepartitionExec: ...  -- customers (build)
TIP

Если оптимизатор выбирает неоптимальный порядок (например, из-за отсутствия статистики), вы можете повлиять на порядок через порядок таблиц в запросе или предоставить статистику через TableProvider::statistics().

Декорреляция подзапросов

Коррелированные подзапросы — источник неэффективности: по определению они выполняются для каждой строки внешнего запроса. DataFusion переписывает их в JOIN.

EXISTS → Semi-Join

-- Коррелированный подзапрос
SELECT c.name
FROM customers c
WHERE EXISTS (
    SELECT 1 FROM orders o WHERE o.customer_id = c.id AND o.amount > 1000
);

DataFusion декоррелирует в:

LeftSemiJoin: c.id = o.customer_id
  TableScan: customers
  Filter: o.amount > 1000
    TableScan: orders

Semi-join возвращает строку из customers, если хотя бы одно совпадение найдено в orders. В отличие от коррелированного подзапроса, это один проход по обеим таблицам.

IN → Semi-Join

SELECT * FROM products
WHERE category_id IN (SELECT id FROM categories WHERE active = true);

Переписывается аналогично EXISTS в LeftSemiJoin с фильтром active = true на правой стороне.

Скалярный подзапрос → Left Join

SELECT o.id,
       (SELECT c.name FROM customers c WHERE c.id = o.customer_id) AS customer_name
FROM orders o;

Декоррелируется в:

Projection: o.id, c.name AS customer_name
  LeftJoin: o.customer_id = c.id
    TableScan: orders
    TableScan: customers
NOTE

Декорреляция работает для подзапросов с equi-предикатами на колонки внешнего запроса. Подзапросы со сложными выражениями (функции, арифметика) или без корреляции обрабатываются другими правилами.

TopK-оптимизация

Комбинация ORDER BY ... LIMIT N — частый паттерн. DataFusion оптимизирует его через TopK:

SELECT * FROM events ORDER BY timestamp DESC LIMIT 10;

Без оптимизации: отсортировать все строки, взять первые 10.

С TopK: поддерживать heap из 10 элементов, обновлять при нахождении большего значения. Сложность O(N log K) вместо O(N log N), и главное — не нужно хранить весь набор данных в памяти.

GlobalLimitExec: skip=0, fetch=10
  SortExec: expr=[timestamp@0 DESC], fetch=10  -- fetch=10 → TopK
    DataSourceExec: ...

Обратите внимание на fetch=10 в SortExec — это признак TopK-оптимизации. DataFusion передаёт лимит вниз в sort operator, который использует bounded heap вместо полной сортировки.

Limit pushdown через JOIN

SELECT o.id
FROM orders o
JOIN customers c ON o.customer_id = c.id
ORDER BY o.created_at DESC
LIMIT 5;

Оптимизатор может спустить LIMIT 5 под join, если сортировка по ключу из левой таблицы. Это ограничивает объём данных до join.

Динамические фильтры

Dynamic filters — оптимизация на этапе выполнения (не планирования). При HashJoin DataFusion может вычислить bloom filter по build side и применить его к probe side, отсекая строки до actual join.

SELECT *
FROM large_fact f
JOIN small_dim d ON f.dim_key = d.id
WHERE d.category = 'electronics';

Без динамического фильтра: сканировать весь large_fact и проверять join condition.

С динамическим фильтром:

  1. Построить хеш-таблицу для small_dim (отфильтрованная — только ‘electronics’)
  2. Извлечь множество d.id значений → создать bloom filter
  3. Применить bloom filter к f.dim_key при сканировании large_fact
  4. Строки, не проходящие bloom filter, отбрасываются до join
TIP

Динамические фильтры наиболее эффективны, когда dimension-таблица мала и сильно фильтрована, а fact-таблица велика. Этот паттерн типичен для star-schema моделей в аналитических хранилищах.

TIP

В DataFusion 51–52 динамическая фильтрация расширена через механизм sideways information passing: probe side может получать фильтры от build side ещё до начала join, что позволяет DataSourceExec пропускать целые row groups при сканировании Parquet-файлов. Это особенно заметно на больших fact-таблицах с высокой селективностью dimension-фильтра.

Диагностика через EXPLAIN

Для анализа выбора алгоритма используйте EXPLAIN:

EXPLAIN SELECT o.id, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE c.region = 'EU';

На что обращать внимание:

  • HashJoinExec vs SortMergeJoinExec vs NestedLoopJoinExec — какой алгоритм выбран
  • mode=Partitioned vs CollectLeft — стратегия распределения для HashJoin
  • RepartitionExec — перераспределение по ключу join (нормально для Partitioned mode)
  • Filter placement — фильтр спущен к scan или остаётся над join
  • fetch=N в SortExec — признак TopK-оптимизации
-- EXPLAIN ANALYZE показывает реальные числа
EXPLAIN ANALYZE SELECT ...;
-- output_rows на каждом этапе → эффективность фильтрации
-- elapsed_compute → узкое место

Итоги

  • DataFusion выбирает между HashJoin (equi, по умолчанию), SortMergeJoin (данные уже отсортированы) и NestedLoop (non-equi)
  • Join reordering минимизирует промежуточные результаты — маленькие таблицы соединяются первыми
  • Коррелированные подзапросы (EXISTS, IN, скалярные) декоррелируются в Semi-Join или Left Join
  • TopK-оптимизация (ORDER BY + LIMIT) использует bounded heap вместо полной сортировки
  • Динамические фильтры сокращают probe side через bloom filter из build side
  • EXPLAIN и EXPLAIN ANALYZE — основные инструменты диагностики выбора алгоритма

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. При equi-join двух больших таблиц DataFusion по умолчанию выбирает HashJoinExec в mode=Partitioned. Что делает RepartitionExec перед join?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6