Learning Platform
Глоссарий Troubleshooting
Урок 04.04 · 12 мин
Средний
Broadcast JoinBroadcastHashJoinspark.sql.autoBroadcastJoinThresholdOptimizer Hints

Broadcast join и хинты оптимизатору

Broadcast join — самый быстрый тип join в Spark. Вместо shuffle обеих таблиц, маленькая таблица целиком копируется на каждый executor. Это устраняет shuffle полностью, но работает только для маленьких таблиц.

Как работает Broadcast Join

Broadcast Join vs Shuffle Join
spark.sql.autoBroadcastJoinThreshold = 10 MB

dimensions

10 MB

facts

100 GB

dimensions broadcast на все executors

Executor 0

facts: 25 GB
in-place
10 MB
broadcast
Hash lookup (local)

Executor 1

facts: 25 GB
in-place
10 MB
broadcast
Hash lookup (local)

Executor 2

facts: 25 GB
in-place
10 MB
broadcast
Hash lookup (local)

Executor 3

facts: 25 GB
in-place
10 MB
broadcast
Hash lookup (local)
Network I/O30 MB
Механика10 MB x 3 copies
ShuffleНет
Physical PlanBroadcastHashJoin

Broadcast: 30 MB по сети = мгновенно. Facts остаются на месте.

При Broadcast Join:

  1. Driver собирает маленькую таблицу целиком (или executor вычисляет её)
  2. Таблица отправляется через broadcast variable на все executors
  3. Каждый executor выполняет hash lookup локально — без сетевого обмена
from pyspark.sql.functions import broadcast

# Явный broadcast hint
result = fact_table.join(broadcast(dim_country), "country_id")
result.explain()
== Physical Plan ==
*(2) BroadcastHashJoin [country_id], [country_id], Inner, BuildRight
:- *(2) FileScan parquet [fact_table]
+- BroadcastExchange HashedRelationBroadcastMode(List(country_id))
   +- *(1) FileScan parquet [dim_country]

Обратите внимание: BroadcastExchange вместо обычного Exchange. Маленькая таблица (dim_country) broadcastится, большая (fact_table) остаётся на месте. Никакого hashpartitioning shuffle.

autoBroadcastJoinThreshold

Spark автоматически выбирает Broadcast Join, если одна из таблиц меньше порога:

# По умолчанию: 10 МБ
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
# '10485760' (10 * 1024 * 1024 байт)

# Увеличить порог для больших dimension tables
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100m")  # 100 МБ

# Отключить автоматический broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

Почему 10 МБ по умолчанию? Broadcast отправляет таблицу на каждый executor. При 100 executors, broadcast 100 МБ таблицы = 10 ГБ сетевого трафика. Для 10 МБ — всего 1 ГБ. Порог 10 МБ балансирует между устранением shuffle и стоимостью broadcast.

Before (SortMergeJoin, dimension table 50 МБ, порог 10 МБ):

== Physical Plan ==
*(3) SortMergeJoin [country_id], [country_id]
:- *(1) Sort [country_id ASC]
:  +- Exchange hashpartitioning(country_id, 200)  ← shuffle 500 ГБ
:     +- *(1) FileScan parquet [fact_table]
+- *(2) Sort [country_id ASC]
   +- Exchange hashpartitioning(country_id, 200)  ← shuffle 50 МБ
      +- *(2) FileScan parquet [dim_country]

After (BroadcastHashJoin, порог увеличен до 100 МБ):

== Physical Plan ==
*(2) BroadcastHashJoin [country_id], [country_id]
:- *(2) FileScan parquet [fact_table]           ← без shuffle!
+- BroadcastExchange
   +- *(1) FileScan parquet [dim_country]       ← broadcast 50 МБ

Разница: вместо shuffle 500 ГБ + 50 МБ — broadcast 50 МБ. В 10 000 раз меньше данных по сети.

TIP

Broadcast vs Shuffle: правило выбора. Broadcast join выгоден, когда одна таблица помещается в память каждого executor. Если executor имеет 4 ГБ heap и broadcast таблица 2 ГБ — это рискованно (50% памяти). Безопасный лимит — broadcast таблица не превышает 20-30% памяти executor.

Когда НЕ broadcast

1. Большая таблица

# ОПАСНО: broadcast 5 ГБ таблицы на 100 executors
result = table_a.join(broadcast(table_b_5gb), "key")
# 5 ГБ * 100 executors = 500 ГБ в сети + 5 ГБ на каждом executor в памяти
# Вероятен OOM

2. Broadcast timeout

# По умолчанию Spark ждёт broadcast 300 секунд
spark.conf.get("spark.sql.broadcastTimeout")  # '300'

# Если таблица не собралась за 5 минут -- job падает с ошибкой
# SparkException: Could not execute broadcast in 300 seconds

3. Обе таблицы большие

