Оптимизация реального пайплайна
В уроках 2 и 3 мы поставили диагноз: data skew в join и aggregation, неработающая AQE skew optimization из-за выбора ShuffledHashJoin, конкурирующий cache() в UnifiedMemoryManager, избыточный disk spill. Теперь применяем исправления методично — по одному, с замерами до и после каждого изменения.
Оптимизация shuffle: практические подходыБазовые метрики перед оптимизацией (Baseline)
Перед любым изменением фиксируем baseline. Этот шаг обязателен — без него невозможно измерить эффект каждого исправления.
Baseline (прогон 2026-05-20, 02:00 UTC):
Total duration: 3 ч 47 мин
Stage 2 (shuffle write): 47 мин (median task 14 мин, max 47 мин)
Stage 3 (join): 62 мин (median task 4 мин, max 62 мин)
Stage 5 (aggregation): 64 мин (median task 4 мин, max 64 мин)
Shuffle spill (disk): 15 GB
GC time executor-7: 14 мин 32 с (Task 45)
OOM in last 7 days: 1 раз
spark.sql.shuffle.partitions: 200
spark.sql.adaptive.enabled: true
enriched.cache(): присутствует
Исправление 1: удаление ненужного cache()
Первое исправление — самое простое и с немедленным эффектом.
# БЫЛО:
enriched = events.join(users, "user_id", "left") \
.join(campaigns, "campaign_id", "left") \
.cache() # эта строка создаёт конкуренцию за storage pool
result = enriched.groupBy(...).agg(...)
result.write...
# СТАЛО:
enriched = events.join(users, "user_id", "left") \
.join(campaigns, "campaign_id", "left")
# cache() убран
result = enriched.groupBy(...).agg(...)
result.write...
Почему cache() здесь был вреден: enriched используется ровно один раз (для aggregation), а значит cache не даёт никакого reuse-выигрыша. Зато захватывает до 28.8 GB storage pool, уменьшая доступный execution pool.
Правило: cache() оправдан только если DataFrame используется более одного раза в коде. Один action = ноль смысла в кэше.
После удаления cache() и повторного прогона (тест с теми же данными):
После Исправления 1:
Total duration: 2 ч 14 мин (-41%)
Stage 3 (join): 55 мин (max task 55 мин, outlier сохранился)
Shuffle spill (disk): 8 GB (-47%)
GC time executor-7: 9 мин (-38%)
Уже значительное улучшение. Spill сократился вдвое — execution pool освободился. Но outlier task в Stage 3 всё ещё есть: root cause skew не устранён.
Исправление 2: принудить AQE к SortMergeJoin для skew optimization
Вторая проблема: AQE выбрал ShuffledHashJoin, деактивировав skew protection. Исправление:
spark.conf.set(
"spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold",
"0b" # запрещаем замену на ShuffledHashJoin
)
Это означает, что AQE больше не будет переключаться на ShuffledHashJoin и сохранит SortMergeJoin. Теперь OptimizeSkewedJoin найдёт SortMergeJoinExec в плане и применит skew split.
Альтернативный подход, появившийся в Spark 4.0 (SPARK-45567):
# Принудить skew optimization даже при ShuffledHashJoin
spark.conf.set(
"spark.sql.adaptive.forceApplySkewedJoinOptimization",
"true"
)
Мы выбираем первый вариант (запрет ShuffledHashJoin) как более предсказуемый.
Также явно задаём skew detection пороги (дефолты подходят, но фиксируем их для документации):
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set(
"spark.sql.adaptive.skewJoin.skewedPartitionFactor",
"5" # partition skewed если > 5x медианы
)
spark.conf.set(
"spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes",
"256m" # и > 256 MB абсолютно
)
spark.conf.set(
"spark.sql.adaptive.advisoryPartitionSizeInBytes",
"64m" # целевой размер подпартиции после split
)
При partition_73 = 6.7 GB и advisory = 64 MB: AQE разобьёт partition_73 на ceil(6700 / 64) = 105 подпартиций. Каждая обрабатывается отдельным task с ~64 MB входных данных.
Важный нюанс: skew split для SortMergeJoin требует, чтобы соответствующие строки из users были дублированы для каждой подпартиции. Если partition_73 из events разбита на 105 частей, каждая часть должна быть joined с полным набором users, у которых hash(user_id) % 200 = 73. Это небольшое увеличение чтения со стороны build (users), но в масштабе 200 MB -> 200 MB * 105 = незначительно на фоне выигрыша.
После Исправления 2 (вместе с Исправлением 1):
После Исправлений 1+2:
Total duration: 48 мин (-79% от baseline)
Stage 3 (join): 11 мин (max task 12 мин, разброс исчез!)
Stage 5 (aggregation): 18 мин (всё ещё skew в aggregation keys)
Shuffle spill (disk): 0 GB
GC time: < 1 мин на всех executor-ах
Отличный результат для join stage. Но Stage 5 всё ещё 18 минут — aggreagtion skew не устранена.
Исправление 3: устранение aggregation skew через salting
Stage 5 (groupBy("country", "campaign_id", "event_type")) имеет skewed partition для (RU, camp_9873, click) = 5.2% всех строк.
Для aggregation AQE не имеет встроенного skew split (AQE coalesce работает для равномерного уменьшения числа партиций, но не для split skewed). Стандартное решение — salting:
from pyspark.sql.functions import rand, floor, concat_ws, lit
# Добавляем соль к ключу агрегации
SALT_BUCKETS = 20 # разбиваем hot key на 20 частей
enriched_salted = enriched.withColumn(
"salt",
(floor(rand() * SALT_BUCKETS)).cast("string")
)
# Первичная агрегация с солью (partial aggregation)
partial_agg = enriched_salted.groupBy(
"country", "campaign_id", "event_type", "salt"
).agg(
count("*").alias("event_count_partial"),
sum("revenue").alias("revenue_partial"),
# countDistinct не поддерживает salting напрямую — нужен HyperLogLog
approx_count_distinct("user_id", 0.01).alias("approx_unique_users")
)
# Финальная агрегация — убираем соль
result = partial_agg.groupBy(
"country", "campaign_id", "event_type"
).agg(
sum("event_count_partial").alias("event_count"),
sum("approx_unique_users").alias("unique_users"), # приближение!
sum("revenue_partial").alias("total_revenue")
)
Компромисс: countDistinct с salting требует использования approx_count_distinct (HyperLogLog), что даёт погрешность ~1%. Для BI-дашборда это приемлемо. Если нужна точность — альтернатива: repartitionByRange с явным числом партиций, что равномерно распределит ключи, но не устранит skew для одного hot key.
Для нашего case выбираем approx_count_distinct с документированной погрешностью.
После всех трёх исправлений:
После Исправлений 1+2+3:
Total duration: 41 мин (-82% от baseline 3ч 47мин)
Stage 3 (join): 11 мин
Stage 5 (aggregation): 9 мин
Shuffle spill (disk): 0 GB
OOM events: 0
Инспекция физического плана: explain(“formatted”)
После применения исправлений проверяем итоговый план:
result.explain("formatted")
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
HashAggregate(keys=[country#12, campaign_id#34, event_type#56],
functions=[count(1), sum(approx_count_distinct(...)), sum(revenue#90)])
+- Exchange hashpartitioning(country#12, campaign_id#34, ..., 200), ...
+- HashAggregate(keys=..., functions=...) (partial)
+- Project [...]
+- SortMergeJoin [user_id#78], [user_id#112], LeftOuter
:- Sort [...], false, 0
: +- Exchange hashpartitioning(user_id#78, 200), ...
: +- Filter [event_date#11 = 2026-05-12]
: +- FileScan parquet [...] PushedFilters: [IsNotNull(event_date),
: EqualTo(event_date, 2026-05-12)]
+- Sort [...], false, 0
+- Exchange hashpartitioning(user_id#112, 200), ...
+- Filter [country#45 IN ('RU','DE','FR',...)]
+- FileScan parquet [...] PushedFilters: [IsNotNull(country),
In(country, [RU,DE,FR,...])]
Что проверяем в плане:
1. PushedFilters присутствуют. EqualTo(event_date, 2026-05-12) и In(country, [...]) — фильтры опускаются к FileScan. Это значит, что Parquet-reader применяет row group filtering и не читает лишние данные.
2. SortMergeJoin вместо ShuffledHashJoin. После нашего исправления maxShuffledHashJoinLocalMapThreshold=0b — AQE сохранил SortMergeJoin. Теперь skew optimization будет работать.
3. Partial HashAggregate. Две фазы: partial aggregation до shuffle + final aggregation после. Это map-side pre-aggregation, значительно снижающая shuffle bytes.
4. Exchange hashpartitioning(…, 200). Пока 200 — это pre-AQE значение. После завершения map phase AQE скоррегирует через CoalesceShufflePartitions. Проверяем: в Spark UI после прогона Stage summary показывает фактическое число shuffle partitions.
Whole-Stage Codegen: как убедиться, что он работает
Whole-Stage CodeGen (WSCG) объединяет несколько операторов в один сгенерированный Java-метод, устраняя виртуальные вызовы между операторами. Проверяем его активность:
result.explain("codegen")
Found 3 WholeStageCodegen subtrees.
== Subtree 1 / 3 (maxMethodSize:65536, method count:3) ==
*(1) Filter (isnotnull(event_date#11) AND (event_date#11 = 2026-05-12))
+- *(1) ColumnarToRow
+- FileScan parquet [...] PushedFilters: [...]
== Subtree 2 / 3 ==
*(2) HashAggregate (partial)
+- *(2) Project [...]
+- *(2) SortMergeJoin [...]
== Subtree 3 / 3 ==
*(3) HashAggregate (final)
+- *(3) Sort [...]
Звёздочки *(N) перед оператором означают, что оператор включён в WholeStageCodegen subtree N. Все наши основные операторы в WSCG — хорошо.
Если оператор НЕ в WSCG (нет звёздочки), причины могут быть:
- UDF (Python UDF полностью ломает WSCG для этой цепочки)
- Оператор не поддерживает кодогенерацию (например,
GenerateExecдля explode) spark.sql.codegen.wholeStage=falseявно выключен
В нашем плане BroadcastHashJoin для campaigns тоже должен быть в WSCG:
*(4) BroadcastHashJoin [campaign_id#34], [campaign_id#67], LeftOuter, BuildRight
:- ... (probe side)
+- BroadcastExchange HashedRelationBroadcastMode (build side)
+- FileScan parquet [campaigns...]
BroadcastExchange вне WSCG (нет звёздочки) — это нормально: broadcast exchange выполняется на driver side и не подлежит codegen.
Проверка predicate pushdown: columns pruning и filter pushdown
Убеждаемся, что Catalyst правильно применил predicate pushdown к Parquet-файлам.
# Подробный план через extended
result.explain("extended")
В секции “Optimized Logical Plan”:
Filter (event_date = 2026-05-12) <- filter присутствует
+- Project [user_id, campaign_id, event_type, revenue] <- только нужные колонки
+- Relation [event_date, user_id, campaign_id, event_type,
session_id, platform, device_type, ip_address,
latitude, longitude, user_agent, revenue] (parquet)
В “Physical Plan” в FileScan:
FileScan parquet [user_id, campaign_id, event_type, revenue, event_date]
PartitionFilters: [isnotnull(event_date), (event_date = 2026-05-12)]
PushedFilters: [IsNotNull(event_date), EqualTo(event_date,2026-05-12)]
ReadSchema: struct<user_id:string, campaign_id:string, event_type:string,
revenue:decimal(10,2), event_date:date>
PartitionFilters — Spark использует partition directory structure для partition pruning (события читаются только из папки event_date=2026-05-12/). ReadSchema — только 5 колонок из 12 оригинальных. Column pruning работает: не читаем session_id, platform, device_type, ip_address, latitude, longitude, user_agent.
Это важно для объёма I/O. 12 колонок -> 5 колонок: примерно 40% от оригинального размера Parquet-файлов. С column pruning читаем ~48 GB вместо 120 GB.
AQE coalesce shuffle partitions: итоговое число партиций
После прогона с исправлениями смотрим, что AQE сделал с shuffle.partitions=200:
Stage 3 AQE metrics:
Pre-shuffle partitions: 200
Post-shuffle coalesced: 47 partitions
Coalesce ratio: 4.3x
Reason: mapOutputStatistics showed many small partitions (< advisoryPartitionSizeInBytes)
AQE объединил 200 -> 47 партиций. Это нормально для нашего объёма: 120 GB / 200 = 600 MB на partition, но после column pruning 48 GB / 200 = 240 MB — ближе к advisory 64 MB -> 200 партиций — много. AQE объединяет смежные малые партиции, сохраняя крупные как есть.
Но: если AQE делает coalesce каждый раз, это сигнал, что стоит установить начальное значение правильнее:
# Более подходящий starting point для AQE
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "400")
# AQE начнёт с 400 и coalesce до оптимального
Начать с большего числа partition безопаснее при наличии skew: если AQE разобьёт skewed partition на 105 подпартиций, при исходных 200 это даёт пиковые 200 + 104 = 304 tasks на один stage.
Итоговая конфигурация после всех исправлений
# Итоговые настройки (добавить в spark-defaults.conf или SparkSession)
# AQE: ядро оптимизации
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64m")
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "400")
# Skew join: принудить SortMergeJoin для skew protection
spark.conf.set(
"spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold", "0b"
)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set(
"spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m"
)
# Broadcast join: campaigns уже корректно broadcast-ится
# users (4 GB) слишком большой для broadcast -- не меняем
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10m") # дефолт
# Память: убрали cache(), overhead уже увеличен до 16g
# spark.executor.memory=96g и overhead=16g -- оставляем как есть после инцидента
Попробуй сам
# 1. Проверить, есть ли ненужные cache() в вашем пайплайне
# Используем execution plan для поиска InMemoryRelation
df.queryExecution.executedPlan.foreach(
lambda p: print(type(p).__name__) if "InMemory" in type(p).__name__ else None
)
# 2. Форсировать план с SortMergeJoin и проверить skew split
spark.conf.set(
"spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold", "0b"
)
result.explain("formatted")
# Искать: "SortMergeJoin" и "OptimizeSkewedJoin applied" в логах
# 3. Проверить column pruning и partition pruning
result.explain("extended")
# В FileScan смотреть: PartitionFilters, PushedFilters, ReadSchema
# 4. Посмотреть finalized число партиций после AQE coalesce
# Через Spark UI: Stage details -> "Number of Partitions" после завершения
# Или через logs при spark.eventLog.enabled=true
# 5. Benchmark: запустить ДВАЖДЫ с разными настройками и сравнить
import time
configs_to_test = [
{"spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold": "32m"}, # ShuffledHashJoin
{"spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold": "0b"}, # SortMergeJoin
]
for config in configs_to_test:
for k, v in config.items():
spark.conf.set(k, v)
start = time.time()
result = run_pipeline() # ваш пайплайн
duration = time.time() - start
print(f"Config {config}: {duration:.0f}s")
Итоги
Три исправления дали суммарный эффект: 3ч 47мин -> 41 мин (минус 82%). Ноль spill, ноль OOM. Ключевые изменения: (1) удаление бессмысленного cache() освободило storage pool; (2) запрет ShuffledHashJoin восстановил AQE skew protection; (3) salting aggregation key устранил второй skew hotspot. В последнем уроке оформляем полный разбор и создаём чек-лист senior-инженера.