Joins: глубокое погружение
Join — одна из самых частых и потенциально самых дорогих операций в Spark. Неправильный join может превратить 5-минутный job в 5-часовой (или вызвать OOM). В этом уроке мы разберём все типы join, стратегии их выполнения и дерево решений для выбора оптимальной стратегии.
Типы Join
Подготовим два DataFrame для примеров:
employees = spark.createDataFrame([
(1, "Alice", 10),
(2, "Bob", 20),
(3, "Carol", 10),
(4, "Dave", 30),
], ["emp_id", "name", "dept_id"])
departments = spark.createDataFrame([
(10, "Engineering"),
(20, "Marketing"),
(40, "HR"),
], ["dept_id", "dept_name"])
Inner Join
Возвращает только строки с совпадением в обеих таблицах:
employees.join(departments, "dept_id", "inner").show()
-- Spark SQL эквивалент
SELECT e.*, d.dept_name
FROM employees e
INNER JOIN departments d ON e.dept_id = d.dept_id
+-------+------+----+-----------+
|dept_id|emp_id|name| dept_name|
+-------+------+----+-----------+
| 10| 1|Alice|Engineering|
| 10| 3|Carol|Engineering|
| 20| 2| Bob| Marketing|
+-------+------+----+-----------+
Dave (dept_id=30) и HR (dept_id=40) отсутствуют — для них нет совпадения.
Left Join
Все строки из левой таблицы, совпадения из правой (или NULL):
employees.join(departments, "dept_id", "left").show()
SELECT e.*, d.dept_name
FROM employees e
LEFT JOIN departments d ON e.dept_id = d.dept_id
+-------+------+-----+-----------+
|dept_id|emp_id| name| dept_name|
+-------+------+-----+-----------+
| 10| 1|Alice|Engineering|
| 10| 3|Carol|Engineering|
| 20| 2| Bob| Marketing|
| 30| 4| Dave| NULL|
+-------+------+-----+-----------+
Right Join
Все строки из правой таблицы, совпадения из левой (или NULL):
employees.join(departments, "dept_id", "right").show()
Full Outer Join
Все строки из обеих таблиц:
employees.join(departments, "dept_id", "outer").show()
+-------+------+-----+-----------+
|dept_id|emp_id| name| dept_name|
+-------+------+-----+-----------+
| 10| 1|Alice|Engineering|
| 10| 3|Carol|Engineering|
| 20| 2| Bob| Marketing|
| 30| 4| Dave| NULL|
| 40| NULL| NULL| HR|
+-------+------+-----+-----------+
Cross Join
Декартово произведение — каждая строка левой таблицы с каждой строкой правой:
employees.crossJoin(departments).show()
# 4 * 3 = 12 строк
SELECT * FROM employees CROSS JOIN departments
Left Semi Join
Возвращает строки из левой таблицы, для которых существует совпадение в правой. Колонки правой таблицы не включаются:
employees.join(departments, "dept_id", "left_semi").show()
+-------+------+-----+
|dept_id|emp_id| name|
+-------+------+-----+
| 10| 1|Alice|
| 10| 3|Carol|
| 20| 2| Bob|
+-------+------+-----+
Аналог WHERE EXISTS в SQL:
SELECT e.* FROM employees e
WHERE EXISTS (SELECT 1 FROM departments d WHERE d.dept_id = e.dept_id)
Left Anti Join
Противоположность semi join — строки из левой таблицы, для которых нет совпадения:
employees.join(departments, "dept_id", "left_anti").show()
+-------+------+----+
|dept_id|emp_id|name|
+-------+------+----+
| 30| 4|Dave|
+-------+------+----+
Аналог WHERE NOT EXISTS в SQL.
KnowledgeCheck: В каких случаях left semi join предпочтительнее inner join?
Ответ: Left semi join лучше, когда вам нужны только колонки из левой таблицы и нужно проверить существование совпадения в правой. Semi join (1) не дублирует строки при множественных совпадениях (в отличие от inner join), (2) не передаёт колонки правой таблицы через shuffle, что экономит сеть и память. Типичный сценарий: фильтрация по lookup-таблице.
Стратегии выполнения Join
Spark выбирает физическую стратегию join на основе размера данных и конфигурации. Это решение принимает Catalyst optimizer.
Broadcast Hash Join (BHJ)
Маленькая таблица целиком отправляется (broadcast) на каждый executor:
Driver собирает маленькую таблицу
|
v
Broadcast на все executors
|
v
Каждый executor делает hash lookup
локально -- без shuffle!
Условия: одна из таблиц меньше spark.sql.autoBroadcastJoinThreshold (по умолчанию 10 МБ).
from pyspark.sql.functions import broadcast
# Явный hint для broadcast
result = big_table.join(broadcast(small_table), "key")
Вывод explain() для Broadcast Hash Join:
== Physical Plan ==
*(2) BroadcastHashJoin [dept_id#0], [dept_id#4], Inner, BuildRight
:- *(2) Scan ExistingRDD[emp_id#0, name#1, dept_id#2]
+- BroadcastExchange HashedRelationBroadcastMode(List(dept_id#4))
+- *(1) Scan ExistingRDD[dept_id#4, dept_name#5]
Sort-Merge Join (SMJ)
Обе таблицы сортируются по join key, затем выполняется merge:
Таблица A: sort by key -> shuffle -> merge
\
-> output
/
Таблица B: sort by key -> shuffle -> merge
Условия: обе таблицы большие, ни одна не помещается в broadcast. Это стратегия по умолчанию для больших join.
Вывод explain() для Sort-Merge Join:
== Physical Plan ==
*(3) SortMergeJoin [dept_id#0], [dept_id#4], Inner
:- *(1) Sort [dept_id#0 ASC], false, 0
: +- Exchange hashpartitioning(dept_id#0, 200)
: +- Scan ExistingRDD[emp_id#0, name#1, dept_id#2]
+- *(2) Sort [dept_id#4 ASC], false, 0
+- Exchange hashpartitioning(dept_id#4, 200)
+- Scan ExistingRDD[dept_id#4, dept_name#5]
Обратите внимание на Exchange hashpartitioning — это shuffle. Обе таблицы перераспределяются по join key.
Shuffle Hash Join (SHJ)
Обе таблицы хешируются по join key и перераспределяются. Меньшая сторона строит hash table в памяти:
Условия: одна таблица значительно меньше другой, но больше broadcast threshold. Spark может выбрать SHJ, если spark.sql.join.preferSortMergeJoin=false.
Дерево решений: какую стратегию выберет Spark?
Catalyst optimizer выбирает стратегию в следующем порядке:
- Есть broadcast hint? -> Broadcast Hash Join
- Одна таблица < 10 МБ (autoBroadcastJoinThreshold)? -> Broadcast Hash Join
- Обе таблицы большие, join key сортируемый? -> Sort-Merge Join (по умолчанию)
- preferSortMergeJoin=false и одна сторона buildable? -> Shuffle Hash Join
- Ничего не подходит (например, full outer без equi-condition)? -> Broadcast Nested Loop Join (крайний случай)
| Размер левой | Размер правой | Стратегия | Shuffle? |
|---|---|---|---|
| Любой | < 10 МБ | Broadcast Hash Join | Нет |
| < 10 МБ | Любой | Broadcast Hash Join | Нет |
| > 10 МБ | > 10 МБ | Sort-Merge Join | Да (обе стороны) |
| > 10 МБ | 10-100 МБ | Shuffle Hash Join* | Да (обе стороны) |
*Shuffle Hash Join выбирается только при preferSortMergeJoin=false.
AQE и runtime оптимизация. Adaptive Query Execution (AQE, включён по умолчанию с Spark 3.2) может изменить стратегию join в runtime. Если после shuffle оказывается, что одна сторона значительно меньше ожидаемого, AQE может переключить Sort-Merge на Broadcast Hash Join. Подробнее об AQE — в модуле оптимизации производительности (Phase 65).
Spark 4.0: ANSI mode и join keys. В Spark 4.0 с ANSI mode по умолчанию, неявное приведение типов в join keys вызывает ошибку. Если employees.dept_id — IntegerType, а departments.dept_id — StringType, join упадёт с ошибкой вместо тихого преобразования. Всегда проверяйте типы join keys через printSchema() перед join.
Анти-паттерны в Joins
1. Случайный Cross Join
# ОПАСНО: отсутствие join condition создаёт cross join
# 1M * 1M = 1 триллион строк!
result = table_a.join(table_b) # implicit cross join
# ПРАВИЛЬНО: всегда указывайте join condition
result = table_a.join(table_b, "key")
2. Join по неключевому столбцу
# ОПАСНО: join по неуникальному столбцу = data explosion
# 1000 записей с city="Moscow" * 1000 записей с city="Moscow" = 1M строк
result = users.join(orders, "city")
# ПРАВИЛЬНО: join по уникальному ключу
result = users.join(orders, "user_id")
3. Забытый broadcast для маленьких таблиц
# НЕ ОПТИМАЛЬНО: dimension table 5 МБ, но Spark не знает размер
result = fact_table.join(dim_country, "country_id")
# ОПТИМАЛЬНО: явный broadcast для dimension tables
result = fact_table.join(broadcast(dim_country), "country_id")
Broadcast устраняет shuffle полностью: маленькая таблица отправляется на каждый executor один раз. Для dimension tables (страны, валюты, статусы) это всегда правильный выбор.
KnowledgeCheck: У вас fact-таблица 500 ГБ и dimension-таблица 50 МБ. По умолчанию autoBroadcastJoinThreshold = 10MB. Что произойдёт и как это исправить?
Ответ: Spark выберет Sort-Merge Join, потому что 50 МБ > 10 МБ порога. Обе таблицы будут перераспределены через shuffle. Исправление: (1) увеличить порог: spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100m") или (2) использовать hint: fact_table.join(broadcast(dim_table), "key"). Второй вариант предпочтительнее — он явный и не влияет на другие join в приложении.
Что дальше?
В следующем уроке мы разберём groupBy и агрегации — операции, которые всегда вызывают shuffle и требуют понимания распределения данных для эффективной работы.