Три shuffle writer: выбор алгоритма и форматы файлов
В прошлом уроке мы видели, что SortShuffleManager.registerShuffle возвращает один из трёх ShuffleHandle, и от этого зависит, какой writer будет создан. Пришло время разобрать каждый writer детально: что именно происходит внутри, почему алгоритмы такие разные, и что записывается на диск.
Три writer-а существуют не случайно — они оптимизированы для принципиально разных рабочих нагрузок. Понимание критериев выбора позволяет диагностировать медленный shuffle, а понимание форматов файлов — разобраться с ошибками и corruption.
Дерево решений: какой writer выбирается
SortShuffleManager.registerShuffle проверяет условия строго по порядку:
1. shouldBypassMergeSort?
- dep.mapSideCombine == false (нет pre-aggregation)
- AND numPartitions <= spark.shuffle.sort.bypassMergeThreshold (default: 200)
-> BypassMergeSortShuffleHandle -> BypassMergeSortShuffleWriter
2. canUseSerializedShuffle?
- dep.mapSideCombine == false
- AND serializer.supportsRelocationOfSerializedObjects == true
(KryoSerializer с auto-reset, или UnsafeRowSerializer из SQL)
- AND numPartitions <= MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE (16777216)
-> SerializedShuffleHandle -> UnsafeShuffleWriter
3. fallback:
-> BaseShuffleHandle -> SortShuffleWriter
Ключевые конфиги:
| Конфиг | По умолчанию | Что меняет |
|---|---|---|
spark.shuffle.sort.bypassMergeThreshold | 200 | Порог для BypassMergeSortShuffleWriter |
spark.shuffle.file.buffer | 32 КБ | Размер буфера при записи .data файла |
spark.shuffle.spill.compress | true | Сжатие при spill на диск |
spark.shuffle.compress | true | Сжатие итогового shuffle файла |
spark.shuffle.unsafe.file.output.buffer | 32 КБ | Буфер UnsafeShuffleWriter при слиянии spill |
BypassMergeSortShuffleWriter: hash-based для малого числа партиций
BypassMergeSortShuffleWriter — это hash shuffle, переродившийся внутри sort shuffle. Он создаёт по одному временному файлу на каждую output partition, пишет туда данные без сортировки, а в конце объединяет всё в одну пару .data + .index.
// BypassMergeSortShuffleWriter (упрощённо, Java)
public void write(Iterator<Product2<K, V>> records) {
// Открываем по одному DiskBlockObjectWriter на partition
for (int i = 0; i < numPartitions; i++) {
partitionWriters[i] = blockManager.getDiskWriter(
new TempShuffleBlockId(...),
output = getFile(shuffleId, mapId, i),
serializerInstance,
fileBufferSize,
writeMetrics);
}
// Для каждой записи: hash по partitionerId, пишем в нужный writer
while (records.hasNext()) {
Product2<K, V> record = records.next();
int partitionId = partitioner.getPartition(record._1());
partitionWriters[partitionId].write(record._1(), record._2());
}
// Закрываем все temporary writers
for (int i = 0; i < numPartitions; i++) {
partitionWriters[i].commitAndGet();
}
// Объединяем N temp-файлов в один .data файл
// и строим .index файл с offsets
writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
}
Почему ограничение 200 партиций
Каждый DiskBlockObjectWriter держит открытый FileOutputStream и буфер размером spark.shuffle.file.buffer (32 КБ по умолчанию). При 200 партициях это 200 * 32 КБ = 6.4 МБ памяти только на буферы writer-ов. При 10 000 партициях — 320 МБ. На кластере с 50 map tasks на executor это 16 ГБ только на write-буферы. Поэтому threshold 200 — это разумный практический предел.
Преимущество: нет сортировки
BypassMergeSortShuffleWriter не сортирует данные по ключу внутри партиции. Для операций, где порядок не важен (например, repartition, coalesce, простой groupBy без pre-aggregation через RDD), это значит нулевые затраты CPU на сортировку. Writer просто pipe-ает сериализованные байты напрямую в нужный временный файл.
Формат выходного файла
После слияния получается стандартная пара:
shuffle_{shuffleId}_{mapId}_0.data— конкатенация всех partition-файловshuffle_{shuffleId}_{mapId}_0.index—(numPartitions + 1)long-значений, байтовые offsets
Каждая partition пишется во временный файл. Финальный шаг: конкатенировать в .data, записать offsets в .index. Сортировки внутри partition нет.
UnsafeShuffleWriter: Tungsten serialized sort
UnsafeShuffleWriter — это «Tungsten shuffle». Он работает с сериализованными байтами напрямую, минуя Java object model. Вместо объектов в памяти хранятся сырые байты плюс 8-байтовый указатель на запись (4 байта partition ID + 4 байта upper bits адреса).
// UnsafeShuffleWriter: ключевая идея (Java)
// Запись:
public void insertRecordIntoSorter(Product2<K, V> record) {
// Сериализуем в буфер (без копирования через Java heap)
serBuffer.reset();
serOutputStream.writeKey(record._1(), OBJECT_CLASS_TAG);
serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
serOutputStream.flush();
final int serializedRecordSize = serBuffer.size();
// ShuffleExternalSorter сохраняет только pointer + partitionId
// Сами байты -- в TaskMemoryManager off-heap буферах
sorter.insertRecord(
serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET,
serializedRecordSize,
partitionId);
}
ShuffleExternalSorter: radix sort над указателями
ShuffleExternalSorter — сердце UnsafeShuffleWriter. Он хранит массив 8-байтовых записей: (partitionId 24 bits | address 40 bits). Этот массив можно сортировать без доступа к фактическим данным — просто radix sort по partition ID.
Когда память заканчивается (контролируется TaskMemoryManager), происходит spill:
- Radix sort текущего массива указателей по partition ID.
- Последовательная запись записей в порядке отсортированных указателей в spill-файл.
- Освобождение памяти.
При финальном слиянии spill-файлов UnsafeShuffleWriter делает merge by partition ID без повторной десериализации — просто конкатенирует уже-отсортированные блоки.
Spill 1: [part0: 500 records][part1: 300 records][part2: 200 records]
Spill 2: [part0: 400 records][part1: 250 records][part2: 350 records]
Merge: [part0: 900 records][part1: 550 records][part2: 550 records]
(просто конкатенация, без sort merge key-by-key)
supportsRelocationOfSerializedObjects: почему это важно
Для того чтобы перемещать сериализованные байты без десериализации, сериализатор должен гарантировать: если байты одной записи скопированы в другое место памяти, запись остаётся валидной. Это называется “relocation”.
JavaSerializer (по умолчанию для RDD API) использует Java object serialization, где объекты могут содержать абсолютные ссылки — такие байты нельзя перемещать. UnsafeRowSerializer (используемый Spark SQL для DataFrames) — бинарный формат с relative offsets, relocation всегда безопасна. KryoSerializer поддерживает relocation только если включён kryo.registrationRequired = false (auto-reset).
Именно поэтому DataFrame shuffle почти всегда использует UnsafeShuffleWriter — SQL layer использует UnsafeRowSerializer by default. RDD API с JavaSerializer — всегда SortShuffleWriter.
Данные хранятся как сырые байты в off-heap буферах. В памяти только массив 8-байтовых указателей. Radix sort по partition ID не трогает данные -- только указатели.
SortShuffleWriter: общий случай с ExternalSorter
SortShuffleWriter — fallback для всех случаев, когда два предыдущих не применимы. Главное отличие: он использует ExternalSorter, который работает с десериализованными Java-объектами и поддерживает map-side combine (pre-aggregation).
// SortShuffleWriter.write (Scala)
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner),
dep.keyOrdering, dep.serializer)
} else {
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner),
ordering = None, dep.serializer)
}
sorter.insertAll(records)
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
val partitionLengths = sorter.writePartitionedMapOutput(dep.shuffleId, mapId, shuffleBlockResolver, tmp)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
}
ExternalSorter: in-memory буфер + spill
ExternalSorter накапливает записи в PartitionedAppendOnlyMap (если есть combiner) или PartitionedPairBuffer (без combiner). Оба хранятся в JVM heap.
При достижении порога памяти (контролируется spark.shuffle.spill.numElementsForceSpillThreshold, или когда MemoryManager говорит “освободи”) происходит spill:
- Текущий буфер сортируется по
(partitionId, key). - Записывается в временный файл (через
DiskBlockObjectWriter). - Буфер очищается.
При записи финального файла ExternalSorter.writePartitionedMapOutput делает merge-sort всех spill-файлов и оставшегося буфера. Поскольку каждый spill уже отсортирован по (partitionId, key), merge требует только обхода с приоритетной очередью по partitionId.
Когда SortShuffleWriter обязателен
Единственный случай, когда нельзя использовать ни один другой writer — наличие mapSideCombine = true. Это происходит при:
reduceByKeyна уровне RDD APIaggregateByKeyна RDDcombineByKeyна RDD
DataFrame-операции (groupBy().agg()) не используют map-side combine на shuffle уровне (предварительная агрегация реализована отдельным физическим оператором HashAggregateExec перед ShuffleExchangeExec), поэтому для DataFrames SortShuffleWriter нужен редко.
Проверить, какой writer используется, можно через Spark UI -> Stages -> конкретный task -> Task Metrics. Поле “Shuffle Write: Records Written” vs “Shuffle Write: Bytes Written” даёт косвенную информацию. Точнее — включите DEBUG-лог для org.apache.spark.shuffle.sort.SortShuffleManager и ищите “Selecting … writer for shuffle”.
Механика spill: общие принципы
Все три writer-а (UnsafeShuffleWriter через ShuffleExternalSorter, SortShuffleWriter через ExternalSorter) поддерживают spill, но с разной механикой:
| Аспект | UnsafeShuffleWriter | SortShuffleWriter |
|---|---|---|
| Что хранится в памяти | Сырые байты (off-heap) | Java объекты (on-heap) |
| Единица spill | Страницы TaskMemoryManager | Сериализованные записи |
| Сортировка при spill | Radix sort по partitionId (указатели) | Comparison sort по (partitionId, key) |
| Merge | Конкатенация по partition | Merge-sort с приоритетной очередью |
| GC impact | Минимальный (off-heap) | Высокий при большом буфере |
Spill-файлы именуются как temp_shuffle_UUID.tmp и живут в spark.local.dir. После завершения map task они удаляются.
Метрику spill можно наблюдать в Spark UI: “Shuffle Spill (Memory)” — сколько данных было в памяти до спиллинга, “Shuffle Spill (Disk)” — сколько записано на диск (после сжатия, если spark.shuffle.spill.compress = true).
Формат файлов: .data и .index в деталях
Все три writer в итоге создают одинаковый формат через IndexShuffleBlockResolver.writeIndexFileAndCommit:
shuffle_{shuffleId}_{mapId}_0.data:
+-------------------+-------------------+-------------------+
| partition 0 bytes | partition 1 bytes | ... partition P |
+-------------------+-------------------+-------------------+
(может быть compressed блоки)
shuffle_{shuffleId}_{mapId}_0.index:
+--------+--------+--------+--------+--------+
| 0 | len0 | len0+1 | ... | total |
+--------+--------+--------+--------+--------+
(P+1) значений типа Long (64 бит каждое)
Важный нюанс: сжатие применяется на уровне блока партиции, а не на весь файл. Если spark.shuffle.compress = true (по умолчанию), каждый блок партиции независимо сжат кодеком, указанным в spark.io.compression.codec (по умолчанию lz4). Это позволяет читателю распаковывать только нужную партицию.
Атомарность записи
Запись файлов атомарна: сначала пишется во временный файл shuffle_{shuffleId}_{mapId}_0.data.tmp, потом rename(). Если executor падает во время записи — останется временный файл, но не будет partial .data файла. IndexShuffleBlockResolver при старте очищает незавершённые tmp-файлы.
Sequence diagram: полный путь UnsafeShuffleWriter
Попробуй сам
Чтобы наблюдать работу writer-ов в реальном задании:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("writers-demo")
# Снижаем порог для BypassMergeSort, чтобы увидеть его в действии
.config("spark.shuffle.sort.bypassMergeThreshold", "5")
.getOrCreate())
# Случай 1: BypassMergeSortShuffleWriter (4 partitions < threshold 5)
df_small = spark.range(100_000).repartition(4)
df_small.groupBy((F.col("id") % 4)).count().show()
# В Spark UI: Stage с repartition(4) использует BypassMergeSortShuffleWriter
# Случай 2: UnsafeShuffleWriter (DataFrame, много partitions, нет combine)
spark.conf.set("spark.sql.shuffle.partitions", "200")
df_large = spark.range(10_000_000)
df_large.groupBy((F.col("id") % 200)).count().show()
# UnsafeShuffleWriter: UnsafeRowSerializer supports relocation
# Случай 3: SortShuffleWriter (RDD с map-side combine)
rdd = spark.sparkContext.parallelize(range(1_000_000))
pairs = rdd.map(lambda x: (x % 100, 1))
# reduceByKey использует map-side combine -> SortShuffleWriter
result = pairs.reduceByKey(lambda a, b: a + b)
print(result.count())
# Мониторинг spill:
# Spark UI -> Stages -> Task List -> Shuffle Spill (Memory) / (Disk)
# Если Spill > 0: рассмотрите увеличение spark.executor.memory
# или spark.memory.fraction
Чтобы увидеть spill в логах:
# Включите DEBUG для sorter
spark = (SparkSession.builder
.appName("spill-demo")
.config("spark.executor.memory", "512m") # мало памяти, будет spill
.config("spark.executor.extraJavaOptions",
"-Dlog4j.logger.org.apache.spark.util.collection.ExternalSorter=DEBUG")
.getOrCreate())
# В логах:
# ExternalSorter: Spilling in-memory map of X.X MB to disk (iteration 1)
# ExternalSorter: Size of collection before spilling: X MB
Практические техники оптимизации shuffle
Лабораторная работа
В лабораторной вы заставляете Spark выбрать каждый из трёх shuffle writer-ов, заглядываете в реальные shuffle-файлы на диске, провоцируете spill нехваткой памяти и подключаете External Shuffle Service с push-based shuffle. Так теория writer-ов из урока превращается в наблюдаемое поведение.
cd labs/shuffle-lab
docker compose up -d
Полное описание и шаги проверки — в labs/shuffle-lab/README.md.