Learning Platform
Глоссарий Troubleshooting
Урок 05.02 · 28 мин
Продвинутый
ShuffleManagerSortShuffleManagerMapOutputTrackerShuffleHandleBlockManager

ShuffleManager и MapOutputTracker: реестр shuffle-выходов

В предыдущем уроке мы разобрали shuffle на уровне операций: что вызывает shuffle, что такое sort-merge, как выглядит пара файлов .data + .index. Теперь спускаемся на уровень исходников: кто принимает решение «какой writer использовать», как driver узнаёт, что все map tasks завершились, и как reduce task находит нужные байты среди десятков executor-ов.

Два ключевых участника этой истории — ShuffleManager и MapOutputTracker. Они работают на разных уровнях, но тесно связаны: один управляет lifecycle shuffle-зависимостей, другой хранит карту «где что лежит» и отдаёт её reduce-таскам по запросу.

ShuffleManager: интерфейс и единственная реализация

ShuffleManager — это trait (интерфейс) в пакете org.apache.spark.shuffle. Определён минимально:

// org.apache.spark.shuffle.ShuffleManager
private[spark] trait ShuffleManager {
  def registerShuffle[K, V, C](
      shuffleId: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle

  def getWriter[K, V](
      handle: ShuffleHandle,
      mapId: Long,
      context: TaskContext,
      metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V]

  def getReader[K, C](
      handle: ShuffleHandle,
      startMapIndex: Int,
      endMapIndex: Int,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext,
      metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]

  def unregisterShuffle(shuffleId: Int): Boolean

  def shuffleBlockResolver: ShuffleBlockResolver

  def stop(): Unit
}

В Apache Spark 1.x и 2.x существовал конфиг spark.shuffle.manager, позволявший подключать альтернативные реализации (HashShuffleManager был встроен до 2.0). В PR #23707 этот конфиг был удалён, и начиная с Spark 3.0 SortShuffleManager — единственная реализация. Он регистрируется в SparkEnv при старте executor-а и driver-а, живёт в SparkEnv.shuffleManager, и все компоненты получают к нему доступ через SparkEnv.get.shuffleManager.

NOTE

В Spark 4.0 существуют внешние плагины — например, RAPIDS ShuffleManager от NVIDIA. Они реализуют тот же ShuffleManager trait и подключаются через spark.shuffle.manager = com.nvidia.spark.rapids.RapidsShuffleManager. Но это внешние плагины, не встроенная часть Spark.

SortShuffleManager: регистрация shuffle и выбор handle

Когда DAGScheduler создаёт ShuffleMapStage для новой ShuffleDependency, он вызывает shuffleManager.registerShuffle(shuffleId, dependency). Этот метод принимает единственное ключевое решение всего shuffle pipeline: какой тип ShuffleHandle вернуть.

// SortShuffleManager.registerShuffle (упрощённо)
override def registerShuffle[K, V, C](
    shuffleId: Int,
    dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
  if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
    new BypassMergeSortShuffleHandle[K, V](shuffleId, dependency)
  } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
    new SerializedShuffleHandle[K, V](shuffleId, dependency)
  } else {
    new BaseShuffleHandle(shuffleId, dependency)
  }
}

Возвращённый ShuffleHandle — это не просто метка. Он упаковывается вместе с задачей и отправляется на executor. Там getWriter смотрит на тип handle и создаёт соответствующий writer. Таким образом, решение о типе writer принимается один раз на driver при регистрации shuffle, а не на каждом executor.

Три типа ShuffleHandle:

HandleWriterКогда выбирается
BypassMergeSortShuffleHandleBypassMergeSortShuffleWriterНет map-side combine + partitions <= threshold
SerializedShuffleHandleUnsafeShuffleWriterСериализатор поддерживает relocation + нет combine
BaseShuffleHandleSortShuffleWriterВсе остальные случаи

Подробные критерии выбора — тема следующего урока. Сейчас сосредоточимся на том, что происходит после выбора handle.

Путь ShuffleHandle: от driver до executor

registerShuffle вызывается один раз на driver. Handle сериализуется и отправляется вместе с описанием задачи. Executor десериализует его и вызывает getWriter, который создаёт нужный writer.

