TorrentBroadcast: как broadcast variable расходится по кластеру
Broadcast-переменная кажется простой: driver отправляет данные на все executor’ы, executor’ы их используют. Реальность сложнее. Если driver отправлял бы данные каждому executor’у напрямую (push model), то для 500 executor’ов с broadcast 100 MiB потребовалось бы 50 ГБ сетевого трафика с единственного driver’а. На больших кластерах это узкое место убивало бы производительность.
Именно поэтому с версии 2.0 Spark использует TorrentBroadcast — реализацию, вдохновлённую BitTorrent: данные нарезаются на блоки и распространяются peer-to-peer между executor’ами.
Архитектура TorrentBroadcast
Класс TorrentBroadcast находится в core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala. Это единственная реализация Broadcast[T] в Spark — интерфейс BroadcastFactory существует для расширяемости, но на практике всегда создаётся TorrentBroadcastFactory.
// core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
private[spark] class TorrentBroadcast[T: ClassTag](
obj: T, // Объект для рассылки (только на driver при создании)
id: Long // Уникальный broadcast ID
) extends Broadcast[T](id) with Logging {
// Размер куска (default 4 MiB)
private val blockSize = conf.get(BROADCAST_BLOCK_SIZE).toInt
// Количество кусков (вычисляется при нарезке)
private val numBlocks: Int = writeBlocks(obj)
}
Фаза 1: нарезка на блоки (driver)
При вызове sc.broadcast(value) на driver происходит:
private def writeBlocks(value: T): Int = {
import StorageLevel._
// 1. Сохраняем объект целиком у driver (BroadcastBlockId без piece)
SparkEnv.get.blockManager.putSingle(broadcastId, value,
MEMORY_AND_DISK, tellMaster = false)
// tellMaster=false: не регистрируем у BlockManagerMaster — driver сам знает
// 2. Сериализуем в байты
val blocks = TorrentBroadcast.blockifyObject(
value,
blockSize,
SparkEnv.get.serializer,
compressionCodec)
// blockifyObject: serializer.serialize(value) -> Array[ByteBuffer]
// Каждый ByteBuffer = blockSize (4 MiB), последний может быть меньше
// 3. Записываем каждый кусок в BlockManager driver'а
blocks.zipWithIndex.foreach { case (block, i) =>
SparkEnv.get.blockManager.putBytes(
BroadcastBlockId(id, "piece" + i), // "broadcast_0_piece0", etc.
block,
MEMORY_AND_DISK_SER,
tellMaster = true) // Регистрируем у Master — executor'ы найдут
}
blocks.length // numBlocks
}
Обрати внимание на два объекта в памяти driver одновременно:
- Оригинальный объект
valueвMEMORY_AND_DISK(без сериализации) - Сериализованные куски
piece0..pieceNвMEMORY_AND_DISK_SER
Это пиковая нагрузка на driver memory. Для broadcast 1 GiB данных driver временно использует ~2 GiB: оригинальные объекты + сериализованные байты. После записи кусков в BlockManager оригинальный объект можно освободить через GC, но GC это делает не мгновенно.
Фаза 2: получение на executor (BitTorrent механика)
Когда задача на executor’е обращается к broadcastVar.value, вызывается readBroadcastBlock():
private def readBroadcastBlock(): T = {
// 1. Проверяем локальный BlockManager
TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf)
SparkEnv.get.blockManager.getLocalValues(broadcastId) match {
case Some(blockResult) =>
// Уже есть локально (собранный объект из предыдущего task'а)
return blockResult.data.asInstanceOf[Iterator[T]].next()
case None =>
// Нужно получить куски
}
}
// 2. Получаем numBlocks метаданных от driver
// (через SparkContext который уже знает numBlocks при создании broadcast)
// 3. Для каждого куска: запрашиваем у BlockManagerMaster GetLocations
val blocks = new Array[ByteBuffer](numBlocks)
val bm = SparkEnv.get.blockManager
for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
val pieceId = BroadcastBlockId(id, "piece" + pid)
blocks(pid) = bm.getRemoteBytes(pieceId) match {
case Some(b) =>
// Нашли у кого-то (driver или другой executor)
// ВАЖНО: сразу сохраняем кусок локально!
bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true) // Сообщаем Master: теперь я тоже раздаю этот кусок
b
case None =>
throw new SparkException(s"Failed to get $pieceId of $broadcastId")
}
}
// 4. Собираем куски в объект
val obj = TorrentBroadcast.unBlockifyObject[T](blocks, serializer, compressionCodec)
// 5. Сохраняем собранный объект локально (без сериализации — быстрый доступ)
bm.putSingle(broadcastId, obj, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
obj
}
Ключевой момент в строке tellMaster = true при сохранении куска: executor сразу сообщает BlockManagerMaster, что у него есть этот кусок. Следующий executor, запрашивающий тот же кусок, получит адрес этого executor’а — и скачает с него, а не с driver’а. Это и есть BitTorrent-эффект.
Каждый executor при получении куска регистрирует себя как источник у BlockManagerMaster. Последующие executor'ы скачивают с ближайшего, а не с driver'а.
Broadcast Join: механика на уровне executor
Broadcast join — одна из самых важных оптимизаций Spark SQL. Когда Catalyst выбирает BroadcastHashJoin, маленькая таблица оборачивается в broadcast и рассылается всем executor’ам. Каждый executor строит hash-таблицу локально и обрабатывает свою партицию большой таблицы без shuffle:
# Что пишет разработчик:
result = big_table.join(small_table, "user_id")
# Что делает Spark, если small_table < autoBroadcastJoinThreshold:
# 1. Collect small_table на driver (Action!)
# 2. sc.broadcast(small_table.as_hash_map)
# 3. Каждая задача big_table строит hash-таблицу и joinит локально
Физический план показывает:
*(2) BroadcastHashJoin [user_id#1], [user_id#7], Inner, BuildRight
:- *(2) Filter (isnotnull(user_id#1))
: +- *(2) FileScan parquet big_table
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(user_id#7 as bigint)))
+- *(1) Filter (isnotnull(user_id#7))
+- *(1) FileScan parquet small_table
BroadcastExchange — это оператор, который:
- Выполняет
small_tablescan + filter на driver (или специальном executor) - Собирает результат через
collect() - Вызывает
sc.broadcast(HashedRelation)— строит hash-таблицу и бродкастит
HashedRelation — специализированная hash-таблица для join, оптимизированная Tungsten. Она хранит данные в UnsafeRow-формате и может быть on-heap или off-heap.
spark.sql.autoBroadcastJoinThreshold: как Catalyst принимает решение
# Дефолт: 10 MiB = 10 * 1024 * 1024 байт
spark.conf.get("spark.sql.autoBroadcastJoinThreshold") # "10485760"
# Изменить:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", str(100 * 1024 * 1024)) # 100 MiB
# Или в SQL:
spark.sql("SET spark.sql.autoBroadcastJoinThreshold=104857600")
# Отключить auto-broadcast:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
Catalyst проверяет статистику таблицы через LogicalPlanStats.sizeInBytes. Если таблица меньше порога — выбирается BroadcastHashJoin. Если статистики нет (ANALYZE TABLE не выполнялся, внешний источник без метаданных) — Catalyst использует оценки, которые часто завышены, и выбирает SortMergeJoin даже для маленьких таблиц.
# Явное указание broadcast hint — приоритет над autoBroadcastJoinThreshold:
from pyspark.sql.functions import broadcast
result = big_table.join(broadcast(small_table), "user_id")
# В explain(): hint(broadcast) заставит BroadcastHashJoin
# даже если small_table > autoBroadcastJoinThreshold
В Spark 4.0 также работает SQL-синтаксис:
SELECT /*+ BROADCAST(t) */ t.*, t2.name
FROM transactions t JOIN users t2 ON t.user_id = t2.id
Ограничения и риски OOM на driver
Жёсткое ограничение: 8 GiB.
// sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
val BROADCAST_TABLE_MAX_ROWS = 512000000L // 512 млн строк
// + ограничение 8 GiB:
// Cannot broadcast the table that is larger than 8GB
Если таблица > 8 GiB, Spark выбросит SparkException: Cannot broadcast the table. Но серьёзнее — риск OOM на driver задолго до этого предела.
Рассмотрим цепочку памяти при broadcast 1 GiB таблицы:
- На driver:
BroadcastExchangeExecсобирает данные черезcollect(). Данные материализуются в JVM-объектах на driver heap. Размер в памяти может быть в 3-5x больше “сырого” размера данных (JVM object overhead). - Сериализация:
TorrentBroadcast.writeBlocks()сериализует объект. Пиковая память = исходные JVM-объекты + сериализованные байты. - HashedRelation: строится из сериализованных данных. Ещё один экземпляр.
Итого для broadcast 1 GiB: driver может использовать 4-6 GiB памяти в пике.
Driver memory (spark.driver.memory) должна быть достаточной для хранения всех broadcast-переменных одновременно. Если запускается несколько конкурентных broadcast join (сложные запросы с несколькими маленькими таблицами), driver держит все broadcast-объекты одновременно. На кластерах с большим числом параллельных запросов (Spark Thrift Server, многопользовательский режим) это основная причина driver OOM.
# Пример опасного паттерна: несколько broadcast join в одном запросе
result = (orders
.join(broadcast(customers), "customer_id") # customers: 200 MiB
.join(broadcast(products), "product_id") # products: 150 MiB
.join(broadcast(regions), "region_id")) # regions: 50 MiB
# На driver одновременно: 200 + 150 + 50 = 400 MiB + сериализованные копии
# Пиковая нагрузка на driver: ~1.2-2 GiB только для broadcast
Мониторинг broadcast в Spark UI
Jobs tab: найди задачи BroadcastExchange. Время BroadcastExchange включает collect + сериализацию + бродкаст всем executor’ам. Если BroadcastExchange занимает > 30% от времени stage — рассмотри отключение broadcast для этой таблицы.
Executor logs: при получении broadcast куска:
DEBUG BlockManager: Putting block broadcast_0_piece0 in memory
(estimated size 4.0 MiB, free 1234.5 MiB)
INFO TorrentBroadcast: Reading broadcast variable 0 took 0.342 s
Storage tab: broadcast блоки отображаются как broadcast_N. Если ты видишь broadcast_N_piece0, broadcast_N_piece1, etc. — это сериализованные куски TorrentBroadcast. После bc.unpersist(blocking=True) они исчезают.
Production-паттерны
Broadcast join в ETL: правильный размер
# Хорошо: dimension tables < 100 MiB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 104857600) # 100 MiB
# Осторожно: для "средних" таблиц 100 MiB - 1 GiB
# Явный broadcast hint только если уверен в driver memory
result = fact_table.join(broadcast(medium_dim), "key")
# Плохо: broadcast таблицы > 1 GiB
# Используй SortMergeJoin или bucketing вместо broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) # Отключить auto
result = fact_table.join(large_dim.hint("MERGE"), "key") # Явный SMJ hint
Переиспользование broadcast между заданиями
# broadcast создаётся один раз, используется много раз
lookup_bc = spark.sparkContext.broadcast(
spark.table("lookup_table").toPandas().to_dict(orient="records")
)
# Используем в нескольких запросах
for date_partition in date_range:
daily_df = spark.table(f"events_{date_partition}")
result = daily_df.filter(
daily_df.event_type.isin([row['type'] for row in lookup_bc.value])
)
result.write.parquet(f"output/{date_partition}")
# В конце явно освобождаем
lookup_bc.unpersist()
Отладка: когда broadcast не применяется
# Проверяем план
df1.join(df2, "id").explain(True)
# Если видим SortMergeJoin вместо BroadcastHashJoin для маленькой таблицы:
# 1. Проверяем размер
print(f"df2 size estimate: {df2._jdf.queryExecution().analyzed().stats().sizeInBytes()}")
# 2. Если оценка завышена — собираем статистику
spark.sql("ANALYZE TABLE small_table COMPUTE STATISTICS")
# 3. Или используем явный hint
df1.join(broadcast(df2), "id").explain(True)
Попробуй сам
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast, col
import time
spark = SparkSession.builder \
.master("local[4]") \
.config("spark.executor.memory", "2g") \
.config("spark.driver.memory", "2g") \
.appName("broadcast-internals") \
.getOrCreate()
# Создаём большую таблицу и маленькую dimension table
import random
n_orders = 1_000_000
n_customers = 10_000
orders = spark.createDataFrame(
[(i, random.randint(1, n_customers), random.random() * 1000)
for i in range(n_orders)],
["order_id", "customer_id", "amount"]
)
customers = spark.createDataFrame(
[(i, f"Customer_{i}", random.choice(["US", "DE", "RU", "CN"]))
for i in range(1, n_customers + 1)],
["customer_id", "name", "country"]
)
# 1. SortMergeJoin (дефолт, если customers > threshold)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
t0 = time.time()
smj_result = orders.join(customers, "customer_id").count()
smj_time = time.time() - t0
print(f"SortMergeJoin: {smj_result} rows in {smj_time:.2f}s")
# 2. BroadcastHashJoin с явным hint
t0 = time.time()
bhj_result = orders.join(broadcast(customers), "customer_id").count()
bhj_time = time.time() - t0
print(f"BroadcastHashJoin: {bhj_result} rows in {bhj_time:.2f}s")
print(f"Speedup: {smj_time/bhj_time:.1f}x")
# Смотрим план
print("\n--- BroadcastHashJoin Plan ---")
orders.join(broadcast(customers), "customer_id").explain(False)
spark.stop()
Ожидаемый вывод (приблизительно):
SortMergeJoin: 1000000 rows in 4.21s
BroadcastHashJoin: 1000000 rows in 1.38s
Speedup: 3.1x
--- BroadcastHashJoin Plan ---
*(2) BroadcastHashJoin [customer_id#1L], [customer_id#5L], Inner, BuildRight
:- *(2) Project [order_id#0L, customer_id#1L, amount#2]
: +- *(2) Filter isnotnull(customer_id#1L)
: +- *(2) Scan ExistingRDD[order_id#0L,customer_id#1L,amount#2]
+- BroadcastExchange HashedRelationBroadcastMode(...)
+- *(1) Filter isnotnull(customer_id#5L)
+- *(1) Scan ExistingRDD[customer_id#5L,name#6,country#7]
Broadcast hints в практике