Learning Platform
Глоссарий Troubleshooting
Урок 16.01 · 30 мин
Продвинутый
CapstoneДиагностикаSpark UIProductionETL

Капстоун: постановка задачи

В предыдущих пятнадцати модулях курса мы разбирали Apache Spark 4.0 изнутри: RDD-модель и DAG Scheduler, shuffle internals и SortShuffleWriter, UnifiedMemoryManager и MemoryStore, Catalyst и Tungsten, AQE и skew join, Structured Streaming и Arrow. Капстоун переводит всё это в практику: перед вами реальный production-пайплайн с реальными симптомами, и ваша задача — найти причины, опираясь именно на знание internals.


Описание пайплайна

Команда аналитики запускает ежедневный ETL-job в 02:00 UTC. Пайплайн читает события за прошедшие сутки из Delta Lake (Parquet-паркет, партицировано по event_date), обогащает их двумя join-операциями, выполняет агрегацию и записывает результат в lakehouse для BI-инструментов.

# Упрощённая схема пайплайна
events = spark.read.format("delta").load("s3://data/events/")
    .filter(col("event_date") == yesterday)   # ~800M строк / 120 GB

users = spark.read.format("delta").load("s3://data/users/")
    .filter(col("country").isin(TARGET_COUNTRIES))  # ~25M строк / 4 GB

campaigns = spark.read.format("delta").load("s3://data/campaigns/")
    # ~2000 строк / 80 KB

enriched = events.join(users, "user_id", "left")       # shuffle join
                 .join(campaigns, "campaign_id", "left") # ещё join

result = enriched.groupBy("country", "campaign_id", "event_type") \
                 .agg(
                     count("*").alias("event_count"),
                     countDistinct("user_id").alias("unique_users"),
                     sum("revenue").alias("total_revenue")
                 )

result.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("country") \
    .save("s3://data/analytics/daily/")

Кластер: YARN на AWS EMR, 1 driver (r6g.2xlarge: 8 vCPU, 64 GB RAM) + 20 executor-ов (r6g.4xlarge: 16 vCPU, 128 GB RAM каждый). Spark 4.0, Java 17 (G1GC). spark.executor.memory=96g, spark.executor.memoryOverhead=8g, spark.sql.shuffle.partitions=200 (дефолт сохранён при переходе на AQE), spark.executor.cores=8.


Симптомы

В понедельник утром on-call инженер получает два алерта. Первый: задание выполнялось 3 часа 12 минут вместо штатных 38–45 минут. Второй: одно из выполнений той же ночи завершилось с OOM-ошибкой и потребовало ручного перезапуска.

История за последние 30 дней показывает паттерн:

2026-04-22  38 мин   OK
2026-04-23  41 мин   OK
2026-04-24  39 мин   OK
2026-05-01  51 мин   OK
2026-05-07  1ч 23м   OK
2026-05-12  2ч 08м   OK   <- запуск после рекламной кампании
2026-05-13  FAILED        <- OOM на executor
2026-05-13  3ч 12м   OK   <- повторный запуск со spark.executor.memoryOverhead=16g
2026-05-20  3ч 47м   OK   <- сегодня утром

Уже видно нарастающую регрессию, привязанную к событиям бизнеса (рекламные кампании увеличивают объём данных). Но точных причин ещё нет.


Что показывает Spark UI

Spark UI: общий вид завершённого job-а
Job 0 (read events)1 мин 12 сЧтение Parquet с S3: 120 GB, 960 tasks. Параллельное чтение, без shuffle. Время стабильное.
Job 1 (read users)18 сЧтение 4 GB. 32 tasks. Быстро — данные влезают в память.
Stage 2 (shuffle write events)47 минSortShuffle write: 120 GB в 200 partitions. Median task: 14 мин. Max task: 47 мин. Огромный разброс — сигнал skew.
wide dep
Stage 3 (join + shuffle write)1 ч 02 минShuffledHashJoin events x users. Shuffle read: 120 GB. Один task занял 62 мин — критический outlier. 15 GB spill to disk.
Stage 4 (join campaigns)8 минBroadcastHashJoin для campaigns (2000 строк). Быстро — broadcast корректно применён.
Stage 5 (groupBy agg)1 ч 04 минHashAggregate + shuffle. Ещё один outlier task: 64 мин. Shuffle write: 85 GB. Похожий паттерн skew.

Сразу бросаются в глаза несколько вещей. Stage 2 имеет медиану 14 минут, но хвост до 47 минут — разброс 3.3x. Stage 3 показывает один task в 62 минуты при медиане 4 минуты — разброс 15x. Это классическая картина data skew. Общий shuffle write Stage 3 = 120 GB, что совпадает с размером events — join не уменьшил объём данных. Stage 5 — аналогичная аномалия.

