Learning Platform
Глоссарий Troubleshooting
Урок 16.04 · 35 мин
Продвинутый
AQEBroadcast JoinExplain PlanWhole-Stage CodegenОптимизация

Оптимизация реального пайплайна

В уроках 2 и 3 мы поставили диагноз: data skew в join и aggregation, неработающая AQE skew optimization из-за выбора ShuffledHashJoin, конкурирующий cache() в UnifiedMemoryManager, избыточный disk spill. Теперь применяем исправления методично — по одному, с замерами до и после каждого изменения.

Оптимизация shuffle: практические подходы

Базовые метрики перед оптимизацией (Baseline)

Перед любым изменением фиксируем baseline. Этот шаг обязателен — без него невозможно измерить эффект каждого исправления.

Baseline (прогон 2026-05-20, 02:00 UTC):
  Total duration:     3 ч 47 мин
  Stage 2 (shuffle write): 47 мин (median task 14 мин, max 47 мин)
  Stage 3 (join):          62 мин (median task 4 мин, max 62 мин)
  Stage 5 (aggregation):   64 мин (median task 4 мин, max 64 мин)
  Shuffle spill (disk):    15 GB
  GC time executor-7:      14 мин 32 с (Task 45)
  OOM in last 7 days:      1 раз
  spark.sql.shuffle.partitions: 200
  spark.sql.adaptive.enabled:   true
  enriched.cache():         присутствует

Исправление 1: удаление ненужного cache()

Первое исправление — самое простое и с немедленным эффектом.

# БЫЛО:
enriched = events.join(users, "user_id", "left") \
                 .join(campaigns, "campaign_id", "left") \
                 .cache()    # эта строка создаёт конкуренцию за storage pool

result = enriched.groupBy(...).agg(...)
result.write...

# СТАЛО:
enriched = events.join(users, "user_id", "left") \
                 .join(campaigns, "campaign_id", "left")
# cache() убран

result = enriched.groupBy(...).agg(...)
result.write...

Почему cache() здесь был вреден: enriched используется ровно один раз (для aggregation), а значит cache не даёт никакого reuse-выигрыша. Зато захватывает до 28.8 GB storage pool, уменьшая доступный execution pool.

Правило: cache() оправдан только если DataFrame используется более одного раза в коде. Один action = ноль смысла в кэше.

После удаления cache() и повторного прогона (тест с теми же данными):

После Исправления 1:
  Total duration:     2 ч 14 мин  (-41%)
  Stage 3 (join):     55 мин (max task 55 мин, outlier сохранился)
  Shuffle spill (disk): 8 GB  (-47%)
  GC time executor-7: 9 мин    (-38%)

Уже значительное улучшение. Spill сократился вдвое — execution pool освободился. Но outlier task в Stage 3 всё ещё есть: root cause skew не устранён.


Исправление 2: принудить AQE к SortMergeJoin для skew optimization

Вторая проблема: AQE выбрал ShuffledHashJoin, деактивировав skew protection. Исправление:

spark.conf.set(
    "spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold", 
    "0b"  # запрещаем замену на ShuffledHashJoin
)

Это означает, что AQE больше не будет переключаться на ShuffledHashJoin и сохранит SortMergeJoin. Теперь OptimizeSkewedJoin найдёт SortMergeJoinExec в плане и применит skew split.

Альтернативный подход, появившийся в Spark 4.0 (SPARK-45567):

# Принудить skew optimization даже при ShuffledHashJoin
spark.conf.set(
    "spark.sql.adaptive.forceApplySkewedJoinOptimization",
    "true"
)

Мы выбираем первый вариант (запрет ShuffledHashJoin) как более предсказуемый.

Также явно задаём skew detection пороги (дефолты подходят, но фиксируем их для документации):

spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set(
    "spark.sql.adaptive.skewJoin.skewedPartitionFactor", 
    "5"  # partition skewed если > 5x медианы
)
spark.conf.set(
    "spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", 
    "256m"  # и > 256 MB абсолютно
)
spark.conf.set(
    "spark.sql.adaptive.advisoryPartitionSizeInBytes",
    "64m"   # целевой размер подпартиции после split
)

При partition_73 = 6.7 GB и advisory = 64 MB: AQE разобьёт partition_73 на ceil(6700 / 64) = 105 подпартиций. Каждая обрабатывается отдельным task с ~64 MB входных данных.

AQE skew split: partition_73 до и после
Один task: 6.7 GBДо исправления: partition_73 = 6.7 GB идёт в один task SortMergeJoin или ShuffledHashJoin. Task duration: 62 мин. Peak memory: 47 GB. Spill: 15 GB.
AQE skew split
105 tasks: 64 MB каждыйПосле: partition_73 разбита на 105 sub-partitions. Каждая ~64 MB, обрабатывается параллельным task. Максимальный task duration ≈ медиана. Zero spill.

