Learning Platform
Глоссарий Troubleshooting
Урок 05.03 · 32 мин
Продвинутый
ShuffleWriterBypassMergeSortShuffleWriterUnsafeShuffleWriterSortShuffleWriterSpillTungsten

Три shuffle writer: выбор алгоритма и форматы файлов

В прошлом уроке мы видели, что SortShuffleManager.registerShuffle возвращает один из трёх ShuffleHandle, и от этого зависит, какой writer будет создан. Пришло время разобрать каждый writer детально: что именно происходит внутри, почему алгоритмы такие разные, и что записывается на диск.

Три writer-а существуют не случайно — они оптимизированы для принципиально разных рабочих нагрузок. Понимание критериев выбора позволяет диагностировать медленный shuffle, а понимание форматов файлов — разобраться с ошибками и corruption.

Дерево решений: какой writer выбирается

SortShuffleManager.registerShuffle проверяет условия строго по порядку:

1. shouldBypassMergeSort?
   - dep.mapSideCombine == false  (нет pre-aggregation)
   - AND numPartitions <= spark.shuffle.sort.bypassMergeThreshold (default: 200)
   -> BypassMergeSortShuffleHandle -> BypassMergeSortShuffleWriter

2. canUseSerializedShuffle?
   - dep.mapSideCombine == false
   - AND serializer.supportsRelocationOfSerializedObjects == true
     (KryoSerializer с auto-reset, или UnsafeRowSerializer из SQL)
   - AND numPartitions <= MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE (16777216)
   -> SerializedShuffleHandle -> UnsafeShuffleWriter

3. fallback:
   -> BaseShuffleHandle -> SortShuffleWriter

Ключевые конфиги:

КонфигПо умолчаниюЧто меняет
spark.shuffle.sort.bypassMergeThreshold200Порог для BypassMergeSortShuffleWriter
spark.shuffle.file.buffer32 КБРазмер буфера при записи .data файла
spark.shuffle.spill.compresstrueСжатие при spill на диск
spark.shuffle.compresstrueСжатие итогового shuffle файла
spark.shuffle.unsafe.file.output.buffer32 КББуфер UnsafeShuffleWriter при слиянии spill

BypassMergeSortShuffleWriter: hash-based для малого числа партиций

BypassMergeSortShuffleWriter — это hash shuffle, переродившийся внутри sort shuffle. Он создаёт по одному временному файлу на каждую output partition, пишет туда данные без сортировки, а в конце объединяет всё в одну пару .data + .index.

// BypassMergeSortShuffleWriter (упрощённо, Java)
public void write(Iterator<Product2<K, V>> records) {
    // Открываем по одному DiskBlockObjectWriter на partition
    for (int i = 0; i < numPartitions; i++) {
        partitionWriters[i] = blockManager.getDiskWriter(
            new TempShuffleBlockId(...),
            output = getFile(shuffleId, mapId, i),
            serializerInstance,
            fileBufferSize,
            writeMetrics);
    }

    // Для каждой записи: hash по partitionerId, пишем в нужный writer
    while (records.hasNext()) {
        Product2<K, V> record = records.next();
        int partitionId = partitioner.getPartition(record._1());
        partitionWriters[partitionId].write(record._1(), record._2());
    }

    // Закрываем все temporary writers
    for (int i = 0; i < numPartitions; i++) {
        partitionWriters[i].commitAndGet();
    }

    // Объединяем N temp-файлов в один .data файл
    // и строим .index файл с offsets
    writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
}

Почему ограничение 200 партиций

Каждый DiskBlockObjectWriter держит открытый FileOutputStream и буфер размером spark.shuffle.file.buffer (32 КБ по умолчанию). При 200 партициях это 200 * 32 КБ = 6.4 МБ памяти только на буферы writer-ов. При 10 000 партициях — 320 МБ. На кластере с 50 map tasks на executor это 16 ГБ только на write-буферы. Поэтому threshold 200 — это разумный практический предел.

Преимущество: нет сортировки

BypassMergeSortShuffleWriter не сортирует данные по ключу внутри партиции. Для операций, где порядок не важен (например, repartition, coalesce, простой groupBy без pre-aggregation через RDD), это значит нулевые затраты CPU на сортировку. Writer просто pipe-ает сериализованные байты напрямую в нужный временный файл.

Формат выходного файла

После слияния получается стандартная пара:

  • shuffle_{shuffleId}_{mapId}_0.data — конкатенация всех partition-файлов
  • shuffle_{shuffleId}_{mapId}_0.index(numPartitions + 1) long-значений, байтовые offsets
BypassMergeSortShuffleWriter: N temp -> 1 .data + .index

