Learning Platform
Глоссарий Troubleshooting
Урок 09.02 · 12 мин
Средний
Coalesce PartitionsadvisoryPartitionSizeInBytesminPartitionSizeSmall Partitions

Автоматическое слияние shuffle-партиций

Проблема: 200 партиций для 10 МБ данных

По умолчанию Spark использует 200 партиций для shuffle-операций (spark.sql.shuffle.partitions = 200). Это число было выбрано как компромисс для кластеров среднего размера, но на практике оно редко оптимально.

Представьте: у вас 10 миллионов строк в таблице orders. После фильтра WHERE region = 'Сибирь' осталось 50 000 строк (~2 МБ). При groupBy("city").count() Spark создаст shuffle с 200 партициями для 2 МБ данных:

df_orders = spark.read.parquet("/data/orders/")
result = (
    df_orders
    .filter(col("region") == "Сибирь")  # 10M -> 50K строк
    .groupBy("city")
    .count()
)
result.explain()
== Physical Plan ==
*(2) HashAggregate(keys=[city], functions=[count(1)])
+- Exchange hashpartitioning(city, 200)     # <-- 200 партиций!
   +- *(1) HashAggregate(keys=[city], functions=[partial_count(1)])
      +- *(1) Filter (region = Сибирь)
         +- *(1) FileScan parquet [city, region]

Результат: 200 партиций по ~10 КБ каждая. Из них 180+ будут пустыми или содержать считанные байты. Каждая партиция — это отдельный task, а каждый task имеет накладные расходы на планирование (~15-50 мс). Запуск 200 tasks по 10 КБ — это 3-10 секунд чистого overhead.

Как AQE решает эту проблему

С включённым AQE (spark.sql.adaptive.enabled = true) Spark выполняет shuffle, собирает реальные размеры каждой из 200 партиций, и затем объединяет мелкие партиции в более крупные:

До AQE coalesce (200 партиций):
[0.1KB][0KB][0.5KB][0KB][0KB][1.2KB][0KB]...[0.3KB]  # 200 tasks

После AQE coalesce (12 партиций):
[  2.1MB  ][  1.8MB  ][  2.3MB  ]...[  1.5MB  ]      # 12 tasks

AQE объединяет соседние партиции до тех пор, пока их суммарный размер не приблизится к advisory partition size (по умолчанию 64 МБ). Для наших 2 МБ данных результат — 1-2 партиции вместо 200.

== Physical Plan == (с AQE)
*(2) HashAggregate(keys=[city], functions=[count(1)])
+- AQEShuffleRead coalesced                      # <-- AQE coalesced!
   +- Exchange hashpartitioning(city, 200)
      +- *(1) HashAggregate(keys=[city], functions=[partial_count(1)])
         +- *(1) Filter (region = Сибирь)
            +- *(1) FileScan parquet [city, region]

Обратите внимание на AQEShuffleRead coalesced в плане — это маркер того, что AQE объединил партиции.

Конфигурация coalesce

Три ключевых параметра управляют объединением:

ПараметрЗначениеКак работает
advisoryPartitionSizeInBytes64MBЦелевой размер. AQE объединяет соседние партиции, пока суммарный размер не превысит этот порог
minPartitionSize1MBМинимальный размер. Партиция не будет меньше этого значения (защита от чрезмерного объединения)
coalescePartitions.initialPartitionNumНе заданНачальное число партиций. Если не задано, используется spark.sql.shuffle.partitions (200)
# Настройка для конкретного сценария
# Для больших данных (>100 ГБ) увеличьте advisory size:
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")

# Для маленьких данных уменьшите min partition size:
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "512KB")

# Начать с большего числа партиций для лучшего параллелизма:
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "400")

Before/After: реальные метрики

Сценарий: groupBy("city").agg(sum("amount")) после фильтра, оставляющего 5% данных.

МетрикаБез AQE coalesceС AQE coalesce
Shuffle партиций2008
Средний размер партиции0.3 МБ7.5 МБ
Пустых партиций1400
Task scheduling overhead~4 сек~0.2 сек
Общее время stage6.2 сек1.8 сек
Ускорение3.4x

Ускорение зависит от соотношения “полезной работы” к “накладным расходам на task scheduling”. Чем мельче партиции, тем больше выигрыш от coalesce.

TIP

Как проверить, что AQE coalesce работает?

В Spark UI перейдите на вкладку SQL и найдите ваш запрос. В графе плана выполнения ищите узел AQEShuffleRead. Если рядом написано coalesced, значит AQE объединил партиции. Кликните на узел — увидите число партиций до и после coalesce.

Анти-паттерн: ручная настройка shuffle.partitions

До появления AQE разработчики вручную подбирали spark.sql.shuffle.partitions:

# Анти-паттерн: ручная настройка
spark.conf.set("spark.sql.shuffle.partitions", "12")  # Для маленьких данных
# ... но что если этот же код запустится на 100 ГБ?
# 12 партиций = 8.3 ГБ на партицию = OOM!

Проблема: фиксированное число партиций не масштабируется. Значение 12 отлично для 100 МБ, но вызовет OOM на 100 ГБ. Значение 2000 отлично для 1 ТБ, но создаст 2000 пустых tasks для 10 МБ.

Решение с AQE: оставьте shuffle.partitions = 200 (или увеличьте до 400-1000 для больших данных) и позвольте AQE автоматически объединить лишние партиции. AQE адаптируется к реальному объёму данных каждый раз, без ручной настройки.

# Правильный подход: позвольте AQE решать
spark.conf.set("spark.sql.adaptive.enabled", "true")
# Используйте initialPartitionNum для верхней границы:
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "500")
# AQE сам определит оптимальное число
Проверка знанийKnowledge check
Почему AQE объединяет соседние партиции, а не произвольные?
ОтветAnswer
Shuffle-партиции уже распределены по hash-ключам. Соседние партиции (по номеру) обрабатываются на тех же executor'ах. Объединение соседних партиций позволяет избежать дополнительного перемещения данных по сети -- данные уже находятся на нужных узлах. Объединение произвольных партиций потребовало бы дополнительного shuffle, что нивелировало бы выигрыш от coalesce.
Проверка знанийKnowledge check
В каком случае стоит вручную задать spark.sql.shuffle.partitions вместо AQE coalesce?
ОтветAnswer
Ручная настройка оправдана только если вы точно знаете, что объём данных стабилен и не меняется между запусками (например, фиксированный batch одного размера). Также AQE coalesce может быть недостаточен, если initialPartitionNum слишком мал для начального параллелизма -- тогда стоит увеличить начальное число. В остальных случаях AQE coalesce предпочтительнее, так как он адаптируется к реальным данным.

Что дальше?

В следующем уроке мы разберём вторую ключевую оптимизацию AQE — skew join optimization. Если coalesce решает проблему “слишком много мелких партиций”, то skew join решает обратную проблему: “одна партиция в 100 раз больше остальных”.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 6. spark.sql.shuffle.partitions = 200, после фильтра осталось 2 МБ данных. Без AQE будет 200 tasks для 2 МБ. Что сделает AQE?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5