BlockManager: распределённое хранилище блоков
Когда задача читает партицию закэшированного DataFrame или получает broadcast-переменную, она не обращается напрямую к диску или памяти executor. Всё это делает BlockManager — key-value хранилище, работающее на каждом узле кластера. Понять BlockManager означает понять, как Spark хранит данные физически: где живут блоки shuffle, как replicated cache попадает на второй executor, почему иногда broadcast-fetch идёт с другого executor, а не с driver.
Архитектура: master и slave
Система BlockManager двухуровневая: на driver запускается BlockManagerMaster, на каждом executor — BlockManager. Взаимодействие через RPC (Netty под капотом):
BlockManagerMaster держит глобальный реестр всех блоков всех executor'ов. Каждый executor регистрирует свой BlockManager при старте и шлёт обновления статусов блоков.
BlockManagerId: идентификатор узла
BlockManagerId — это sealed class с четырьмя полями:
// core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
class BlockManagerId private (
private var executorId_ : String, // "driver", "1", "2", ...
private var host_ : String, // hostname или IP
private var port_ : Int, // Netty port (shuffle service или executor)
private var topologyInfo_ : Option[String] // rack awareness для репликации
) extends Externalizable {
def isDriver: Boolean = {
executorId == SparkContext.DRIVER_IDENTIFIER // "driver"
}
}
BlockManagerId сериализуется в RPC-сообщения. Когда executor падает и рестартует, он получает новый BlockManagerId — прежде всего другой порт. Это означает, что driver считает старые блоки потерянными и переводит зависимые партиции в состояние UNAVAILABLE.
Иерархия BlockId
BlockId — это идентификатор конкретного блока данных. В Spark 4.0 семь основных подтипов:
// Разные типы блоков
RDDBlockId(rddId: Int, splitIndex: Int)
// Например: RDDBlockId(42, 7) = "rdd_42_7"
// Используется для кэшированных RDD/DataFrame партиций
ShuffleBlockId(shuffleId: Int, mapId: Long, reduceId: Int)
// Например: ShuffleBlockId(0, 3, 5) = "shuffle_0_3_5"
// Временный блок shuffle data, удаляется после fetch
ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int)
// Физический файл данных shuffle (без индекса)
ShuffleIndexBlockId(shuffleId: Int, mapId: Long, reduceId: Int)
// Индексный файл shuffle: offset для каждого reduce partition
BroadcastBlockId(broadcastId: Long, field: String = "")
// Например: BroadcastBlockId(0, "piece0") = "broadcast_0_piece0"
// Кусок broadcast-переменной (TorrentBroadcast нарезает на piece0, piece1, ...)
TaskResultBlockId(taskId: Long)
// Результат задачи, превышающий spark.task.maxDirectResultSize (1 MiB)
// Хранится в BlockManager executor, driver забирает через getRemoteBytes
StreamBlockId(streamId: Int, uniqueId: Long)
// Для Spark Streaming / DStream
Строковое представление BlockId.name используется как ключ в реестре BlockManagerMaster. Зная тип блока по его имени (rdd_ / shuffle_ / broadcast_), можно сразу понять, какая операция его создала.
Регистрация BlockManager: протокол инициализации
При старте executor вызывает BlockManager.initialize():
def initialize(appId: String): Unit = {
// 1. Запускаем Netty transport layer для peer-to-peer transfer
blockTransferService.init(this)
externalShuffleClient.foreach { client =>
client.init(appId)
}
// 2. Вычисляем BlockManagerId
val id = BlockManagerId(executorId, blockTransferService.hostName,
blockTransferService.port, conf.get(EXECUTOR_RACK_ID))
// 3. Регистрируемся у Master (RPC вызов)
val idFromMaster = master.registerBlockManager(
id,
diskBlockManager.localDirsString,
maxOnHeapMemory,
maxOffHeapMemory,
storageEndpoint)
// Master возвращает окончательный BlockManagerId (может переписать rack-info)
blockManagerId = if (idFromMaster != null) idFromMaster else id
// 4. Инициализируем shuffleServerId (ExternalShuffleService или сам executor)
shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo("external shuffle service port = " + externalShuffleServicePort)
BlockManagerId(SparkContext.DRIVER_IDENTIFIER, blockTransferService.hostName,
externalShuffleServicePort)
} else {
blockManagerId
}
}
RPC-сообщение RegisterBlockManager идёт к BlockManagerMasterEndpoint на driver. Master сохраняет в blockManagerInfo: HashMap[BlockManagerId, BlockManagerInfo]:
private def register(
idWithoutTopologyInfo: BlockManagerId,
localDirs: Array[String],
maxOnHeapMemMB: Long,
maxOffHeapMemMB: Long,
storageEndpoint: RpcEndpointRef): BlockManagerId = {
val id = BlockManagerId(
idWithoutTopologyInfo.executorId,
idWithoutTopologyInfo.host,
idWithoutTopologyInfo.port,
topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host))
if (!blockManagerInfo.contains(id)) {
blockManagerInfo(id) = new BlockManagerInfo(
id,
System.currentTimeMillis(),
maxOnHeapMemMB * 1024 * 1024,
maxOffHeapMemMB * 1024 * 1024,
storageEndpoint,
localDirs)
}
id
}
Запись блока: putBlockData и updateBlockInfo
Когда задача кэширует партицию DataFrame, вызывается BlockManager.putIteratorAsValues() или BlockManager.putBytes(). После физической записи в MemoryStore/DiskStore executor асинхронно сообщает driver:
private def reportBlockStatus(
blockId: BlockId,
status: BlockStatus,
droppedMemorySize: Long = 0L): Unit = {
val needReregister = !tryToReportBlockStatus(blockId, status, droppedMemorySize)
if (needReregister) {
// Master не знает этот executor — нужно перерегистрироваться
logInfo(s"Got told to re-register updating block $blockId")
asyncReregister()
}
}
private def tryToReportBlockStatus(
blockId: BlockId,
status: BlockStatus,
droppedMemorySize: Long = 0L): Boolean = {
// RPC: UpdateBlockInfo (blockId, storageLevel, memSize, diskSize)
master.updateBlockInfo(blockManagerId, blockId, status.storageLevel,
status.memSize, status.diskSize)
}
UpdateBlockInfo — это fire-and-forget с retries. Master обновляет BlockManagerInfo.blocks: JHashMap[BlockId, BlockStatus].
Репликация: алгоритм выбора peer
Для уровней хранения с _2 (например MEMORY_AND_DISK_2) Spark реплицирует блок на другой executor. Выбор peer:
private def replicate(
blockId: BlockId,
data: ChunkedByteBuffer,
level: StorageLevel,
classTag: ClassTag[_],
existingReplicas: Set[BlockManagerId] = Set.empty): Boolean = {
val maxReplicationFailures = conf.get(config.STORAGE_MAX_REPLICATION_FAILURE)
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap,
level.deserialized, 1) // Реплика с replication=1
// Получаем список peer'ов от Master
val peers = master.getPeers(blockManagerId)
.filterNot(existingReplicas.contains)
// BlockReplicationPolicy: по умолчанию RandomBlockReplicationPolicy
// Выбирает peer с учётом rack topology для rack-awareness
val peersForReplication = blockReplicationPolicy.prioritize(
blockManagerId, peers, mutable.HashSet.empty, blockId, numPeersToReplicateTo)
var replicateBlockToFutures = ...
// Параллельная пересылка через BlockTransferService.uploadBlock()
}
Spark поддерживает rack-aware репликацию: TopologyMapper определяет rack для каждого хоста, и BasicBlockReplicationPolicy старается поместить реплику в другой rack. В Kubernetes этот механизм работает через node labels.
Чтение блока: getLocalBytes и getRemoteBytes
Когда задаче нужен блок, порядок поиска:
def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
// 1. Сначала смотрим локально
val local = getLocalValues(blockId)
if (local.isDefined) {
return local
}
// 2. Если нет — идём к удалённым executor'ам
getRemoteValues[T](blockId)
}
private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
// Запрос к Master: GetLocations(blockId) -> List[BlockManagerId]
val locations = master.getLocations(blockId)
for (loc <- locations) {
val data = blockTransferService.fetchBlockSync(
loc.host, loc.port, loc.executorId,
blockId.toString, tempFileManager)
// Десериализация и возврат
}
}
Это peer-to-peer fetch: executor читает блок напрямую с другого executor через Netty, минуя driver. Driver лишь выдаёт адреса (GetLocations). Это критически важно для масштабируемости — тысячи task’ов не создают нагрузку на driver.
Как shuffle использует BlockManager
Shuffle write: SortShuffleWriter записывает данные в ShuffleDataBlockId / ShuffleIndexBlockId через DiskBlockObjectWriter. Блоки не регистрируются у Master (shuffle блоки эфемерны — их жизненный цикл управляется MapOutputTracker).
Shuffle read: BlockStoreShuffleReader запрашивает адреса map-выходов у MapOutputTrackerMaster, затем делает peer-to-peer fetch через BlockTransferService:
// Fetch у remote executor
shuffleClient.fetchBlocks(
address.host, address.port, address.executorId,
blockIds.toArray, blockFetchingListener, tempFileManager)
Если включён External Shuffle Service (spark.shuffle.service.enabled=true), fetch идёт к ExternalShuffleService на NodeManager (YARN) или к standalone shuffle service. Это позволяет executor’ам завершиться до того, как reduce-задачи прочитают shuffle.
Три разных пути через BlockManager. Cache: долгоживущие блоки, регистрируются у Master. Broadcast: распространяются P2P. Shuffle: эфемерные, управляются MapOutputTracker.
Failure modes: что происходит при потере executor
Когда executor теряет heartbeat, HeartbeatReceiver на driver уведомляет BlockManagerMaster.removeExecutor():
def removeExecutor(execId: String): Unit = {
val info = blockManagerIdByExecutor.get(execId)
info.foreach(removeBlockManager)
}
private def removeBlockManager(blockManagerId: BlockManagerId): Unit = {
// Удаляем все блоки этого executor из реестра
val info = blockManagerInfo(blockManagerId)
for (blockId <- info.blocks.keys) {
val locations = blockLocations.get(blockId)
locations -= blockManagerId
if (locations.isEmpty) {
blockLocations.remove(blockId)
// Блок потерян — RDD/DF партиции нужно пересчитать
}
}
blockManagerInfo.remove(blockManagerId)
}
После этого DAGScheduler при попытке чтения потерянных блоков получит ошибку и перезапустит stage’ы, которые производили эти блоки. Это объясняет, почему потеря executor вызывает повторное вычисление — не из-за RDD lineage напрямую, а через BlockManager реестр.
Spark UI: что смотреть
Executors tab: колонка Address содержит host:port из BlockManagerId. Если executor переподключился, адрес изменится.
Storage tab: каждый кэшированный RDD/DataFrame с колонками:
- Cached Partitions: сколько партиций в BlockManager (из reachable executors)
- Fraction Cached:
cached / total. Если ниже 100% — часть партиций вытеснена или не помещается - Size in Memory / Size on Disk: физический размер в MemoryStore / DiskStore
В executor logs (или через Spark History Server) ищи строки вида:
Removed broadcast_3_piece0 on exec-2 in memory (size: 4.0 MiB)
Это означает LRU-вытеснение блока — storage memory под давлением.
Попробуй сам
from pyspark.sql import SparkSession
from pyspark import StorageLevel
spark = SparkSession.builder \
.master("local[2]") \
.config("spark.executor.memory", "1g") \
.appName("blockmanager-demo") \
.getOrCreate()
sc = spark.sparkContext
# Кэшируем RDD и смотрим BlockId'ы
rdd = sc.parallelize(range(100_000), numSlices=4)
cached = rdd.cache()
cached.count() # Материализуем
# Проверяем статус через SparkContext (Python API ограничен)
# Но можем посмотреть через Spark UI http://localhost:4040/storage
# Кэшируем с репликацией
rdd2 = sc.parallelize(range(50_000), numSlices=2)
# MEMORY_ONLY_2 = replication factor 2 (нужно 2+ executor'а для работы)
# В local режиме будет только 1 executor, реплика не создастся
cached2 = rdd2.persist(StorageLevel.MEMORY_ONLY_2)
cached2.count()
# Broadcast переменная -> BroadcastBlockId
lookup_table = {"US": "United States", "DE": "Germany", "RU": "Russia"}
bc = sc.broadcast(lookup_table)
# В Storage UI: broadcast_0_piece0 (если таблица < 4 MiB — один piece)
# Используем broadcast в задаче
def map_country(code):
return bc.value.get(code, "Unknown")
data = sc.parallelize(["US", "DE", "XX", "RU"])
result = data.map(map_country).collect()
print(result) # ['United States', 'Germany', 'Unknown', 'Russia']
# Освобождаем broadcast явно
bc.unpersist()
# Теперь Storage UI покажет, что broadcast_0_piece0 удалён
spark.stop()