Learning Platform
Глоссарий Troubleshooting
Урок 06.05 · 28 мин
Продвинутый
TorrentBroadcastBroadcast JoinBitTorrentBlockManagerautoBroadcastJoinThreshold

TorrentBroadcast: как broadcast variable расходится по кластеру

Broadcast-переменная кажется простой: driver отправляет данные на все executor’ы, executor’ы их используют. Реальность сложнее. Если driver отправлял бы данные каждому executor’у напрямую (push model), то для 500 executor’ов с broadcast 100 MiB потребовалось бы 50 ГБ сетевого трафика с единственного driver’а. На больших кластерах это узкое место убивало бы производительность.

Именно поэтому с версии 2.0 Spark использует TorrentBroadcast — реализацию, вдохновлённую BitTorrent: данные нарезаются на блоки и распространяются peer-to-peer между executor’ами.

Архитектура TorrentBroadcast

Класс TorrentBroadcast находится в core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala. Это единственная реализация Broadcast[T] в Spark — интерфейс BroadcastFactory существует для расширяемости, но на практике всегда создаётся TorrentBroadcastFactory.

// core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
private[spark] class TorrentBroadcast[T: ClassTag](
    obj: T,           // Объект для рассылки (только на driver при создании)
    id: Long          // Уникальный broadcast ID
) extends Broadcast[T](id) with Logging {

  // Размер куска (default 4 MiB)
  private val blockSize = conf.get(BROADCAST_BLOCK_SIZE).toInt

  // Количество кусков (вычисляется при нарезке)
  private val numBlocks: Int = writeBlocks(obj)
}

Фаза 1: нарезка на блоки (driver)

При вызове sc.broadcast(value) на driver происходит:

private def writeBlocks(value: T): Int = {
  import StorageLevel._

  // 1. Сохраняем объект целиком у driver (BroadcastBlockId без piece)
  SparkEnv.get.blockManager.putSingle(broadcastId, value,
    MEMORY_AND_DISK, tellMaster = false)
  // tellMaster=false: не регистрируем у BlockManagerMaster — driver сам знает

  // 2. Сериализуем в байты
  val blocks = TorrentBroadcast.blockifyObject(
    value,
    blockSize,
    SparkEnv.get.serializer,
    compressionCodec)
  // blockifyObject: serializer.serialize(value) -> Array[ByteBuffer]
  // Каждый ByteBuffer = blockSize (4 MiB), последний может быть меньше

  // 3. Записываем каждый кусок в BlockManager driver'а
  blocks.zipWithIndex.foreach { case (block, i) =>
    SparkEnv.get.blockManager.putBytes(
      BroadcastBlockId(id, "piece" + i),  // "broadcast_0_piece0", etc.
      block,
      MEMORY_AND_DISK_SER,
      tellMaster = true)   // Регистрируем у Master — executor'ы найдут
  }

  blocks.length  // numBlocks
}

Обрати внимание на два объекта в памяти driver одновременно:

  1. Оригинальный объект value в MEMORY_AND_DISK (без сериализации)
  2. Сериализованные куски piece0..pieceN в MEMORY_AND_DISK_SER

Это пиковая нагрузка на driver memory. Для broadcast 1 GiB данных driver временно использует ~2 GiB: оригинальные объекты + сериализованные байты. После записи кусков в BlockManager оригинальный объект можно освободить через GC, но GC это делает не мгновенно.

Фаза 2: получение на executor (BitTorrent механика)

Когда задача на executor’е обращается к broadcastVar.value, вызывается readBroadcastBlock():

private def readBroadcastBlock(): T = {
  // 1. Проверяем локальный BlockManager
  TorrentBroadcast.synchronized {
    setConf(SparkEnv.get.conf)
    SparkEnv.get.blockManager.getLocalValues(broadcastId) match {
      case Some(blockResult) =>
        // Уже есть локально (собранный объект из предыдущего task'а)
        return blockResult.data.asInstanceOf[Iterator[T]].next()
      case None =>
        // Нужно получить куски
    }
  }

  // 2. Получаем numBlocks метаданных от driver
  // (через SparkContext который уже знает numBlocks при создании broadcast)

  // 3. Для каждого куска: запрашиваем у BlockManagerMaster GetLocations
  val blocks = new Array[ByteBuffer](numBlocks)
  val bm = SparkEnv.get.blockManager

  for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
    val pieceId = BroadcastBlockId(id, "piece" + pid)

    blocks(pid) = bm.getRemoteBytes(pieceId) match {
      case Some(b) =>
        // Нашли у кого-то (driver или другой executor)
        // ВАЖНО: сразу сохраняем кусок локально!
        bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER,
          tellMaster = true)  // Сообщаем Master: теперь я тоже раздаю этот кусок
        b
      case None =>
        throw new SparkException(s"Failed to get $pieceId of $broadcastId")
    }
  }

  // 4. Собираем куски в объект
  val obj = TorrentBroadcast.unBlockifyObject[T](blocks, serializer, compressionCodec)

  // 5. Сохраняем собранный объект локально (без сериализации — быстрый доступ)
  bm.putSingle(broadcastId, obj, StorageLevel.MEMORY_AND_DISK, tellMaster = false)

  obj
}