Строка Shuffle Spill в Stage 3: Shuffle spill (memory): 42 GB, Shuffle spill (disk): 15 GB. Это означает, что какой-то task пытался удержать 42 GB в execution memory перед сбросом на диск — для executor с 96 GB это почти половина всей heap.


Фрагмент executor-лога с OOM

[2026-05-13 03:47:22] ERROR TaskSetManager: Task 45 in stage 3.0 failed 3 times
java.lang.OutOfMemoryError: Java heap space
  at org.apache.spark.unsafe.types.UTF8String.fromBytes(UTF8String.java:185)
  at org.apache.spark.sql.execution.joins.ShuffledHashJoinExec$$anonfun$1
        .apply(ShuffledHashJoinExec.scala:142)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1
        .apply(SparkPlan.scala:185)
  ...
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:542)

[2026-05-13 03:47:22] WARN MemoryStore: Not enough space to cache rdd_42_0
  in memory! (computed 8.4 GiB so far; 0.0 B remaining for storage)

[2026-05-13 03:47:22] INFO BlockManagerMasterEndpoint: Removing block manager
  BlockManagerId(exec-7, 10.0.1.47, 52815, None) from memory

Трассировка ShuffledHashJoinExec говорит, что OOM произошёл внутри hash-table build phase. Предупреждение Not enough space to cache rdd_42_0 указывает на конкуренцию за память между storage pool и execution pool.


