Learning Platform
Глоссарий Troubleshooting
Урок 16.03 · 35 мин
Продвинутый
OOMUnifiedMemoryManagerGCHeap DumpMemory Diagnostics

Дебаг проблем с памятью

В предыдущем уроке мы определили, что AQE переключился на ShuffledHashJoin, deактивировав skew protection, и один task получил partition_73 = 6.7 GB. Теперь разберём вторую составляющую катастрофы: почему именно произошёл OOM, что происходило в UnifiedMemoryManager, и как правильно читать executor-логи и GC-логи для точной диагностики.

Кэширование и persistence: уровни хранения

Анатомия OOM в ShuffledHashJoinExec

Трассировка из лога OOM:

java.lang.OutOfMemoryError: Java heap space
  at org.apache.spark.unsafe.types.UTF8String.fromBytes(UTF8String.java:185)
  at org.apache.spark.sql.execution.joins.ShuffledHashJoinExec$$anonfun$1
        .apply(ShuffledHashJoinExec.scala:142)

UTF8String.fromBytes — это аллокация объекта в heap. ShuffledHashJoinExec.scala:142 — это build phase: построение hash table из build side (users). Heap space exhausted во время аллокации строки hash table.

Но почему? У executor 96 GB heap, а users-таблица всего 4 GB. Казалось бы, должно хватать.

Ответ в том, что к моменту прихода Task 45 на executor executor уже:

  1. Держал кэш broadcast переменных (campaigns, 80 KB) — незначительно
  2. Обрабатывал параллельно 7 других tasks (8 cores) в той же JVM
  3. Хранил в StorageMemoryPool промежуточные результаты кэшированного RDD

Вот полная картина через UnifiedMemoryManager:

spark.executor.memory = 96 GB
spark.memory.fraction  = 0.6  (default)
spark.memory.storageFraction = 0.5 (default)

Unified Memory Region = 96 * 0.6 = 57.6 GB
  StorageMemoryPool (минимум) = 57.6 * 0.5 = 28.8 GB
  ExecutionMemoryPool (минимум) = 57.6 * 0.5 = 28.8 GB

Reserved Memory = 300 MB (hardcoded)
User Memory = 96 * 0.4 - 0.3 ≈ 38.1 GB (Spark internals + user objects)

Когда Task 45 запрашивает execution memory для hash table, в том же executor выполнялись ещё 7 tasks, каждый держал свою долю execution memory. 8 concurrent tasks делят execution pool на 8 слотов минимум. Но Task 45 пытается занять несравнимо больше своей «доты».


Три вида OOM: как различить

Это принципиальный вопрос диагностики. OOM может произойти в трёх разных местах, и правильное исправление зависит от того, где именно.

Три источника OOM в Spark
Execution OOMExecution OOM: task исчерпал execution memory при sort, hash join, или aggregation. Stack trace: ShuffledHashJoinExec, UnsafeExternalSorter, BytesToBytesMap. Исправление: уменьшить partition size (больше partitions), отключить hash join, включить spill.
Storage OOMStorage OOM: кэшированный RDD/DataFrame не влезает в storage pool и начинает конкурировать с execution. Stack trace: MemoryStore.putBytes, CacheManager.putInBlockManager. Исправление: unpersist() неиспользуемых данных, выбрать MEMORY_AND_DISK вместо MEMORY_ONLY.
Driver OOMDriver OOM: driver собирает слишком большой результат (collect(), broadcast join с большой таблицей, toPandas()). Stack trace: SparkContext.runJob, Dataset.collect. Исправление: не использовать collect() на больших данных, проверить broadcast threshold.

В нашем логе: java.lang.OutOfMemoryError: Java heap space в ShuffledHashJoinExec — это Execution OOM. Предупреждение Not enough space to cache rdd_42_0 — это Storage OOM как вторичный эффект. Обе проблемы произошли в одном executor: сначала storage pool «одолжил» у execution (UnifiedMemoryManager это позволяет), затем execution запросил обратно и оба пула исчерпались.


Чтение executor-лога: полная хронология OOM

Реконструируем события из лога executor-7:

[03:31:15] INFO BlockManager: Acquired 2.1 GiB for block rdd_42_0 (StorageMemory)
# RDD кэшируется в storage pool — это был явный cache() в коде пайплайна

[03:31:18] INFO MemoryStore: Block rdd_42_0 stored as values in memory
  (estimated size 2.1 GiB, free 9.8 GiB)
# После кэширования storage pool занял 2.1 GiB