Driver: DAGSchedulerсоздаёт ShuffleMapStage для ShuffleDependencyDAGScheduler.getOrCreateShuffleMapStage вызывает ShuffleDependency.shuffleHandle, который через SparkContext регистрирует shuffle
registerShuffle(shuffleId, dep)
SortShuffleManager.registerShuffle()shouldBypassMergeSort? canUseSerializedShuffle?Метод проверяет dep.mapSideCombine, dep.partitioner.numPartitions, serializer.supportsRelocationOfSerializedObjects
возвращает один из трёх ShuffleHandle
ShuffleHandle (сериализован)упакован в ShuffleMapTask.taskBinaryTaskBinary -- это broadcast переменная, которую все executor-ы получают один раз. Содержит ShuffleHandle + RDD.
Executor: TaskRunner.run()
SortShuffleManager.getWriter(handle, ...)match handle type -> UnsafeShuffleWriter / BypassMergeSortShuffleWriter / SortShuffleWritergetWriter вызывается на каждый map task, но сам handle уже готов -- решение принято раньше.

ShuffleBlockResolver: IndexShuffleBlockResolver

SortShuffleManager использует IndexShuffleBlockResolver — реализацию ShuffleBlockResolver. Это компонент, который переводит логический ShuffleBlockId(shuffleId, mapId, reduceId) в физический путь на диске executor-а.

// IndexShuffleBlockResolver.getBlockData (упрощённо)
def getBlockData(blockId: ShuffleBlockId, dirs: Option[Array[String]]): ManagedBuffer = {
  val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId, dirs)
  val dataFile  = getDataFile(blockId.shuffleId, blockId.mapId, dirs)
  val in        = new DataInputStream(new FileInputStream(indexFile))
  try {
    in.skip(blockId.reduceId * 8L)     // 8 bytes per offset entry
    val offset = in.readLong()
    val nextOffset = in.readLong()
    new FileSegmentManagedBuffer(
      transportConf, dataFile, offset, nextOffset - offset)
  } finally {
    in.close()
  }
}

Файл .index содержит массив из (numPartitions + 1) long-значений. Каждый — байтовый offset в .data-файле. Чтобы прочитать partition reduceId, нужно перейти на позицию reduceId * 8 в index-файле, прочитать два long-а (start и end), и взять соответствующий диапазон байт из .data. Никакой дополнительной структуры — всё линейно.

По умолчанию файлы хранятся в директориях, заданных конфигом spark.local.dir (по умолчанию — системный java.io.tmpdir). Путь: ${spark.local.dir}/blockmgr-UUID/XX/shuffle_{shuffleId}_{mapId}_0.{data|index}, где XX — первые два символа hex-хеша от имени файла.

MapOutputTracker: архитектура master/worker

После того как map task записал shuffle-файлы на диск, он должен сообщить driver-у: «я сделал, вот мои метаданные». Это задача MapOutputTracker.

MapOutputTracker — абстрактный класс в org.apache.spark. Две его реализации живут в разных процессах:

  • MapOutputTrackerMaster — запускается на driver. Хранит реестр всех map-выходов для всех shuffle в JVM. Принимает регистрации map task-ов. Отвечает на запросы executor-ов (через Netty RPC).
  • MapOutputTrackerWorker — запускается на каждом executor. Имеет локальный кэш (ConcurrentHashMap). При cache-miss отправляет RPC-запрос на driver и получает ответ.
MapOutputTracker: master на driver, worker на executor

Одна сторона регистрирует выходы, другая запрашивает их. Кэширование на стороне worker сокращает RPC-трафик.

DRIVER: MapOutputTrackerMastermapStatuses: Map[shuffleId, Array[MapStatus]]mapStatuses хранит массив MapStatus для каждого shuffleId. Индекс в массиве = mapIndex (номер map task). null означает, что task ещё не завершён.
MapOutputTrackerMasterEndpointRPC endpoint: слушает запросы от workerРеализован как Spark RPC endpoint на стандартном порту driver. Принимает GetMapOutputStatuses(shuffleId), отвечает сериализованным массивом MapStatus.
EpochLong, инкрементируется при любом изменении реестраЕсли worker имеет устаревшую epoch — он перечитывает весь реестр для затронутого shuffleId. Это позволяет инвалидировать кэш после speculative execution.
EXECUTOR N: MapOutputTrackerWorkermapStatuses: ConcurrentHashMap[shuffleId, Array[MapStatus]]Локальный кэш. При hit — никаких сетевых вызовов. При miss — RPC к master.
fetching: HashSet[shuffleId]множество в-процессе-загружаемых shuffleIdПредотвращает дублирующие RPC: если два reduce-таска одного shuffleId одновременно промахнулись по кэшу, только один пойдёт за данными, другой подождёт.
trackerEndpoint: RpcEndpointRefссылка на MapOutputTrackerMasterEndpoint driver-аИнициализируется при старте executor: SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerWorker]