Каждая partition пишется во временный файл. Финальный шаг: конкатенировать в .data, записать offsets в .index. Сортировки внутри partition нет.

temp_shuffle_0_0_0.tmppartition 0: (k1,v1),(k3,v2),...Временный файл на partition 0. Создаётся DiskBlockObjectWriter с буфером 32KB. Сериализован без сортировки.
temp_shuffle_0_0_1.tmppartition 1: (k2,v5),(k8,v3),...Аналогично для partition 1. Каждый файл -- отдельный FileOutputStream.
temp_shuffle_0_0_P.tmppartition P: ...P = numPartitions-1. Итого P+1 временных файлов открыты одновременно.
writeIndexFileAndCommit: конкатенировать все tmp
shuffle_0_0_0.data[part0 bytes][part1 bytes]...[partP bytes]Один непрерывный файл. Партиции идут последовательно, без padding.
shuffle_0_0_0.index[0][len0][len0+len1]...[total](P+1) значений типа Long. index[i] = начало partition i в .data файле. index[P+1] = размер файла.

UnsafeShuffleWriter: Tungsten serialized sort

UnsafeShuffleWriter — это «Tungsten shuffle». Он работает с сериализованными байтами напрямую, минуя Java object model. Вместо объектов в памяти хранятся сырые байты плюс 8-байтовый указатель на запись (4 байта partition ID + 4 байта upper bits адреса).

// UnsafeShuffleWriter: ключевая идея (Java)
// Запись:
public void insertRecordIntoSorter(Product2<K, V> record) {
    // Сериализуем в буфер (без копирования через Java heap)
    serBuffer.reset();
    serOutputStream.writeKey(record._1(), OBJECT_CLASS_TAG);
    serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
    serOutputStream.flush();

    final int serializedRecordSize = serBuffer.size();
    // ShuffleExternalSorter сохраняет только pointer + partitionId
    // Сами байты -- в TaskMemoryManager off-heap буферах
    sorter.insertRecord(
        serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET,
        serializedRecordSize,
        partitionId);
}

ShuffleExternalSorter: radix sort над указателями

ShuffleExternalSorter — сердце UnsafeShuffleWriter. Он хранит массив 8-байтовых записей: (partitionId 24 bits | address 40 bits). Этот массив можно сортировать без доступа к фактическим данным — просто radix sort по partition ID.

Когда память заканчивается (контролируется TaskMemoryManager), происходит spill:

  1. Radix sort текущего массива указателей по partition ID.
  2. Последовательная запись записей в порядке отсортированных указателей в spill-файл.
  3. Освобождение памяти.

При финальном слиянии spill-файлов UnsafeShuffleWriter делает merge by partition ID без повторной десериализации — просто конкатенирует уже-отсортированные блоки.

Spill 1: [part0: 500 records][part1: 300 records][part2: 200 records]
Spill 2: [part0: 400 records][part1: 250 records][part2: 350 records]
Merge:   [part0: 900 records][part1: 550 records][part2: 550 records]
           (просто конкатенация, без sort merge key-by-key)

supportsRelocationOfSerializedObjects: почему это важно

Для того чтобы перемещать сериализованные байты без десериализации, сериализатор должен гарантировать: если байты одной записи скопированы в другое место памяти, запись остаётся валидной. Это называется “relocation”.

JavaSerializer (по умолчанию для RDD API) использует Java object serialization, где объекты могут содержать абсолютные ссылки — такие байты нельзя перемещать. UnsafeRowSerializer (используемый Spark SQL для DataFrames) — бинарный формат с relative offsets, relocation всегда безопасна. KryoSerializer поддерживает relocation только если включён kryo.registrationRequired = false (auto-reset).

Именно поэтому DataFrame shuffle почти всегда использует UnsafeShuffleWriter — SQL layer использует UnsafeRowSerializer by default. RDD API с JavaSerializer — всегда SortShuffleWriter.

UnsafeShuffleWriter: pointer array + off-heap bytes

Данные хранятся как сырые байты в off-heap буферах. В памяти только массив 8-байтовых указателей. Radix sort по partition ID не трогает данные -- только указатели.

Pointer Array (on-heap)[(part=2, addr=0x1A00), (part=0, addr=0x1B40), (part=1, addr=0x1C20), ...]8 байт на запись: старшие 24 бита = partitionId (max 16M partitions), младшие 40 бит = адрес записи в off-heap.
radix sort по partitionId[(part=0,...),(part=1,...),(part=2,...)]Sort не трогает off-heap данные. Только переставляет 8-байтовые указатели.
Off-heap buffers (TaskMemoryManager)raw serialized bytes: [len][bytes][len][bytes]...Каждая запись: 4-байтовый length prefix + сериализованные байты. UnsafeRowSerializer: binary row без Java header.
Spill: при нехватке памятиflush отсортированный массив в spill файл, освободить буферыSpillInfo хранит File + array[numPartitions] sizes для каждого spill. При merge: итерируемся по spill файлам partition за partition.

