Learning Platform
Глоссарий Troubleshooting
Урок 16.05 · 30 мин
Продвинутый
CapstoneSenior ChecklistProductionДиагностикаInternals

Разбор решения и чек-лист senior

Мы прошли полный цикл: симптомы -> диагностика через Spark UI -> идентификация root cause на уровне internals -> применение точечных исправлений -> верификация результатов. В финальном уроке собираем всё в единую картину, разбираем, что именно пошло не так на каждом слое движка, и формируем чек-лист для будущих инцидентов.


Итоговый результат

До оптимизации (baseline 2026-05-20):
  Total duration:       3 ч 47 мин
  Shuffle spill (disk): 15 GB
  OOM events (7 дней):  1
  Stage 3 max task:     62 мин
  Stage 5 max task:     64 мин

После оптимизации (2026-05-22, те же данные):
  Total duration:       39 мин   (-83%)
  Shuffle spill (disk): 0 GB     (-100%)
  OOM events:           0
  Stage 3 max task:     12 мин   (-81%)
  Stage 5 max task:      8 мин   (-88%)

Три исправления в сумме дали 83% ускорение. Ни одного OOM. Задание снова укладывается в целевые 40 минут — и теперь с margin: даже при следующем вирусном событии (которое может дать 2x текущего объёма) skew protection через AQE удержит время в разумных рамках.


Что было не так на каждом слое

Карта проблем по слоям движка
DAG/Shuffle: дефолтные 200 partitionsСлой 1: DAG / Shuffle. spark.sql.shuffle.partitions=200 — дефолт из pre-AQE эпохи. При 120 GB данных медианная партиция 600 MB — приемлемо, но при skew один outlier достигает 6.7 GB. AQE должен был справиться — но не справился из-за Слоя 2.
AQE: конфликт оптимизацийСлой 2: AQE. OptimizeShuffleWithLocalRead заменил SortMergeJoin на ShuffledHashJoin (users 4 GB < maxShuffledHashJoinLocalMapThreshold). Это деактивировало OptimizeSkewedJoin. Два корректных решения AQE вступили в конфликт: hash join хорош для size, но несовместим со skew protection.
Memory: бесполезный cache()Слой 3: Memory. enriched.cache() без смысла (DataFrame использовался один раз). Захватил до 28.8 GB storage pool. UnifiedMemoryManager балансировал между execution и storage под давлением. Task 45 с 6.7 GB partition не получил достаточно execution memory.
Aggregation: skew в groupBy keysСлой 4: Aggregation. groupBy по hot key (RU, camp_9873, click) = 5.2% строк в одной partition. Без salting aggregation stage дублирует skew проблему join stage. AQE не имеет встроенного skew split для aggregation.

Поучительный паттерн: ни одна из четырёх проблем не была «очевидно неправильным» кодом. Все решения казались разумными:

  • shuffle.partitions=200 — дефолт, рекомендованный до AQE
  • AQE включён с дефолтными настройками — казалось, достаточно
  • enriched.cache() — коллега добавил «для производительности»
  • groupBy без salting — стандартный способ агрегации

Это типично для production-деградации: не одна грубая ошибка, а взаимодействие нескольких «достаточно разумных» решений при изменившемся объёме данных.


Разбор по каждому применённому исправлению

Исправление 1: удаление cache() (-41% времени)

enriched.cache() полезен только при reuse. Если DataFrame используется один раз, cache() создаёт overhead: материализация в MemoryStore, конкуренция за storage pool, eviction при нехватке, GC давление на объекты в storage. Правило простое: cache() нужен только если вы выполняете более одного action на одном DataFrame.

Дополнительный эффект: без cache() storage pool полностью доступен execution pool через UnifiedMemoryManager boundary shifting. Это дало execution tasks в 1.7x больше памяти на ту же hash table.

Исправление 2: принудить SortMergeJoin через maxShuffledHashJoinLocalMapThreshold=0b (-82% времени, Stage 3)

Ключевое знание internals: AQE оптимизации применяются в строгом порядке и могут взаимно исключать друг друга. OptimizeShuffleWithLocalRead (hash join preference) выполняется ДО OptimizeSkewedJoin. Если первый трансформировал план, второй не найдёт SortMergeJoinExec.

