Learning Platform
Глоссарий Troubleshooting
Урок 06.03 · 30 мин
Продвинутый
BlockManagerBlockManagerMasterBlockIdReplicationStorage

BlockManager: распределённое хранилище блоков

Когда задача читает партицию закэшированного DataFrame или получает broadcast-переменную, она не обращается напрямую к диску или памяти executor. Всё это делает BlockManager — key-value хранилище, работающее на каждом узле кластера. Понять BlockManager означает понять, как Spark хранит данные физически: где живут блоки shuffle, как replicated cache попадает на второй executor, почему иногда broadcast-fetch идёт с другого executor, а не с driver.

Архитектура: master и slave

Система BlockManager двухуровневая: на driver запускается BlockManagerMaster, на каждом executorBlockManager. Взаимодействие через RPC (Netty под капотом):

Топология BlockManager: driver и executors

BlockManagerMaster держит глобальный реестр всех блоков всех executor'ов. Каждый executor регистрирует свой BlockManager при старте и шлёт обновления статусов блоков.

DriverBlockManagerMasterЕдинственный на кластер. Держит BlockManagerMasterEndpoint — RPC endpoint для всех операций с метаданными. BlockManagerInfo: Map[BlockManagerId -> {blocks, lastSeenMs, maxOnHeapMem}].
BlockManagerMasterEndpointRPC endpoint, принимает регистрации и status updatesОбрабатывает RegisterBlockManager, UpdateBlockInfo, GetLocations, RemoveBlock. Весь реестр в памяти driver: если driver OOM — теряем маршрутизацию блоков.
Executor 1BlockManager (id: exec-1)BlockManagerId = (executorId, host, port). Уникальный идентификатор. При реконнекте выдаётся новый BlockManagerId — старые блоки считаются потерянными.
MemoryStore + DiskStoreФизическое хранилище блоковMemoryStore: блоки в JVM heap или off-heap. DiskStore: блоки на локальном диске в spark.local.dir. Оба работают через единый интерфейс BlockStore.
Executor 2BlockManager (id: exec-2)Peer для репликации. При StorageLevel._2 (MEMORY_ONLY_2, MEMORY_AND_DISK_2) executor 1 пушит копию блока на executor 2 через BlockTransferService.
MemoryStore + DiskStoreФизическое хранилище блоковНезависимое хранилище. Executor 2 получает реплику от executor 1, регистрирует блок у Master.
Executor NBlockManager (id: exec-N)При broadcast fetch: executor сначала ищет блок локально, потом у случайного peer, потом у driver. TorrentBroadcast использует этот механизм для BitTorrent-подобного распространения.
MemoryStore + DiskStoreФизическое хранилище блоковBlockTransferService (Netty) слушает на порту, выделенном при регистрации. Порт передаётся в BlockManagerId.

BlockManagerId: идентификатор узла

BlockManagerId — это sealed class с четырьмя полями:

// core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
class BlockManagerId private (
    private var executorId_ : String,  // "driver", "1", "2", ...
    private var host_ : String,        // hostname или IP
    private var port_ : Int,           // Netty port (shuffle service или executor)
    private var topologyInfo_ : Option[String]  // rack awareness для репликации
) extends Externalizable {

  def isDriver: Boolean = {
    executorId == SparkContext.DRIVER_IDENTIFIER  // "driver"
  }
}

BlockManagerId сериализуется в RPC-сообщения. Когда executor падает и рестартует, он получает новый BlockManagerId — прежде всего другой порт. Это означает, что driver считает старые блоки потерянными и переводит зависимые партиции в состояние UNAVAILABLE.

Иерархия BlockId

BlockId — это идентификатор конкретного блока данных. В Spark 4.0 семь основных подтипов:

// Разные типы блоков
RDDBlockId(rddId: Int, splitIndex: Int)
// Например: RDDBlockId(42, 7) = "rdd_42_7"
// Используется для кэшированных RDD/DataFrame партиций

