Learning Platform
Глоссарий Troubleshooting
Урок 06.02 · 28 мин
Продвинутый
UnifiedMemoryManagerExecution MemoryStorage MemoryOff-HeapMemory Fraction

UnifiedMemoryManager: исходники и алгоритм

Первый урок модуля дал обзор трёх регионов памяти и принципа soft boundary. Теперь вскроем реализацию. Класс UnifiedMemoryManager в core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala — это около 300 строк Scala, которые управляют каждым байтом между execution и storage. Чтобы правильно диагностировать OOM и тюнить кластер, нужно понимать эти 300 строк на уровне алгоритма.

Иерархия классов: MemoryManager и его наследники

Класс UnifiedMemoryManager наследует от абстрактного MemoryManager:

// core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
abstract class MemoryManager(
    conf: SparkConf,
    numCores: Int,
    onHeapStorageMemory: Long,
    onHeapExecutionMemory: Long) extends Logging {

  // Два пула on-heap
  protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
  protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)

  // Два пула off-heap
  protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
  protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)

  // Абстрактный метод — реализуется в UnifiedMemoryManager
  def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
  def acquireExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long
}

До Spark 1.6 существовал StaticMemoryManager с жёсткими границами между execution и storage. Начиная с версии 1.6 он заменён UnifiedMemoryManager, который стал единственной реализацией. В Spark 4.0 StaticMemoryManager полностью удалён из кодовой базы.

Каждый executor создаёт ровно один экземпляр UnifiedMemoryManager. Driver тоже создаёт свой экземпляр — для управления памятью broadcast-переменных и внутренних структур.

Иерархия памяти в UnifiedMemoryManager (on-heap)

Все четыре пула синхронизированы через общий мьютекс в MemoryManager.

Executor JVM Heap (spark.executor.memory = 8 GiB)Полный heap процесса executorspark.executor.memory — итоговый JVM heap, передаётся в -Xmx. Сюда входят все регионы ниже.
Reserved Memory (300 MiB, жёстко)Для Spark runtime объектовЗахардкожен как RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024. Не конфигурируется без перекомпиляции. Защита от случайного уменьшения spark.memory.fraction до нуля.
Spark Memory: 0.6 * (heap - 300 MiB) = 4661 MiBspark.memory.fraction = 0.6 (по умолчанию)Общий пул, делящийся между StorageMemoryPool и ExecutionMemoryPool. Граница между ними — мягкая: можно занимать друг у друга.
Storage Memory (начальная граница: 50%)= 2330 MiBНачальная граница задана spark.memory.storageFraction = 0.5. Хранит кэшированные RDD/DataFrame партиции, broadcast-блоки. Может занимать свободное execution-пространство.
Execution Memory (начальная граница: 50%)= 2330 MiBShuffle write/read buffers, sort buffers, hash aggregation таблицы. Приоритет над storage: может вытеснять кэшированные блоки. Справедливо делится между задачами через per-task slots.
User Memory (40%)= 3107 MiB(1 - spark.memory.fraction) * (heap - 300 MiB). Для пользовательских объектов: UDF-данные, RDD metadata, Kryo/Java сериализация, внутренние структуры Spark не под управлением MemoryManager.

Вычисление границ: метод apply()

UnifiedMemoryManager создаётся через companion object. Метод apply() вычисляет начальные размеры пулов:

object UnifiedMemoryManager {
  // Захардкоженный reserved memory
  private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024L  // 300 MiB

  def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
    val maxMemory = getMaxMemory(conf)
    new UnifiedMemoryManager(
      conf,
      maxHeapMemory = maxMemory,
      onHeapStorageRegionSize =
        (maxMemory * conf.get(MEMORY_STORAGE_FRACTION)).toLong,
      numCores = numCores)
  }

  private def getMaxMemory(conf: SparkConf): Long = {
    val systemMemory = conf.get(TEST_MEMORY)
      .getOrElse(Runtime.getRuntime.maxMemory)  // -Xmx значение

    val reservedMemory = conf.getLong(
      "spark.testing.reservedMemory",
      RESERVED_SYSTEM_MEMORY_BYTES)

    val usableMemory = systemMemory - reservedMemory  // heap минус 300 MiB
    val memoryFraction = conf.get(MEMORY_FRACTION)    // 0.6 по умолчанию
    (usableMemory * memoryFraction).toLong
  }
}

Итого для executor с 8 GiB heap:

systemMemory     = 8 * 1024 * 1024 * 1024 = 8589934592 байт
reservedMemory   = 300 * 1024 * 1024      =  314572800 байт
usableMemory     = 8589934592 - 314572800 = 8275361792 байт
maxMemory        = 8275361792 * 0.6       = 4965217075 байт ~ 4735 MiB
storageRegion    = 4965217075 * 0.5       = 2482608537 байт ~ 2367 MiB
executionRegion  = maxMemory - storageRegion = 2367 MiB

Обрати внимание: числа в документации и в spark.executor.memory указываются в GiB (1024-кратные), а JVM Runtime.getRuntime.maxMemory работает в байтах.

Алгоритм acquireStorageMemory()

Когда BlockManager хочет закэшировать блок, он вызывает acquireStorageMemory. Метод реализует следующую логику:

override def acquireStorageMemory(
    blockId: BlockId,
    numBytes: Long,
    memoryMode: MemoryMode): Boolean = synchronized {

  val (executionPool, storagePool, maxMemory) = memoryMode match {
    case MemoryMode.ON_HEAP => (
      onHeapExecutionMemoryPool,
      onHeapStorageMemoryPool,
      maxOnHeapStorageMemory)
    case MemoryMode.OFF_HEAP => (
      offHeapExecutionMemoryPool,
      offHeapStorageMemoryPool,
      maxOffHeapStorageMemory)
  }

  if (numBytes > maxMemory) {
    // Блок в принципе не влезет: сразу false, без попыток
    logInfo(s"Will not store $blockId as the required space " +
      s"($numBytes bytes) exceeds our memory limit ($maxMemory bytes)")
    return false
  }

  if (numBytes > storagePool.memoryFree) {
    // В storage не хватает — пробуем занять у execution
    val memoryBorrowedFromExecution = Math.min(
      executionPool.memoryFree,
      numBytes - storagePool.memoryFree)

    executionPool.decrementPoolSize(memoryBorrowedFromExecution)
    storagePool.incrementPoolSize(memoryBorrowedFromExecution)
  }

  storagePool.acquireMemory(blockId, numBytes)
}

Ключевые наблюдения:

  1. Метод synchronized — мьютекс на уровне всего MemoryManager. Конкурентных запросов от разных задач выстраивается очередь.
  2. Занять у execution можно только свободную execution-память (executionPool.memoryFree). Память, уже занятую задачами под shuffle, не отбирается.
  3. Если и занять нечего, метод возвращает false — блок просто не кэшируется (или вытесняет другой блок через LRU, что делается в MemoryStore до вызова этого метода).

Алгоритм acquireExecutionMemory() и per-task slots

Execution-память распределяется между задачами (task) справедливо. Логика сложнее:

override private[memory] def acquireExecutionMemory(
    numBytes: Long,
    taskAttemptId: Long,
    memoryMode: MemoryMode): Long = synchronized {

  val (executionPool, storagePool, storageRegionSize, maxMemory) =
    memoryMode match { ... }

  def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
    if (extraMemoryNeeded > 0) {
      // Попытка занять у storage
      val memoryReclaimableFromStorage = math.max(
        storagePool.memoryFree,
        storagePool.poolSize - storageRegionSize)
      // Максимум: свободная storage + то, что сверх storageFraction

      if (memoryReclaimableFromStorage > 0) {
        val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
          math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
        storagePool.decrementPoolSize(spaceToReclaim)
        executionPool.incrementPoolSize(spaceToReclaim)
      }
    }
  }

  executionPool.acquireMemory(numBytes, taskAttemptId, maybeGrowExecutionPool)
}

В отличие от storage, execution использует freeSpaceToShrinkPool — это вызов вытеснения (eviction) из MemoryStore. Если storage занимает больше, чем storageRegionSize (т.е. заняло execution-память), execution может принудительно вернуть эту память, удаляя кэшированные блоки.

Внутри ExecutionMemoryPool.acquireMemory() задача может ждать (через wait() / notifyAll()), пока другие задачи не освободят память:

// Псевдокод ExecutionMemoryPool
while (true) {
  val maxMemoryPerTask = poolSize / numActiveTasks
  val minMemoryPerTask = poolSize / (2 * numActiveTasks)

  if (memoryForTask >= minMemoryPerTask) {
    // Задаче хватает, выдаём
    return grant
  }
  // Ждём, пока другая задача не освободит память
  lock.wait()
}

Это защита от голодания: если executor запускает 8 задач параллельно, каждая получит долю не меньше poolSize / (2 * 8) = 6.25% пула перед тем как заблокироваться.

