Дебаг проблем с памятью
В предыдущем уроке мы определили, что AQE переключился на ShuffledHashJoin, deактивировав skew protection, и один task получил partition_73 = 6.7 GB. Теперь разберём вторую составляющую катастрофы: почему именно произошёл OOM, что происходило в UnifiedMemoryManager, и как правильно читать executor-логи и GC-логи для точной диагностики.
Анатомия OOM в ShuffledHashJoinExec
Трассировка из лога OOM:
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.unsafe.types.UTF8String.fromBytes(UTF8String.java:185)
at org.apache.spark.sql.execution.joins.ShuffledHashJoinExec$$anonfun$1
.apply(ShuffledHashJoinExec.scala:142)
UTF8String.fromBytes — это аллокация объекта в heap. ShuffledHashJoinExec.scala:142 — это build phase: построение hash table из build side (users). Heap space exhausted во время аллокации строки hash table.
Но почему? У executor 96 GB heap, а users-таблица всего 4 GB. Казалось бы, должно хватать.
Ответ в том, что к моменту прихода Task 45 на executor executor уже:
- Держал кэш broadcast переменных (campaigns, 80 KB) — незначительно
- Обрабатывал параллельно 7 других tasks (8 cores) в той же JVM
- Хранил в StorageMemoryPool промежуточные результаты кэшированного RDD
Вот полная картина через UnifiedMemoryManager:
spark.executor.memory = 96 GB
spark.memory.fraction = 0.6 (default)
spark.memory.storageFraction = 0.5 (default)
Unified Memory Region = 96 * 0.6 = 57.6 GB
StorageMemoryPool (минимум) = 57.6 * 0.5 = 28.8 GB
ExecutionMemoryPool (минимум) = 57.6 * 0.5 = 28.8 GB
Reserved Memory = 300 MB (hardcoded)
User Memory = 96 * 0.4 - 0.3 ≈ 38.1 GB (Spark internals + user objects)
Когда Task 45 запрашивает execution memory для hash table, в том же executor выполнялись ещё 7 tasks, каждый держал свою долю execution memory. 8 concurrent tasks делят execution pool на 8 слотов минимум. Но Task 45 пытается занять несравнимо больше своей «доты».
Три вида OOM: как различить
Это принципиальный вопрос диагностики. OOM может произойти в трёх разных местах, и правильное исправление зависит от того, где именно.
В нашем логе: java.lang.OutOfMemoryError: Java heap space в ShuffledHashJoinExec — это Execution OOM. Предупреждение Not enough space to cache rdd_42_0 — это Storage OOM как вторичный эффект. Обе проблемы произошли в одном executor: сначала storage pool «одолжил» у execution (UnifiedMemoryManager это позволяет), затем execution запросил обратно и оба пула исчерпались.
Чтение executor-лога: полная хронология OOM
Реконструируем события из лога executor-7:
[03:31:15] INFO BlockManager: Acquired 2.1 GiB for block rdd_42_0 (StorageMemory)
# RDD кэшируется в storage pool — это был явный cache() в коде пайплайна
[03:31:18] INFO MemoryStore: Block rdd_42_0 stored as values in memory
(estimated size 2.1 GiB, free 9.8 GiB)
# После кэширования storage pool занял 2.1 GiB
[03:44:22] INFO TaskMemoryManager: Task 38 trying to acquire 512.0 MiB
[03:44:22] INFO UnifiedMemoryManager: Acquiring 512.0 MiB for execution
(executionPool.memoryUsed=24.1 GiB, storagePool.memoryUsed=2.1 GiB)
# 7 tasks работают параллельно, execution pool постепенно заполняется
[03:46:55] WARN UnifiedMemoryManager: No enough execution memory,
shrinking storage pool by 1.2 GiB for task 45
# Execution pool запрашивает больше. UnifiedMemoryManager вытесняет storage.
[03:46:55] WARN MemoryStore: Block rdd_42_0 evicted with size 2.1 GiB
due to memory pressure
# Кэш вытеснен. rdd_42_0 потерян — при следующем обращении пересчитается.
[03:47:01] INFO TaskMemoryManager: Task 45 peak: 47.2 GiB
# Task 45 занял почти всю execution memory
[03:47:10] INFO GCMetrics: GC pause 8.2 s (G1 Full GC)
# G1 Full GC — давление достигло предела
[03:47:22] ERROR TaskRunner: Task 45 failed
java.lang.OutOfMemoryError: Java heap space
at UTF8String.fromBytes
Хронология: (1) явный cache() захватил 2.1 GB storage; (2) 7+1 concurrent tasks заполняли execution pool; (3) UnifiedMemoryManager попытался вытеснить storage ради execution; (4) Task 45 с 6.7 GB partition всё равно не влез; (5) G1 Full GC пытался освободить место; (6) OOM при аллокации очередной строки hash table.
Чтение GC-логов: как интерпретировать
Включаем детальный GC-лог для диагностики:
spark.executor.extraJavaOptions=-verbose:gc -Xloggc:/tmp/gc.log
-XX:+PrintGCDetails -XX:+PrintGCDateStamps
-XX:+PrintGCTimeStamps -XX:+PrintHeapAtGC
Из gc.log за инцидентный день:
2026-05-13T03:31:15.234: [GC pause (G1 Young) (initial-mark)
Heap before GC: Eden: 3.2G(4G) Survivors: 0.8G Old: 51.2G(88G)
Heap after GC: Eden: 0G(4G) Survivors: 1.1G Old: 53.8G(88G)
Pause time: 0.82 s]
2026-05-13T03:46:58.112: [Full GC (Allocation Failure)
Heap before GC: Eden: 0G(4G) Survivors: 0G Old: 91.2G(96G)
Heap after GC: Eden: 0G(4G) Survivors: 0G Old: 87.4G(96G)
Pause time: 8.24 s] <- Full GC, 8 секунд
2026-05-13T03:47:10.441: [Full GC (Allocation Failure)
Heap before GC: Eden: 0G(4G) Survivors: 0G Old: 94.1G(96G)
Heap after GC: Eden: 0G(4G) Survivors: 0G Old: 93.8G(96G)
Pause time: 9.12 s] <- Второй Full GC, 9 секунд
2026-05-13T03:47:22.891: [Full GC (Allocation Failure)
Heap before GC: Old: 95.9G(96G) ...
java.lang.OutOfMemoryError: Java heap space]
Ключевые паттерны: (1) Old Gen близок к 96 GB capacity (95.9/96 = 99.9%). (2) Full GC не освобождает значимый объём (94.1 -> 93.8 GB после 9 секунд GC). (3) Allocation Failure — JVM не может аллоцировать даже маленький объект. Это терминальное состояние.
Нормальный GC-паттерн для Spark executor: Young GC каждые несколько секунд (0.1–0.3 s pause), Old Gen стабильна или растёт медленно, Full GC — редко и коротко. В нашем случае Old Gen за 15 минут заполнилась до 99% — это не нормальный рост.
Heap dump: что занимало память
При следующем запуске добавляем:
spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/mnt/dumps/
Анализ heap dump через Eclipse Memory Analyzer (MAT) или jmap:
# jmap -histo <pid> (из ещё живого executor перед OOM)
num #instances #bytes class name
----------------------------------------------
1: 89234576 8567123488 [B <- byte arrays (UnsafeRow data)
2: 4523891 3128344312 org.apache.spark.unsafe.types.UTF8String
3: 2341234 1872987200 org.apache.spark.sql.catalyst.InternalRow
4: 891234 892345678 java.util.HashMap$Node <- hash table entries
5: 234567 456234567 org.apache.spark.sql.execution.UnsafeRow
[B (byte arrays) на 8.5 GB — это сырые данные строк в UnsafeRow представлении. UTF8String на 3.1 GB — decoded string columns. HashMap$Node на 892 MB — элементы hash table в ShuffledHashJoinExec.buildHashedRelation.
Эти числа подтверждают: hash table build для partition_73 = 6.7 GB raw data -> после десериализации в UnsafeRow + UTF8String + HashMap overhead занимает ~13–15 GB в heap. При 8 concurrent tasks это превышает доступный execution pool.
Storage OOM vs Execution OOM: детальный разбор
Второй лог из предыдущего урока: Not enough space to cache rdd_42_0 in memory! (computed 8.4 GiB so far; 0.0 B remaining for storage).
Откуда взялся rdd_42_0? Смотрим код пайплайна — там нет явного cache(). Но есть неявный: при events.filter(col("event_date") == yesterday) Spark в режиме spark.sql.adaptive.enabled=true может материализовать промежуточный результат через ShuffleExchangeExec, который внутри использует BlockManager.
Правильный вопрос: есть ли где-то явный cache() или persist() в коде? Проверяем:
# В production-коде нашлась строка (добавлена коллегой «для ускорения»)
enriched = events.join(users, "user_id", "left") \
.join(campaigns, "campaign_id", "left") \
.cache() # <-- вот оно
result = enriched.groupBy(...).agg(...)
enriched.cache() — это MEMORY_ONLY persistence. При первом action (write) Spark материализует enriched и кладёт в storage pool. enriched = join результат, ~60 GB сжатого (120 GB raw × 0.5 compression ratio для Spark in-memory). 60 GB не влезает в storage pool 28.8 GB, поэтому начинается частичное кэширование и eviction.
Эффект: storage pool пытается захватить доступную память, вытесняя execution. Execution отвечает тем же. В итоге оба пула «перетягивают канат», GC работает интенсивно, реального выигрыша от cache() нет — данные всё равно пересчитываются при следующем чтении после eviction.
Driver OOM: отдельный вектор проблемы
В нашем случае driver OOM не произошёл, но стоит разобрать, как его диагностировать. Driver лог хранится отдельно от executor-логов. Симптомы:
[driver] ERROR SparkContext: Error running job
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.rdd.RDD.collect(RDD.scala:992)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3567)
Типичные причины driver OOM: (1) df.collect() на большом DataFrame; (2) broadcast join с таблицей больше spark.sql.autoBroadcastJoinThreshold на driver side; (3) toPandas() без лимита; (4) collect в loop для итеративного алгоритма.
Диагностика: driver heap dump через jmap -dump:format=b,file=/tmp/driver.hprof <driver-pid>. Обычно показывает одну огромную коллекцию Java-объектов (List или Array) с сотнями миллионов элементов.
Конфиг для предотвращения: spark.driver.memory=4g (default = 1g — слишком мало для production), spark.driver.memoryOverhead=1g.
Практическое разграничение: execution OOM vs spill
Не каждый spill — это проблема. Spill — это механизм graceful degradation: когда execution memory исчерпывается, Spark сбрасывает часть данных на диск и продолжает работу. OOM — это когда даже spill не помогает.
Разграничение через логи:
SPILL (нормальная деградация):
[INFO] ExternalSorter: Spilling in-memory map of 1.2 GiB to disk
(1 time so far)
[INFO] DiskBlockManager: Created temp shuffle file: /mnt/spark/blockmgr-...
# Job продолжается медленнее, но продолжается
OOM (катастрофа):
[WARN] TaskSetManager: Lost task 45.0 in stage 3.0 (TID 1234):
java.lang.OutOfMemoryError: Java heap space
# Task упал, потребуется retry или job failure
Когда OOM неизбежен: если один partition содержит данных больше, чем executionMemory / (concurrent tasks per executor), и эти данные не поддерживают spill (например, broadcast relation, которая должна целиком помещаться в память), то OOM неизбежен вне зависимости от настроек.
В нашем случае ShuffledHashJoin с BuildRight строит hash table из users-подмножества, соответствующего partition_73 events. При partition_73 = 6.7 GB events, matching users могут быть несколько сотен тысяч строк — это ~200 MB. Но проблема в том, что LongToUnsafeRowMap для build side занимает 3-5x от raw data size из-за hash table overhead и Java object overhead. 200 MB -> 600 MB–1 GB. Это управляемо. Но probe side (6.7 GB events) сам по себе тоже нужен в памяти при сортировке. Итоговое давление — пиковое 47 GB.
Попробуй сам
# 1. Проверить наличие неявных cache() в плане
df.explain("formatted")
# Ищем InMemoryRelation — это означает, что промежуточный результат кэшируется
# 2. Проверить текущее использование памяти через SparkContext
sc = spark.sparkContext
for executor in sc.statusTracker().getExecutorInfos():
print(f"Executor {executor.executorId()}: "
f"maxMem={executor.maxMemory()//1024//1024} MB, "
f"usedMem={executor.memoryUsed()//1024//1024} MB")
# 3. Посмотреть storage memory через StorageLevel
for rdd_info in sc.statusTracker().getRDDInfos():
if rdd_info.storageLevel.useMemory:
print(f"RDD {rdd_info.id}: {rdd_info.memSize//1024//1024} MB in memory")
# 4. Включить детальный memory logging для диагностики
spark.conf.set("spark.memory.offHeap.enabled", "false")
spark.sparkContext.setLogLevel("DEBUG")
# Теперь UnifiedMemoryManager.acquireExecutionMemory будет логировать каждый acquire
# 5. Оценить нужный execution memory per task для нашего join
# partition_73 = 6.7 GB raw
# hash table overhead ~3x: 6.7 * 3 = 20.1 GB нужно для одного task
# При 8 concurrent tasks: 20.1 * 8 = 160 GB -- больше всего executor heap!
# Вывод: нужно уменьшить partition size через больше partitions
print(f"Нужно partitions: {int(120 * 1024 / 200)}") # если хотим <200 MB/partition
Итоги
OOM в нашем пайплайне — совместный результат трёх факторов: (1) skewed partition_73 = 6.7 GB дала одному task нагрузку, требующую ~47 GB execution memory; (2) явный enriched.cache() захватил часть storage pool, уменьшив доступный execution; (3) 8 concurrent tasks делили execution pool, и Task 45 не получил достаточно. UnifiedMemoryManager попытался балансировать через boundary shifting, но масштаб проблемы превысил возможности любого балансировщика. В следующем уроке применим конкретные исправления для всех трёх уровней.