ShuffleBlockId(shuffleId: Int, mapId: Long, reduceId: Int)
// Например: ShuffleBlockId(0, 3, 5) = "shuffle_0_3_5"
// Временный блок shuffle data, удаляется после fetch

ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int)
// Физический файл данных shuffle (без индекса)

ShuffleIndexBlockId(shuffleId: Int, mapId: Long, reduceId: Int)
// Индексный файл shuffle: offset для каждого reduce partition

BroadcastBlockId(broadcastId: Long, field: String = "")
// Например: BroadcastBlockId(0, "piece0") = "broadcast_0_piece0"
// Кусок broadcast-переменной (TorrentBroadcast нарезает на piece0, piece1, ...)

TaskResultBlockId(taskId: Long)
// Результат задачи, превышающий spark.task.maxDirectResultSize (1 MiB)
// Хранится в BlockManager executor, driver забирает через getRemoteBytes

StreamBlockId(streamId: Int, uniqueId: Long)
// Для Spark Streaming / DStream

Строковое представление BlockId.name используется как ключ в реестре BlockManagerMaster. Зная тип блока по его имени (rdd_ / shuffle_ / broadcast_), можно сразу понять, какая операция его создала.

Регистрация BlockManager: протокол инициализации

При старте executor вызывает BlockManager.initialize():

def initialize(appId: String): Unit = {
  // 1. Запускаем Netty transport layer для peer-to-peer transfer
  blockTransferService.init(this)
  externalShuffleClient.foreach { client =>
    client.init(appId)
  }

  // 2. Вычисляем BlockManagerId
  val id = BlockManagerId(executorId, blockTransferService.hostName,
    blockTransferService.port, conf.get(EXECUTOR_RACK_ID))

  // 3. Регистрируемся у Master (RPC вызов)
  val idFromMaster = master.registerBlockManager(
    id,
    diskBlockManager.localDirsString,
    maxOnHeapMemory,
    maxOffHeapMemory,
    storageEndpoint)
  // Master возвращает окончательный BlockManagerId (может переписать rack-info)

  blockManagerId = if (idFromMaster != null) idFromMaster else id

  // 4. Инициализируем shuffleServerId (ExternalShuffleService или сам executor)
  shuffleServerId = if (externalShuffleServiceEnabled) {
    logInfo("external shuffle service port = " + externalShuffleServicePort)
    BlockManagerId(SparkContext.DRIVER_IDENTIFIER, blockTransferService.hostName,
      externalShuffleServicePort)
  } else {
    blockManagerId
  }
}

RPC-сообщение RegisterBlockManager идёт к BlockManagerMasterEndpoint на driver. Master сохраняет в blockManagerInfo: HashMap[BlockManagerId, BlockManagerInfo]:

private def register(
    idWithoutTopologyInfo: BlockManagerId,
    localDirs: Array[String],
    maxOnHeapMemMB: Long,
    maxOffHeapMemMB: Long,
    storageEndpoint: RpcEndpointRef): BlockManagerId = {

  val id = BlockManagerId(
    idWithoutTopologyInfo.executorId,
    idWithoutTopologyInfo.host,
    idWithoutTopologyInfo.port,
    topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host))

  if (!blockManagerInfo.contains(id)) {
    blockManagerInfo(id) = new BlockManagerInfo(
      id,
      System.currentTimeMillis(),
      maxOnHeapMemMB * 1024 * 1024,
      maxOffHeapMemMB * 1024 * 1024,
      storageEndpoint,
      localDirs)
  }

  id
}

Запись блока: putBlockData и updateBlockInfo

Когда задача кэширует партицию DataFrame, вызывается BlockManager.putIteratorAsValues() или BlockManager.putBytes(). После физической записи в MemoryStore/DiskStore executor асинхронно сообщает driver:

private def reportBlockStatus(
    blockId: BlockId,
    status: BlockStatus,
    droppedMemorySize: Long = 0L): Unit = {

  val needReregister = !tryToReportBlockStatus(blockId, status, droppedMemorySize)
  if (needReregister) {
    // Master не знает этот executor — нужно перерегистрироваться
    logInfo(s"Got told to re-register updating block $blockId")
    asyncReregister()
  }
}

private def tryToReportBlockStatus(
    blockId: BlockId,
    status: BlockStatus,
    droppedMemorySize: Long = 0L): Boolean = {
  // RPC: UpdateBlockInfo (blockId, storageLevel, memSize, diskSize)
  master.updateBlockInfo(blockManagerId, blockId, status.storageLevel,
    status.memSize, status.diskSize)
}

UpdateBlockInfo — это fire-and-forget с retries. Master обновляет BlockManagerInfo.blocks: JHashMap[BlockId, BlockStatus].

Репликация: алгоритм выбора peer

Для уровней хранения с _2 (например MEMORY_AND_DISK_2) Spark реплицирует блок на другой executor. Выбор peer:

private def replicate(
    blockId: BlockId,
    data: ChunkedByteBuffer,
    level: StorageLevel,
    classTag: ClassTag[_],
    existingReplicas: Set[BlockManagerId] = Set.empty): Boolean = {

  val maxReplicationFailures = conf.get(config.STORAGE_MAX_REPLICATION_FAILURE)
  val tLevel = StorageLevel(
    level.useDisk, level.useMemory, level.useOffHeap,
    level.deserialized, 1)  // Реплика с replication=1

  // Получаем список peer'ов от Master
  val peers = master.getPeers(blockManagerId)
    .filterNot(existingReplicas.contains)

  // BlockReplicationPolicy: по умолчанию RandomBlockReplicationPolicy
  // Выбирает peer с учётом rack topology для rack-awareness
  val peersForReplication = blockReplicationPolicy.prioritize(
    blockManagerId, peers, mutable.HashSet.empty, blockId, numPeersToReplicateTo)

  var replicateBlockToFutures = ...
  // Параллельная пересылка через BlockTransferService.uploadBlock()
}

Spark поддерживает rack-aware репликацию: TopologyMapper определяет rack для каждого хоста, и BasicBlockReplicationPolicy старается поместить реплику в другой rack. В Kubernetes этот механизм работает через node labels.

Чтение блока: getLocalBytes и getRemoteBytes

Когда задаче нужен блок, порядок поиска:

def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
  // 1. Сначала смотрим локально
  val local = getLocalValues(blockId)
  if (local.isDefined) {
    return local
  }
  // 2. Если нет — идём к удалённым executor'ам
  getRemoteValues[T](blockId)
}

private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
  // Запрос к Master: GetLocations(blockId) -> List[BlockManagerId]
  val locations = master.getLocations(blockId)

  for (loc <- locations) {
    val data = blockTransferService.fetchBlockSync(
      loc.host, loc.port, loc.executorId,
      blockId.toString, tempFileManager)
    // Десериализация и возврат
  }
}

Это peer-to-peer fetch: executor читает блок напрямую с другого executor через Netty, минуя driver. Driver лишь выдаёт адреса (GetLocations). Это критически важно для масштабируемости — тысячи task’ов не создают нагрузку на driver.

Как shuffle использует BlockManager

Shuffle write: SortShuffleWriter записывает данные в ShuffleDataBlockId / ShuffleIndexBlockId через DiskBlockObjectWriter. Блоки не регистрируются у Master (shuffle блоки эфемерны — их жизненный цикл управляется MapOutputTracker).

Shuffle read: BlockStoreShuffleReader запрашивает адреса map-выходов у MapOutputTrackerMaster, затем делает peer-to-peer fetch через BlockTransferService:

// Fetch у remote executor
shuffleClient.fetchBlocks(
  address.host, address.port, address.executorId,
  blockIds.toArray, blockFetchingListener, tempFileManager)