[03:44:22] INFO TaskMemoryManager: Task 38 trying to acquire 512.0 MiB
[03:44:22] INFO UnifiedMemoryManager: Acquiring 512.0 MiB for execution
  (executionPool.memoryUsed=24.1 GiB, storagePool.memoryUsed=2.1 GiB)
# 7 tasks работают параллельно, execution pool постепенно заполняется

[03:46:55] WARN UnifiedMemoryManager: No enough execution memory, 
  shrinking storage pool by 1.2 GiB for task 45
# Execution pool запрашивает больше. UnifiedMemoryManager вытесняет storage.

[03:46:55] WARN MemoryStore: Block rdd_42_0 evicted with size 2.1 GiB
  due to memory pressure
# Кэш вытеснен. rdd_42_0 потерян — при следующем обращении пересчитается.

[03:47:01] INFO TaskMemoryManager: Task 45 peak: 47.2 GiB
# Task 45 занял почти всю execution memory

[03:47:10] INFO GCMetrics: GC pause 8.2 s (G1 Full GC)
# G1 Full GC — давление достигло предела

[03:47:22] ERROR TaskRunner: Task 45 failed
java.lang.OutOfMemoryError: Java heap space
  at UTF8String.fromBytes

Хронология: (1) явный cache() захватил 2.1 GB storage; (2) 7+1 concurrent tasks заполняли execution pool; (3) UnifiedMemoryManager попытался вытеснить storage ради execution; (4) Task 45 с 6.7 GB partition всё равно не влез; (5) G1 Full GC пытался освободить место; (6) OOM при аллокации очередной строки hash table.


Чтение GC-логов: как интерпретировать

Включаем детальный GC-лог для диагностики:

spark.executor.extraJavaOptions=-verbose:gc -Xloggc:/tmp/gc.log
  -XX:+PrintGCDetails -XX:+PrintGCDateStamps
  -XX:+PrintGCTimeStamps -XX:+PrintHeapAtGC

Из gc.log за инцидентный день:

2026-05-13T03:31:15.234: [GC pause (G1 Young) (initial-mark)
  Heap before GC:  Eden: 3.2G(4G) Survivors: 0.8G Old: 51.2G(88G)
  Heap after GC:   Eden: 0G(4G)   Survivors: 1.1G Old: 53.8G(88G)
  Pause time: 0.82 s]

2026-05-13T03:46:58.112: [Full GC (Allocation Failure)
  Heap before GC:  Eden: 0G(4G) Survivors: 0G Old: 91.2G(96G)
  Heap after GC:   Eden: 0G(4G) Survivors: 0G Old: 87.4G(96G)
  Pause time: 8.24 s]     <- Full GC, 8 секунд
  
2026-05-13T03:47:10.441: [Full GC (Allocation Failure)
  Heap before GC:  Eden: 0G(4G) Survivors: 0G Old: 94.1G(96G)
  Heap after GC:   Eden: 0G(4G) Survivors: 0G Old: 93.8G(96G)
  Pause time: 9.12 s]     <- Второй Full GC, 9 секунд
  
2026-05-13T03:47:22.891: [Full GC (Allocation Failure)
  Heap before GC:  Old: 95.9G(96G) ...
  java.lang.OutOfMemoryError: Java heap space]

Ключевые паттерны: (1) Old Gen близок к 96 GB capacity (95.9/96 = 99.9%). (2) Full GC не освобождает значимый объём (94.1 -> 93.8 GB после 9 секунд GC). (3) Allocation Failure — JVM не может аллоцировать даже маленький объект. Это терминальное состояние.

Нормальный GC-паттерн для Spark executor: Young GC каждые несколько секунд (0.1–0.3 s pause), Old Gen стабильна или растёт медленно, Full GC — редко и коротко. В нашем случае Old Gen за 15 минут заполнилась до 99% — это не нормальный рост.


Heap dump: что занимало память

При следующем запуске добавляем:

spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError
  -XX:HeapDumpPath=/mnt/dumps/

Анализ heap dump через Eclipse Memory Analyzer (MAT) или jmap:

# jmap -histo <pid> (из ещё живого executor перед OOM)
 num     #instances         #bytes  class name
