UnifiedMemoryManager: исходники и алгоритм
Первый урок модуля дал обзор трёх регионов памяти и принципа soft boundary. Теперь вскроем реализацию. Класс UnifiedMemoryManager в core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala — это около 300 строк Scala, которые управляют каждым байтом между execution и storage. Чтобы правильно диагностировать OOM и тюнить кластер, нужно понимать эти 300 строк на уровне алгоритма.
Иерархия классов: MemoryManager и его наследники
Класс UnifiedMemoryManager наследует от абстрактного MemoryManager:
// core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
abstract class MemoryManager(
conf: SparkConf,
numCores: Int,
onHeapStorageMemory: Long,
onHeapExecutionMemory: Long) extends Logging {
// Два пула on-heap
protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
// Два пула off-heap
protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
// Абстрактный метод — реализуется в UnifiedMemoryManager
def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
def acquireExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long
}
До Spark 1.6 существовал StaticMemoryManager с жёсткими границами между execution и storage. Начиная с версии 1.6 он заменён UnifiedMemoryManager, который стал единственной реализацией. В Spark 4.0 StaticMemoryManager полностью удалён из кодовой базы.
Каждый executor создаёт ровно один экземпляр UnifiedMemoryManager. Driver тоже создаёт свой экземпляр — для управления памятью broadcast-переменных и внутренних структур.
Все четыре пула синхронизированы через общий мьютекс в MemoryManager.
Вычисление границ: метод apply()
UnifiedMemoryManager создаётся через companion object. Метод apply() вычисляет начальные размеры пулов:
object UnifiedMemoryManager {
// Захардкоженный reserved memory
private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024L // 300 MiB
def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
val maxMemory = getMaxMemory(conf)
new UnifiedMemoryManager(
conf,
maxHeapMemory = maxMemory,
onHeapStorageRegionSize =
(maxMemory * conf.get(MEMORY_STORAGE_FRACTION)).toLong,
numCores = numCores)
}
private def getMaxMemory(conf: SparkConf): Long = {
val systemMemory = conf.get(TEST_MEMORY)
.getOrElse(Runtime.getRuntime.maxMemory) // -Xmx значение
val reservedMemory = conf.getLong(
"spark.testing.reservedMemory",
RESERVED_SYSTEM_MEMORY_BYTES)
val usableMemory = systemMemory - reservedMemory // heap минус 300 MiB
val memoryFraction = conf.get(MEMORY_FRACTION) // 0.6 по умолчанию
(usableMemory * memoryFraction).toLong
}
}
Итого для executor с 8 GiB heap:
systemMemory = 8 * 1024 * 1024 * 1024 = 8589934592 байт
reservedMemory = 300 * 1024 * 1024 = 314572800 байт
usableMemory = 8589934592 - 314572800 = 8275361792 байт
maxMemory = 8275361792 * 0.6 = 4965217075 байт ~ 4735 MiB
storageRegion = 4965217075 * 0.5 = 2482608537 байт ~ 2367 MiB
executionRegion = maxMemory - storageRegion = 2367 MiB
Обрати внимание: числа в документации и в spark.executor.memory указываются в GiB (1024-кратные), а JVM Runtime.getRuntime.maxMemory работает в байтах.
Алгоритм acquireStorageMemory()
Когда BlockManager хочет закэшировать блок, он вызывает acquireStorageMemory. Метод реализует следующую логику:
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
val (executionPool, storagePool, maxMemory) = memoryMode match {
case MemoryMode.ON_HEAP => (
onHeapExecutionMemoryPool,
onHeapStorageMemoryPool,
maxOnHeapStorageMemory)
case MemoryMode.OFF_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
maxOffHeapStorageMemory)
}
if (numBytes > maxMemory) {
// Блок в принципе не влезет: сразу false, без попыток
logInfo(s"Will not store $blockId as the required space " +
s"($numBytes bytes) exceeds our memory limit ($maxMemory bytes)")
return false
}
if (numBytes > storagePool.memoryFree) {
// В storage не хватает — пробуем занять у execution
val memoryBorrowedFromExecution = Math.min(
executionPool.memoryFree,
numBytes - storagePool.memoryFree)
executionPool.decrementPoolSize(memoryBorrowedFromExecution)
storagePool.incrementPoolSize(memoryBorrowedFromExecution)
}
storagePool.acquireMemory(blockId, numBytes)
}
Ключевые наблюдения:
- Метод synchronized — мьютекс на уровне всего MemoryManager. Конкурентных запросов от разных задач выстраивается очередь.
- Занять у execution можно только свободную execution-память (
executionPool.memoryFree). Память, уже занятую задачами под shuffle, не отбирается. - Если и занять нечего, метод возвращает
false— блок просто не кэшируется (или вытесняет другой блок через LRU, что делается вMemoryStoreдо вызова этого метода).
Алгоритм acquireExecutionMemory() и per-task slots
Execution-память распределяется между задачами (task) справедливо. Логика сложнее:
override private[memory] def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
val (executionPool, storagePool, storageRegionSize, maxMemory) =
memoryMode match { ... }
def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
if (extraMemoryNeeded > 0) {
// Попытка занять у storage
val memoryReclaimableFromStorage = math.max(
storagePool.memoryFree,
storagePool.poolSize - storageRegionSize)
// Максимум: свободная storage + то, что сверх storageFraction
if (memoryReclaimableFromStorage > 0) {
val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
storagePool.decrementPoolSize(spaceToReclaim)
executionPool.incrementPoolSize(spaceToReclaim)
}
}
}
executionPool.acquireMemory(numBytes, taskAttemptId, maybeGrowExecutionPool)
}
В отличие от storage, execution использует freeSpaceToShrinkPool — это вызов вытеснения (eviction) из MemoryStore. Если storage занимает больше, чем storageRegionSize (т.е. заняло execution-память), execution может принудительно вернуть эту память, удаляя кэшированные блоки.
Внутри ExecutionMemoryPool.acquireMemory() задача может ждать (через wait() / notifyAll()), пока другие задачи не освободят память:
// Псевдокод ExecutionMemoryPool
while (true) {
val maxMemoryPerTask = poolSize / numActiveTasks
val minMemoryPerTask = poolSize / (2 * numActiveTasks)
if (memoryForTask >= minMemoryPerTask) {
// Задаче хватает, выдаём
return grant
}
// Ждём, пока другая задача не освободит память
lock.wait()
}
Это защита от голодания: если executor запускает 8 задач параллельно, каждая получит долю не меньше poolSize / (2 * 8) = 6.25% пула перед тем как заблокироваться.
Конфигурационные параметры: полная таблица
| Параметр | Дефолт | Описание |
|---|---|---|
spark.memory.fraction | 0.6 | Доля (heap - 300 MiB) под Spark Memory |
spark.memory.storageFraction | 0.5 | Начальная доля Spark Memory под storage |
spark.memory.offHeap.enabled | false | Включить off-heap пул |
spark.memory.offHeap.size | 0 | Размер off-heap пула в байтах |
spark.memory.storageFraction влияет на | — | Только начальную границу; в рантайме граница двигается |
spark.executor.memory | 1g | Полный heap executor (-Xmx) |
spark.driver.memory | 1g | Отдельный heap driver |
spark.executor.memoryOverhead | 10% от memory, min 384 MiB | Overhead вне heap (JVM metaspace, thread stacks, off-heap если не задан явно) |
spark.memory.fraction в Spark 4.0 действительно равен 0.6 — это было исправлено по сравнению с некоторыми документационными ссылками, утверждавшими 0.75. Значение 0.75 встречалось в промежуточных preview-версиях. В стабильном Spark 4.0 дефолт остался 0.6. Проверяй через spark.conf.get("spark.memory.fraction") в рантайме.
Off-heap режим: зачем и как
Off-heap память выделяется через sun.misc.Unsafe.allocateMemory() — за пределами JVM heap. Это принципиально меняет поведение GC:
spark = SparkSession.builder \
.config("spark.memory.offHeap.enabled", "true") \
.config("spark.memory.offHeap.size", "4g") \
.config("spark.executor.memory", "8g") \
.getOrCreate()
С этой конфигурацией создаются четыре пула вместо двух:
onHeapStorageMemoryPool— хранит объекты JVM (deserialized кэш)onHeapExecutionMemoryPool— shuffle/sort буферы на heapoffHeapStorageMemoryPool— хранит сериализованные байты вне heapoffHeapExecutionMemoryPool— UnsafeRow данные для Tungsten sort вне heap
Когда включён off-heap, BlockManager.putBytes() принимает решение в какой пул писать на основе StorageLevel. Уровни MEMORY_ONLY и MEMORY_AND_DISK используют on-heap. Для off-heap нужно явно выбирать через StorageLevel.OFF_HEAP:
from pyspark import StorageLevel
df.persist(StorageLevel.OFF_HEAP)
При включённом off-heap Tungsten-операции (sort, hash join) используют off-heap через UnsafeMemoryAllocator. Кэш по умолчанию остаётся on-heap если не указан StorageLevel.OFF_HEAP.
Когда off-heap действительно помогает
Off-heap решает конкретную проблему: GC-паузы на больших heap. JVM GC (особенно G1GC, который Spark использует по умолчанию) при Full GC сканирует весь heap. При 32 GiB heap и запросах, генерирующих миллионы объектов, Full GC может занять 30-60 секунд, что вызывает потерю heartbeat executor.
Эмпирическое правило: если spark.executor.memory > 16 GiB, рассматривай off-heap. Если executor теряет heartbeat из-за GC (видно в логах как Executor heartbeat timed out), off-heap — приоритетное решение.
# Диагностика: проверь GC metrics в Spark UI -> Executors tab
# Колонки: GC Time, JVM Heap Used
# Если GC Time > 10% от Task Time — есть GC проблема
# Production конфиг для тяжёлых shuffle workloads:
spark = SparkSession.builder \
.config("spark.executor.memory", "16g") \
.config("spark.memory.offHeap.enabled", "true") \
.config("spark.memory.offHeap.size", "8g") \
.config("spark.memory.fraction", "0.7") \
.config("spark.executor.extraJavaOptions",
"-XX:+UseG1GC -XX:G1HeapRegionSize=32m") \
.getOrCreate()
Что смотреть в Spark UI
В Spark UI -> Executors tab две ключевые колонки:
- Storage Memory:
used / max— отображаетstoragePool.memoryUsed / maxOnHeapStorageMemory. Если постоянно близко к 100%, storage вытесняет блоки. - GC Time: агрегированное время GC. Больше 10% от суммарного task time — тревожный сигнал.
В Spark UI -> Storage tab:
- Каждый кэшированный DataFrame/RDD с размером in-memory. Сумма должна совпадать с
Storage Memory usedна вкладке Executors. - Колонка Fraction Cached: если 100% — весь RDD влез. Если 40% — 60% партиций вытеснено или не закэшировано.
Попробуй сам
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import time
spark = SparkSession.builder \
.master("local[4]") \
.config("spark.executor.memory", "2g") \
.config("spark.memory.fraction", "0.6") \
.config("spark.memory.storageFraction", "0.5") \
.appName("umm-internals") \
.getOrCreate()
# Проверяем текущие значения MemoryManager
sc = spark.sparkContext
print("Spark Memory Fraction:", sc.getConf().get("spark.memory.fraction", "0.6"))
print("Storage Fraction:", sc.getConf().get("spark.memory.storageFraction", "0.5"))
# Создаём DataFrame достаточного размера для наблюдения
import random
data = [(i, f"user_{i}", random.randint(20, 60), random.random() * 100000)
for i in range(500_000)]
df = spark.createDataFrame(data, ["id", "name", "age", "salary"])
# Кэшируем on-heap (default)
df_cached = df.cache()
count = df_cached.count() # Материализуем кэш
print(f"Cached rows: {count}")
# Проверяем через RDD.name() — все partition'ы закэшированы?
cached_rdd = df_cached.rdd
print(f"Storage level: {cached_rdd.getStorageLevel()}")
# Второй запрос — должен идти из кэша
t0 = time.time()
result = df_cached.filter(df_cached.age > 40).count()
print(f"From cache: {result} rows in {time.time() - t0:.2f}s")
# Unpersist
df_cached.unpersist()
print("Cache cleared")
Ожидаемый вывод:
Spark Memory Fraction: 0.6
Storage Fraction: 0.5
Cached rows: 500000
Storage level: Disk Memory Deserialized 1x Replicated
From cache: 249890 rows in 0.31s
Cache cleared
Открой Spark UI (http://localhost:4040) -> Storage tab во время выполнения, чтобы увидеть занятую storage memory в режиме реального времени.