Если включён External Shuffle Service (spark.shuffle.service.enabled=true), fetch идёт к ExternalShuffleService на NodeManager (YARN) или к standalone shuffle service. Это позволяет executor’ам завершиться до того, как reduce-задачи прочитают shuffle.

Жизненный цикл блоков: cache, broadcast, shuffle

Три разных пути через BlockManager. Cache: долгоживущие блоки, регистрируются у Master. Broadcast: распространяются P2P. Shuffle: эфемерные, управляются MapOutputTracker.

Cache (RDDBlockId)persist() / cache()Создаётся при первом action после cache(). Хранится до unpersist() или вытеснения LRU. Регистрируется у BlockManagerMaster через UpdateBlockInfo.
MemoryStore / DiskStoreПо StorageLevelMEMORY_ONLY: только RAM. MEMORY_AND_DISK: RAM + спилл на диск. При вытеснении execution memory — удаляется или пишется на диск.
GetLocations(blockId)Master отдаёт список executor'овЕсли блок реплицирован (MEMORY_ONLY_2), Master вернёт два executor ID. Reader выбирает ближайший по rack-topology.
Broadcast (BroadcastBlockId)sc.broadcast() -> TorrentBroadcastDriver нарезает на pieces (4 MiB каждый). Piece0..PieceN хранятся у driver как BroadcastBlockId(id, 'piece0'). Executor'ы забирают случайные куски и отдают другим — P2P распространение.
BlockManagerMaster tracks piecesКаждый piece — отдельный BlockIdGetLocations(BroadcastBlockId(0, 'piece3')) вернёт executor'ы, у которых есть этот кусок. Постепенно все executor'ы накапливают все pieces.
sc.broadcast cleanupunpersist() удаляет со всех узловRemoveBroadcast RPC: Master шлёт RemoveBlock всем executor'ам через storageEndpoint. Без явного unpersist() broadcast живёт до конца SparkContext.
Shuffle (ShuffleBlockId)SortShuffleWriter -> DiskBlockObjectWriterНЕ регистрируется у BlockManagerMaster. Адреса управляются MapOutputTracker. DiskBlockObjectWriter пишет в spark.local.dir через DiskBlockManager.
ExternalShuffleServicespark.shuffle.service.enabledЕсли включён, shuffle файлы остаются доступны даже после завершения executor. NodeManager читает из тех же spark.local.dir. Критично для dynamic allocation.
Cleanup после fetchMapOutputTracker.unregisterShuffle()После завершения всех reduce-задач stage'а, ShuffleMapStage уведомляет DAGScheduler, который вызывает unregisterShuffle. BlockManagerMaster шлёт RemoveShuffle на все executor'ы.

Failure modes: что происходит при потере executor

Когда executor теряет heartbeat, HeartbeatReceiver на driver уведомляет BlockManagerMaster.removeExecutor():

def removeExecutor(execId: String): Unit = {
  val info = blockManagerIdByExecutor.get(execId)
  info.foreach(removeBlockManager)
}

private def removeBlockManager(blockManagerId: BlockManagerId): Unit = {
  // Удаляем все блоки этого executor из реестра
  val info = blockManagerInfo(blockManagerId)
  for (blockId <- info.blocks.keys) {
    val locations = blockLocations.get(blockId)
    locations -= blockManagerId
    if (locations.isEmpty) {
      blockLocations.remove(blockId)
      // Блок потерян — RDD/DF партиции нужно пересчитать
    }
  }
  blockManagerInfo.remove(blockManagerId)
}

После этого DAGScheduler при попытке чтения потерянных блоков получит ошибку и перезапустит stage’ы, которые производили эти блоки. Это объясняет, почему потеря executor вызывает повторное вычисление — не из-за RDD lineage напрямую, а через BlockManager реестр.

Spark UI: что смотреть

Executors tab: колонка Address содержит host:port из BlockManagerId. Если executor переподключился, адрес изменится.