SortShuffleWriter: общий случай с ExternalSorter

SortShuffleWriter — fallback для всех случаев, когда два предыдущих не применимы. Главное отличие: он использует ExternalSorter, который работает с десериализованными Java-объектами и поддерживает map-side combine (pre-aggregation).

// SortShuffleWriter.write (Scala)
override def write(records: Iterator[Product2[K, V]]): Unit = {
  sorter = if (dep.mapSideCombine) {
    new ExternalSorter[K, V, C](
      context, dep.aggregator, Some(dep.partitioner),
      dep.keyOrdering, dep.serializer)
  } else {
    new ExternalSorter[K, V, V](
      context, aggregator = None, Some(dep.partitioner),
      ordering = None, dep.serializer)
  }
  sorter.insertAll(records)

  val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
  val tmp = Utils.tempFileWith(output)
  val partitionLengths = sorter.writePartitionedMapOutput(dep.shuffleId, mapId, shuffleBlockResolver, tmp)
  shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
  mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
}

ExternalSorter: in-memory буфер + spill

ExternalSorter накапливает записи в PartitionedAppendOnlyMap (если есть combiner) или PartitionedPairBuffer (без combiner). Оба хранятся в JVM heap.

При достижении порога памяти (контролируется spark.shuffle.spill.numElementsForceSpillThreshold, или когда MemoryManager говорит “освободи”) происходит spill:

  1. Текущий буфер сортируется по (partitionId, key).
  2. Записывается в временный файл (через DiskBlockObjectWriter).
  3. Буфер очищается.

При записи финального файла ExternalSorter.writePartitionedMapOutput делает merge-sort всех spill-файлов и оставшегося буфера. Поскольку каждый spill уже отсортирован по (partitionId, key), merge требует только обхода с приоритетной очередью по partitionId.

Когда SortShuffleWriter обязателен

Единственный случай, когда нельзя использовать ни один другой writer — наличие mapSideCombine = true. Это происходит при:

  • reduceByKey на уровне RDD API
  • aggregateByKey на RDD
  • combineByKey на RDD

DataFrame-операции (groupBy().agg()) не используют map-side combine на shuffle уровне (предварительная агрегация реализована отдельным физическим оператором HashAggregateExec перед ShuffleExchangeExec), поэтому для DataFrames SortShuffleWriter нужен редко.

TIP

Проверить, какой writer используется, можно через Spark UI -> Stages -> конкретный task -> Task Metrics. Поле “Shuffle Write: Records Written” vs “Shuffle Write: Bytes Written” даёт косвенную информацию. Точнее — включите DEBUG-лог для org.apache.spark.shuffle.sort.SortShuffleManager и ищите “Selecting … writer for shuffle”.

Механика spill: общие принципы

Все три writer-а (UnsafeShuffleWriter через ShuffleExternalSorter, SortShuffleWriter через ExternalSorter) поддерживают spill, но с разной механикой:

АспектUnsafeShuffleWriterSortShuffleWriter
Что хранится в памятиСырые байты (off-heap)Java объекты (on-heap)
Единица spillСтраницы TaskMemoryManagerСериализованные записи
Сортировка при spillRadix sort по partitionId (указатели)Comparison sort по (partitionId, key)
MergeКонкатенация по partitionMerge-sort с приоритетной очередью
GC impactМинимальный (off-heap)Высокий при большом буфере

Spill-файлы именуются как temp_shuffle_UUID.tmp и живут в spark.local.dir. После завершения map task они удаляются.

Метрику spill можно наблюдать в Spark UI: “Shuffle Spill (Memory)” — сколько данных было в памяти до спиллинга, “Shuffle Spill (Disk)” — сколько записано на диск (после сжатия, если spark.shuffle.spill.compress = true).

Формат файлов: .data и .index в деталях

Все три writer в итоге создают одинаковый формат через IndexShuffleBlockResolver.writeIndexFileAndCommit:

shuffle_{shuffleId}_{mapId}_0.data:
+-------------------+-------------------+-------------------+
| partition 0 bytes | partition 1 bytes | ... partition P   |
+-------------------+-------------------+-------------------+
  (может быть compressed блоки)

shuffle_{shuffleId}_{mapId}_0.index:
+--------+--------+--------+--------+--------+
|   0    |  len0  | len0+1 |  ...   | total  |
+--------+--------+--------+--------+--------+
  (P+1) значений типа Long (64 бит каждое)