Конфигурационные параметры: полная таблица

ПараметрДефолтОписание
spark.memory.fraction0.6Доля (heap - 300 MiB) под Spark Memory
spark.memory.storageFraction0.5Начальная доля Spark Memory под storage
spark.memory.offHeap.enabledfalseВключить off-heap пул
spark.memory.offHeap.size0Размер off-heap пула в байтах
spark.memory.storageFraction влияет наТолько начальную границу; в рантайме граница двигается
spark.executor.memory1gПолный heap executor (-Xmx)
spark.driver.memory1gОтдельный heap driver
spark.executor.memoryOverhead10% от memory, min 384 MiBOverhead вне heap (JVM metaspace, thread stacks, off-heap если не задан явно)
WARNING

spark.memory.fraction в Spark 4.0 действительно равен 0.6 — это было исправлено по сравнению с некоторыми документационными ссылками, утверждавшими 0.75. Значение 0.75 встречалось в промежуточных preview-версиях. В стабильном Spark 4.0 дефолт остался 0.6. Проверяй через spark.conf.get("spark.memory.fraction") в рантайме.

Off-heap режим: зачем и как

Off-heap память выделяется через sun.misc.Unsafe.allocateMemory() — за пределами JVM heap. Это принципиально меняет поведение GC:

spark = SparkSession.builder \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "4g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

С этой конфигурацией создаются четыре пула вместо двух:

  • onHeapStorageMemoryPool — хранит объекты JVM (deserialized кэш)
  • onHeapExecutionMemoryPool — shuffle/sort буферы на heap
  • offHeapStorageMemoryPool — хранит сериализованные байты вне heap
  • offHeapExecutionMemoryPool — UnsafeRow данные для Tungsten sort вне heap

Когда включён off-heap, BlockManager.putBytes() принимает решение в какой пул писать на основе StorageLevel. Уровни MEMORY_ONLY и MEMORY_AND_DISK используют on-heap. Для off-heap нужно явно выбирать через StorageLevel.OFF_HEAP:

from pyspark import StorageLevel

df.persist(StorageLevel.OFF_HEAP)
On-heap vs Off-heap: что куда идёт

При включённом off-heap Tungsten-операции (sort, hash join) используют off-heap через UnsafeMemoryAllocator. Кэш по умолчанию остаётся on-heap если не указан StorageLevel.OFF_HEAP.

ON-HEAP (по умолчанию)Управляется JVM GCJVM выделяет из -Xmx heap. GC отслеживает объекты. При Full GC — stop-the-world пауза.
StorageMemoryPool: кэш RDD/DFdeserialized Row objectscache() и persist(MEMORY_ONLY) хранят JVM-объекты. Быстрый доступ без десериализации, но высокое GC давление.
ExecutionMemoryPool: shuffle буферыPartitionedAppendOnlyMap, ExternalSorterБуферы for shuffle write, sort, hash aggregation. Занимают heap, видны GC.
OFF-HEAP (если включён)Через sun.misc.Unsafe, вне GCUnsafe.allocateMemory() выделяет нативную память. GC не видит эти байты. Ручное управление жизненным циклом через MemoryConsumer.free().
StorageMemoryPool: сериализованные блокиpersist(StorageLevel.OFF_HEAP)Сериализованные байты блока хранятся в off-heap ChunkedByteBuffer. Нет GC overhead, но требует сериализации/десериализации при каждом доступе.
ExecutionMemoryPool: Tungsten UnsafeRowUnsafeExternalSorter, BytesToBytesMapTungsten-оптимизированные структуры данных хранят UnsafeRow как непрерывные байты. GC не мешает sort и hash join даже на больших объёмах.

Когда off-heap действительно помогает

Off-heap решает конкретную проблему: GC-паузы на больших heap. JVM GC (особенно G1GC, который Spark использует по умолчанию) при Full GC сканирует весь heap. При 32 GiB heap и запросах, генерирующих миллионы объектов, Full GC может занять 30-60 секунд, что вызывает потерю heartbeat executor.

Эмпирическое правило: если spark.executor.memory > 16 GiB, рассматривай off-heap. Если executor теряет heartbeat из-за GC (видно в логах как Executor heartbeat timed out), off-heap — приоритетное решение.

# Диагностика: проверь GC metrics в Spark UI -> Executors tab
# Колонки: GC Time, JVM Heap Used
# Если GC Time > 10% от Task Time — есть GC проблема

