Broadcast join и хинты оптимизатору
Broadcast join — самый быстрый тип join в Spark. Вместо shuffle обеих таблиц, маленькая таблица целиком копируется на каждый executor. Это устраняет shuffle полностью, но работает только для маленьких таблиц.
Как работает Broadcast Join
dimensions
10 MB
facts
100 GB
Executor 0
Executor 1
Executor 2
Executor 3
Broadcast: 30 MB по сети = мгновенно. Facts остаются на месте.
При Broadcast Join:
- Driver собирает маленькую таблицу целиком (или executor вычисляет её)
- Таблица отправляется через broadcast variable на все executors
- Каждый executor выполняет hash lookup локально — без сетевого обмена
from pyspark.sql.functions import broadcast
# Явный broadcast hint
result = fact_table.join(broadcast(dim_country), "country_id")
result.explain()
== Physical Plan ==
*(2) BroadcastHashJoin [country_id], [country_id], Inner, BuildRight
:- *(2) FileScan parquet [fact_table]
+- BroadcastExchange HashedRelationBroadcastMode(List(country_id))
+- *(1) FileScan parquet [dim_country]
Обратите внимание: BroadcastExchange вместо обычного Exchange. Маленькая таблица (dim_country) broadcastится, большая (fact_table) остаётся на месте. Никакого hashpartitioning shuffle.
autoBroadcastJoinThreshold
Spark автоматически выбирает Broadcast Join, если одна из таблиц меньше порога:
# По умолчанию: 10 МБ
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
# '10485760' (10 * 1024 * 1024 байт)
# Увеличить порог для больших dimension tables
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100m") # 100 МБ
# Отключить автоматический broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
Почему 10 МБ по умолчанию? Broadcast отправляет таблицу на каждый executor. При 100 executors, broadcast 100 МБ таблицы = 10 ГБ сетевого трафика. Для 10 МБ — всего 1 ГБ. Порог 10 МБ балансирует между устранением shuffle и стоимостью broadcast.
Before (SortMergeJoin, dimension table 50 МБ, порог 10 МБ):
== Physical Plan ==
*(3) SortMergeJoin [country_id], [country_id]
:- *(1) Sort [country_id ASC]
: +- Exchange hashpartitioning(country_id, 200) ← shuffle 500 ГБ
: +- *(1) FileScan parquet [fact_table]
+- *(2) Sort [country_id ASC]
+- Exchange hashpartitioning(country_id, 200) ← shuffle 50 МБ
+- *(2) FileScan parquet [dim_country]
After (BroadcastHashJoin, порог увеличен до 100 МБ):
== Physical Plan ==
*(2) BroadcastHashJoin [country_id], [country_id]
:- *(2) FileScan parquet [fact_table] ← без shuffle!
+- BroadcastExchange
+- *(1) FileScan parquet [dim_country] ← broadcast 50 МБ
Разница: вместо shuffle 500 ГБ + 50 МБ — broadcast 50 МБ. В 10 000 раз меньше данных по сети.
Broadcast vs Shuffle: правило выбора. Broadcast join выгоден, когда одна таблица помещается в память каждого executor. Если executor имеет 4 ГБ heap и broadcast таблица 2 ГБ — это рискованно (50% памяти). Безопасный лимит — broadcast таблица не превышает 20-30% памяти executor.
Когда НЕ broadcast
1. Большая таблица
# ОПАСНО: broadcast 5 ГБ таблицы на 100 executors
result = table_a.join(broadcast(table_b_5gb), "key")
# 5 ГБ * 100 executors = 500 ГБ в сети + 5 ГБ на каждом executor в памяти
# Вероятен OOM
2. Broadcast timeout
# По умолчанию Spark ждёт broadcast 300 секунд
spark.conf.get("spark.sql.broadcastTimeout") # '300'
# Если таблица не собралась за 5 минут -- job падает с ошибкой
# SparkException: Could not execute broadcast in 300 seconds
3. Обе таблицы большие
Если обе таблицы > 100 МБ, SortMergeJoin обычно оптимальнее. Broadcast обеих таблиц — бессмысленно.
Другие Optimizer Hints
Spark поддерживает несколько типов hints для управления физическим планом:
BROADCAST / BROADCASTJOIN / MAPJOIN
# Python API
result = big.join(broadcast(small), "key")
# SQL hint (все три -- синонимы)
spark.sql("""
SELECT /*+ BROADCAST(dim) */ *
FROM fact f
JOIN dim d ON f.key = d.key
""")
SHUFFLE_MERGE / MERGE / MERGEJOIN
Принудительный SortMergeJoin (даже если таблица маленькая):
spark.sql("""
SELECT /*+ MERGE(orders) */ *
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
""")
SHUFFLE_HASH
Принудительный Shuffle Hash Join:
spark.sql("""
SELECT /*+ SHUFFLE_HASH(small_table) */ *
FROM big_table b
JOIN small_table s ON b.key = s.key
""")
Shuffle Hash Join быстрее SortMergeJoin, когда одна сторона значительно меньше, но больше broadcast threshold. Не требует сортировки.
COALESCE и REPARTITION hints
# В SQL запросах
spark.sql("""
SELECT /*+ COALESCE(5) */ * FROM big_table
""")
spark.sql("""
SELECT /*+ REPARTITION(100, key) */ * FROM big_table
""")
Приоритет hints
Если указано несколько conflicting hints, Spark применяет в порядке:
- BROADCAST (наивысший приоритет)
- SHUFFLE_MERGE
- SHUFFLE_HASH
- Автоматический выбор (по autoBroadcastJoinThreshold и preferSortMergeJoin)
# BROADCAST победит, потому что у него наивысший приоритет
spark.sql("""
SELECT /*+ BROADCAST(a), MERGE(a) */ *
FROM table_a a JOIN table_b b ON a.key = b.key
""")
# Результат: BroadcastHashJoin
Hints — это советы, не команды. Spark может проигнорировать hint, если он невыполним. Например, broadcast hint для таблицы размером 50 ГБ будет проигнорирован, если spark.sql.autoBroadcastJoinThreshold = -1 (broadcast отключён). Всегда проверяйте explain() после hint, чтобы убедиться, что Spark применил его.
BroadcastHashJoin vs BroadcastNestedLoopJoin
Spark использует два варианта broadcast:
| BroadcastHashJoin | BroadcastNestedLoopJoin | |
|---|---|---|
| Условие join | Equi-join (a.key = b.key) | Non-equi (a.val > b.threshold) |
| Механика | Hash table lookup (O(1)) | Nested loop (O(N*M)) |
| Производительность | Быстрый | Медленный для больших таблиц |
| В explain() | BroadcastHashJoin | BroadcastNestedLoopJoin |
# Equi-join -> BroadcastHashJoin
result = big.join(broadcast(small), big.key == small.key)
# Non-equi join -> BroadcastNestedLoopJoin
result = big.join(broadcast(small), big.val > small.threshold)
Как broadcast-переменная расходится по кластеру изнутри — разбор на уровне исходников в senior-курсе:
Spark Internals: TorrentBroadcastЧто дальше?
В следующем уроке разберём Data Skew — ситуацию, когда данные распределены неравномерно и один executor обрабатывает в 10-100 раз больше данных, чем остальные. Вы узнаете, как диагностировать skew через Spark UI и как salting и AQE решают эту проблему.