Жизненный цикл: от registerShuffle до reduce read

Проследим полный путь от регистрации shuffle до момента, когда reduce task знает, где лежат его данные.

Шаг 1: registerShuffle на driver

Когда DAGScheduler создаёт ShuffleMapStage, он вызывает mapOutputTracker.registerShuffle(shuffleId, numMapTasks). На MapOutputTrackerMaster это создаёт запись:

// MapOutputTrackerMaster.registerShuffle
def registerShuffle(shuffleId: Int, numMaps: Int, numReduces: Int): Unit = {
  shuffleStatuses.put(shuffleId, new ShuffleStatus(numMaps, numReduces))
}

ShuffleStatus — это контейнер для массива MapStatus. Изначально все элементы null.

Шаг 2: registerMapOutput после завершения map task

Когда ShuffleMapTask завершается, она возвращает MapStatus — небольшой объект, хранящий BlockManagerId executor-а и размеры всех партиций (в сжатом виде через MapStatus.apply). TaskScheduler передаёт этот результат в DAGScheduler.handleTaskCompletion, который вызывает:

mapOutputTracker.registerMapOutput(shuffleId, mapIndex, status)
// или после завершения всей стадии:
mapOutputTracker.registerMapOutputs(shuffleId, statuses, changeEpoch = true)

После этого mapStatuses(shuffleId)(mapIndex) перестаёт быть null.

Шаг 3: запрос от reduce task

Когда reduce task готов читать данные, BlockStoreShuffleReader вызывает:

val statuses = SparkEnv.get.mapOutputTracker
  .getMapSizesByExecutorId(shuffleId, startPartition, endPartition)

На MapOutputTrackerWorker это проверяет локальный кэш. При промахе — RPC к driver:

// MapOutputTrackerWorker.getStatuses (упрощённо)
private def getStatuses(shuffleId: Int): Array[MapStatus] = {
  val statuses = mapStatuses.get(shuffleId)
  if (statuses != null) return statuses

  // cache miss: идём к master
  val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
  val fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
  mapStatuses.put(shuffleId, fetchedStatuses)
  fetchedStatuses
}

getMapSizesByExecutorId затем разворачивает полученный Array[MapStatus] в Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])] — список блоков, сгруппированных по executor. Каждый элемент: BlockManagerId (адрес executor), список (ShuffleBlockId, size, mapIndex).

MapStatus: что внутри

MapStatus — это то, что каждый map task возвращает driver-у. Важно понимать, что в нём хранится.

// MapStatus -- sealed trait
sealed trait MapStatus {
  def location: BlockManagerId  // executor, где лежат файлы
  def getSizeForBlock(reduceId: Int): Long  // размер partition reduceId в байтах
  def getNumNonEmptyBlocks: Int
}

Размеры партиций хранятся не напрямую. CompressedMapStatus (обычный вариант) хранит массив байт, где каждый байт — логарифмически сжатый размер. HighlyCompressedMapStatus используется когда партиций больше 2000: хранит только среднее и список непустых партиций. Это существенно: для shuffle с 10000 partition-ами точные размеры не нужны, нужен только список непустых и их примерные размеры для планирования.

Конфиг spark.shuffle.minNumPartitionsToHighlyCompress (по умолчанию 2000) определяет переключение между реализациями.

Что происходит при потере executor

Одна из главных ролей MapOutputTrackerMaster — обработка сбоев. Когда executor умирает, DAGScheduler.handleExecutorLost вызывает:

mapOutputTracker.removeOutputsOnExecutor(executorId)

Это инвалидирует все MapStatus, которые были зарегистрированы с умершего executor. Соответствующие map tasks будут пересчитаны на других executor-ах. После пересчёта новые MapStatus будут зарегистрированы, и epoch инкрементируется — worker-ы поймут, что их кэш устарел.