# Production конфиг для тяжёлых shuffle workloads:
spark = SparkSession.builder \
    .config("spark.executor.memory", "16g") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "8g") \
    .config("spark.memory.fraction", "0.7") \
    .config("spark.executor.extraJavaOptions",
            "-XX:+UseG1GC -XX:G1HeapRegionSize=32m") \
    .getOrCreate()

Что смотреть в Spark UI

В Spark UI -> Executors tab две ключевые колонки:

  • Storage Memory: used / max — отображает storagePool.memoryUsed / maxOnHeapStorageMemory. Если постоянно близко к 100%, storage вытесняет блоки.
  • GC Time: агрегированное время GC. Больше 10% от суммарного task time — тревожный сигнал.

В Spark UI -> Storage tab:

  • Каждый кэшированный DataFrame/RDD с размером in-memory. Сумма должна совпадать с Storage Memory used на вкладке Executors.
  • Колонка Fraction Cached: если 100% — весь RDD влез. Если 40% — 60% партиций вытеснено или не закэшировано.

Попробуй сам

from pyspark.sql import SparkSession
from pyspark import StorageLevel
import time

spark = SparkSession.builder \
    .master("local[4]") \
    .config("spark.executor.memory", "2g") \
    .config("spark.memory.fraction", "0.6") \
    .config("spark.memory.storageFraction", "0.5") \
    .appName("umm-internals") \
    .getOrCreate()

# Проверяем текущие значения MemoryManager
sc = spark.sparkContext
print("Spark Memory Fraction:", sc.getConf().get("spark.memory.fraction", "0.6"))
print("Storage Fraction:", sc.getConf().get("spark.memory.storageFraction", "0.5"))

# Создаём DataFrame достаточного размера для наблюдения
import random
data = [(i, f"user_{i}", random.randint(20, 60), random.random() * 100000)
        for i in range(500_000)]
df = spark.createDataFrame(data, ["id", "name", "age", "salary"])

# Кэшируем on-heap (default)
df_cached = df.cache()
count = df_cached.count()  # Материализуем кэш
print(f"Cached rows: {count}")

# Проверяем через RDD.name() — все partition'ы закэшированы?
cached_rdd = df_cached.rdd
print(f"Storage level: {cached_rdd.getStorageLevel()}")

# Второй запрос — должен идти из кэша
t0 = time.time()
result = df_cached.filter(df_cached.age > 40).count()
print(f"From cache: {result} rows in {time.time() - t0:.2f}s")

# Unpersist
df_cached.unpersist()
print("Cache cleared")

Ожидаемый вывод:

Spark Memory Fraction: 0.6
Storage Fraction: 0.5
Cached rows: 500000
Storage level: Disk Memory Deserialized 1x Replicated
From cache: 249890 rows in 0.31s
Cache cleared

Открой Spark UI (http://localhost:4040) -> Storage tab во время выполнения, чтобы увидеть занятую storage memory в режиме реального времени.

Проверка знанийKnowledge check
На executor с 16 GiB heap запущены 8 задач одновременно. spark.memory.fraction=0.6, spark.memory.storageFraction=0.5, off-heap выключен. Задача A пытается занять 2.5 GiB для shuffle sort, но execution pool почти заполнен. Storage pool занят кэшем на 4 GiB (всё выше storageFraction). Что сделает UnifiedMemoryManager?
ОтветAnswer
UnifiedMemoryManager попытается освободить место в три шага. Шаг 1: проверит свободную память в execution pool — если есть свободные байты, выдаст оттуда. Шаг 2: вычислит 'reclaimable from storage' как max(storagePool.memoryFree, storagePool.poolSize - storageRegionSize). Так как storage занял память сверх своего storageFraction (занял из execution), execution имеет право вернуть этот излишек — вызывает freeSpaceToShrinkPool(), что вытесняет LRU блоки из MemoryStore. Шаг 3: если вытеснения хватает — задача A получает память и продолжает. Если нет — задача A блокируется через lock.wait() в ExecutionMemoryPool, ожидая пока другие 7 задач не освободят своё execution memory. Блок защищён: нижние storageFraction*sparkMemory не вытесняются execution — только излишек сверх этой границы.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. В UnifiedMemoryManager.acquireStorageMemory() executor хочет закэшировать блок 800 MiB. StorageMemoryPool.memoryFree = 300 MiB, ExecutionMemoryPool.memoryFree = 600 MiB. Что сделает метод?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5