Ключевой момент в строке tellMaster = true при сохранении куска: executor сразу сообщает BlockManagerMaster, что у него есть этот кусок. Следующий executor, запрашивающий тот же кусок, получит адрес этого executor’а — и скачает с него, а не с driver’а. Это и есть BitTorrent-эффект.

TorrentBroadcast: распространение 3-кускового объекта (6 executor'ов)

Каждый executor при получении куска регистрирует себя как источник у BlockManagerMaster. Последующие executor'ы скачивают с ближайшего, а не с driver'а.

Driverpiece0, piece1, piece2 -> BlockManagerНарезает broadcast-объект на 3 куска по 4 MiB. Регистрирует все три у BlockManagerMaster. Источник кусков для первых executor'ов.
Executor 1скачивает piece0,1,2 с driverПервый executor всегда идёт к driver. После получения каждого куска регистрирует себя у BlockManagerMaster как источник.
Executor 2piece0 с Exec1, piece1 с Exec1, piece2 с driverMaster возвращает [Exec1, Driver] для piece0 и piece1. Exec2 скачивает с Exec1 (ближе). Driver разгружается.
Executor 3piece0 с Exec1, piece1 с Exec2, piece2 с Exec1Exec3 видит оба источника для каждого куска. Итерирует случайно. Нагрузка распределяется между Exec1, Exec2, Driver.
Executor 4, 5, 6скачивают куски у Exec1-3 (не у driver)При 6 executor'ах и 3 кусках BitTorrent-эффект полностью работает: driver отдал каждый кусок 1-2 раза, остальное — P2P между executor'ами.

Broadcast Join: механика на уровне executor

Broadcast join — одна из самых важных оптимизаций Spark SQL. Когда Catalyst выбирает BroadcastHashJoin, маленькая таблица оборачивается в broadcast и рассылается всем executor’ам. Каждый executor строит hash-таблицу локально и обрабатывает свою партицию большой таблицы без shuffle:

# Что пишет разработчик:
result = big_table.join(small_table, "user_id")

# Что делает Spark, если small_table < autoBroadcastJoinThreshold:
# 1. Collect small_table на driver (Action!)
# 2. sc.broadcast(small_table.as_hash_map)
# 3. Каждая задача big_table строит hash-таблицу и joinит локально

Физический план показывает:

*(2) BroadcastHashJoin [user_id#1], [user_id#7], Inner, BuildRight
:- *(2) Filter (isnotnull(user_id#1))
:  +- *(2) FileScan parquet big_table
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(user_id#7 as bigint)))
   +- *(1) Filter (isnotnull(user_id#7))
      +- *(1) FileScan parquet small_table

BroadcastExchange — это оператор, который:

  1. Выполняет small_table scan + filter на driver (или специальном executor)
  2. Собирает результат через collect()
  3. Вызывает sc.broadcast(HashedRelation) — строит hash-таблицу и бродкастит

HashedRelation — специализированная hash-таблица для join, оптимизированная Tungsten. Она хранит данные в UnsafeRow-формате и может быть on-heap или off-heap.

spark.sql.autoBroadcastJoinThreshold: как Catalyst принимает решение

# Дефолт: 10 MiB = 10 * 1024 * 1024 байт
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")  # "10485760"

# Изменить:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", str(100 * 1024 * 1024))  # 100 MiB
# Или в SQL:
spark.sql("SET spark.sql.autoBroadcastJoinThreshold=104857600")

# Отключить auto-broadcast:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

Catalyst проверяет статистику таблицы через LogicalPlanStats.sizeInBytes. Если таблица меньше порога — выбирается BroadcastHashJoin. Если статистики нет (ANALYZE TABLE не выполнялся, внешний источник без метаданных) — Catalyst использует оценки, которые часто завышены, и выбирает SortMergeJoin даже для маленьких таблиц.

# Явное указание broadcast hint — приоритет над autoBroadcastJoinThreshold:
from pyspark.sql.functions import broadcast

result = big_table.join(broadcast(small_table), "user_id")
# В explain(): hint(broadcast) заставит BroadcastHashJoin
# даже если small_table > autoBroadcastJoinThreshold

В Spark 4.0 также работает SQL-синтаксис:

SELECT /*+ BROADCAST(t) */ t.*, t2.name
FROM transactions t JOIN users t2 ON t.user_id = t2.id

Ограничения и риски OOM на driver

Жёсткое ограничение: 8 GiB.

// sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
val BROADCAST_TABLE_MAX_ROWS = 512000000L  // 512 млн строк
// + ограничение 8 GiB:
// Cannot broadcast the table that is larger than 8GB

Если таблица > 8 GiB, Spark выбросит SparkException: Cannot broadcast the table. Но серьёзнее — риск OOM на driver задолго до этого предела.

Рассмотрим цепочку памяти при broadcast 1 GiB таблицы:

  1. На driver: BroadcastExchangeExec собирает данные через collect(). Данные материализуются в JVM-объектах на driver heap. Размер в памяти может быть в 3-5x больше “сырого” размера данных (JVM object overhead).
  2. Сериализация: TorrentBroadcast.writeBlocks() сериализует объект. Пиковая память = исходные JVM-объекты + сериализованные байты.
  3. HashedRelation: строится из сериализованных данных. Ещё один экземпляр.

Итого для broadcast 1 GiB: driver может использовать 4-6 GiB памяти в пике.

DANGER

Driver memory (spark.driver.memory) должна быть достаточной для хранения всех broadcast-переменных одновременно. Если запускается несколько конкурентных broadcast join (сложные запросы с несколькими маленькими таблицами), driver держит все broadcast-объекты одновременно. На кластерах с большим числом параллельных запросов (Spark Thrift Server, многопользовательский режим) это основная причина driver OOM.

# Пример опасного паттерна: несколько broadcast join в одном запросе
result = (orders
    .join(broadcast(customers), "customer_id")   # customers: 200 MiB
    .join(broadcast(products), "product_id")     # products: 150 MiB
    .join(broadcast(regions), "region_id"))      # regions: 50 MiB
# На driver одновременно: 200 + 150 + 50 = 400 MiB + сериализованные копии
# Пиковая нагрузка на driver: ~1.2-2 GiB только для broadcast

Мониторинг broadcast в Spark UI

Jobs tab: найди задачи BroadcastExchange. Время BroadcastExchange включает collect + сериализацию + бродкаст всем executor’ам. Если BroadcastExchange занимает > 30% от времени stage — рассмотри отключение broadcast для этой таблицы.

Executor logs: при получении broadcast куска:

DEBUG BlockManager: Putting block broadcast_0_piece0 in memory
  (estimated size 4.0 MiB, free 1234.5 MiB)
INFO TorrentBroadcast: Reading broadcast variable 0 took 0.342 s

Storage tab: broadcast блоки отображаются как broadcast_N. Если ты видишь broadcast_N_piece0, broadcast_N_piece1, etc. — это сериализованные куски TorrentBroadcast. После bc.unpersist(blocking=True) они исчезают.

Production-паттерны

Broadcast join в ETL: правильный размер

# Хорошо: dimension tables < 100 MiB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 104857600)  # 100 MiB

# Осторожно: для "средних" таблиц 100 MiB - 1 GiB
# Явный broadcast hint только если уверен в driver memory
result = fact_table.join(broadcast(medium_dim), "key")

# Плохо: broadcast таблицы > 1 GiB
# Используй SortMergeJoin или bucketing вместо broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)  # Отключить auto
result = fact_table.join(large_dim.hint("MERGE"), "key")  # Явный SMJ hint

Переиспользование broadcast между заданиями

# broadcast создаётся один раз, используется много раз
lookup_bc = spark.sparkContext.broadcast(
    spark.table("lookup_table").toPandas().to_dict(orient="records")
)

# Используем в нескольких запросах
for date_partition in date_range:
    daily_df = spark.table(f"events_{date_partition}")
    result = daily_df.filter(
        daily_df.event_type.isin([row['type'] for row in lookup_bc.value])
    )
    result.write.parquet(f"output/{date_partition}")

# В конце явно освобождаем
lookup_bc.unpersist()

Отладка: когда broadcast не применяется

# Проверяем план
df1.join(df2, "id").explain(True)

# Если видим SortMergeJoin вместо BroadcastHashJoin для маленькой таблицы:
# 1. Проверяем размер
print(f"df2 size estimate: {df2._jdf.queryExecution().analyzed().stats().sizeInBytes()}")
# 2. Если оценка завышена — собираем статистику
spark.sql("ANALYZE TABLE small_table COMPUTE STATISTICS")
# 3. Или используем явный hint
df1.join(broadcast(df2), "id").explain(True)

Попробуй сам

from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast, col
import time

spark = SparkSession.builder \
    .master("local[4]") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .appName("broadcast-internals") \
    .getOrCreate()

# Создаём большую таблицу и маленькую dimension table
import random
n_orders = 1_000_000
n_customers = 10_000

orders = spark.createDataFrame(
    [(i, random.randint(1, n_customers), random.random() * 1000)
     for i in range(n_orders)],
    ["order_id", "customer_id", "amount"]
)

customers = spark.createDataFrame(
    [(i, f"Customer_{i}", random.choice(["US", "DE", "RU", "CN"]))
     for i in range(1, n_customers + 1)],
    ["customer_id", "name", "country"]
)

# 1. SortMergeJoin (дефолт, если customers > threshold)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
t0 = time.time()
smj_result = orders.join(customers, "customer_id").count()
smj_time = time.time() - t0
print(f"SortMergeJoin: {smj_result} rows in {smj_time:.2f}s")

# 2. BroadcastHashJoin с явным hint
t0 = time.time()
bhj_result = orders.join(broadcast(customers), "customer_id").count()
bhj_time = time.time() - t0
print(f"BroadcastHashJoin: {bhj_result} rows in {bhj_time:.2f}s")
print(f"Speedup: {smj_time/bhj_time:.1f}x")

# Смотрим план
print("\n--- BroadcastHashJoin Plan ---")
orders.join(broadcast(customers), "customer_id").explain(False)

spark.stop()

Ожидаемый вывод (приблизительно):

SortMergeJoin: 1000000 rows in 4.21s
BroadcastHashJoin: 1000000 rows in 1.38s
Speedup: 3.1x

--- BroadcastHashJoin Plan ---
*(2) BroadcastHashJoin [customer_id#1L], [customer_id#5L], Inner, BuildRight
:- *(2) Project [order_id#0L, customer_id#1L, amount#2]
:  +- *(2) Filter isnotnull(customer_id#1L)
:     +- *(2) Scan ExistingRDD[order_id#0L,customer_id#1L,amount#2]
+- BroadcastExchange HashedRelationBroadcastMode(...)
   +- *(1) Filter isnotnull(customer_id#5L)
      +- *(1) Scan ExistingRDD[customer_id#5L,name#6,country#7]
Broadcast hints в практике
Проверка знанийKnowledge check
Кластер из 200 executor'ов. Broadcast-переменная содержит DataFrame размером 400 MiB. spark.broadcast.blockSize = 4 MiB, driver memory = 4 GiB. Оцени: (1) сколько кусков нарезает TorrentBroadcast, (2) пиковое использование памяти на driver, (3) сколько раз driver отправит данные по сети, если у executor'ов нет кэша.
ОтветAnswer
(1) Количество кусков: 400 MiB / 4 MiB = 100 кусков (piece0..piece99). Каждый кусок регистрируется у BlockManagerMaster как BroadcastBlockId(id, 'pieceN'). (2) Пиковая память на driver: оригинальный объект в JVM-объектах (400 MiB * 3-4x object overhead = ~1.2-1.6 GiB) + сериализованные куски (400 MiB) + HashedRelation структура если это broadcast join (ещё ~400-600 MiB). Итого пик: 2-2.5 GiB из 4 GiB driver memory. Опасно: если одновременно выполняется другой broadcast join — driver рискует OOM. (3) Сетевые передачи: без BitTorrent эффекта driver бы отправил 400 MiB * 200 = 80 GiB. С TorrentBroadcast: первый executor скачивает все 100 кусков с driver (400 MiB). Следующие executor'ы скачивают куски у первого executor'а или у driver. В идеальном случае каждый из 100 кусков driver отдаёт 1-2 раза (первым получателям), остальные распространяются P2P. Реально driver отправит ~2-5x * 400 MiB = 800 MiB - 2 GiB вместо 80 GiB. Выигрыш в сетевом трафике на driver: 40-100x.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. TorrentBroadcast.writeBlocks() нарезает объект на куски и записывает их в BlockManager driver'а с StorageLevel MEMORY_AND_DISK_SER и tellMaster=true. Почему именно tellMaster=true для кусков, но tellMaster=false для оригинального объекта?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5