Learning Platform
Глоссарий Troubleshooting
Урок 05.01 · 20 мин
Продвинутый
ShuffleSort MergeHash ShufflePartitionNetwork I/O

Shuffle: Самая дорогая операция Spark

Shuffle — это процесс перераспределения данных между executors. Каждый раз, когда Spark не может обработать данные в рамках одной партиции, ему нужно перемешать (shuffle) данные по сети. Это самая дорогая операция в Spark — как по времени, так и по ресурсам.

Что вызывает shuffle?

Любая операция, требующая перемещения данных между партициями:

ОперацияПочему shuffle?
groupBy() + agg()Одинаковые ключи должны оказаться на одном executor
join()Записи с одинаковыми join keys должны быть на одном executor
repartition()Явное перераспределение по новым партициям
distinct()Нужно сравнить все записи для нахождения уникальных
sort() / orderBy()Глобальная сортировка требует обмена данными
coalesce() (увеличение)Только при увеличении числа партиций (= repartition)

Механика shuffle: Map и Reduce side

Shuffle состоит из двух фаз:

Shuffle Write (Map side)

Каждый task в предыдущей стадии записывает свои данные в shuffle файлы на локальном диске executor:

Executor 1:
  Task 0 -> shuffle_0_0.data + shuffle_0_0.index
  Task 1 -> shuffle_0_1.data + shuffle_0_1.index

Executor 2:
  Task 2 -> shuffle_0_2.data + shuffle_0_2.index
  Task 3 -> shuffle_0_3.data + shuffle_0_3.index

Каждый shuffle файл содержит данные, отсортированные и разбитые по target partitions. Index файл хранит offsets для быстрого доступа к нужным партициям.

Shuffle Read (Reduce side)

Tasks следующей стадии вытягивают данные, относящиеся к их партиции, со всех executors предыдущей стадии:

Stage 2, Task 0: читает partition 0 данные из:
  - Executor 1 (shuffle_0_0.data, offset для partition 0)
  - Executor 1 (shuffle_0_1.data, offset для partition 0)
  - Executor 2 (shuffle_0_2.data, offset для partition 0)
  - Executor 2 (shuffle_0_3.data, offset для partition 0)

Это означает N * M сетевых запросов: N tasks чтения * M executors записи. Для 200 read tasks и 10 executors это 2000 HTTP-запросов.

Sort-Merge Shuffle (по умолчанию)

С Spark 1.2 по умолчанию используется Sort-Merge Shuffle:

  1. Map side сортирует данные по partition ID и ключу
  2. Записывает в один файл (вместо отдельного файла на каждую partition)
  3. Создаёт index файл с offsets
  4. Reduce side читает нужные offsets и merge-сортирует результаты

Преимущество Sort-Merge: меньше файлов. Старый Hash Shuffle создавал M * R файлов (M map tasks * R reduce partitions), что приводило к миллионам мелких файлов.

spark.sql.shuffle.partitions

Параметр spark.sql.shuffle.partitions (по умолчанию 200) определяет количество партиций после каждого shuffle:

# По умолчанию 200 -- часто слишком много для небольших данных
spark.conf.set("spark.sql.shuffle.partitions", "200")

# Для 1 ГБ данных -- 8-20 партиций достаточно
spark.conf.set("spark.sql.shuffle.partitions", "10")

# Для 1 ТБ данных -- может потребоваться 2000+
spark.conf.set("spark.sql.shuffle.partitions", "2000")

Правило большого пальца: каждая shuffle partition должна содержать 128-256 МБ данных после сжатия. Если меньше — слишком много overhead от task scheduling. Если больше — risk OOM и долгий GC.

Неправильное число shuffle partitions — один из самых распространённых источников проблем с производительностью:

  • Слишком мало (10 для 1 ТБ): каждая partition 100 ГБ, executors получают OOM
  • Слишком много (10000 для 1 ГБ): 10000 tasks по 100 КБ, scheduling overhead доминирует

Минимизация shuffle

1. Broadcast Join вместо Sort-Merge Join

Если одна из таблиц маленькая (< 10 МБ по умолчанию), Spark отправляет её целиком на каждый executor через broadcast. Shuffle не нужен:

from pyspark.sql.functions import broadcast

# Принудительный broadcast маленькой таблицы
result = big_table.join(broadcast(small_table), "key")
# Настройка порога автоматического broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100m")  # 100 МБ

2. coalesce() вместо repartition()

Для уменьшения числа партиций используйте coalesce() — он объединяет партиции без shuffle, перемещая данные только внутри executor:

# ПЛОХО: repartition всегда делает full shuffle
df.repartition(10).write.parquet("/output/")

# ХОРОШО: coalesce объединяет партиции без shuffle
df.coalesce(10).write.parquet("/output/")
WARNING

coalesce() работает только для уменьшения числа партиций. Для увеличения используйте repartition()coalesce(1000) на DataFrame с 100 партициями не создаст 1000 партиций.

3. Предварительная фильтрация

Фильтруйте данные до операций с shuffle, чтобы уменьшить объём перемещаемых данных:

# ПЛОХО: shuffle всех данных, потом фильтрация
result = orders.join(customers, "id").filter(orders.amount > 1000)

# ХОРОШО: Catalyst обычно делает это автоматически (predicate pushdown),
# но явная фильтрация до join гарантирует порядок
filtered_orders = orders.filter(orders.amount > 1000)
result = filtered_orders.join(customers, "id")

Сетевой overhead

Shuffle — это не просто перемещение данных. Каждый shuffle включает:

  1. Сериализация данных на map side
  2. Запись на диск (shuffle files)
  3. Сетевой трансфер (HTTP или Netty)
  4. Десериализация на reduce side
  5. Merge-сортировка для Sort-Merge Shuffle

Каждый шаг добавляет latency. Для 100 ГБ shuffle data при 10 Gbit/s сети — это минимум 80 секунд только на передачу, плюс сериализация, диск I/O, и merge.

Проверка знанийKnowledge check
Почему coalesce(N) предпочтительнее repartition(N) при уменьшении числа партиций?
ОтветAnswer
coalesce() объединяет существующие партиции без выполнения full shuffle. Данные перемещаются минимально -- соседние партиции на одном executor объединяются в одну. repartition() всегда выполняет полный shuffle: все данные сериализуются, записываются на диск, передаются по сети и десериализуются. Для уменьшения с 1000 до 100 партиций coalesce избегает перемещения 90% данных.

Что дальше?

В следующем уроке мы разберём Memory Management — как Spark управляет памятью executor, что такое Unified Memory Manager, и как правильно настроить spark.memory.fraction для вашей нагрузки.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какие из следующих операций НЕ вызывают shuffle?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4