Капстоун: постановка задачи
В предыдущих пятнадцати модулях курса мы разбирали Apache Spark 4.0 изнутри: RDD-модель и DAG Scheduler, shuffle internals и SortShuffleWriter, UnifiedMemoryManager и MemoryStore, Catalyst и Tungsten, AQE и skew join, Structured Streaming и Arrow. Капстоун переводит всё это в практику: перед вами реальный production-пайплайн с реальными симптомами, и ваша задача — найти причины, опираясь именно на знание internals.
Описание пайплайна
Команда аналитики запускает ежедневный ETL-job в 02:00 UTC. Пайплайн читает события за прошедшие сутки из Delta Lake (Parquet-паркет, партицировано по event_date), обогащает их двумя join-операциями, выполняет агрегацию и записывает результат в lakehouse для BI-инструментов.
# Упрощённая схема пайплайна
events = spark.read.format("delta").load("s3://data/events/")
.filter(col("event_date") == yesterday) # ~800M строк / 120 GB
users = spark.read.format("delta").load("s3://data/users/")
.filter(col("country").isin(TARGET_COUNTRIES)) # ~25M строк / 4 GB
campaigns = spark.read.format("delta").load("s3://data/campaigns/")
# ~2000 строк / 80 KB
enriched = events.join(users, "user_id", "left") # shuffle join
.join(campaigns, "campaign_id", "left") # ещё join
result = enriched.groupBy("country", "campaign_id", "event_type") \
.agg(
count("*").alias("event_count"),
countDistinct("user_id").alias("unique_users"),
sum("revenue").alias("total_revenue")
)
result.write.format("delta") \
.mode("overwrite") \
.partitionBy("country") \
.save("s3://data/analytics/daily/")
Кластер: YARN на AWS EMR, 1 driver (r6g.2xlarge: 8 vCPU, 64 GB RAM) + 20 executor-ов (r6g.4xlarge: 16 vCPU, 128 GB RAM каждый). Spark 4.0, Java 17 (G1GC). spark.executor.memory=96g, spark.executor.memoryOverhead=8g, spark.sql.shuffle.partitions=200 (дефолт сохранён при переходе на AQE), spark.executor.cores=8.
Симптомы
В понедельник утром on-call инженер получает два алерта. Первый: задание выполнялось 3 часа 12 минут вместо штатных 38–45 минут. Второй: одно из выполнений той же ночи завершилось с OOM-ошибкой и потребовало ручного перезапуска.
История за последние 30 дней показывает паттерн:
2026-04-22 38 мин OK
2026-04-23 41 мин OK
2026-04-24 39 мин OK
2026-05-01 51 мин OK
2026-05-07 1ч 23м OK
2026-05-12 2ч 08м OK <- запуск после рекламной кампании
2026-05-13 FAILED <- OOM на executor
2026-05-13 3ч 12м OK <- повторный запуск со spark.executor.memoryOverhead=16g
2026-05-20 3ч 47м OK <- сегодня утром
Уже видно нарастающую регрессию, привязанную к событиям бизнеса (рекламные кампании увеличивают объём данных). Но точных причин ещё нет.
Что показывает Spark UI
Сразу бросаются в глаза несколько вещей. Stage 2 имеет медиану 14 минут, но хвост до 47 минут — разброс 3.3x. Stage 3 показывает один task в 62 минуты при медиане 4 минуты — разброс 15x. Это классическая картина data skew. Общий shuffle write Stage 3 = 120 GB, что совпадает с размером events — join не уменьшил объём данных. Stage 5 — аналогичная аномалия.
Строка Shuffle Spill в Stage 3: Shuffle spill (memory): 42 GB, Shuffle spill (disk): 15 GB. Это означает, что какой-то task пытался удержать 42 GB в execution memory перед сбросом на диск — для executor с 96 GB это почти половина всей heap.
Фрагмент executor-лога с OOM
[2026-05-13 03:47:22] ERROR TaskSetManager: Task 45 in stage 3.0 failed 3 times
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.unsafe.types.UTF8String.fromBytes(UTF8String.java:185)
at org.apache.spark.sql.execution.joins.ShuffledHashJoinExec$$anonfun$1
.apply(ShuffledHashJoinExec.scala:142)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1
.apply(SparkPlan.scala:185)
...
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:542)
[2026-05-13 03:47:22] WARN MemoryStore: Not enough space to cache rdd_42_0
in memory! (computed 8.4 GiB so far; 0.0 B remaining for storage)
[2026-05-13 03:47:22] INFO BlockManagerMasterEndpoint: Removing block manager
BlockManagerId(exec-7, 10.0.1.47, 52815, None) from memory
Трассировка ShuffledHashJoinExec говорит, что OOM произошёл внутри hash-table build phase. Предупреждение Not enough space to cache rdd_42_0 указывает на конкуренцию за память между storage pool и execution pool.
Фрагмент explain()-плана
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
SortAggregate(key=[country#12, campaign_id#34, event_type#56],
functions=[count(1), count(distinct user_id#78), sum(revenue#90)])
+- Sort [country#12 ASC NULLS FIRST, campaign_id#34 ASC NULLS FIRST, ...]
+- Exchange hashpartitioning(country#12, campaign_id#34, ..., 200)
+- SortAggregate(key=...)
+- Project [...]
+- ShuffledHashJoin [user_id#78], [user_id#112], LeftOuter, ...
:- Exchange hashpartitioning(user_id#78, 200)
: +- Filter [event_date#11 = 2026-05-12]
: +- FileScan parquet [...]
+- Exchange hashpartitioning(user_id#112, 200)
+- Filter [country#45 IN ('RU','DE','FR',...)]
+- FileScan parquet [...]
Обратите внимание: AQE включён (AdaptiveSparkPlan isFinalPlan=true), но план показывает ShuffledHashJoin вместо SortMergeJoin. Это значит, что AQE переключился на hash join — это хорошо само по себе, но hash join строит hash table в памяти, что усугубляет OOM при skew. Также видно: 200 partitions в Exchange — это то самое дефолтное значение, которое AQE должен был скоррегировать, но не скоррегировал (почему — разберём в уроке 2).
Методология диагностики: от симптома к слою движка
Продуктивная диагностика Spark — это не перебор конфигов наугад. Это движение от симптома к механизму через конкретные слои движка.
Принцип 1 — один слой за раз. Не меняйте одновременно shuffle partitions, broadcast threshold и executor memory. Вы не поймёте, что именно помогло. Двигайтесь методично: сначала определите, какой слой движка является узким местом, потом применяйте исправление.
Принцип 2 — измеряйте механизм, не только время. Время job — итоговый показатель. Но причина может быть в shuffle bytes (сетевое IO), spill bytes (disk IO), GC pause (memory), task outlier (skew). Каждый из них указывает на разный механизм и требует разного исправления.
Принцип 3 — различайте symptom от root cause. OOM — это симптом. Root cause может быть: broadcast table слишком большая и помещена в heap, hash table при skew partition не влезает, storage pool вытесняет execution при кэшировании, driver собирает слишком большой результат через collect(). Разные root causes — разные исправления.
Принцип 4 — знайте дефолты и их смысл. spark.sql.shuffle.partitions=200 — дефолт из эпохи до AQE. spark.sql.autoBroadcastJoinThreshold=10m — порог, который AQE может переопределить. spark.executor.memory=1g — дефолт для тестов, не для production. Знание того, откуда взялось значение, помогает оценить, нужно ли его менять.
Карта уроков капстоуна
Следующие четыре урока разбирают этот пайплайн по слоям:
Урок 2 — диагностика skew и shuffle. Почему Stage 3 имеет outlier task в 62 минуты? Как AQE skew join optimization должен был этому помочь, но не помог и почему? Как читать shuffle metrics в Spark UI.
Урок 3 — дебаг проблем с памятью. Почему произошёл OOM именно в ShuffledHashJoinExec? Как UnifiedMemoryManager распределял память между execution и storage? Как читать executor-логи и GC-метрики.
Урок 4 — оптимизация пайплайна. Применяем исправления: правильная конфигурация AQE, repартиционирование, broadcast join, инспекция whole-stage codegen. Замер «было/стало» для каждого изменения.
Урок 5 — разбор решения и чек-лист senior-инженера. Полный итог: что было не так на каждом слое, итоговый результат (целевые 40 минут, ноль OOM), чек-лист диагностики для future use.
Попробуй сам
Прежде чем переходить к следующему уроку, попробуй самостоятельно сформулировать гипотезы:
- Открой Spark UI любого вашего production job и найди Stage с наибольшим расхождением медианы и максимума task duration. Это первый кандидат на диагностику.
- Запусти
df.explain("formatted")на тяжёлом join и найди строкуExchange hashpartitioning(key, N). Какое N используется? Это дефолт или подобранное значение? - Проверь, включён ли AQE:
spark.sql.adaptive.enabled=true. Если да — смотри, какиеAdaptiveSparkPlanрешения он принял в Physical Plan.
# Команды для Spark UI через REST API
import requests
# Список всех stages с метриками
stages = requests.get(
"http://spark-history-server:18080/api/v1/applications/{appId}/stages"
).json()
# Найти stage с максимальным разбросом task duration
for s in stages:
if s.get("taskMetricDistributions"):
dist = s["taskMetricDistributions"]["executorRunTime"]
median = dist["50"]
max_val = dist["max"]
if median > 0 and max_val / median > 3:
print(f"Stage {s['stageId']}: median={median}ms, max={max_val}ms, "
f"ratio={max_val/median:.1f}x <-- кандидат на skew")
Итоги
Пайплайн имеет три очевидных сигнала из Spark UI: (1) резкий разброс task duration в Stage 3 и Stage 5 указывает на data skew; (2) 15 GB disk spill указывает на проблемы с allocation в execution memory; (3) OOM в ShuffledHashJoinExec — прямое следствие того, что один task получил непропорционально большую партицию при skew. AQE включён, но, судя по плану, не справился со skew. В следующем уроке разберём механизм диагностики skew точно через shuffle internals и AQE internals.
Лабораторная работа
В капстоун-лабораторной вы поднимаете намеренно сломанный пайплайн и проходите весь путь диагностики: находите первопричину по Spark UI и Grafana, формулируете гипотезу и исправляете её. Это сводит воедино все навыки курса в одном production-сценарии.
cd labs/capstone
docker compose up -d
Полное описание и шаги проверки — в labs/capstone/README.md.