Learning Platform
Глоссарий Troubleshooting
Урок 07.04 · 18 мин
Продвинутый
Physical PlanSparkPlannerBroadcastHashJoinSortMergeJoinStrategy

Физический план: Стратегии выполнения

От логики к физике: SparkPlanner

После того как Optimizer создал Optimized Logical Plan, SparkPlanner берет на себя ключевую задачу: превратить абстрактный логический план в конкретный физический план с реальными операторами.

Логический план говорит что нужно сделать (join, filter, project). Физический план определяет как это сделать (BroadcastHashJoin, SortMergeJoin, FileScan Parquet).

Стратегии SparkPlanner

SparkPlanner работает через набор стратегий (Strategies) — каждая стратегия умеет преобразовывать определенные логические операторы в физические:

СтратегияЧто обрабатываетРезультат
JoinSelectionLogical JoinBroadcastHashJoinExec, SortMergeJoinExec, ShuffleHashJoinExec
BasicOperatorsFilter, Project, SortFilterExec, ProjectExec, SortExec
FileSourceStrategyScan таблицыFileScan parquet/orc/csv
AggregationAggregateHashAggregateExec, SortAggregateExec
InMemoryScansКэшированные данныеInMemoryTableScanExec

Join-стратегии: ключевой выбор

Выбор join-стратегии — самое важное решение SparkPlanner. От него зависит, будет ли запрос выполняться секунды или часы.

BroadcastHashJoin (BHJ)

Когда: Одна из таблиц достаточно мала, чтобы поместиться в память каждого executor.

Как работает:

  1. Маленькая таблица целиком отправляется (broadcast) на все executors
  2. На каждом executor строится hash-таблица из маленькой таблицы
  3. Большая таблица сканируется, каждая строка ищется в hash-таблице

Порог: spark.sql.autoBroadcastJoinThreshold (по умолчанию 10 MB)

# Увеличить порог broadcast join до 50MB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024)

# Явный broadcast hint
from pyspark.sql.functions import broadcast
result = big_df.join(broadcast(small_df), "key")

Преимущество: Нет shuffle — данные не перемещаются между executors. Это критически важно для производительности.

SortMergeJoin (SMJ)

Когда: Обе таблицы большие, ни одна не помещается в broadcast.

Как работает:

  1. Обе таблицы партиционируются (shuffle) по join-ключу
  2. В каждой партиции данные сортируются по join-ключу
  3. Отсортированные партиции сливаются (merge) — однопроходный алгоритм

Стоимость: Требует shuffle обеих таблиц + сортировку. Это самая “дорогая” стратегия, но единственная, которая работает для любых размеров данных.

# Принудительно выбрать SortMergeJoin
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

ShuffleHashJoin

Когда: Одна таблица значительно меньше другой, но превышает broadcast-порог.

Как работает:

  1. Обе таблицы партиционируются (shuffle) по join-ключу
  2. В каждой партиции строится hash-таблица из меньшей стороны
  3. Большая сторона сканируется через hash-таблицу

Используется реже, так как SortMergeJoin обычно предпочтительнее (лучше работает с disk spill).

CartesianProduct — анти-паттерн

Когда: Join без условия (cross join).

-- ОПАСНО: CartesianProduct
SELECT * FROM employees, departments

-- Размер результата: rows(employees) * rows(departments)
-- 1M * 1K = 1 BILLION строк!
DANGER

CartesianProduct генерирует декартово произведение — каждая строка одной таблицы соединяется с каждой строкой другой. Для таблиц 1M x 1K строк это 1 миллиард строк. Если вы видите CartesianProduct в explain() — это почти всегда ошибка: пропущено условие join. Spark требует явный spark.sql.crossJoin.enabled=true для CartesianProduct.

Чтение Physical Plan из explain()

Продолжим наш сквозной пример:

result = spark.sql("""
    SELECT name, dept_name
    FROM employees e
    JOIN departments d ON e.dept_id = d.dept_id
    WHERE e.age > 30
""")

result.explain()  # Короткая версия -- только Physical Plan
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [name#5, dept_name#12]
   +- BroadcastHashJoin [dept_id#7], [dept_id#11], Inner, BuildRight, false
      :- Filter (isnotnull(dept_id#7) AND (age#6 > 30))
      :  +- LocalTableScan [name#5, age#6, dept_id#7]
      +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=42]
         +- LocalTableScan [dept_id#11, dept_name#12]

Разберем ключевые элементы:

  • AdaptiveSparkPlan — обертка Adaptive Query Execution (AQE), которая может изменить план в runtime Spark3.5+
  • BroadcastHashJoin — Spark выбрал broadcast join, потому что departments маленькая таблица
  • BuildRight — hash-таблица строится из правой стороны join (departments)
  • BroadcastExchange — физическая операция отправки таблицы на все executors
  • Filter — фильтр age > 30 уже сдвинут под join (predicate pushdown из Optimizer)
  • LocalTableScan — чтение локальных данных (в нашем примере данные в памяти)

Как определить стратегию join?

В explain() ищите ключевые слова:

Оператор в explain()СтратегияShuffle?
BroadcastHashJoinBroadcastНет
SortMergeJoinSort-MergeДа (обе стороны)
ShuffledHashJoinShuffle-HashДа (обе стороны)
CartesianProductCartesianНет (но O(n*m))
BroadcastNestedLoopJoinBroadcast + nested loopНет

Формат explain(): подробный разбор

# Расширенный формат (Spark 3.0+)
result.explain("formatted")

explain("formatted") показывает план в табличном виде с нумерацией операторов:

== Physical Plan ==
* Project (7)
+- * BroadcastHashJoin Inner BuildRight (6)
   :- * Filter (3)
   :  +- * LocalTableScan (1)
   +- BroadcastExchange (5)
      +- * LocalTableScan (4)

Звёздочки (*) перед операторами означают Whole-Stage CodeGen — эти операторы объединены в один Java-метод (подробнее в уроке 06).

Множественные физические планы

SparkPlanner может сгенерировать несколько физических планов для одного логического плана. Например, для join:

  • Вариант 1: BroadcastHashJoin (если одна сторона < 10MB)
  • Вариант 2: SortMergeJoin (для любых размеров)

Spark выбирает вариант на основе стоимости (cost model): учитываются размеры данных, количество партиций, наличие shuffle.

Spark3.5+ С Adaptive Query Execution (AQE) Spark может изменить стратегию в runtime: если после shuffle оказалось, что одна сторона join маленькая, AQE переключает SortMergeJoin на BroadcastHashJoin. Мы подробно разберем AQE в модуле Performance Tuning.

Проверка знанийKnowledge check
Когда Spark выбирает BroadcastHashJoin вместо SortMergeJoin? Какой порог по умолчанию?
ОтветAnswer
Spark выбирает BroadcastHashJoin, когда одна из сторон join меньше spark.sql.autoBroadcastJoinThreshold (по умолчанию 10 MB). В этом случае маленькая таблица целиком отправляется (broadcast) на все executors, и join выполняется без shuffle. Если обе таблицы превышают порог, Spark использует SortMergeJoin — обе таблицы партиционируются по join-ключу через shuffle, сортируются и сливаются. Порог можно изменить через spark.conf.set() или обойти через broadcast() hint.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Таблица orders (500 ГБ) joinится с таблицей products (8 МБ). spark.sql.autoBroadcastJoinThreshold = 10MB (по умолчанию). Какую join-стратегию выберет SparkPlanner?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6