Learning Platform
Глоссарий Troubleshooting
Урок 03.03 · 16 мин
Средний
JoinBroadcast JoinSort-Merge JoinShuffle Hash JoinCross JoinSemi JoinAnti Join

Joins: глубокое погружение

Join — одна из самых частых и потенциально самых дорогих операций в Spark. Неправильный join может превратить 5-минутный job в 5-часовой (или вызвать OOM). В этом уроке мы разберём все типы join, стратегии их выполнения и дерево решений для выбора оптимальной стратегии.

Типы Join

Подготовим два DataFrame для примеров:

employees = spark.createDataFrame([
    (1, "Alice", 10),
    (2, "Bob", 20),
    (3, "Carol", 10),
    (4, "Dave", 30),
], ["emp_id", "name", "dept_id"])

departments = spark.createDataFrame([
    (10, "Engineering"),
    (20, "Marketing"),
    (40, "HR"),
], ["dept_id", "dept_name"])

Inner Join

Возвращает только строки с совпадением в обеих таблицах:

employees.join(departments, "dept_id", "inner").show()
-- Spark SQL эквивалент
SELECT e.*, d.dept_name
FROM employees e
INNER JOIN departments d ON e.dept_id = d.dept_id
+-------+------+----+-----------+
|dept_id|emp_id|name|  dept_name|
+-------+------+----+-----------+
|     10|     1|Alice|Engineering|
|     10|     3|Carol|Engineering|
|     20|     2| Bob|  Marketing|
+-------+------+----+-----------+

Dave (dept_id=30) и HR (dept_id=40) отсутствуют — для них нет совпадения.

Left Join

Все строки из левой таблицы, совпадения из правой (или NULL):

employees.join(departments, "dept_id", "left").show()
SELECT e.*, d.dept_name
FROM employees e
LEFT JOIN departments d ON e.dept_id = d.dept_id
+-------+------+-----+-----------+
|dept_id|emp_id| name|  dept_name|
+-------+------+-----+-----------+
|     10|     1|Alice|Engineering|
|     10|     3|Carol|Engineering|
|     20|     2|  Bob|  Marketing|
|     30|     4| Dave|       NULL|
+-------+------+-----+-----------+

Right Join

Все строки из правой таблицы, совпадения из левой (или NULL):

employees.join(departments, "dept_id", "right").show()

Full Outer Join

Все строки из обеих таблиц:

employees.join(departments, "dept_id", "outer").show()
+-------+------+-----+-----------+
|dept_id|emp_id| name|  dept_name|
+-------+------+-----+-----------+
|     10|     1|Alice|Engineering|
|     10|     3|Carol|Engineering|
|     20|     2|  Bob|  Marketing|
|     30|     4| Dave|       NULL|
|     40|  NULL| NULL|         HR|
+-------+------+-----+-----------+

Cross Join

Декартово произведение — каждая строка левой таблицы с каждой строкой правой:

employees.crossJoin(departments).show()
# 4 * 3 = 12 строк
SELECT * FROM employees CROSS JOIN departments

Left Semi Join

Возвращает строки из левой таблицы, для которых существует совпадение в правой. Колонки правой таблицы не включаются:

employees.join(departments, "dept_id", "left_semi").show()
+-------+------+-----+
|dept_id|emp_id| name|
+-------+------+-----+
|     10|     1|Alice|
|     10|     3|Carol|
|     20|     2|  Bob|
+-------+------+-----+

Аналог WHERE EXISTS в SQL:

SELECT e.* FROM employees e
WHERE EXISTS (SELECT 1 FROM departments d WHERE d.dept_id = e.dept_id)

Left Anti Join

Противоположность semi join — строки из левой таблицы, для которых нет совпадения:

employees.join(departments, "dept_id", "left_anti").show()
+-------+------+----+
|dept_id|emp_id|name|
+-------+------+----+
|     30|     4|Dave|
+-------+------+----+

Аналог WHERE NOT EXISTS в SQL.

TIP

KnowledgeCheck: В каких случаях left semi join предпочтительнее inner join?

Ответ: Left semi join лучше, когда вам нужны только колонки из левой таблицы и нужно проверить существование совпадения в правой. Semi join (1) не дублирует строки при множественных совпадениях (в отличие от inner join), (2) не передаёт колонки правой таблицы через shuffle, что экономит сеть и память. Типичный сценарий: фильтрация по lookup-таблице.

Стратегии выполнения Join

Spark выбирает физическую стратегию join на основе размера данных и конфигурации. Это решение принимает Catalyst optimizer.

Broadcast Hash Join (BHJ)

Маленькая таблица целиком отправляется (broadcast) на каждый executor:

Driver собирает маленькую таблицу
     |
     v
Broadcast на все executors
     |
     v
Каждый executor делает hash lookup
локально -- без shuffle!

Условия: одна из таблиц меньше spark.sql.autoBroadcastJoinThreshold (по умолчанию 10 МБ).

from pyspark.sql.functions import broadcast

# Явный hint для broadcast
result = big_table.join(broadcast(small_table), "key")

Вывод explain() для Broadcast Hash Join:

== Physical Plan ==
*(2) BroadcastHashJoin [dept_id#0], [dept_id#4], Inner, BuildRight
:- *(2) Scan ExistingRDD[emp_id#0, name#1, dept_id#2]
+- BroadcastExchange HashedRelationBroadcastMode(List(dept_id#4))
   +- *(1) Scan ExistingRDD[dept_id#4, dept_name#5]

Sort-Merge Join (SMJ)

Обе таблицы сортируются по join key, затем выполняется merge:

Таблица A: sort by key -> shuffle -> merge
                                        \
                                         -> output
                                        /
Таблица B: sort by key -> shuffle -> merge

Условия: обе таблицы большие, ни одна не помещается в broadcast. Это стратегия по умолчанию для больших join.

Вывод explain() для Sort-Merge Join:

== Physical Plan ==
*(3) SortMergeJoin [dept_id#0], [dept_id#4], Inner
:- *(1) Sort [dept_id#0 ASC], false, 0
:  +- Exchange hashpartitioning(dept_id#0, 200)
:     +- Scan ExistingRDD[emp_id#0, name#1, dept_id#2]
+- *(2) Sort [dept_id#4 ASC], false, 0
   +- Exchange hashpartitioning(dept_id#4, 200)
      +- Scan ExistingRDD[dept_id#4, dept_name#5]

Обратите внимание на Exchange hashpartitioning — это shuffle. Обе таблицы перераспределяются по join key.

Shuffle Hash Join (SHJ)

Обе таблицы хешируются по join key и перераспределяются. Меньшая сторона строит hash table в памяти:

Условия: одна таблица значительно меньше другой, но больше broadcast threshold. Spark может выбрать SHJ, если spark.sql.join.preferSortMergeJoin=false.

Дерево решений: какую стратегию выберет Spark?

Catalyst optimizer выбирает стратегию в следующем порядке:

  1. Есть broadcast hint? -> Broadcast Hash Join
  2. Одна таблица < 10 МБ (autoBroadcastJoinThreshold)? -> Broadcast Hash Join
  3. Обе таблицы большие, join key сортируемый? -> Sort-Merge Join (по умолчанию)
  4. preferSortMergeJoin=false и одна сторона buildable? -> Shuffle Hash Join
  5. Ничего не подходит (например, full outer без equi-condition)? -> Broadcast Nested Loop Join (крайний случай)
Размер левойРазмер правойСтратегияShuffle?
Любой< 10 МБBroadcast Hash JoinНет
< 10 МБЛюбойBroadcast Hash JoinНет
> 10 МБ> 10 МБSort-Merge JoinДа (обе стороны)
> 10 МБ10-100 МБShuffle Hash Join*Да (обе стороны)

*Shuffle Hash Join выбирается только при preferSortMergeJoin=false.

NOTE

AQE и runtime оптимизация. Adaptive Query Execution (AQE, включён по умолчанию с Spark 3.2) может изменить стратегию join в runtime. Если после shuffle оказывается, что одна сторона значительно меньше ожидаемого, AQE может переключить Sort-Merge на Broadcast Hash Join. Подробнее об AQE — в модуле оптимизации производительности (Phase 65).

Spark4.0

Spark 4.0: ANSI mode и join keys. В Spark 4.0 с ANSI mode по умолчанию, неявное приведение типов в join keys вызывает ошибку. Если employees.dept_id — IntegerType, а departments.dept_id — StringType, join упадёт с ошибкой вместо тихого преобразования. Всегда проверяйте типы join keys через printSchema() перед join.

Анти-паттерны в Joins

1. Случайный Cross Join

# ОПАСНО: отсутствие join condition создаёт cross join
# 1M * 1M = 1 триллион строк!
result = table_a.join(table_b)  # implicit cross join

# ПРАВИЛЬНО: всегда указывайте join condition
result = table_a.join(table_b, "key")

2. Join по неключевому столбцу

# ОПАСНО: join по неуникальному столбцу = data explosion
# 1000 записей с city="Moscow" * 1000 записей с city="Moscow" = 1M строк
result = users.join(orders, "city")

# ПРАВИЛЬНО: join по уникальному ключу
result = users.join(orders, "user_id")

3. Забытый broadcast для маленьких таблиц

# НЕ ОПТИМАЛЬНО: dimension table 5 МБ, но Spark не знает размер
result = fact_table.join(dim_country, "country_id")

# ОПТИМАЛЬНО: явный broadcast для dimension tables
result = fact_table.join(broadcast(dim_country), "country_id")

Broadcast устраняет shuffle полностью: маленькая таблица отправляется на каждый executor один раз. Для dimension tables (страны, валюты, статусы) это всегда правильный выбор.

TIP

KnowledgeCheck: У вас fact-таблица 500 ГБ и dimension-таблица 50 МБ. По умолчанию autoBroadcastJoinThreshold = 10MB. Что произойдёт и как это исправить?

Ответ: Spark выберет Sort-Merge Join, потому что 50 МБ > 10 МБ порога. Обе таблицы будут перераспределены через shuffle. Исправление: (1) увеличить порог: spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100m") или (2) использовать hint: fact_table.join(broadcast(dim_table), "key"). Второй вариант предпочтительнее — он явный и не влияет на другие join в приложении.

Что дальше?

В следующем уроке мы разберём groupBy и агрегации — операции, которые всегда вызывают shuffle и требуют понимания распределения данных для эффективной работы.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 6. Чем left semi join отличается от inner join?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8