Фрагмент explain()-плана

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   SortAggregate(key=[country#12, campaign_id#34, event_type#56],
                 functions=[count(1), count(distinct user_id#78), sum(revenue#90)])
   +- Sort [country#12 ASC NULLS FIRST, campaign_id#34 ASC NULLS FIRST, ...]
      +- Exchange hashpartitioning(country#12, campaign_id#34, ..., 200)
         +- SortAggregate(key=...)
            +- Project [...]
               +- ShuffledHashJoin [user_id#78], [user_id#112], LeftOuter, ...
                  :- Exchange hashpartitioning(user_id#78, 200)
                  :  +- Filter [event_date#11 = 2026-05-12]
                  :     +- FileScan parquet [...]
                  +- Exchange hashpartitioning(user_id#112, 200)
                     +- Filter [country#45 IN ('RU','DE','FR',...)]
                        +- FileScan parquet [...]

Обратите внимание: AQE включён (AdaptiveSparkPlan isFinalPlan=true), но план показывает ShuffledHashJoin вместо SortMergeJoin. Это значит, что AQE переключился на hash join — это хорошо само по себе, но hash join строит hash table в памяти, что усугубляет OOM при skew. Также видно: 200 partitions в Exchange — это то самое дефолтное значение, которое AQE должен был скоррегировать, но не скоррегировал (почему — разберём в уроке 2).


Методология диагностики: от симптома к слою движка

Продуктивная диагностика Spark — это не перебор конфигов наугад. Это движение от симптома к механизму через конкретные слои движка.

Методология диагностики
СимптомСимптом: медленный job или OOM. Источник: алерты, пользователи, мониторинг.
Spark UI
Аномальные метрикиSpark UI: Job timeline, Stage details, Task metrics. Находим аномальные stages и tasks. Медиана vs max time, spill, shuffle bytes.
Гипотеза о механизмеСлои движка: DAG + Scheduler (01), Shuffle internals (02), Memory (03), Catalyst + AQE (04). Каждый слой даёт конкретную гипотезу.
explain() + логи
ПодтверждениеВерификация через физический план, executor-логи, GC-логи, heap dump. Исключаем альтернативные гипотезы.
ИсправлениеКонкретное изменение конфига или кода с ожидаемым эффектом. Замер до/после.
до/после
Верификация результатаЗамер результата: время выполнения, spill, OOM-отсутствие. Итерация если нужно.

Принцип 1 — один слой за раз. Не меняйте одновременно shuffle partitions, broadcast threshold и executor memory. Вы не поймёте, что именно помогло. Двигайтесь методично: сначала определите, какой слой движка является узким местом, потом применяйте исправление.

Принцип 2 — измеряйте механизм, не только время. Время job — итоговый показатель. Но причина может быть в shuffle bytes (сетевое IO), spill bytes (disk IO), GC pause (memory), task outlier (skew). Каждый из них указывает на разный механизм и требует разного исправления.

Принцип 3 — различайте symptom от root cause. OOM — это симптом. Root cause может быть: broadcast table слишком большая и помещена в heap, hash table при skew partition не влезает, storage pool вытесняет execution при кэшировании, driver собирает слишком большой результат через collect(). Разные root causes — разные исправления.

Принцип 4 — знайте дефолты и их смысл. spark.sql.shuffle.partitions=200 — дефолт из эпохи до AQE. spark.sql.autoBroadcastJoinThreshold=10m — порог, который AQE может переопределить. spark.executor.memory=1g — дефолт для тестов, не для production. Знание того, откуда взялось значение, помогает оценить, нужно ли его менять.


Карта уроков капстоуна

Следующие четыре урока разбирают этот пайплайн по слоям:

Урок 2 — диагностика skew и shuffle. Почему Stage 3 имеет outlier task в 62 минуты? Как AQE skew join optimization должен был этому помочь, но не помог и почему? Как читать shuffle metrics в Spark UI.

Урок 3 — дебаг проблем с памятью. Почему произошёл OOM именно в ShuffledHashJoinExec? Как UnifiedMemoryManager распределял память между execution и storage? Как читать executor-логи и GC-метрики.

Урок 4 — оптимизация пайплайна. Применяем исправления: правильная конфигурация AQE, repартиционирование, broadcast join, инспекция whole-stage codegen. Замер «было/стало» для каждого изменения.

Урок 5 — разбор решения и чек-лист senior-инженера. Полный итог: что было не так на каждом слое, итоговый результат (целевые 40 минут, ноль OOM), чек-лист диагностики для future use.


Попробуй сам

Прежде чем переходить к следующему уроку, попробуй самостоятельно сформулировать гипотезы:

  1. Открой Spark UI любого вашего production job и найди Stage с наибольшим расхождением медианы и максимума task duration. Это первый кандидат на диагностику.
  2. Запусти df.explain("formatted") на тяжёлом join и найди строку Exchange hashpartitioning(key, N). Какое N используется? Это дефолт или подобранное значение?
  3. Проверь, включён ли AQE: spark.sql.adaptive.enabled=true. Если да — смотри, какие AdaptiveSparkPlan решения он принял в Physical Plan.
# Команды для Spark UI через REST API
import requests

# Список всех stages с метриками
stages = requests.get(
    "http://spark-history-server:18080/api/v1/applications/{appId}/stages"
).json()

# Найти stage с максимальным разбросом task duration
for s in stages:
    if s.get("taskMetricDistributions"):
        dist = s["taskMetricDistributions"]["executorRunTime"]
        median = dist["50"]
        max_val = dist["max"]
        if median > 0 and max_val / median > 3:
            print(f"Stage {s['stageId']}: median={median}ms, max={max_val}ms, "
                  f"ratio={max_val/median:.1f}x  <-- кандидат на skew")

Итоги

Пайплайн имеет три очевидных сигнала из Spark UI: (1) резкий разброс task duration в Stage 3 и Stage 5 указывает на data skew; (2) 15 GB disk spill указывает на проблемы с allocation в execution memory; (3) OOM в ShuffledHashJoinExec — прямое следствие того, что один task получил непропорционально большую партицию при skew. AQE включён, но, судя по плану, не справился со skew. В следующем уроке разберём механизм диагностики skew точно через shuffle internals и AQE internals.

Проверка знанийKnowledge check
В Spark UI Stage 3 показывает: median task duration = 4 мин, max task duration = 62 мин, Shuffle spill (disk) = 15 GB. Что является наиболее вероятной первопричиной этих симптомов, и какой механизм Spark 4.0 должен был предотвратить проблему, но не предотвратил?
ОтветAnswer
Data skew: одна или несколько shuffle partitions содержат непропорционально больше строк, чем остальные. Task, получивший такую partition, работает в 15 раз дольше медианы. Spill возникает потому, что hash table для этой partition не влезает в выделенную execution memory и сбрасывается на диск. Механизм, который должен был помочь — AQE skew join optimization (SPARK-29544): при обнаружении partitions, которые в 5x (spark.sql.adaptive.skewJoin.skewedPartitionFactor) превышают медиану и превышают 256 MB (spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes), AQE разбивает skewed partition на подпартиции и обрабатывает их параллельно. Что именно не сработало — разберём в следующем уроке.

Лабораторная работа

В капстоун-лабораторной вы поднимаете намеренно сломанный пайплайн и проходите весь путь диагностики: находите первопричину по Spark UI и Grafana, формулируете гипотезу и исправляете её. Это сводит воедино все навыки курса в одном production-сценарии.

cd labs/capstone
docker compose up -d

Полное описание и шаги проверки — в labs/capstone/README.md.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Spark UI Stage 3 показывает: median task duration = 4 мин, max task duration = 62 мин, Shuffle Read Size для max task = 6.7 GB при медиане 61 MB. Какой из перечисленных механизмов является наиболее точным объяснением этого паттерна?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5