ShuffleManager и MapOutputTracker: реестр shuffle-выходов
В предыдущем уроке мы разобрали shuffle на уровне операций: что вызывает shuffle, что такое sort-merge, как выглядит пара файлов .data + .index. Теперь спускаемся на уровень исходников: кто принимает решение «какой writer использовать», как driver узнаёт, что все map tasks завершились, и как reduce task находит нужные байты среди десятков executor-ов.
Два ключевых участника этой истории — ShuffleManager и MapOutputTracker. Они работают на разных уровнях, но тесно связаны: один управляет lifecycle shuffle-зависимостей, другой хранит карту «где что лежит» и отдаёт её reduce-таскам по запросу.
ShuffleManager: интерфейс и единственная реализация
ShuffleManager — это trait (интерфейс) в пакете org.apache.spark.shuffle. Определён минимально:
// org.apache.spark.shuffle.ShuffleManager
private[spark] trait ShuffleManager {
def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
def getWriter[K, V](
handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V]
def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
def unregisterShuffle(shuffleId: Int): Boolean
def shuffleBlockResolver: ShuffleBlockResolver
def stop(): Unit
}
В Apache Spark 1.x и 2.x существовал конфиг spark.shuffle.manager, позволявший подключать альтернативные реализации (HashShuffleManager был встроен до 2.0). В PR #23707 этот конфиг был удалён, и начиная с Spark 3.0 SortShuffleManager — единственная реализация. Он регистрируется в SparkEnv при старте executor-а и driver-а, живёт в SparkEnv.shuffleManager, и все компоненты получают к нему доступ через SparkEnv.get.shuffleManager.
В Spark 4.0 существуют внешние плагины — например, RAPIDS ShuffleManager от NVIDIA. Они реализуют тот же ShuffleManager trait и подключаются через spark.shuffle.manager = com.nvidia.spark.rapids.RapidsShuffleManager. Но это внешние плагины, не встроенная часть Spark.
SortShuffleManager: регистрация shuffle и выбор handle
Когда DAGScheduler создаёт ShuffleMapStage для новой ShuffleDependency, он вызывает shuffleManager.registerShuffle(shuffleId, dependency). Этот метод принимает единственное ключевое решение всего shuffle pipeline: какой тип ShuffleHandle вернуть.
// SortShuffleManager.registerShuffle (упрощённо)
override def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
new BypassMergeSortShuffleHandle[K, V](shuffleId, dependency)
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
new SerializedShuffleHandle[K, V](shuffleId, dependency)
} else {
new BaseShuffleHandle(shuffleId, dependency)
}
}
Возвращённый ShuffleHandle — это не просто метка. Он упаковывается вместе с задачей и отправляется на executor. Там getWriter смотрит на тип handle и создаёт соответствующий writer. Таким образом, решение о типе writer принимается один раз на driver при регистрации shuffle, а не на каждом executor.
Три типа ShuffleHandle:
| Handle | Writer | Когда выбирается |
|---|---|---|
BypassMergeSortShuffleHandle | BypassMergeSortShuffleWriter | Нет map-side combine + partitions <= threshold |
SerializedShuffleHandle | UnsafeShuffleWriter | Сериализатор поддерживает relocation + нет combine |
BaseShuffleHandle | SortShuffleWriter | Все остальные случаи |
Подробные критерии выбора — тема следующего урока. Сейчас сосредоточимся на том, что происходит после выбора handle.
registerShuffle вызывается один раз на driver. Handle сериализуется и отправляется вместе с описанием задачи. Executor десериализует его и вызывает getWriter, который создаёт нужный writer.
ShuffleBlockResolver: IndexShuffleBlockResolver
SortShuffleManager использует IndexShuffleBlockResolver — реализацию ShuffleBlockResolver. Это компонент, который переводит логический ShuffleBlockId(shuffleId, mapId, reduceId) в физический путь на диске executor-а.
// IndexShuffleBlockResolver.getBlockData (упрощённо)
def getBlockData(blockId: ShuffleBlockId, dirs: Option[Array[String]]): ManagedBuffer = {
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId, dirs)
val dataFile = getDataFile(blockId.shuffleId, blockId.mapId, dirs)
val in = new DataInputStream(new FileInputStream(indexFile))
try {
in.skip(blockId.reduceId * 8L) // 8 bytes per offset entry
val offset = in.readLong()
val nextOffset = in.readLong()
new FileSegmentManagedBuffer(
transportConf, dataFile, offset, nextOffset - offset)
} finally {
in.close()
}
}
Файл .index содержит массив из (numPartitions + 1) long-значений. Каждый — байтовый offset в .data-файле. Чтобы прочитать partition reduceId, нужно перейти на позицию reduceId * 8 в index-файле, прочитать два long-а (start и end), и взять соответствующий диапазон байт из .data. Никакой дополнительной структуры — всё линейно.
По умолчанию файлы хранятся в директориях, заданных конфигом spark.local.dir (по умолчанию — системный java.io.tmpdir). Путь: ${spark.local.dir}/blockmgr-UUID/XX/shuffle_{shuffleId}_{mapId}_0.{data|index}, где XX — первые два символа hex-хеша от имени файла.
MapOutputTracker: архитектура master/worker
После того как map task записал shuffle-файлы на диск, он должен сообщить driver-у: «я сделал, вот мои метаданные». Это задача MapOutputTracker.
MapOutputTracker — абстрактный класс в org.apache.spark. Две его реализации живут в разных процессах:
MapOutputTrackerMaster— запускается на driver. Хранит реестр всех map-выходов для всех shuffle в JVM. Принимает регистрации map task-ов. Отвечает на запросы executor-ов (через Netty RPC).MapOutputTrackerWorker— запускается на каждом executor. Имеет локальный кэш (ConcurrentHashMap). При cache-miss отправляет RPC-запрос на driver и получает ответ.
Одна сторона регистрирует выходы, другая запрашивает их. Кэширование на стороне worker сокращает RPC-трафик.
Жизненный цикл: от registerShuffle до reduce read
Проследим полный путь от регистрации shuffle до момента, когда reduce task знает, где лежат его данные.
Шаг 1: registerShuffle на driver
Когда DAGScheduler создаёт ShuffleMapStage, он вызывает mapOutputTracker.registerShuffle(shuffleId, numMapTasks). На MapOutputTrackerMaster это создаёт запись:
// MapOutputTrackerMaster.registerShuffle
def registerShuffle(shuffleId: Int, numMaps: Int, numReduces: Int): Unit = {
shuffleStatuses.put(shuffleId, new ShuffleStatus(numMaps, numReduces))
}
ShuffleStatus — это контейнер для массива MapStatus. Изначально все элементы null.
Шаг 2: registerMapOutput после завершения map task
Когда ShuffleMapTask завершается, она возвращает MapStatus — небольшой объект, хранящий BlockManagerId executor-а и размеры всех партиций (в сжатом виде через MapStatus.apply). TaskScheduler передаёт этот результат в DAGScheduler.handleTaskCompletion, который вызывает:
mapOutputTracker.registerMapOutput(shuffleId, mapIndex, status)
// или после завершения всей стадии:
mapOutputTracker.registerMapOutputs(shuffleId, statuses, changeEpoch = true)
После этого mapStatuses(shuffleId)(mapIndex) перестаёт быть null.
Шаг 3: запрос от reduce task
Когда reduce task готов читать данные, BlockStoreShuffleReader вызывает:
val statuses = SparkEnv.get.mapOutputTracker
.getMapSizesByExecutorId(shuffleId, startPartition, endPartition)
На MapOutputTrackerWorker это проверяет локальный кэш. При промахе — RPC к driver:
// MapOutputTrackerWorker.getStatuses (упрощённо)
private def getStatuses(shuffleId: Int): Array[MapStatus] = {
val statuses = mapStatuses.get(shuffleId)
if (statuses != null) return statuses
// cache miss: идём к master
val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
val fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
mapStatuses.put(shuffleId, fetchedStatuses)
fetchedStatuses
}
getMapSizesByExecutorId затем разворачивает полученный Array[MapStatus] в Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])] — список блоков, сгруппированных по executor. Каждый элемент: BlockManagerId (адрес executor), список (ShuffleBlockId, size, mapIndex).
MapStatus: что внутри
MapStatus — это то, что каждый map task возвращает driver-у. Важно понимать, что в нём хранится.
// MapStatus -- sealed trait
sealed trait MapStatus {
def location: BlockManagerId // executor, где лежат файлы
def getSizeForBlock(reduceId: Int): Long // размер partition reduceId в байтах
def getNumNonEmptyBlocks: Int
}
Размеры партиций хранятся не напрямую. CompressedMapStatus (обычный вариант) хранит массив байт, где каждый байт — логарифмически сжатый размер. HighlyCompressedMapStatus используется когда партиций больше 2000: хранит только среднее и список непустых партиций. Это существенно: для shuffle с 10000 partition-ами точные размеры не нужны, нужен только список непустых и их примерные размеры для планирования.
Конфиг spark.shuffle.minNumPartitionsToHighlyCompress (по умолчанию 2000) определяет переключение между реализациями.
Что происходит при потере executor
Одна из главных ролей MapOutputTrackerMaster — обработка сбоев. Когда executor умирает, DAGScheduler.handleExecutorLost вызывает:
mapOutputTracker.removeOutputsOnExecutor(executorId)
Это инвалидирует все MapStatus, которые были зарегистрированы с умершего executor. Соответствующие map tasks будут пересчитаны на других executor-ах. После пересчёта новые MapStatus будут зарегистрированы, и epoch инкрементируется — worker-ы поймут, что их кэш устарел.
Кэш worker-а хранит только массив MapStatus и не знает о epoch. При следующем shuffle read MapOutputTrackerWorker сравнивает свою epoch с той, что пришла в задаче (передаётся как поле ShuffleMapTask). Если локальная epoch отстала, кэш полностью очищается для данного shuffleId.
В production это означает: если у вас часто умирают executor-ы в середине shuffle, reduce tasks будут видеть FetchFailed (не могут скачать блок с умершего executor). DAGScheduler перезапустит не только reduce stage, но и map stage для пострадавших map tasks. При этом в Spark UI вы увидите стадию, помеченную как “resubmitted”. Это не баг — это штатная recovery через MapOutputTracker.
Broadcast вместо RPC: оптимизация для больших shuffle
Для очень больших shuffle (много map tasks, много partitions) сериализованный Array[MapStatus] может быть несколько мегабайт. Отправлять его как RPC-ответ на каждый executor неэффективно. Начиная с Spark 3.0, если размер ответа превышает spark.shuffle.mapOutput.minSizeForBroadcast (по умолчанию 512 КБ), MapOutputTrackerMaster использует BroadcastManager для рассылки статусов как broadcast-переменной.
Это снижает нагрузку на driver: вместо N отдельных RPC-ответов — одна broadcast-переменная, которую executor-ы получают через BitTorrent-протокол Spark.
# Проверить, используется ли broadcast:
# В логах executor смотрите на:
# "Getting broadcast variable X for MapOutputTracker"
# вместо:
# "Fetching map output info from master"
# Настройка порога:
spark.conf.set("spark.shuffle.mapOutput.minSizeForBroadcast", "1m")
Попробуй сам
Чтобы увидеть MapOutputTracker в действии, добавьте в логи уровень DEBUG для конкретных классов:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("mot-debug")
.config("spark.executor.extraJavaOptions",
"-Dlog4j.logger.org.apache.spark.MapOutputTrackerWorker=DEBUG")
.config("spark.driver.extraJavaOptions",
"-Dlog4j.logger.org.apache.spark.MapOutputTrackerMaster=DEBUG")
.getOrCreate())
# Создаём shuffle
df = spark.range(1_000_000).repartition(50)
result = df.groupBy((df.id % 10).alias("bucket")).count()
result.show()
# В логах driver вы увидите:
# MapOutputTrackerMaster: Registering shuffle 0 with 50 map tasks
# MapOutputTrackerMaster: Registered map output for shuffle 0, map 12
# ...
# В логах executor:
# MapOutputTrackerWorker: Fetching outputs for shuffle 0, reduces 0-10
Для более детального анализа смотрите Spark UI -> Stages -> конкретная shuffle stage -> Summary Metrics. Поле “Shuffle Write Time” показывает только время записи; время регистрации в MapOutputTracker входит в “Task Serialization Time” и накладные расходы планировщика.
В Spark UI на вкладке Environment -> Spark Properties вы можете проверить spark.shuffle.mapOutput.minSizeForBroadcast и убедиться, что broadcast threshold настроен разумно для вашего масштаба.