Shuffle: Самая дорогая операция Spark
Shuffle — это процесс перераспределения данных между executors. Каждый раз, когда Spark не может обработать данные в рамках одной партиции, ему нужно перемешать (shuffle) данные по сети. Это самая дорогая операция в Spark — как по времени, так и по ресурсам.
Что вызывает shuffle?
Любая операция, требующая перемещения данных между партициями:
| Операция | Почему shuffle? |
|---|---|
groupBy() + agg() | Одинаковые ключи должны оказаться на одном executor |
join() | Записи с одинаковыми join keys должны быть на одном executor |
repartition() | Явное перераспределение по новым партициям |
distinct() | Нужно сравнить все записи для нахождения уникальных |
sort() / orderBy() | Глобальная сортировка требует обмена данными |
coalesce() (увеличение) | Только при увеличении числа партиций (= repartition) |
Механика shuffle: Map и Reduce side
Shuffle состоит из двух фаз:
Shuffle Write (Map side)
Каждый task в предыдущей стадии записывает свои данные в shuffle файлы на локальном диске executor:
Executor 1:
Task 0 -> shuffle_0_0.data + shuffle_0_0.index
Task 1 -> shuffle_0_1.data + shuffle_0_1.index
Executor 2:
Task 2 -> shuffle_0_2.data + shuffle_0_2.index
Task 3 -> shuffle_0_3.data + shuffle_0_3.index
Каждый shuffle файл содержит данные, отсортированные и разбитые по target partitions. Index файл хранит offsets для быстрого доступа к нужным партициям.
Shuffle Read (Reduce side)
Tasks следующей стадии вытягивают данные, относящиеся к их партиции, со всех executors предыдущей стадии:
Stage 2, Task 0: читает partition 0 данные из:
- Executor 1 (shuffle_0_0.data, offset для partition 0)
- Executor 1 (shuffle_0_1.data, offset для partition 0)
- Executor 2 (shuffle_0_2.data, offset для partition 0)
- Executor 2 (shuffle_0_3.data, offset для partition 0)
Это означает N * M сетевых запросов: N tasks чтения * M executors записи. Для 200 read tasks и 10 executors это 2000 HTTP-запросов.
Sort-Merge Shuffle (по умолчанию)
С Spark 1.2 по умолчанию используется Sort-Merge Shuffle:
- Map side сортирует данные по partition ID и ключу
- Записывает в один файл (вместо отдельного файла на каждую partition)
- Создаёт index файл с offsets
- Reduce side читает нужные offsets и merge-сортирует результаты
Преимущество Sort-Merge: меньше файлов. Старый Hash Shuffle создавал M * R файлов (M map tasks * R reduce partitions), что приводило к миллионам мелких файлов.
spark.sql.shuffle.partitions
Параметр spark.sql.shuffle.partitions (по умолчанию 200) определяет количество партиций после каждого shuffle:
# По умолчанию 200 -- часто слишком много для небольших данных
spark.conf.set("spark.sql.shuffle.partitions", "200")
# Для 1 ГБ данных -- 8-20 партиций достаточно
spark.conf.set("spark.sql.shuffle.partitions", "10")
# Для 1 ТБ данных -- может потребоваться 2000+
spark.conf.set("spark.sql.shuffle.partitions", "2000")
Правило большого пальца: каждая shuffle partition должна содержать 128-256 МБ данных после сжатия. Если меньше — слишком много overhead от task scheduling. Если больше — risk OOM и долгий GC.
Неправильное число shuffle partitions — один из самых распространённых источников проблем с производительностью:
- Слишком мало (10 для 1 ТБ): каждая partition 100 ГБ, executors получают OOM
- Слишком много (10000 для 1 ГБ): 10000 tasks по 100 КБ, scheduling overhead доминирует
Минимизация shuffle
1. Broadcast Join вместо Sort-Merge Join
Если одна из таблиц маленькая (< 10 МБ по умолчанию), Spark отправляет её целиком на каждый executor через broadcast. Shuffle не нужен:
from pyspark.sql.functions import broadcast
# Принудительный broadcast маленькой таблицы
result = big_table.join(broadcast(small_table), "key")
# Настройка порога автоматического broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100m") # 100 МБ
2. coalesce() вместо repartition()
Для уменьшения числа партиций используйте coalesce() — он объединяет партиции без shuffle, перемещая данные только внутри executor:
# ПЛОХО: repartition всегда делает full shuffle
df.repartition(10).write.parquet("/output/")
# ХОРОШО: coalesce объединяет партиции без shuffle
df.coalesce(10).write.parquet("/output/")
coalesce() работает только для уменьшения числа партиций. Для увеличения используйте repartition() — coalesce(1000) на DataFrame с 100 партициями не создаст 1000 партиций.
3. Предварительная фильтрация
Фильтруйте данные до операций с shuffle, чтобы уменьшить объём перемещаемых данных:
# ПЛОХО: shuffle всех данных, потом фильтрация
result = orders.join(customers, "id").filter(orders.amount > 1000)
# ХОРОШО: Catalyst обычно делает это автоматически (predicate pushdown),
# но явная фильтрация до join гарантирует порядок
filtered_orders = orders.filter(orders.amount > 1000)
result = filtered_orders.join(customers, "id")
Сетевой overhead
Shuffle — это не просто перемещение данных. Каждый shuffle включает:
- Сериализация данных на map side
- Запись на диск (shuffle files)
- Сетевой трансфер (HTTP или Netty)
- Десериализация на reduce side
- Merge-сортировка для Sort-Merge Shuffle
Каждый шаг добавляет latency. Для 100 ГБ shuffle data при 10 Gbit/s сети — это минимум 80 секунд только на передачу, плюс сериализация, диск I/O, и merge.
Что дальше?
В следующем уроке мы разберём Memory Management — как Spark управляет памятью executor, что такое Unified Memory Manager, и как правильно настроить spark.memory.fraction для вашей нагрузки.