Кэш worker-а хранит только массив MapStatus и не знает о epoch. При следующем shuffle read MapOutputTrackerWorker сравнивает свою epoch с той, что пришла в задаче (передаётся как поле ShuffleMapTask). Если локальная epoch отстала, кэш полностью очищается для данного shuffleId.

WARNING

В production это означает: если у вас часто умирают executor-ы в середине shuffle, reduce tasks будут видеть FetchFailed (не могут скачать блок с умершего executor). DAGScheduler перезапустит не только reduce stage, но и map stage для пострадавших map tasks. При этом в Spark UI вы увидите стадию, помеченную как “resubmitted”. Это не баг — это штатная recovery через MapOutputTracker.

Broadcast вместо RPC: оптимизация для больших shuffle

Для очень больших shuffle (много map tasks, много partitions) сериализованный Array[MapStatus] может быть несколько мегабайт. Отправлять его как RPC-ответ на каждый executor неэффективно. Начиная с Spark 3.0, если размер ответа превышает spark.shuffle.mapOutput.minSizeForBroadcast (по умолчанию 512 КБ), MapOutputTrackerMaster использует BroadcastManager для рассылки статусов как broadcast-переменной.

Это снижает нагрузку на driver: вместо N отдельных RPC-ответов — одна broadcast-переменная, которую executor-ы получают через BitTorrent-протокол Spark.

# Проверить, используется ли broadcast:
# В логах executor смотрите на:
# "Getting broadcast variable X for MapOutputTracker"
# вместо:
# "Fetching map output info from master"

# Настройка порога:
spark.conf.set("spark.shuffle.mapOutput.minSizeForBroadcast", "1m")

Попробуй сам

Чтобы увидеть MapOutputTracker в действии, добавьте в логи уровень DEBUG для конкретных классов:

from pyspark.sql import SparkSession

spark = (SparkSession.builder
  .appName("mot-debug")
  .config("spark.executor.extraJavaOptions",
          "-Dlog4j.logger.org.apache.spark.MapOutputTrackerWorker=DEBUG")
  .config("spark.driver.extraJavaOptions",
          "-Dlog4j.logger.org.apache.spark.MapOutputTrackerMaster=DEBUG")
  .getOrCreate())

# Создаём shuffle
df = spark.range(1_000_000).repartition(50)
result = df.groupBy((df.id % 10).alias("bucket")).count()
result.show()

# В логах driver вы увидите:
# MapOutputTrackerMaster: Registering shuffle 0 with 50 map tasks
# MapOutputTrackerMaster: Registered map output for shuffle 0, map 12
# ...
# В логах executor:
# MapOutputTrackerWorker: Fetching outputs for shuffle 0, reduces 0-10

Для более детального анализа смотрите Spark UI -> Stages -> конкретная shuffle stage -> Summary Metrics. Поле “Shuffle Write Time” показывает только время записи; время регистрации в MapOutputTracker входит в “Task Serialization Time” и накладные расходы планировщика.

В Spark UI на вкладке Environment -> Spark Properties вы можете проверить spark.shuffle.mapOutput.minSizeForBroadcast и убедиться, что broadcast threshold настроен разумно для вашего масштаба.

Оптимизация shuffle: практические техники
Проверка знанийKnowledge check
Map task на executor A завершился и зарегистрировал MapStatus в MapOutputTrackerMaster. Затем executor A упал. Reduce task на executor B попытался прочитать shuffle блок с executor A и получил FetchFailed. Что произойдёт дальше с точки зрения MapOutputTracker?
ОтветAnswer
DAGScheduler вызовет mapOutputTracker.removeOutputsOnExecutor(executorId A), что инвалидирует все MapStatus от executor A. Epoch инкрементируется. DAGScheduler пометит пострадавшие map tasks как FAILED и добавит их в очередь на повторное выполнение (на другом executor). После завершения повторных map tasks будут зарегистрированы новые MapStatus с новым BlockManagerId. Reduce tasks увидят инкрементированную epoch и очистят свой локальный кэш MapOutputTrackerWorker для данного shuffleId перед следующим чтением.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Когда именно SortShuffleManager принимает решение о типе ShuffleHandle (BypassMergeSort / Serialized / Base)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4