Разбор решения и чек-лист senior
Мы прошли полный цикл: симптомы -> диагностика через Spark UI -> идентификация root cause на уровне internals -> применение точечных исправлений -> верификация результатов. В финальном уроке собираем всё в единую картину, разбираем, что именно пошло не так на каждом слое движка, и формируем чек-лист для будущих инцидентов.
Итоговый результат
До оптимизации (baseline 2026-05-20):
Total duration: 3 ч 47 мин
Shuffle spill (disk): 15 GB
OOM events (7 дней): 1
Stage 3 max task: 62 мин
Stage 5 max task: 64 мин
После оптимизации (2026-05-22, те же данные):
Total duration: 39 мин (-83%)
Shuffle spill (disk): 0 GB (-100%)
OOM events: 0
Stage 3 max task: 12 мин (-81%)
Stage 5 max task: 8 мин (-88%)
Три исправления в сумме дали 83% ускорение. Ни одного OOM. Задание снова укладывается в целевые 40 минут — и теперь с margin: даже при следующем вирусном событии (которое может дать 2x текущего объёма) skew protection через AQE удержит время в разумных рамках.
Что было не так на каждом слое
Поучительный паттерн: ни одна из четырёх проблем не была «очевидно неправильным» кодом. Все решения казались разумными:
shuffle.partitions=200— дефолт, рекомендованный до AQE- AQE включён с дефолтными настройками — казалось, достаточно
enriched.cache()— коллега добавил «для производительности»groupByбез salting — стандартный способ агрегации
Это типично для production-деградации: не одна грубая ошибка, а взаимодействие нескольких «достаточно разумных» решений при изменившемся объёме данных.
Разбор по каждому применённому исправлению
Исправление 1: удаление cache() (-41% времени)
enriched.cache() полезен только при reuse. Если DataFrame используется один раз, cache() создаёт overhead: материализация в MemoryStore, конкуренция за storage pool, eviction при нехватке, GC давление на объекты в storage. Правило простое: cache() нужен только если вы выполняете более одного action на одном DataFrame.
Дополнительный эффект: без cache() storage pool полностью доступен execution pool через UnifiedMemoryManager boundary shifting. Это дало execution tasks в 1.7x больше памяти на ту же hash table.
Исправление 2: принудить SortMergeJoin через maxShuffledHashJoinLocalMapThreshold=0b (-82% времени, Stage 3)
Ключевое знание internals: AQE оптимизации применяются в строгом порядке и могут взаимно исключать друг друга. OptimizeShuffleWithLocalRead (hash join preference) выполняется ДО OptimizeSkewedJoin. Если первый трансформировал план, второй не найдёт SortMergeJoinExec.
Альтернатива, появившаяся в Spark 4.0: spark.sql.adaptive.forceApplySkewedJoinOptimization=true. Но мы выбрали более conservative вариант: полный запрет ShuffledHashJoin через threshold=0. Для нашего случая (join с 4 GB side) разница в производительности SortMergeJoin vs ShuffledHashJoin при отсутствии skew — незначительна.
Исправление 3: salting aggregation key (-88% времени, Stage 5)
AQE не имеет aggregation skew split (в отличие от join). Salting — классическое решение: добавляем случайный «соль» к ключу агрегации, выполняем partial aggregation с солью, затем финальную агрегацию без соли. Компромисс: countDistinct -> approx_count_distinct с ~1% погрешностью.
Чек-лист senior-инженера: диагностика Spark internals
Этот чек-лист применим к любому медленному или нестабильному Spark-заданию.
Шаг 1: Spark UI первичный осмотр (5 минут)
[ ] Job timeline: есть ли gaps (scheduler overhead, GC pauses)?
[ ] Stage с максимальной длительностью — что именно?
[ ] Stage details: median vs max task duration — разброс > 3x?
[ ] Shuffle spill (disk) > 0 GB?
[ ] GC time > 10% task duration?
[ ] Failed/speculative tasks?
Шаг 2: идентификация паттерна (10 минут)
[ ] Один outlier task (max >> p75) -> data skew
[ ] Все tasks медленные, один executor медленный -> straggler node
[ ] Все tasks медленные равномерно -> not enough resources / GC
[ ] OOM в executor logs -> memory: execution или storage
[ ] OOM в driver logs -> collect(), large broadcast, toPandas()
[ ] FetchFailed в logs -> shuffle fetch failure, executor died
Шаг 3: проверка плана через explain() (10 минут)
df.explain("formatted")
# [ ] SortMergeJoin или ShuffledHashJoin для большого join?
# [ ] BroadcastHashJoin для маленьких таблиц (<10 MB)?
# [ ] PushedFilters присутствуют в FileScan?
# [ ] PartitionFilters присутствуют для partitioned tables?
# [ ] ReadSchema: только нужные колонки?
# [ ] InMemoryRelation: есть ли cache() который используется <=1 раза?
# [ ] Whole-stage codegen (*N): нет ли операторов без звёздочки?
df.explain("extended")
# [ ] Optimized Logical Plan: filter сдвинут вниз?
# [ ] Column pruning применён?
Шаг 4: AQE-конфигурация (5 минут)
[ ] spark.sql.adaptive.enabled=true?
[ ] spark.sql.adaptive.skewJoin.enabled=true?
[ ] maxShuffledHashJoinLocalMapThreshold — не приводит ли к конфликту со skew?
[ ] advisoryPartitionSizeInBytes — 64m-256m для production?
[ ] После прогона: сколько partitions реально создал AQE (из UI)?
Шаг 5: memory-профиль (10 минут при подозрении на memory)
[ ] spark.executor.memory и spark.memory.fraction установлены явно?
[ ] executor.memoryOverhead = 10-15% от executor.memory?
[ ] Нет ли enriched.cache() без reuse?
[ ] При OOM: трассировка в executor-7 logs — где именно OOM?
[ ] GC logs: Full GC чаще чем 1 раз в 10 минут?
[ ] Peak Execution Memory для outlier task (Spark UI task details)?
Шаг 6: проверка данных (5 минут)
# [ ] Распределение join key
df.groupBy("join_key").count().selectExpr(
"max(count) / percentile_approx(count, 0.5) as skew_ratio"
).show()
# skew_ratio > 100 -> серьёзный skew
# [ ] Распределение groupBy key
df.groupBy("agg_key").count().selectExpr(
"max(count) * 100.0 / sum(count) as max_pct"
).show()
# max_pct > 5% -> salting нужен
Частые ловушки, которые не очевидны
Ловушка 1: AQE включён, но не работает. AQE применяет оптимизации в порядке приоритета. Если OptimizeShuffleWithLocalRead заменил SortMergeJoin на ShuffledHashJoin, OptimizeSkewedJoin не найдёт что оптимизировать. Всегда проверяй финальный Physical Plan, не верь тому, что AQE «должен был» сделать.
Ловушка 2: cache() замедляет вместо ускорения. Любой cache() без последующего reuse — это чистый overhead. В ETL-пайплайнах (линейный DAG, каждый DataFrame используется один раз) cache() не нужен вообще, если не нужны multiple actions.
Ловушка 3: null-значения в join key создают skew. hash(null) % N = 0 для любого N. Если в join key много null — все они идут в partition 0. Это выглядит как skew, но причина — data quality. Исправление: df.filter(col("join_key").isNotNull()) до join.
Ловушка 4: countDistinct и salting несовместимы точно. Распределённый countDistinct требует, чтобы все строки с одинаковым значением пришли к одному reducer. При salting этот инвариант нарушается. Решение: approx_count_distinct (HyperLogLog) или явная pre-deduplication до join.
Ловушка 5: spark.sql.shuffle.partitions=200 — дефолт для маленьких данных. При объёме > 10 GB или AQE включён этот параметр — отправная точка, а не финальное значение. С AQE используй coalescePartitions.initialPartitionNum=400-1000 и позволь AQE выбрать оптимальное число.
Ловушка 6: executor.memory и memoryOverhead недооценены. spark.executor.memoryOverhead покрывает: pyspark process, JVM overhead, off-heap буферы. При Python UDF или Arrow-сериализации (pandas UDF) overhead должен быть минимум 15-20% от executor.memory. Дефолт 10% недостаточен.
Полная цепочка: от кода к internals
Что мониторить после оптимизации
Задание теперь стабильно, но нужны системные метрики для раннего обнаружения регрессии:
# 1. Алерт на task duration skew ratio
# Spark UI API: если max/p50 > 5 для любой stage -> алерт
# 2. Алерт на disk spill
# Если Shuffle spill (disk) > 1 GB -> предупреждение (скоро может вернуться OOM)
# 3. Алерт на GC time
# GC time > 20% task duration у любого executor -> memory pressure
# 4. Мониторинг объёма входных данных
# Если events > 1.5x от вчерашнего объёма -> ранний алерт (будущий skew)
Особенно важен пункт 4: наша проблема нарастала 30 дней (38 мин -> 3ч 47мин). Мониторинг объёма данных с алертом позволил бы поймать деградацию на этапе 1ч 23мин и исправить задолго до OOM.
Что читать дальше
Капстоун охватил основные слои диагностики, но каждый из них можно углубить:
Shuffle internals глубже: SortShuffleWriter vs UnsafeShuffleWriter selection algorithm, MapOutputTracker epoch mechanism, push-based shuffle (External Shuffle Service 2.0). Модуль 04 курса.
Memory глубже: OffHeapMemoryAllocator и Tungsten в off-heap режиме, MemoryStore eviction policies (LRU vs FIFO), TaskMemoryManager spill coordination между concurrent consumers. Модуль 05 курса.
AQE глубже: DynamicPartitionPruning (как AQE использует runtime statistics для partition pruning при broadcast join), RuntimeFilterPushDown, CoalesceShufflePartitions internal mechanics. Модуль 08 курса.
Structured Streaming: всё то же самое, но в micro-batch контексте. State store (RocksDB vs in-memory), checkpoint mechanics, watermark и late data. Модуль 09 курса.
Расширение Spark: если нужна custom оптимизация (например, domain-specific predicate pushdown или кастомный data source), SparkSessionExtensions позволяет инжектировать rules в Catalyst pipeline. Модуль 11 курса.
Попробуй сам
Финальное упражнение капстоуна: применить чек-лист к вашему production-заданию.
# Шаблон диагностики production job
def diagnose_pipeline(df, description="pipeline"):
"""
Senior diagnostic template.
Запустить ДО оптимизации для сбора baseline метрик.
"""
print(f"\n=== Диагностика: {description} ===\n")
# 1. Физический план
print("--- Physical Plan ---")
df.explain("formatted")
# 2. Проверка join keys на skew
# (заменить 'key' на реальный join key)
# df.groupBy("key").count() \
# .selectExpr("max(count)/percentile_approx(count,0.5) as skew_ratio") \
# .show()
# 3. Проверка настроек AQE
print("\n--- AQE Config ---")
for conf in [
"spark.sql.adaptive.enabled",
"spark.sql.adaptive.skewJoin.enabled",
"spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold",
"spark.sql.adaptive.advisoryPartitionSizeInBytes",
"spark.sql.shuffle.partitions",
]:
print(f" {conf} = {spark.conf.get(conf, 'not set')}")
# 4. Проверка cache
plan_str = str(df.queryExecution.executedPlan)
if "InMemoryRelation" in plan_str:
print("\n[WARNING] InMemoryRelation обнаружен в плане. "
"Убедитесь, что cache() используется более одного раза.")
return df
# Использование:
result = diagnose_pipeline(
events.join(users, "user_id").groupBy(...).agg(...),
"daily ETL join + aggregation"
)
result.write...
Итоги всего капстоуна
Мы прошли полный цикл диагностики и оптимизации production Spark-пайплайна, опираясь на знание internals каждого слоя:
- Shuffle internals (модуль 04): понимание HashPartitioning и distribution объяснило, почему один
user_idсоздаёт outlier partition - AQE internals (модуль 08): знание порядка применения оптимизаций (OptimizeShuffleWithLocalRead до OptimizeSkewedJoin) объяснило, почему skew protection не сработала
- Memory internals (модуль 05): понимание UnifiedMemoryManager boundary shifting объяснило связь между cache() и OOM
- Catalyst internals (модуль 06): инспекция через explain() подтвердила column pruning, partition pruning и whole-stage codegen
Главный принцип senior-диагностики: от симптома к механизму, не к конфигу. Знание того, как Spark работает изнутри, позволяет сразу сформулировать правильную гипотезу вместо случайного перебора параметров.