----------------------------------------------
   1:      89234576    8567123488  [B              <- byte arrays (UnsafeRow data)
   2:       4523891    3128344312  org.apache.spark.unsafe.types.UTF8String
   3:       2341234    1872987200  org.apache.spark.sql.catalyst.InternalRow
   4:        891234     892345678  java.util.HashMap$Node  <- hash table entries
   5:        234567     456234567  org.apache.spark.sql.execution.UnsafeRow

[B (byte arrays) на 8.5 GB — это сырые данные строк в UnsafeRow представлении. UTF8String на 3.1 GB — decoded string columns. HashMap$Node на 892 MB — элементы hash table в ShuffledHashJoinExec.buildHashedRelation.

Эти числа подтверждают: hash table build для partition_73 = 6.7 GB raw data -> после десериализации в UnsafeRow + UTF8String + HashMap overhead занимает ~13–15 GB в heap. При 8 concurrent tasks это превышает доступный execution pool.


Storage OOM vs Execution OOM: детальный разбор

Второй лог из предыдущего урока: Not enough space to cache rdd_42_0 in memory! (computed 8.4 GiB so far; 0.0 B remaining for storage).

Откуда взялся rdd_42_0? Смотрим код пайплайна — там нет явного cache(). Но есть неявный: при events.filter(col("event_date") == yesterday) Spark в режиме spark.sql.adaptive.enabled=true может материализовать промежуточный результат через ShuffleExchangeExec, который внутри использует BlockManager.

Правильный вопрос: есть ли где-то явный cache() или persist() в коде? Проверяем:

# В production-коде нашлась строка (добавлена коллегой «для ускорения»)
enriched = events.join(users, "user_id", "left") \
                 .join(campaigns, "campaign_id", "left") \
                 .cache()    # <-- вот оно

result = enriched.groupBy(...).agg(...)

enriched.cache() — это MEMORY_ONLY persistence. При первом action (write) Spark материализует enriched и кладёт в storage pool. enriched = join результат, ~60 GB сжатого (120 GB raw × 0.5 compression ratio для Spark in-memory). 60 GB не влезает в storage pool 28.8 GB, поэтому начинается частичное кэширование и eviction.

Эффект: storage pool пытается захватить доступную память, вытесняя execution. Execution отвечает тем же. В итоге оба пула «перетягивают канат», GC работает интенсивно, реального выигрыша от cache() нет — данные всё равно пересчитываются при следующем чтении после eviction.

UnifiedMemoryManager: конкуренция при cache() + heavy join
Unified Memory57.6 GB96 GB * 0.6 = 57.6 GB. Делится между execution и storage динамически. Reserved 300 MB + User 38.1 GB не входят.
Storage (после cache)28.8 GB занятоenriched.cache() пытается занять 60 GB. UnifiedMemoryManager ограничивает: storage не может занять больше unified pool. Занято всё 28.8 GB, остальное evicted.
конкурирует
Execution (Task 45)0 GB свободно8 tasks * нужно ~6 GB каждый = 48 GB. Unified pool = 57.6 GB. Storage занял 28.8 GB. Execution = 57.6 - 28.8 = 28.8 GB на 8 tasks. Task 45 нужно 13+ GB. Получает spill.

Driver OOM: отдельный вектор проблемы

В нашем случае driver OOM не произошёл, но стоит разобрать, как его диагностировать. Driver лог хранится отдельно от executor-логов. Симптомы:

[driver] ERROR SparkContext: Error running job
java.lang.OutOfMemoryError: Java heap space
  at org.apache.spark.rdd.RDD.collect(RDD.scala:992)
  at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3567)

Типичные причины driver OOM: (1) df.collect() на большом DataFrame; (2) broadcast join с таблицей больше spark.sql.autoBroadcastJoinThreshold на driver side; (3) toPandas() без лимита; (4) collect в loop для итеративного алгоритма.

Диагностика: driver heap dump через jmap -dump:format=b,file=/tmp/driver.hprof <driver-pid>. Обычно показывает одну огромную коллекцию Java-объектов (List или Array) с сотнями миллионов элементов.

Конфиг для предотвращения: spark.driver.memory=4g (default = 1g — слишком мало для production), spark.driver.memoryOverhead=1g.


Практическое разграничение: execution OOM vs spill

Не каждый spill — это проблема. Spill — это механизм graceful degradation: когда execution memory исчерпывается, Spark сбрасывает часть данных на диск и продолжает работу. OOM — это когда даже spill не помогает.

Разграничение через логи:

SPILL (нормальная деградация):
[INFO] ExternalSorter: Spilling in-memory map of 1.2 GiB to disk
  (1 time so far)
[INFO] DiskBlockManager: Created temp shuffle file: /mnt/spark/blockmgr-...
# Job продолжается медленнее, но продолжается

OOM (катастрофа):
[WARN] TaskSetManager: Lost task 45.0 in stage 3.0 (TID 1234):
  java.lang.OutOfMemoryError: Java heap space
# Task упал, потребуется retry или job failure

Когда OOM неизбежен: если один partition содержит данных больше, чем executionMemory / (concurrent tasks per executor), и эти данные не поддерживают spill (например, broadcast relation, которая должна целиком помещаться в память), то OOM неизбежен вне зависимости от настроек.

В нашем случае ShuffledHashJoin с BuildRight строит hash table из users-подмножества, соответствующего partition_73 events. При partition_73 = 6.7 GB events, matching users могут быть несколько сотен тысяч строк — это ~200 MB. Но проблема в том, что LongToUnsafeRowMap для build side занимает 3-5x от raw data size из-за hash table overhead и Java object overhead. 200 MB -> 600 MB–1 GB. Это управляемо. Но probe side (6.7 GB events) сам по себе тоже нужен в памяти при сортировке. Итоговое давление — пиковое 47 GB.


Попробуй сам

# 1. Проверить наличие неявных cache() в плане
df.explain("formatted")
# Ищем InMemoryRelation — это означает, что промежуточный результат кэшируется

# 2. Проверить текущее использование памяти через SparkContext
sc = spark.sparkContext
for executor in sc.statusTracker().getExecutorInfos():
    print(f"Executor {executor.executorId()}: "
          f"maxMem={executor.maxMemory()//1024//1024} MB, "
          f"usedMem={executor.memoryUsed()//1024//1024} MB")

# 3. Посмотреть storage memory через StorageLevel
for rdd_info in sc.statusTracker().getRDDInfos():
    if rdd_info.storageLevel.useMemory:
        print(f"RDD {rdd_info.id}: {rdd_info.memSize//1024//1024} MB in memory")

# 4. Включить детальный memory logging для диагностики
spark.conf.set("spark.memory.offHeap.enabled", "false")
spark.sparkContext.setLogLevel("DEBUG")
# Теперь UnifiedMemoryManager.acquireExecutionMemory будет логировать каждый acquire

# 5. Оценить нужный execution memory per task для нашего join
# partition_73 = 6.7 GB raw
# hash table overhead ~3x: 6.7 * 3 = 20.1 GB нужно для одного task
# При 8 concurrent tasks: 20.1 * 8 = 160 GB -- больше всего executor heap!
# Вывод: нужно уменьшить partition size через больше partitions
print(f"Нужно partitions: {int(120 * 1024 / 200)}")  # если хотим <200 MB/partition

Итоги

OOM в нашем пайплайне — совместный результат трёх факторов: (1) skewed partition_73 = 6.7 GB дала одному task нагрузку, требующую ~47 GB execution memory; (2) явный enriched.cache() захватил часть storage pool, уменьшив доступный execution; (3) 8 concurrent tasks делили execution pool, и Task 45 не получил достаточно. UnifiedMemoryManager попытался балансировать через boundary shifting, но масштаб проблемы превысил возможности любого балансировщика. В следующем уроке применим конкретные исправления для всех трёх уровней.

Проверка знанийKnowledge check
Executor имеет spark.executor.memory=96g, spark.memory.fraction=0.6, spark.memory.storageFraction=0.5, spark.executor.cores=8. В коде есть enriched.cache() (MEMORY_ONLY). При выполнении Stage 3 (8 concurrent tasks) один task пытается построить hash table размером 47 GB. Объясните, почему OOM произошёл несмотря на то, что unified memory pool = 57.6 GB, что больше 47 GB.
ОтветAnswer
57.6 GB делится между ВСЕМИ 8 concurrent tasks одновременно. Task 45 не может захватить все 57.6 GB в одиночку: TaskMemoryManager отдаёт памяти пропорционально между active tasks. Кроме того, enriched.cache() занял ~28.8 GB в storage pool (половину unified). Execution pool = 57.6 - 28.8 = 28.8 GB на 8 tasks = 3.6 GB на task в базовом распределении. Task 45 запросил гораздо больше. UnifiedMemoryManager попытался освободить storage через eviction cache, но освобождённые 28.8 GB всё равно недостаточны: 28.8 + (28.8 - 7 * минимум других tasks) составляет меньше 47 GB. G1GC был перегружен, Full GC не освобождал Old Gen — большинство объектов task 45 были 'живыми' (hash table в активном использовании). В итоге исчерпание heap при аллокации очередной строки UTF8String.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Executor лог показывает последовательность: (1) Block rdd_42_0 stored as values in memory (2.1 GiB); (2) UnifiedMemoryManager shrinking storage pool by 1.2 GiB for task 45; (3) Block rdd_42_0 evicted; (4) Task 45 peak: 47.2 GiB; (5) OOM. Какой механизм UnifiedMemoryManager объясняет шаги 2-3?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5