Важный нюанс: сжатие применяется на уровне блока партиции, а не на весь файл. Если spark.shuffle.compress = true (по умолчанию), каждый блок партиции независимо сжат кодеком, указанным в spark.io.compression.codec (по умолчанию lz4). Это позволяет читателю распаковывать только нужную партицию.

Атомарность записи

Запись файлов атомарна: сначала пишется во временный файл shuffle_{shuffleId}_{mapId}_0.data.tmp, потом rename(). Если executor падает во время записи — останется временный файл, но не будет partial .data файла. IndexShuffleBlockResolver при старте очищает незавершённые tmp-файлы.

Sequence diagram: полный путь UnsafeShuffleWriter

Попробуй сам

Чтобы наблюдать работу writer-ов в реальном задании:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
  .appName("writers-demo")
  # Снижаем порог для BypassMergeSort, чтобы увидеть его в действии
  .config("spark.shuffle.sort.bypassMergeThreshold", "5")
  .getOrCreate())

# Случай 1: BypassMergeSortShuffleWriter (4 partitions < threshold 5)
df_small = spark.range(100_000).repartition(4)
df_small.groupBy((F.col("id") % 4)).count().show()
# В Spark UI: Stage с repartition(4) использует BypassMergeSortShuffleWriter

# Случай 2: UnsafeShuffleWriter (DataFrame, много partitions, нет combine)
spark.conf.set("spark.sql.shuffle.partitions", "200")
df_large = spark.range(10_000_000)
df_large.groupBy((F.col("id") % 200)).count().show()
# UnsafeShuffleWriter: UnsafeRowSerializer supports relocation

# Случай 3: SortShuffleWriter (RDD с map-side combine)
rdd = spark.sparkContext.parallelize(range(1_000_000))
pairs = rdd.map(lambda x: (x % 100, 1))
# reduceByKey использует map-side combine -> SortShuffleWriter
result = pairs.reduceByKey(lambda a, b: a + b)
print(result.count())

# Мониторинг spill:
# Spark UI -> Stages -> Task List -> Shuffle Spill (Memory) / (Disk)
# Если Spill > 0: рассмотрите увеличение spark.executor.memory
# или spark.memory.fraction

Чтобы увидеть spill в логах:

# Включите DEBUG для sorter
spark = (SparkSession.builder
  .appName("spill-demo")
  .config("spark.executor.memory", "512m")  # мало памяти, будет spill
  .config("spark.executor.extraJavaOptions",
          "-Dlog4j.logger.org.apache.spark.util.collection.ExternalSorter=DEBUG")
  .getOrCreate())

# В логах:
# ExternalSorter: Spilling in-memory map of X.X MB to disk (iteration 1)
# ExternalSorter: Size of collection before spilling: X MB
Практические техники оптимизации shuffle
Проверка знанийKnowledge check
Senior data-инженер видит в Spark UI, что стадия groupBy().agg() с 200 shuffle партициями использует очень большой 'Shuffle Spill (Disk)' при практически нулевом 'Shuffle Spill (Memory)'. Какой writer скорее всего работает, и почему spill (Disk) может быть намного больше spill (Memory)?
ОтветAnswer
Для DataFrame groupBy().agg() с 200 партициями используется UnsafeShuffleWriter (UnsafeRowSerializer поддерживает relocation, 200 > bypassMergeThreshold). UnsafeShuffleWriter хранит данные в off-heap буферах TaskMemoryManager. 'Shuffle Spill (Memory)' показывает размер данных в памяти до спиллинга (сырые байты). 'Shuffle Spill (Disk)' показывает размер записанных на диск данных -- он меньше или равен Memory spill при включённом spark.shuffle.spill.compress=true (lz4 по умолчанию). Если Disk намного больше Memory -- это аномалия, требующая проверки сжатия. В норме Disk меньше Memory из-за сжатия. Если spill большой в целом -- рассмотрите увеличение executor памяти или уменьшение spark.sql.shuffle.partitions для снижения параллелизма per-executor.

Лабораторная работа

В лабораторной вы заставляете Spark выбрать каждый из трёх shuffle writer-ов, заглядываете в реальные shuffle-файлы на диске, провоцируете spill нехваткой памяти и подключаете External Shuffle Service с push-based shuffle. Так теория writer-ов из урока превращается в наблюдаемое поведение.

cd labs/shuffle-lab
docker compose up -d

Полное описание и шаги проверки — в labs/shuffle-lab/README.md.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Cluster имеет DataFrame-задание с groupBy().agg() и 500 shuffle-партициями. Какой shuffle writer будет выбран и почему?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4