Альтернатива, появившаяся в Spark 4.0: spark.sql.adaptive.forceApplySkewedJoinOptimization=true. Но мы выбрали более conservative вариант: полный запрет ShuffledHashJoin через threshold=0. Для нашего случая (join с 4 GB side) разница в производительности SortMergeJoin vs ShuffledHashJoin при отсутствии skew — незначительна.

Исправление 3: salting aggregation key (-88% времени, Stage 5)

AQE не имеет aggregation skew split (в отличие от join). Salting — классическое решение: добавляем случайный «соль» к ключу агрегации, выполняем partial aggregation с солью, затем финальную агрегацию без соли. Компромисс: countDistinct -> approx_count_distinct с ~1% погрешностью.


Чек-лист senior-инженера: диагностика Spark internals

Этот чек-лист применим к любому медленному или нестабильному Spark-заданию.

Шаг 1: Spark UI первичный осмотр (5 минут)

[ ] Job timeline: есть ли gaps (scheduler overhead, GC pauses)?
[ ] Stage с максимальной длительностью — что именно?
[ ] Stage details: median vs max task duration — разброс > 3x?
[ ] Shuffle spill (disk) > 0 GB?
[ ] GC time > 10% task duration?
[ ] Failed/speculative tasks?

Шаг 2: идентификация паттерна (10 минут)

[ ] Один outlier task (max >> p75) -> data skew
[ ] Все tasks медленные, один executor медленный -> straggler node
[ ] Все tasks медленные равномерно -> not enough resources / GC
[ ] OOM в executor logs -> memory: execution или storage
[ ] OOM в driver logs -> collect(), large broadcast, toPandas()
[ ] FetchFailed в logs -> shuffle fetch failure, executor died

Шаг 3: проверка плана через explain() (10 минут)

df.explain("formatted")
# [ ] SortMergeJoin или ShuffledHashJoin для большого join?
# [ ] BroadcastHashJoin для маленьких таблиц (<10 MB)?
# [ ] PushedFilters присутствуют в FileScan?
# [ ] PartitionFilters присутствуют для partitioned tables?
# [ ] ReadSchema: только нужные колонки?
# [ ] InMemoryRelation: есть ли cache() который используется <=1 раза?
# [ ] Whole-stage codegen (*N): нет ли операторов без звёздочки?

df.explain("extended")
# [ ] Optimized Logical Plan: filter сдвинут вниз?
# [ ] Column pruning применён?

Шаг 4: AQE-конфигурация (5 минут)

[ ] spark.sql.adaptive.enabled=true?
[ ] spark.sql.adaptive.skewJoin.enabled=true?
[ ] maxShuffledHashJoinLocalMapThreshold — не приводит ли к конфликту со skew?
[ ] advisoryPartitionSizeInBytes — 64m-256m для production?
[ ] После прогона: сколько partitions реально создал AQE (из UI)?

Шаг 5: memory-профиль (10 минут при подозрении на memory)

[ ] spark.executor.memory и spark.memory.fraction установлены явно?
[ ] executor.memoryOverhead = 10-15% от executor.memory?
[ ] Нет ли enriched.cache() без reuse?
[ ] При OOM: трассировка в executor-7 logs — где именно OOM?
[ ] GC logs: Full GC чаще чем 1 раз в 10 минут?
[ ] Peak Execution Memory для outlier task (Spark UI task details)?

Шаг 6: проверка данных (5 минут)

# [ ] Распределение join key
df.groupBy("join_key").count().selectExpr(
    "max(count) / percentile_approx(count, 0.5) as skew_ratio"
).show()
# skew_ratio > 100 -> серьёзный skew

# [ ] Распределение groupBy key
df.groupBy("agg_key").count().selectExpr(
    "max(count) * 100.0 / sum(count) as max_pct"
).show()
# max_pct > 5% -> salting нужен

Частые ловушки, которые не очевидны

Ловушка 1: AQE включён, но не работает. AQE применяет оптимизации в порядке приоритета. Если OptimizeShuffleWithLocalRead заменил SortMergeJoin на ShuffledHashJoin, OptimizeSkewedJoin не найдёт что оптимизировать. Всегда проверяй финальный Physical Plan, не верь тому, что AQE «должен был» сделать.

Ловушка 2: cache() замедляет вместо ускорения. Любой cache() без последующего reuse — это чистый overhead. В ETL-пайплайнах (линейный DAG, каждый DataFrame используется один раз) cache() не нужен вообще, если не нужны multiple actions.