Если обе таблицы > 100 МБ, SortMergeJoin обычно оптимальнее. Broadcast обеих таблиц — бессмысленно.

Другие Optimizer Hints

Spark поддерживает несколько типов hints для управления физическим планом:

BROADCAST / BROADCASTJOIN / MAPJOIN

# Python API
result = big.join(broadcast(small), "key")

# SQL hint (все три -- синонимы)
spark.sql("""
    SELECT /*+ BROADCAST(dim) */ *
    FROM fact f
    JOIN dim d ON f.key = d.key
""")

SHUFFLE_MERGE / MERGE / MERGEJOIN

Принудительный SortMergeJoin (даже если таблица маленькая):

spark.sql("""
    SELECT /*+ MERGE(orders) */ *
    FROM orders o
    JOIN customers c ON o.customer_id = c.customer_id
""")

SHUFFLE_HASH

Принудительный Shuffle Hash Join:

spark.sql("""
    SELECT /*+ SHUFFLE_HASH(small_table) */ *
    FROM big_table b
    JOIN small_table s ON b.key = s.key
""")

Shuffle Hash Join быстрее SortMergeJoin, когда одна сторона значительно меньше, но больше broadcast threshold. Не требует сортировки.

COALESCE и REPARTITION hints

# В SQL запросах
spark.sql("""
    SELECT /*+ COALESCE(5) */ * FROM big_table
""")

spark.sql("""
    SELECT /*+ REPARTITION(100, key) */ * FROM big_table
""")

Приоритет hints

Если указано несколько conflicting hints, Spark применяет в порядке:

  1. BROADCAST (наивысший приоритет)
  2. SHUFFLE_MERGE
  3. SHUFFLE_HASH
  4. Автоматический выбор (по autoBroadcastJoinThreshold и preferSortMergeJoin)
# BROADCAST победит, потому что у него наивысший приоритет
spark.sql("""
    SELECT /*+ BROADCAST(a), MERGE(a) */ *
    FROM table_a a JOIN table_b b ON a.key = b.key
""")
# Результат: BroadcastHashJoin
TIP

Hints — это советы, не команды. Spark может проигнорировать hint, если он невыполним. Например, broadcast hint для таблицы размером 50 ГБ будет проигнорирован, если spark.sql.autoBroadcastJoinThreshold = -1 (broadcast отключён). Всегда проверяйте explain() после hint, чтобы убедиться, что Spark применил его.

BroadcastHashJoin vs BroadcastNestedLoopJoin

Spark использует два варианта broadcast:

BroadcastHashJoinBroadcastNestedLoopJoin
Условие joinEqui-join (a.key = b.key)Non-equi (a.val > b.threshold)
МеханикаHash table lookup (O(1))Nested loop (O(N*M))
ПроизводительностьБыстрыйМедленный для больших таблиц
В explain()BroadcastHashJoinBroadcastNestedLoopJoin
# Equi-join -> BroadcastHashJoin
result = big.join(broadcast(small), big.key == small.key)

# Non-equi join -> BroadcastNestedLoopJoin
result = big.join(broadcast(small), big.val > small.threshold)
Проверка знанийKnowledge check
У вас fact-таблица 500 ГБ и dimension-таблица 50 МБ. autoBroadcastJoinThreshold по умолчанию 10 МБ. Что выберет Spark и как это оптимизировать?
ОтветAnswer
Spark выберет SortMergeJoin, потому что dimension-таблица (50 МБ) превышает порог (10 МБ). Обе таблицы будут перераспределены через shuffle -- 500 ГБ + 50 МБ по сети. Оптимизация: (1) использовать hint broadcast(dim_table) для принудительного broadcast, или (2) увеличить порог: spark.conf.set('spark.sql.autoBroadcastJoinThreshold', '100m'). Первый вариант предпочтительнее -- он явный и не влияет на другие join в приложении.
Проверка знанийKnowledge check
Почему broadcast join 2 ГБ таблицы на кластере со 100 executors может быть опасен?
ОтветAnswer
Broadcast отправляет полную копию таблицы на каждый executor. 2 ГБ * 100 executors = 200 ГБ сетевого трафика для доставки. Кроме того, каждый executor хранит 2 ГБ таблицы в heap memory. Если executor имеет 4 ГБ heap, broadcast займёт 50% памяти, оставляя мало места для обработки данных. Это может привести к OOM, spill-to-disk и длинным GC паузам. Безопасный лимит для broadcast -- 20-30% памяти executor.

Как broadcast-переменная расходится по кластеру изнутри — разбор на уровне исходников в senior-курсе:

Spark Internals: TorrentBroadcast

Что дальше?

В следующем уроке разберём Data Skew — ситуацию, когда данные распределены неравномерно и один executor обрабатывает в 10-100 раз больше данных, чем остальные. Вы узнаете, как диагностировать skew через Spark UI и как salting и AQE решают эту проблему.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Как работает Broadcast Join в Spark?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6