Оптимизация shuffle
В Модуле 01 мы разобрали механику shuffle: Map/Reduce side, Sort-Merge Shuffle, сетевой overhead. Теперь перейдём к практике — как находить лишние shuffle в ваших запросах и устранять их.
Как найти shuffle в explain()
Ключевое слово в физическом плане — Exchange. Каждый Exchange в explain() означает shuffle:
# Запрос с двумя shuffle
df_orders = spark.read.parquet("/data/orders/")
df_customers = spark.read.parquet("/data/customers/")
result = df_orders.join(df_customers, "customer_id").groupBy("city").count()
result.explain()
== Physical Plan ==
*(4) HashAggregate(keys=[city], functions=[count(1)])
+- Exchange hashpartitioning(city, 200) ← shuffle #2 (groupBy)
+- *(3) HashAggregate(keys=[city], functions=[partial_count(1)])
+- *(3) SortMergeJoin [customer_id], [customer_id]
:- *(1) Sort [customer_id ASC]
: +- Exchange hashpartitioning(customer_id, 200) ← shuffle #1 (join, left)
: +- *(1) FileScan parquet [orders]
+- *(2) Sort [customer_id ASC]
+- Exchange hashpartitioning(customer_id, 200) ← shuffle #1 (join, right)
+- *(2) FileScan parquet [customers]
Здесь три Exchange — два для join и один для groupBy. Это три перемещения данных по сети.
Как быстро считать shuffle: grep по слову Exchange в выводе explain(). Каждое совпадение — одна shuffle-операция. В Spark UI это видно как граница между stages.
spark.sql.shuffle.partitions
Параметр spark.sql.shuffle.partitions (по умолчанию 200) определяет число партиций после каждого shuffle. Это один из самых важных параметров для настройки:
# По умолчанию 200 -- подходит для ~50 ГБ данных
spark.conf.set("spark.sql.shuffle.partitions", "200")
# Для 1 ГБ -- 200 партиций = 5 МБ на партицию (слишком мелко)
spark.conf.set("spark.sql.shuffle.partitions", "8")
# Для 1 ТБ -- 200 партиций = 5 ГБ на партицию (слишком крупно, OOM)
spark.conf.set("spark.sql.shuffle.partitions", "4000")
Правило: каждая shuffle partition должна содержать 128-256 МБ данных. С AQE (Adaptive Query Execution) это правило менее критично — AQE автоматически объединяет мелкие партиции. Но для Spark без AQE (legacy приложения) или для начальной оценки — это надёжный ориентир.
Анти-паттерн: слишком мало shuffle partitions
# ОПАСНО: 10 партиций для 1 ТБ = 100 ГБ на партицию
spark.conf.set("spark.sql.shuffle.partitions", "10")
# Симптомы:
# - OOM на executors во время shuffle read
# - Длинные GC паузы (> 30 секунд)
# - Straggler tasks (одна партиция значительно больше)
# ПРАВИЛЬНО: рассчитываем на основе объёма данных
data_size_gb = 1000 # 1 ТБ
target_partition_mb = 200 # 200 МБ на партицию
partitions = (data_size_gb * 1024) // target_partition_mb # ~5120
spark.conf.set("spark.sql.shuffle.partitions", str(partitions))
repartition() vs coalesce()
Оба метода меняют число партиций, но механика принципиально отличается:
repartition(N) | coalesce(N) | |
|---|---|---|
| Shuffle | Да (полный) | Нет (narrow dependency) |
| Увеличение партиций | Да | Нет (игнорируется) |
| Уменьшение партиций | Да (равномерно) | Да (неравномерно) |
| Баланс данных | Равномерный | Может быть неравномерным |
# Перед записью: уменьшаем с 200 до 10 файлов
# ПЛОХО: полный shuffle для простого уменьшения
df.repartition(10).write.parquet("/output/")
# ХОРОШО: объединение без shuffle
df.coalesce(10).write.parquet("/output/")
Когда всё же нужен repartition():
# Нужно repartition по конкретному столбцу для оптимизации downstream joins
df.repartition("customer_id").write.parquet("/output/orders_by_customer/")
# Нужно увеличить число партиций (coalesce это не может)
df.repartition(1000)
Устранение shuffle через Column Pruning и Predicate Pushdown
Catalyst optimizer автоматически применяет две ключевые оптимизации:
Column Pruning — читать только нужные столбцы:
# Catalyst автоматически сужает набор столбцов для Parquet/ORC
result = df.select("id", "name").join(other, "id")
# FileScan читает только id, name вместо всех 50 столбцов
Predicate Pushdown — фильтровать до shuffle:
# Before: filter после join (shuffle 100% данных)
result = orders.join(customers, "id").filter(col("amount") > 1000)
== Physical Plan ==
*(3) Filter (amount > 1000)
+- *(3) SortMergeJoin [id], [id]
:- *(1) Sort [id ASC]
: +- Exchange hashpartitioning(id, 200) ← shuffle 100% orders
...
# After: Catalyst pushes filter перед join (shuffle только отфильтрованных)
# Catalyst делает это автоматически, но вы можете написать явно:
filtered = orders.filter(col("amount") > 1000)
result = filtered.join(customers, "id")
== Physical Plan ==
*(3) SortMergeJoin [id], [id]
:- *(1) Sort [id ASC]
: +- Exchange hashpartitioning(id, 200) ← shuffle только отфильтрованных
: +- *(1) Filter (amount > 1000)
: +- *(1) FileScan parquet [orders] ← pushdown в Parquet
...
Предикат сдвинулся под Exchange — Spark сначала фильтрует, потом шаффлит только подходящие записи.
Predicate Pushdown и Parquet. Для Parquet файлов Spark pushes предикаты не просто до shuffle, а в сам reader. Parquet хранит min/max статистику по каждому row group, и Spark пропускает целые row groups, не соответствующие фильтру. Это может сократить чтение с 100 ГБ до 5 ГБ без единой строчки кода оптимизации.
Устранение shuffle через bucketing
Bucketing предварительно распределяет данные по join key при записи. При join по тому же ключу shuffle не нужен:
Before: без bucketing
== Physical Plan ==
*(3) SortMergeJoin [customer_id], [customer_id]
:- *(1) Sort [customer_id ASC]
: +- Exchange hashpartitioning(customer_id, 200) ← shuffle
: +- *(1) FileScan parquet [orders]
+- *(2) Sort [customer_id ASC]
+- Exchange hashpartitioning(customer_id, 200) ← shuffle
+- *(2) FileScan parquet [customers]
After: с bucketing
# Предварительная запись с bucketing
orders.write.bucketBy(16, "customer_id").saveAsTable("orders_bucketed")
customers.write.bucketBy(16, "customer_id").saveAsTable("customers_bucketed")
# Join без shuffle!
result = spark.table("orders_bucketed").join(
spark.table("customers_bucketed"), "customer_id"
)
result.explain()
== Physical Plan ==
*(3) SortMergeJoin [customer_id], [customer_id]
:- *(1) Sort [customer_id ASC]
: +- *(1) FileScan parquet [orders_bucketed] ← без Exchange!
+- *(2) Sort [customer_id ASC]
+- *(2) FileScan parquet [customers_bucketed] ← без Exchange!
Exchange исчез — данные уже распределены по customer_id. Подробнее о bucketing — в следующих уроках.
Как shuffle устроен внутри Spark — на уровне исходников — в курсе Apache Spark Internals:
Spark Internals: shuffle изнутриЧто дальше?
В следующем уроке мы разберём стратегии партиционирования — как правильно выбирать partition key, когда использовать hash vs range partitioning, и как partition pruning сокращает объём сканирования в десятки раз.