Ловушка 3: null-значения в join key создают skew. hash(null) % N = 0 для любого N. Если в join key много null — все они идут в partition 0. Это выглядит как skew, но причина — data quality. Исправление: df.filter(col("join_key").isNotNull()) до join.

Ловушка 4: countDistinct и salting несовместимы точно. Распределённый countDistinct требует, чтобы все строки с одинаковым значением пришли к одному reducer. При salting этот инвариант нарушается. Решение: approx_count_distinct (HyperLogLog) или явная pre-deduplication до join.

Ловушка 5: spark.sql.shuffle.partitions=200 — дефолт для маленьких данных. При объёме > 10 GB или AQE включён этот параметр — отправная точка, а не финальное значение. С AQE используй coalescePartitions.initialPartitionNum=400-1000 и позволь AQE выбрать оптимальное число.

Ловушка 6: executor.memory и memoryOverhead недооценены. spark.executor.memoryOverhead покрывает: pyspark process, JVM overhead, off-heap буферы. При Python UDF или Arrow-сериализации (pandas UDF) overhead должен быть минимум 15-20% от executor.memory. Дефолт 10% недостаточен.


Полная цепочка: от кода к internals

Полная цепочка: код -> Catalyst -> DAG -> Memory -> Execution
DataFrame APIИсходный код: events.join(users, 'user_id').cache().groupBy(...). DataFrame API транслируется в Unresolved Logical Plan через Catalyst Parser.
Catalyst
Optimized Logical PlanCatalyst Optimizer: Column pruning (12 -> 5 cols), Predicate pushdown (event_date filter), Partition pruning. Optimized Logical Plan без cache() overhead.
SparkPlanner
Physical PlanSparkPlanner + AQE: выбор SortMergeJoin (после maxShuffledHashJoinLocalMapThreshold=0b), BroadcastHashJoin для campaigns. Physical Plan с HashAggregate (partial+final).
DAG: 6 stagesDAG Scheduler: Stage 0 (scan events), Stage 1 (scan users), Stage 2 (shuffle events), Stage 3 (SortMergeJoin + skew split partition_73 -> 105 sub-tasks), Stage 4 (broadcast join campaigns), Stage 5 (groupBy salted).
TaskScheduler
Memory: без конкуренцииTask execution: UnifiedMemoryManager без конкуренции cache(). Execution pool полностью доступен. Sub-tasks partition_73 получают 64 MB каждый. Zero spill. G1GC стабильный.
результат
39 мин, 0 OOMИтог: 39 мин вместо 3ч 47мин. Zero OOM. Zero spill. AQE скоррегировал 400 -> 47 partitions для Stage 5. Система устойчива к следующим вирусным кампаниям.

Что мониторить после оптимизации

Задание теперь стабильно, но нужны системные метрики для раннего обнаружения регрессии:

# 1. Алерт на task duration skew ratio
# Spark UI API: если max/p50 > 5 для любой stage -> алерт

# 2. Алерт на disk spill
# Если Shuffle spill (disk) > 1 GB -> предупреждение (скоро может вернуться OOM)

# 3. Алерт на GC time
# GC time > 20% task duration у любого executor -> memory pressure

# 4. Мониторинг объёма входных данных
# Если events > 1.5x от вчерашнего объёма -> ранний алерт (будущий skew)

Особенно важен пункт 4: наша проблема нарастала 30 дней (38 мин -> 3ч 47мин). Мониторинг объёма данных с алертом позволил бы поймать деградацию на этапе 1ч 23мин и исправить задолго до OOM.


Что читать дальше

Капстоун охватил основные слои диагностики, но каждый из них можно углубить:

Shuffle internals глубже: SortShuffleWriter vs UnsafeShuffleWriter selection algorithm, MapOutputTracker epoch mechanism, push-based shuffle (External Shuffle Service 2.0). Модуль 04 курса.

Memory глубже: OffHeapMemoryAllocator и Tungsten в off-heap режиме, MemoryStore eviction policies (LRU vs FIFO), TaskMemoryManager spill coordination между concurrent consumers. Модуль 05 курса.

AQE глубже: DynamicPartitionPruning (как AQE использует runtime statistics для partition pruning при broadcast join), RuntimeFilterPushDown, CoalesceShufflePartitions internal mechanics. Модуль 08 курса.