Storage tab: каждый кэшированный RDD/DataFrame с колонками:

  • Cached Partitions: сколько партиций в BlockManager (из reachable executors)
  • Fraction Cached: cached / total. Если ниже 100% — часть партиций вытеснена или не помещается
  • Size in Memory / Size on Disk: физический размер в MemoryStore / DiskStore

В executor logs (или через Spark History Server) ищи строки вида:

Removed broadcast_3_piece0 on exec-2 in memory (size: 4.0 MiB)

Это означает LRU-вытеснение блока — storage memory под давлением.

Попробуй сам

from pyspark.sql import SparkSession
from pyspark import StorageLevel

spark = SparkSession.builder \
    .master("local[2]") \
    .config("spark.executor.memory", "1g") \
    .appName("blockmanager-demo") \
    .getOrCreate()

sc = spark.sparkContext

# Кэшируем RDD и смотрим BlockId'ы
rdd = sc.parallelize(range(100_000), numSlices=4)
cached = rdd.cache()
cached.count()  # Материализуем

# Проверяем статус через SparkContext (Python API ограничен)
# Но можем посмотреть через Spark UI http://localhost:4040/storage

# Кэшируем с репликацией
rdd2 = sc.parallelize(range(50_000), numSlices=2)
# MEMORY_ONLY_2 = replication factor 2 (нужно 2+ executor'а для работы)
# В local режиме будет только 1 executor, реплика не создастся
cached2 = rdd2.persist(StorageLevel.MEMORY_ONLY_2)
cached2.count()

# Broadcast переменная -> BroadcastBlockId
lookup_table = {"US": "United States", "DE": "Germany", "RU": "Russia"}
bc = sc.broadcast(lookup_table)
# В Storage UI: broadcast_0_piece0 (если таблица < 4 MiB — один piece)

# Используем broadcast в задаче
def map_country(code):
    return bc.value.get(code, "Unknown")

data = sc.parallelize(["US", "DE", "XX", "RU"])
result = data.map(map_country).collect()
print(result)  # ['United States', 'Germany', 'Unknown', 'Russia']

# Освобождаем broadcast явно
bc.unpersist()
# Теперь Storage UI покажет, что broadcast_0_piece0 удалён

spark.stop()
Проверка знанийKnowledge check
В производственном кластере executor E3 упал в середине stage. Этот executor хранил партиции кэшированного DataFrame (MEMORY_ONLY) и 3 shuffle map-output файла для текущего stage. Опиши, что именно произойдёт с каждым типом блока и какие механизмы Spark это обрабатывают.
ОтветAnswer
Для кэшированного DataFrame (RDDBlockId): BlockManagerMaster.removeBlockManager() удаляет все RDDBlockId, принадлежавшие E3, из реестра locations. Если блок не реплицирован (MEMORY_ONLY без _2), он считается потерянным. При следующем обращении к этим партициям RDD executor получит BlockNotFoundException, DAGScheduler запустит повторное вычисление потерянных партиций с нуля по RDD lineage. Для shuffle map-output (ShuffleBlockId): MapOutputTrackerMaster получит уведомление о потере E3 и пометит соответствующие map-output как недоступные. Reduce-задачи, пытающиеся fetch с E3, получат FetchFailedException. DAGScheduler перезапустит соответствующий ShuffleMapStage, но только те map-задачи, output которых был на E3 — не весь stage полностью. После успешного повтора новые shuffle файлы регистрируются на оставшихся executor'ах. Ключевое различие: RDD cache пересчитывается по lineage (нужен полный DAG), shuffle пересчитывается только для потерянных partitions (точечный повтор). Именно поэтому shuffle-heavy задания с падающими executor'ами замедляются меньше, чем задания с большим объёмом кэшированных данных без репликации.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Задача на Executor 4 обращается к BroadcastBlockId(7, 'piece2'). Этот кусок есть у Executor 2 и у Driver. Как BlockManager принимает решение, откуда получить данные?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5