Важный нюанс: skew split для SortMergeJoin требует, чтобы соответствующие строки из users были дублированы для каждой подпартиции. Если partition_73 из events разбита на 105 частей, каждая часть должна быть joined с полным набором users, у которых hash(user_id) % 200 = 73. Это небольшое увеличение чтения со стороны build (users), но в масштабе 200 MB -> 200 MB * 105 = незначительно на фоне выигрыша.

После Исправления 2 (вместе с Исправлением 1):

После Исправлений 1+2:
  Total duration:     48 мин  (-79% от baseline)
  Stage 3 (join):     11 мин  (max task 12 мин, разброс исчез!)
  Stage 5 (aggregation): 18 мин (всё ещё skew в aggregation keys)
  Shuffle spill (disk): 0 GB
  GC time: < 1 мин на всех executor-ах

Отличный результат для join stage. Но Stage 5 всё ещё 18 минут — aggreagtion skew не устранена.


Исправление 3: устранение aggregation skew через salting

Stage 5 (groupBy("country", "campaign_id", "event_type")) имеет skewed partition для (RU, camp_9873, click) = 5.2% всех строк.

Для aggregation AQE не имеет встроенного skew split (AQE coalesce работает для равномерного уменьшения числа партиций, но не для split skewed). Стандартное решение — salting:

from pyspark.sql.functions import rand, floor, concat_ws, lit

# Добавляем соль к ключу агрегации
SALT_BUCKETS = 20  # разбиваем hot key на 20 частей

enriched_salted = enriched.withColumn(
    "salt",
    (floor(rand() * SALT_BUCKETS)).cast("string")
)

# Первичная агрегация с солью (partial aggregation)
partial_agg = enriched_salted.groupBy(
    "country", "campaign_id", "event_type", "salt"
).agg(
    count("*").alias("event_count_partial"),
    sum("revenue").alias("revenue_partial"),
    # countDistinct не поддерживает salting напрямую — нужен HyperLogLog
    approx_count_distinct("user_id", 0.01).alias("approx_unique_users")
)

# Финальная агрегация — убираем соль
result = partial_agg.groupBy(
    "country", "campaign_id", "event_type"
).agg(
    sum("event_count_partial").alias("event_count"),
    sum("approx_unique_users").alias("unique_users"),  # приближение!
    sum("revenue_partial").alias("total_revenue")
)

Компромисс: countDistinct с salting требует использования approx_count_distinct (HyperLogLog), что даёт погрешность ~1%. Для BI-дашборда это приемлемо. Если нужна точность — альтернатива: repartitionByRange с явным числом партиций, что равномерно распределит ключи, но не устранит skew для одного hot key.

Для нашего case выбираем approx_count_distinct с документированной погрешностью.

После всех трёх исправлений:

После Исправлений 1+2+3:
  Total duration:     41 мин  (-82% от baseline 3ч 47мин)
  Stage 3 (join):     11 мин
  Stage 5 (aggregation): 9 мин
  Shuffle spill (disk): 0 GB
  OOM events: 0

Инспекция физического плана: explain(“formatted”)

После применения исправлений проверяем итоговый план:

result.explain("formatted")
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   HashAggregate(keys=[country#12, campaign_id#34, event_type#56],
                 functions=[count(1), sum(approx_count_distinct(...)), sum(revenue#90)])
   +- Exchange hashpartitioning(country#12, campaign_id#34, ..., 200), ...
      +- HashAggregate(keys=..., functions=...) (partial)
         +- Project [...]
            +- SortMergeJoin [user_id#78], [user_id#112], LeftOuter
               :- Sort [...], false, 0
               :  +- Exchange hashpartitioning(user_id#78, 200), ...
               :     +- Filter [event_date#11 = 2026-05-12]
               :        +- FileScan parquet [...] PushedFilters: [IsNotNull(event_date),
               :                              EqualTo(event_date, 2026-05-12)]
               +- Sort [...], false, 0
                  +- Exchange hashpartitioning(user_id#112, 200), ...
                     +- Filter [country#45 IN ('RU','DE','FR',...)]
                        +- FileScan parquet [...] PushedFilters: [IsNotNull(country),
                                              In(country, [RU,DE,FR,...])]

Что проверяем в плане:

1. PushedFilters присутствуют. EqualTo(event_date, 2026-05-12) и In(country, [...]) — фильтры опускаются к FileScan. Это значит, что Parquet-reader применяет row group filtering и не читает лишние данные.

2. SortMergeJoin вместо ShuffledHashJoin. После нашего исправления maxShuffledHashJoinLocalMapThreshold=0b — AQE сохранил SortMergeJoin. Теперь skew optimization будет работать.

3. Partial HashAggregate. Две фазы: partial aggregation до shuffle + final aggregation после. Это map-side pre-aggregation, значительно снижающая shuffle bytes.

4. Exchange hashpartitioning(…, 200). Пока 200 — это pre-AQE значение. После завершения map phase AQE скоррегирует через CoalesceShufflePartitions. Проверяем: в Spark UI после прогона Stage summary показывает фактическое число shuffle partitions.


Whole-Stage Codegen: как убедиться, что он работает

Whole-Stage CodeGen (WSCG) объединяет несколько операторов в один сгенерированный Java-метод, устраняя виртуальные вызовы между операторами. Проверяем его активность:

result.explain("codegen")
Found 3 WholeStageCodegen subtrees.
== Subtree 1 / 3 (maxMethodSize:65536, method count:3) ==
*(1) Filter (isnotnull(event_date#11) AND (event_date#11 = 2026-05-12))
+- *(1) ColumnarToRow
   +- FileScan parquet [...] PushedFilters: [...]

== Subtree 2 / 3 ==
*(2) HashAggregate (partial)
+- *(2) Project [...]
   +- *(2) SortMergeJoin [...]

== Subtree 3 / 3 ==
*(3) HashAggregate (final)
+- *(3) Sort [...]

Звёздочки *(N) перед оператором означают, что оператор включён в WholeStageCodegen subtree N. Все наши основные операторы в WSCG — хорошо.

Если оператор НЕ в WSCG (нет звёздочки), причины могут быть:

  • UDF (Python UDF полностью ломает WSCG для этой цепочки)
  • Оператор не поддерживает кодогенерацию (например, GenerateExec для explode)
  • spark.sql.codegen.wholeStage=false явно выключен

В нашем плане BroadcastHashJoin для campaigns тоже должен быть в WSCG:

*(4) BroadcastHashJoin [campaign_id#34], [campaign_id#67], LeftOuter, BuildRight
:- ... (probe side)
+- BroadcastExchange HashedRelationBroadcastMode (build side)
   +- FileScan parquet [campaigns...]

BroadcastExchange вне WSCG (нет звёздочки) — это нормально: broadcast exchange выполняется на driver side и не подлежит codegen.


Проверка predicate pushdown: columns pruning и filter pushdown

Убеждаемся, что Catalyst правильно применил predicate pushdown к Parquet-файлам.

# Подробный план через extended
result.explain("extended")

В секции “Optimized Logical Plan”:

Filter (event_date = 2026-05-12)               <- filter присутствует
+- Project [user_id, campaign_id, event_type, revenue]   <- только нужные колонки
   +- Relation [event_date, user_id, campaign_id, event_type,
                session_id, platform, device_type, ip_address,
                latitude, longitude, user_agent, revenue] (parquet)

В “Physical Plan” в FileScan:

FileScan parquet [user_id, campaign_id, event_type, revenue, event_date]
  PartitionFilters: [isnotnull(event_date), (event_date = 2026-05-12)]
  PushedFilters: [IsNotNull(event_date), EqualTo(event_date,2026-05-12)]
  ReadSchema: struct<user_id:string, campaign_id:string, event_type:string,
                     revenue:decimal(10,2), event_date:date>

PartitionFilters — Spark использует partition directory structure для partition pruning (события читаются только из папки event_date=2026-05-12/). ReadSchema — только 5 колонок из 12 оригинальных. Column pruning работает: не читаем session_id, platform, device_type, ip_address, latitude, longitude, user_agent.

Это важно для объёма I/O. 12 колонок -> 5 колонок: примерно 40% от оригинального размера Parquet-файлов. С column pruning читаем ~48 GB вместо 120 GB.

Catalyst оптимизации в итоговом плане
Column Pruning12 -> 5 colCatalyst убирает неиспользуемые колонки (session_id, platform, device_type, ip_address, latitude, longitude, user_agent). Parquet-reader читает только нужные column chunks. I/O снижен с 120 GB до ~48 GB.
Partition Pruning365 days -> 1event_date partitioning: Spark читает только папку event_date=2026-05-12/. Остальные 364 дня не читаются вообще. При таблице без partition pruning чтение было бы 365x больше.
Predicate PushdownPushedFilters: 2EqualTo(event_date) и In(country) опускаются в FileScan. Parquet row group statistics позволяют пропустить row groups без нужных значений. Дополнительная экономия поверх partition pruning.

AQE coalesce shuffle partitions: итоговое число партиций

После прогона с исправлениями смотрим, что AQE сделал с shuffle.partitions=200:

Stage 3 AQE metrics:
  Pre-shuffle partitions: 200
  Post-shuffle coalesced: 47 partitions
  Coalesce ratio: 4.3x
  Reason: mapOutputStatistics showed many small partitions (< advisoryPartitionSizeInBytes)

AQE объединил 200 -> 47 партиций. Это нормально для нашего объёма: 120 GB / 200 = 600 MB на partition, но после column pruning 48 GB / 200 = 240 MB — ближе к advisory 64 MB -> 200 партиций — много. AQE объединяет смежные малые партиции, сохраняя крупные как есть.

Но: если AQE делает coalesce каждый раз, это сигнал, что стоит установить начальное значение правильнее:

# Более подходящий starting point для AQE
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "400")
# AQE начнёт с 400 и coalesce до оптимального

Начать с большего числа partition безопаснее при наличии skew: если AQE разобьёт skewed partition на 105 подпартиций, при исходных 200 это даёт пиковые 200 + 104 = 304 tasks на один stage.


Итоговая конфигурация после всех исправлений

# Итоговые настройки (добавить в spark-defaults.conf или SparkSession)

# AQE: ядро оптимизации
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64m")
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "400")

# Skew join: принудить SortMergeJoin для skew protection
spark.conf.set(
    "spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold", "0b"
)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set(
    "spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m"
)

# Broadcast join: campaigns уже корректно broadcast-ится
# users (4 GB) слишком большой для broadcast -- не меняем
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10m")  # дефолт

# Память: убрали cache(), overhead уже увеличен до 16g
# spark.executor.memory=96g и overhead=16g -- оставляем как есть после инцидента

Попробуй сам

# 1. Проверить, есть ли ненужные cache() в вашем пайплайне
# Используем execution plan для поиска InMemoryRelation
df.queryExecution.executedPlan.foreach(
    lambda p: print(type(p).__name__) if "InMemory" in type(p).__name__ else None
)

# 2. Форсировать план с SortMergeJoin и проверить skew split
spark.conf.set(
    "spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold", "0b"
)
result.explain("formatted")
# Искать: "SortMergeJoin" и "OptimizeSkewedJoin applied" в логах

# 3. Проверить column pruning и partition pruning
result.explain("extended")
# В FileScan смотреть: PartitionFilters, PushedFilters, ReadSchema

# 4. Посмотреть finalized число партиций после AQE coalesce
# Через Spark UI: Stage details -> "Number of Partitions" после завершения
# Или через logs при spark.eventLog.enabled=true

# 5. Benchmark: запустить ДВАЖДЫ с разными настройками и сравнить
import time

configs_to_test = [
    {"spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold": "32m"},  # ShuffledHashJoin
    {"spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold": "0b"},   # SortMergeJoin
]

for config in configs_to_test:
    for k, v in config.items():
        spark.conf.set(k, v)
    start = time.time()
    result = run_pipeline()  # ваш пайплайн
    duration = time.time() - start
    print(f"Config {config}: {duration:.0f}s")

Итоги

Три исправления дали суммарный эффект: 3ч 47мин -> 41 мин (минус 82%). Ноль spill, ноль OOM. Ключевые изменения: (1) удаление бессмысленного cache() освободило storage pool; (2) запрет ShuffledHashJoin восстановил AQE skew protection; (3) salting aggregation key устранил второй skew hotspot. В последнем уроке оформляем полный разбор и создаём чек-лист senior-инженера.

Проверка знанийKnowledge check
После включения spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold=0b план изменился на SortMergeJoin, AQE применил skew split для partition_73. Но теперь Stage 3 имеет 200 + 104 = 304 tasks вместо 200. В чём trade-off, и когда это нежелательно?
ОтветAnswer
Trade-off: 304 tasks vs 200 tasks означает больше task scheduling overhead на driver (DAGScheduler, TaskScheduler). Каждый task — это RPC между driver и executor, сериализация/десериализация TaskDescription. При маленьком кластере (мало executor cores) это приводит к очереди tasks. Нежелательно в трёх случаях: (1) очень маленький кластер (4-8 cores total) — 304 tasks создают большой backlog; (2) задача с очень маленьким объёмом данных, где overhead scheduling > выигрыша от parallelism; (3) при external shuffle service с bottleneck: 304 tasks генерируют больше fetch-запросов. В нашем случае 20 executor * 8 cores = 160 concurrent tasks — 304 задачи выполняются в 2 волны. Scheduling overhead ~5 сек — незначителен на фоне экономии 50 минут от устранения outlier task.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. После установки spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold=0b план изменился: ShuffledHashJoin заменён на SortMergeJoin, AQE применил skew split. Stage 3 изменился: было 200 tasks (max 62 мин), стало 304 tasks (max 12 мин). Как объяснить появление 304 вместо 200 tasks?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5