Оптимизация JOIN и подзапросов
JOIN — самая дорогая операция в аналитических запросах. Неправильный порядок соединений или неподходящий алгоритм может увеличить время выполнения на порядки. DataFusion автоматически выбирает стратегию, но понимание этого выбора позволяет писать запросы, которые оптимизатор может эффективно обработать.
Алгоритмы JOIN
DataFusion реализует три алгоритма соединения. Физический оптимизатор выбирает алгоритм на основе размера данных, наличия индексов сортировки и типа предиката.
HashJoinExec
Используется по умолчанию для equi-join (ON a.id = b.id):
EXPLAIN SELECT o.id, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id;
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@1, id@0)]
-- build side (правая сторона, обычно меньшая таблица)
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([id@0], 16)
DataSourceExec: file_groups={...}, format=parquet, projection=[id, name] -- customers
-- probe side (левая сторона)
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([customer_id@1], 16)
DataSourceExec: file_groups={...}, format=parquet, projection=[id, customer_id] -- orders
Partitioned mode — обе стороны перераспределяются по ключу join через RepartitionExec, каждая партиция строит свою хеш-таблицу. Это позволяет параллельное выполнение.
CollectLeft mode — правая сторона собирается в одну партицию (для малых таблиц). Полезно, когда одна сторона значительно меньше другой.
SortMergeJoinExec
Выбирается, когда данные уже отсортированы по ключу join:
-- Если orders и customers отсортированы по id
EXPLAIN SELECT *
FROM orders o
JOIN customers c ON o.customer_id = c.id
ORDER BY o.customer_id;
SortMergeJoin потребляет минимум памяти — обе стороны обрабатываются потоково. DataFusion выбирает его, когда EnforceSorting обнаруживает, что данные уже в нужном порядке или сортировка всё равно нужна для ORDER BY.
NestedLoopJoinExec
Применяется для non-equi соединений:
-- Диапазонный join — нет оператора =
SELECT e.name, s.label
FROM events e
JOIN severity s ON e.level BETWEEN s.min_level AND s.max_level;
NestedLoopJoinExec: join_type=Inner,
filter=level@0 >= min_level@1 AND level@0 <= max_level@2
NestedLoopJoinExec имеет сложность O(N * M). Для таблиц с миллионами строк это неприемлемо. Если видите NestedLoop в EXPLAIN для больших таблиц — перепишите запрос с equi-join предикатом или используйте оконные функции.
Перестановка таблиц (Join Reordering)
Порядок соединений влияет на промежуточные результаты. DataFusion переставляет таблицы для минимизации размера промежуточных данных.
Пример: три таблицы
SELECT *
FROM orders o -- 10M строк
JOIN customers c ON o.customer_id = c.id -- 100K строк
JOIN regions r ON c.region_id = r.id; -- 50 строк
Наивный порядок (как написано): orders JOIN customers → 10M строк промежуточно → JOIN regions.
Оптимальный порядок: customers JOIN regions → 100K строк → JOIN orders. Промежуточный результат значительно меньше.
DataFusion использует эвристики и статистику (когда доступна) для выбора порядка:
- Маленькие таблицы соединяются первыми
- Фильтрованные таблицы имеют приоритет
- Equi-join предпочтительнее cross-join
Build side selection
Для HashJoin оптимизатор выбирает меньшую таблицу как build side (правая сторона):
-- customers (100K) → build side, orders (10M) → probe side
HashJoinExec: mode=Partitioned, join_type=Inner
RepartitionExec: ... -- orders (probe)
RepartitionExec: ... -- customers (build)
Если оптимизатор выбирает неоптимальный порядок (например, из-за отсутствия статистики), вы можете повлиять на порядок через порядок таблиц в запросе или предоставить статистику через TableProvider::statistics().
Декорреляция подзапросов
Коррелированные подзапросы — источник неэффективности: по определению они выполняются для каждой строки внешнего запроса. DataFusion переписывает их в JOIN.
EXISTS → Semi-Join
-- Коррелированный подзапрос
SELECT c.name
FROM customers c
WHERE EXISTS (
SELECT 1 FROM orders o WHERE o.customer_id = c.id AND o.amount > 1000
);
DataFusion декоррелирует в:
LeftSemiJoin: c.id = o.customer_id
TableScan: customers
Filter: o.amount > 1000
TableScan: orders
Semi-join возвращает строку из customers, если хотя бы одно совпадение найдено в orders. В отличие от коррелированного подзапроса, это один проход по обеим таблицам.
IN → Semi-Join
SELECT * FROM products
WHERE category_id IN (SELECT id FROM categories WHERE active = true);
Переписывается аналогично EXISTS в LeftSemiJoin с фильтром active = true на правой стороне.
Скалярный подзапрос → Left Join
SELECT o.id,
(SELECT c.name FROM customers c WHERE c.id = o.customer_id) AS customer_name
FROM orders o;
Декоррелируется в:
Projection: o.id, c.name AS customer_name
LeftJoin: o.customer_id = c.id
TableScan: orders
TableScan: customers
Декорреляция работает для подзапросов с equi-предикатами на колонки внешнего запроса. Подзапросы со сложными выражениями (функции, арифметика) или без корреляции обрабатываются другими правилами.
TopK-оптимизация
Комбинация ORDER BY ... LIMIT N — частый паттерн. DataFusion оптимизирует его через TopK:
SELECT * FROM events ORDER BY timestamp DESC LIMIT 10;
Без оптимизации: отсортировать все строки, взять первые 10.
С TopK: поддерживать heap из 10 элементов, обновлять при нахождении большего значения. Сложность O(N log K) вместо O(N log N), и главное — не нужно хранить весь набор данных в памяти.
GlobalLimitExec: skip=0, fetch=10
SortExec: expr=[timestamp@0 DESC], fetch=10 -- fetch=10 → TopK
DataSourceExec: ...
Обратите внимание на fetch=10 в SortExec — это признак TopK-оптимизации. DataFusion передаёт лимит вниз в sort operator, который использует bounded heap вместо полной сортировки.
Limit pushdown через JOIN
SELECT o.id
FROM orders o
JOIN customers c ON o.customer_id = c.id
ORDER BY o.created_at DESC
LIMIT 5;
Оптимизатор может спустить LIMIT 5 под join, если сортировка по ключу из левой таблицы. Это ограничивает объём данных до join.
Динамические фильтры
Dynamic filters — оптимизация на этапе выполнения (не планирования). При HashJoin DataFusion может вычислить bloom filter по build side и применить его к probe side, отсекая строки до actual join.
SELECT *
FROM large_fact f
JOIN small_dim d ON f.dim_key = d.id
WHERE d.category = 'electronics';
Без динамического фильтра: сканировать весь large_fact и проверять join condition.
С динамическим фильтром:
- Построить хеш-таблицу для
small_dim(отфильтрованная — только ‘electronics’) - Извлечь множество
d.idзначений → создать bloom filter - Применить bloom filter к
f.dim_keyпри сканированииlarge_fact - Строки, не проходящие bloom filter, отбрасываются до join
Динамические фильтры наиболее эффективны, когда dimension-таблица мала и сильно фильтрована, а fact-таблица велика. Этот паттерн типичен для star-schema моделей в аналитических хранилищах.
В DataFusion 51–52 динамическая фильтрация расширена через механизм sideways information passing: probe side может получать фильтры от build side ещё до начала join, что позволяет DataSourceExec пропускать целые row groups при сканировании Parquet-файлов. Это особенно заметно на больших fact-таблицах с высокой селективностью dimension-фильтра.
Диагностика через EXPLAIN
Для анализа выбора алгоритма используйте EXPLAIN:
EXPLAIN SELECT o.id, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE c.region = 'EU';
На что обращать внимание:
- HashJoinExec vs SortMergeJoinExec vs NestedLoopJoinExec — какой алгоритм выбран
- mode=Partitioned vs CollectLeft — стратегия распределения для HashJoin
- RepartitionExec — перераспределение по ключу join (нормально для Partitioned mode)
- Filter placement — фильтр спущен к scan или остаётся над join
- fetch=N в SortExec — признак TopK-оптимизации
-- EXPLAIN ANALYZE показывает реальные числа
EXPLAIN ANALYZE SELECT ...;
-- output_rows на каждом этапе → эффективность фильтрации
-- elapsed_compute → узкое место
Итоги
- DataFusion выбирает между HashJoin (equi, по умолчанию), SortMergeJoin (данные уже отсортированы) и NestedLoop (non-equi)
- Join reordering минимизирует промежуточные результаты — маленькие таблицы соединяются первыми
- Коррелированные подзапросы (EXISTS, IN, скалярные) декоррелируются в Semi-Join или Left Join
- TopK-оптимизация (
ORDER BY + LIMIT) использует bounded heap вместо полной сортировки - Динамические фильтры сокращают probe side через bloom filter из build side
-
EXPLAINиEXPLAIN ANALYZE— основные инструменты диагностики выбора алгоритма