Автоматическое слияние shuffle-партиций
Проблема: 200 партиций для 10 МБ данных
По умолчанию Spark использует 200 партиций для shuffle-операций (spark.sql.shuffle.partitions = 200). Это число было выбрано как компромисс для кластеров среднего размера, но на практике оно редко оптимально.
Представьте: у вас 10 миллионов строк в таблице orders. После фильтра WHERE region = 'Сибирь' осталось 50 000 строк (~2 МБ). При groupBy("city").count() Spark создаст shuffle с 200 партициями для 2 МБ данных:
df_orders = spark.read.parquet("/data/orders/")
result = (
df_orders
.filter(col("region") == "Сибирь") # 10M -> 50K строк
.groupBy("city")
.count()
)
result.explain()
== Physical Plan ==
*(2) HashAggregate(keys=[city], functions=[count(1)])
+- Exchange hashpartitioning(city, 200) # <-- 200 партиций!
+- *(1) HashAggregate(keys=[city], functions=[partial_count(1)])
+- *(1) Filter (region = Сибирь)
+- *(1) FileScan parquet [city, region]
Результат: 200 партиций по ~10 КБ каждая. Из них 180+ будут пустыми или содержать считанные байты. Каждая партиция — это отдельный task, а каждый task имеет накладные расходы на планирование (~15-50 мс). Запуск 200 tasks по 10 КБ — это 3-10 секунд чистого overhead.
Как AQE решает эту проблему
С включённым AQE (spark.sql.adaptive.enabled = true) Spark выполняет shuffle, собирает реальные размеры каждой из 200 партиций, и затем объединяет мелкие партиции в более крупные:
До AQE coalesce (200 партиций):
[0.1KB][0KB][0.5KB][0KB][0KB][1.2KB][0KB]...[0.3KB] # 200 tasks
После AQE coalesce (12 партиций):
[ 2.1MB ][ 1.8MB ][ 2.3MB ]...[ 1.5MB ] # 12 tasks
AQE объединяет соседние партиции до тех пор, пока их суммарный размер не приблизится к advisory partition size (по умолчанию 64 МБ). Для наших 2 МБ данных результат — 1-2 партиции вместо 200.
== Physical Plan == (с AQE)
*(2) HashAggregate(keys=[city], functions=[count(1)])
+- AQEShuffleRead coalesced # <-- AQE coalesced!
+- Exchange hashpartitioning(city, 200)
+- *(1) HashAggregate(keys=[city], functions=[partial_count(1)])
+- *(1) Filter (region = Сибирь)
+- *(1) FileScan parquet [city, region]
Обратите внимание на AQEShuffleRead coalesced в плане — это маркер того, что AQE объединил партиции.
Конфигурация coalesce
Три ключевых параметра управляют объединением:
| Параметр | Значение | Как работает |
|---|---|---|
advisoryPartitionSizeInBytes | 64MB | Целевой размер. AQE объединяет соседние партиции, пока суммарный размер не превысит этот порог |
minPartitionSize | 1MB | Минимальный размер. Партиция не будет меньше этого значения (защита от чрезмерного объединения) |
coalescePartitions.initialPartitionNum | Не задан | Начальное число партиций. Если не задано, используется spark.sql.shuffle.partitions (200) |
# Настройка для конкретного сценария
# Для больших данных (>100 ГБ) увеличьте advisory size:
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")
# Для маленьких данных уменьшите min partition size:
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "512KB")
# Начать с большего числа партиций для лучшего параллелизма:
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "400")
Before/After: реальные метрики
Сценарий: groupBy("city").agg(sum("amount")) после фильтра, оставляющего 5% данных.
| Метрика | Без AQE coalesce | С AQE coalesce |
|---|---|---|
| Shuffle партиций | 200 | 8 |
| Средний размер партиции | 0.3 МБ | 7.5 МБ |
| Пустых партиций | 140 | 0 |
| Task scheduling overhead | ~4 сек | ~0.2 сек |
| Общее время stage | 6.2 сек | 1.8 сек |
| Ускорение | — | 3.4x |
Ускорение зависит от соотношения “полезной работы” к “накладным расходам на task scheduling”. Чем мельче партиции, тем больше выигрыш от coalesce.
Как проверить, что AQE coalesce работает?
В Spark UI перейдите на вкладку SQL и найдите ваш запрос. В графе плана выполнения ищите узел AQEShuffleRead. Если рядом написано coalesced, значит AQE объединил партиции. Кликните на узел — увидите число партиций до и после coalesce.
Анти-паттерн: ручная настройка shuffle.partitions
До появления AQE разработчики вручную подбирали spark.sql.shuffle.partitions:
# Анти-паттерн: ручная настройка
spark.conf.set("spark.sql.shuffle.partitions", "12") # Для маленьких данных
# ... но что если этот же код запустится на 100 ГБ?
# 12 партиций = 8.3 ГБ на партицию = OOM!
Проблема: фиксированное число партиций не масштабируется. Значение 12 отлично для 100 МБ, но вызовет OOM на 100 ГБ. Значение 2000 отлично для 1 ТБ, но создаст 2000 пустых tasks для 10 МБ.
Решение с AQE: оставьте shuffle.partitions = 200 (или увеличьте до 400-1000 для больших данных) и позвольте AQE автоматически объединить лишние партиции. AQE адаптируется к реальному объёму данных каждый раз, без ручной настройки.
# Правильный подход: позвольте AQE решать
spark.conf.set("spark.sql.adaptive.enabled", "true")
# Используйте initialPartitionNum для верхней границы:
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "500")
# AQE сам определит оптимальное число
Что дальше?
В следующем уроке мы разберём вторую ключевую оптимизацию AQE — skew join optimization. Если coalesce решает проблему “слишком много мелких партиций”, то skew join решает обратную проблему: “одна партиция в 100 раз больше остальных”.