Structured Streaming: всё то же самое, но в micro-batch контексте. State store (RocksDB vs in-memory), checkpoint mechanics, watermark и late data. Модуль 09 курса.

Расширение Spark: если нужна custom оптимизация (например, domain-specific predicate pushdown или кастомный data source), SparkSessionExtensions позволяет инжектировать rules в Catalyst pipeline. Модуль 11 курса.


Попробуй сам

Финальное упражнение капстоуна: применить чек-лист к вашему production-заданию.

# Шаблон диагностики production job

def diagnose_pipeline(df, description="pipeline"):
    """
    Senior diagnostic template.
    Запустить ДО оптимизации для сбора baseline метрик.
    """
    print(f"\n=== Диагностика: {description} ===\n")
    
    # 1. Физический план
    print("--- Physical Plan ---")
    df.explain("formatted")
    
    # 2. Проверка join keys на skew
    # (заменить 'key' на реальный join key)
    # df.groupBy("key").count() \
    #   .selectExpr("max(count)/percentile_approx(count,0.5) as skew_ratio") \
    #   .show()
    
    # 3. Проверка настроек AQE
    print("\n--- AQE Config ---")
    for conf in [
        "spark.sql.adaptive.enabled",
        "spark.sql.adaptive.skewJoin.enabled",
        "spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold",
        "spark.sql.adaptive.advisoryPartitionSizeInBytes",
        "spark.sql.shuffle.partitions",
    ]:
        print(f"  {conf} = {spark.conf.get(conf, 'not set')}")
    
    # 4. Проверка cache
    plan_str = str(df.queryExecution.executedPlan)
    if "InMemoryRelation" in plan_str:
        print("\n[WARNING] InMemoryRelation обнаружен в плане. "
              "Убедитесь, что cache() используется более одного раза.")
    
    return df

# Использование:
result = diagnose_pipeline(
    events.join(users, "user_id").groupBy(...).agg(...),
    "daily ETL join + aggregation"
)
result.write...

Итоги всего капстоуна

Мы прошли полный цикл диагностики и оптимизации production Spark-пайплайна, опираясь на знание internals каждого слоя:

  • Shuffle internals (модуль 04): понимание HashPartitioning и distribution объяснило, почему один user_id создаёт outlier partition
  • AQE internals (модуль 08): знание порядка применения оптимизаций (OptimizeShuffleWithLocalRead до OptimizeSkewedJoin) объяснило, почему skew protection не сработала
  • Memory internals (модуль 05): понимание UnifiedMemoryManager boundary shifting объяснило связь между cache() и OOM
  • Catalyst internals (модуль 06): инспекция через explain() подтвердила column pruning, partition pruning и whole-stage codegen

Главный принцип senior-диагностики: от симптома к механизму, не к конфигу. Знание того, как Spark работает изнутри, позволяет сразу сформулировать правильную гипотезу вместо случайного перебора параметров.

Проверка знанийKnowledge check
Коллега предлагает: 'Давай просто увеличим spark.executor.memory с 96g до 192g — тогда точно не будет OOM'. Как senior-инженер объясни, почему это не решает root cause, и что произойдёт с пайплайном через следующий месяц при дальнейшем росте данных?
ОтветAnswer
Увеличение executor.memory откладывает проблему, а не решает root cause. Root cause — data skew: один user_id генерирует 5.6% всех строк. При 192g heap task 45 (partition_73 = 6.7 GB) перестанет падать с OOM сегодня. Но данные растут: через месяц при очередной вирусной кампании partition_73 может достичь 15-20 GB, и снова будет OOM. Это линейная гонка с данными. Правильное решение — устранить skew: (1) AQE skew split через SortMergeJoin разбивает skewed partition на подпартиции независимо от абсолютного размера; (2) в худшем случае skew ratio остаётся константным при росте данных — задание масштабируется линейно. Увеличение памяти может быть оправдано как краткосрочный mitigation пока готовится proper fix, но не как постоянное решение. Кроме того, 192g executor требует узлы с ещё большим RAM, что увеличивает стоимость кластера.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Senior-инженер применяет чек-лист диагностики к новому заданию. Spark UI Stage 7: median task = 2 мин, max task = 2.1 мин, Shuffle Read Size max = 68 MB (медиана = 61 MB), GC Time = 8 с. При этом задание выполняется 3 часа вместо ожидаемых 30 минут. Что показывает чек-лист?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5