Learning Platform
Глоссарий Troubleshooting
Урок 04.01 · 14 мин
Средний
ShuffleExchangeRepartitionCoalescespark.sql.shuffle.partitions

Оптимизация shuffle

В Модуле 01 мы разобрали механику shuffle: Map/Reduce side, Sort-Merge Shuffle, сетевой overhead. Теперь перейдём к практике — как находить лишние shuffle в ваших запросах и устранять их.

Как найти shuffle в explain()

Ключевое слово в физическом плане — Exchange. Каждый Exchange в explain() означает shuffle:

# Запрос с двумя shuffle
df_orders = spark.read.parquet("/data/orders/")
df_customers = spark.read.parquet("/data/customers/")
result = df_orders.join(df_customers, "customer_id").groupBy("city").count()
result.explain()
== Physical Plan ==
*(4) HashAggregate(keys=[city], functions=[count(1)])
+- Exchange hashpartitioning(city, 200)          ← shuffle #2 (groupBy)
   +- *(3) HashAggregate(keys=[city], functions=[partial_count(1)])
      +- *(3) SortMergeJoin [customer_id], [customer_id]
         :- *(1) Sort [customer_id ASC]
         :  +- Exchange hashpartitioning(customer_id, 200)  ← shuffle #1 (join, left)
         :     +- *(1) FileScan parquet [orders]
         +- *(2) Sort [customer_id ASC]
            +- Exchange hashpartitioning(customer_id, 200)  ← shuffle #1 (join, right)
               +- *(2) FileScan parquet [customers]

Здесь три Exchange — два для join и один для groupBy. Это три перемещения данных по сети.

TIP

Как быстро считать shuffle: grep по слову Exchange в выводе explain(). Каждое совпадение — одна shuffle-операция. В Spark UI это видно как граница между stages.

spark.sql.shuffle.partitions

Параметр spark.sql.shuffle.partitions (по умолчанию 200) определяет число партиций после каждого shuffle. Это один из самых важных параметров для настройки:

# По умолчанию 200 -- подходит для ~50 ГБ данных
spark.conf.set("spark.sql.shuffle.partitions", "200")

# Для 1 ГБ -- 200 партиций = 5 МБ на партицию (слишком мелко)
spark.conf.set("spark.sql.shuffle.partitions", "8")

# Для 1 ТБ -- 200 партиций = 5 ГБ на партицию (слишком крупно, OOM)
spark.conf.set("spark.sql.shuffle.partitions", "4000")

Правило: каждая shuffle partition должна содержать 128-256 МБ данных. С AQE (Adaptive Query Execution) это правило менее критично — AQE автоматически объединяет мелкие партиции. Но для Spark без AQE (legacy приложения) или для начальной оценки — это надёжный ориентир.

Анти-паттерн: слишком мало shuffle partitions

# ОПАСНО: 10 партиций для 1 ТБ = 100 ГБ на партицию
spark.conf.set("spark.sql.shuffle.partitions", "10")

# Симптомы:
# - OOM на executors во время shuffle read
# - Длинные GC паузы (> 30 секунд)
# - Straggler tasks (одна партиция значительно больше)
# ПРАВИЛЬНО: рассчитываем на основе объёма данных
data_size_gb = 1000  # 1 ТБ
target_partition_mb = 200  # 200 МБ на партицию
partitions = (data_size_gb * 1024) // target_partition_mb  # ~5120
spark.conf.set("spark.sql.shuffle.partitions", str(partitions))

repartition() vs coalesce()

Оба метода меняют число партиций, но механика принципиально отличается:

repartition(N)coalesce(N)
ShuffleДа (полный)Нет (narrow dependency)
Увеличение партицийДаНет (игнорируется)
Уменьшение партицийДа (равномерно)Да (неравномерно)
Баланс данныхРавномерныйМожет быть неравномерным
# Перед записью: уменьшаем с 200 до 10 файлов
# ПЛОХО: полный shuffle для простого уменьшения
df.repartition(10).write.parquet("/output/")

# ХОРОШО: объединение без shuffle
df.coalesce(10).write.parquet("/output/")

Когда всё же нужен repartition():

# Нужно repartition по конкретному столбцу для оптимизации downstream joins
df.repartition("customer_id").write.parquet("/output/orders_by_customer/")

# Нужно увеличить число партиций (coalesce это не может)
df.repartition(1000)

Устранение shuffle через Column Pruning и Predicate Pushdown

Catalyst optimizer автоматически применяет две ключевые оптимизации:

Column Pruning — читать только нужные столбцы:

# Catalyst автоматически сужает набор столбцов для Parquet/ORC
result = df.select("id", "name").join(other, "id")
# FileScan читает только id, name вместо всех 50 столбцов

Predicate Pushdown — фильтровать до shuffle:

# Before: filter после join (shuffle 100% данных)
result = orders.join(customers, "id").filter(col("amount") > 1000)
== Physical Plan ==
*(3) Filter (amount > 1000)
+- *(3) SortMergeJoin [id], [id]
   :- *(1) Sort [id ASC]
   :  +- Exchange hashpartitioning(id, 200)    ← shuffle 100% orders
   ...
# After: Catalyst pushes filter перед join (shuffle только отфильтрованных)
# Catalyst делает это автоматически, но вы можете написать явно:
filtered = orders.filter(col("amount") > 1000)
result = filtered.join(customers, "id")
== Physical Plan ==
*(3) SortMergeJoin [id], [id]
:- *(1) Sort [id ASC]
:  +- Exchange hashpartitioning(id, 200)    ← shuffle только отфильтрованных
:     +- *(1) Filter (amount > 1000)
:        +- *(1) FileScan parquet [orders]   ← pushdown в Parquet
...

Предикат сдвинулся под Exchange — Spark сначала фильтрует, потом шаффлит только подходящие записи.

TIP

Predicate Pushdown и Parquet. Для Parquet файлов Spark pushes предикаты не просто до shuffle, а в сам reader. Parquet хранит min/max статистику по каждому row group, и Spark пропускает целые row groups, не соответствующие фильтру. Это может сократить чтение с 100 ГБ до 5 ГБ без единой строчки кода оптимизации.

Устранение shuffle через bucketing

Bucketing предварительно распределяет данные по join key при записи. При join по тому же ключу shuffle не нужен:

Before: без bucketing

== Physical Plan ==
*(3) SortMergeJoin [customer_id], [customer_id]
:- *(1) Sort [customer_id ASC]
:  +- Exchange hashpartitioning(customer_id, 200)   ← shuffle
:     +- *(1) FileScan parquet [orders]
+- *(2) Sort [customer_id ASC]
   +- Exchange hashpartitioning(customer_id, 200)   ← shuffle
      +- *(2) FileScan parquet [customers]

After: с bucketing

# Предварительная запись с bucketing
orders.write.bucketBy(16, "customer_id").saveAsTable("orders_bucketed")
customers.write.bucketBy(16, "customer_id").saveAsTable("customers_bucketed")

# Join без shuffle!
result = spark.table("orders_bucketed").join(
    spark.table("customers_bucketed"), "customer_id"
)
result.explain()
== Physical Plan ==
*(3) SortMergeJoin [customer_id], [customer_id]
:- *(1) Sort [customer_id ASC]
:  +- *(1) FileScan parquet [orders_bucketed]     ← без Exchange!
+- *(2) Sort [customer_id ASC]
   +- *(2) FileScan parquet [customers_bucketed]   ← без Exchange!

Exchange исчез — данные уже распределены по customer_id. Подробнее о bucketing — в следующих уроках.

Проверка знанийKnowledge check
В физическом плане explain() вы видите три Exchange оператора. Что это означает для производительности?
ОтветAnswer
Три Exchange означают три shuffle операции -- три полных перераспределения данных по сети. Каждый shuffle включает сериализацию, запись на диск, сетевой трансфер и десериализацию. Для оптимизации нужно определить, какие Exchange можно устранить: через broadcast join (для маленьких таблиц), bucketing (для повторяющихся join по одному ключу), или coalesce вместо repartition (для уменьшения партиций).
Проверка знанийKnowledge check
Когда следует использовать repartition() вместо coalesce()?
ОтветAnswer
repartition() необходим в двух случаях: (1) увеличение числа партиций -- coalesce не может увеличивать; (2) перераспределение по конкретному столбцу для оптимизации downstream операций. Например, repartition('customer_id') перед записью гарантирует равномерное распределение по join key. Во всех остальных случаях (уменьшение партиций перед записью) используйте coalesce -- он не делает shuffle.

Как shuffle устроен внутри Spark — на уровне исходников — в курсе Apache Spark Internals:

Spark Internals: shuffle изнутри

Что дальше?

В следующем уроке мы разберём стратегии партиционирования — как правильно выбирать partition key, когда использовать hash vs range partitioning, и как partition pruning сокращает объём сканирования в десятки раз.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 6. Какое ключевое слово в выводе explain() указывает на shuffle-операцию?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6