Физический план: Стратегии выполнения
От логики к физике: SparkPlanner
После того как Optimizer создал Optimized Logical Plan, SparkPlanner берет на себя ключевую задачу: превратить абстрактный логический план в конкретный физический план с реальными операторами.
Логический план говорит что нужно сделать (join, filter, project). Физический план определяет как это сделать (BroadcastHashJoin, SortMergeJoin, FileScan Parquet).
Стратегии SparkPlanner
SparkPlanner работает через набор стратегий (Strategies) — каждая стратегия умеет преобразовывать определенные логические операторы в физические:
| Стратегия | Что обрабатывает | Результат |
|---|---|---|
JoinSelection | Logical Join | BroadcastHashJoinExec, SortMergeJoinExec, ShuffleHashJoinExec |
BasicOperators | Filter, Project, Sort | FilterExec, ProjectExec, SortExec |
FileSourceStrategy | Scan таблицы | FileScan parquet/orc/csv |
Aggregation | Aggregate | HashAggregateExec, SortAggregateExec |
InMemoryScans | Кэшированные данные | InMemoryTableScanExec |
Join-стратегии: ключевой выбор
Выбор join-стратегии — самое важное решение SparkPlanner. От него зависит, будет ли запрос выполняться секунды или часы.
BroadcastHashJoin (BHJ)
Когда: Одна из таблиц достаточно мала, чтобы поместиться в память каждого executor.
Как работает:
- Маленькая таблица целиком отправляется (broadcast) на все executors
- На каждом executor строится hash-таблица из маленькой таблицы
- Большая таблица сканируется, каждая строка ищется в hash-таблице
Порог: spark.sql.autoBroadcastJoinThreshold (по умолчанию 10 MB)
# Увеличить порог broadcast join до 50MB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024)
# Явный broadcast hint
from pyspark.sql.functions import broadcast
result = big_df.join(broadcast(small_df), "key")
Преимущество: Нет shuffle — данные не перемещаются между executors. Это критически важно для производительности.
SortMergeJoin (SMJ)
Когда: Обе таблицы большие, ни одна не помещается в broadcast.
Как работает:
- Обе таблицы партиционируются (shuffle) по join-ключу
- В каждой партиции данные сортируются по join-ключу
- Отсортированные партиции сливаются (merge) — однопроходный алгоритм
Стоимость: Требует shuffle обеих таблиц + сортировку. Это самая “дорогая” стратегия, но единственная, которая работает для любых размеров данных.
# Принудительно выбрать SortMergeJoin
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
ShuffleHashJoin
Когда: Одна таблица значительно меньше другой, но превышает broadcast-порог.
Как работает:
- Обе таблицы партиционируются (shuffle) по join-ключу
- В каждой партиции строится hash-таблица из меньшей стороны
- Большая сторона сканируется через hash-таблицу
Используется реже, так как SortMergeJoin обычно предпочтительнее (лучше работает с disk spill).
CartesianProduct — анти-паттерн
Когда: Join без условия (cross join).
-- ОПАСНО: CartesianProduct
SELECT * FROM employees, departments
-- Размер результата: rows(employees) * rows(departments)
-- 1M * 1K = 1 BILLION строк!
CartesianProduct генерирует декартово произведение — каждая строка одной таблицы соединяется с каждой строкой другой. Для таблиц 1M x 1K строк это 1 миллиард строк. Если вы видите CartesianProduct в explain() — это почти всегда ошибка: пропущено условие join. Spark требует явный spark.sql.crossJoin.enabled=true для CartesianProduct.
Чтение Physical Plan из explain()
Продолжим наш сквозной пример:
result = spark.sql("""
SELECT name, dept_name
FROM employees e
JOIN departments d ON e.dept_id = d.dept_id
WHERE e.age > 30
""")
result.explain() # Короткая версия -- только Physical Plan
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [name#5, dept_name#12]
+- BroadcastHashJoin [dept_id#7], [dept_id#11], Inner, BuildRight, false
:- Filter (isnotnull(dept_id#7) AND (age#6 > 30))
: +- LocalTableScan [name#5, age#6, dept_id#7]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=42]
+- LocalTableScan [dept_id#11, dept_name#12]
Разберем ключевые элементы:
- AdaptiveSparkPlan — обертка Adaptive Query Execution (AQE), которая может изменить план в runtime Spark3.5+
- BroadcastHashJoin — Spark выбрал broadcast join, потому что
departmentsмаленькая таблица - BuildRight — hash-таблица строится из правой стороны join (departments)
- BroadcastExchange — физическая операция отправки таблицы на все executors
- Filter — фильтр
age > 30уже сдвинут под join (predicate pushdown из Optimizer) - LocalTableScan — чтение локальных данных (в нашем примере данные в памяти)
Как определить стратегию join?
В explain() ищите ключевые слова:
| Оператор в explain() | Стратегия | Shuffle? |
|---|---|---|
BroadcastHashJoin | Broadcast | Нет |
SortMergeJoin | Sort-Merge | Да (обе стороны) |
ShuffledHashJoin | Shuffle-Hash | Да (обе стороны) |
CartesianProduct | Cartesian | Нет (но O(n*m)) |
BroadcastNestedLoopJoin | Broadcast + nested loop | Нет |
Формат explain(): подробный разбор
# Расширенный формат (Spark 3.0+)
result.explain("formatted")
explain("formatted") показывает план в табличном виде с нумерацией операторов:
== Physical Plan ==
* Project (7)
+- * BroadcastHashJoin Inner BuildRight (6)
:- * Filter (3)
: +- * LocalTableScan (1)
+- BroadcastExchange (5)
+- * LocalTableScan (4)
Звёздочки (*) перед операторами означают Whole-Stage CodeGen — эти операторы объединены в один Java-метод (подробнее в уроке 06).
Множественные физические планы
SparkPlanner может сгенерировать несколько физических планов для одного логического плана. Например, для join:
- Вариант 1: BroadcastHashJoin (если одна сторона < 10MB)
- Вариант 2: SortMergeJoin (для любых размеров)
Spark выбирает вариант на основе стоимости (cost model): учитываются размеры данных, количество партиций, наличие shuffle.
Spark3.5+ С Adaptive Query Execution (AQE) Spark может изменить стратегию в runtime: если после shuffle оказалось, что одна сторона join маленькая, AQE переключает SortMergeJoin на BroadcastHashJoin. Мы подробно разберем AQE в модуле Performance Tuning.