Learning Platform
Глоссарий Troubleshooting
Урок 06.04 · 30 мин
Продвинутый
MemoryStoreDiskStoreStorageLevelLRU EvictionUnrolling

MemoryStore и DiskStore: физика хранения блоков

BlockManager — это facade. Физическое хранение делегируется двум классам: MemoryStore хранит блоки в RAM (on-heap или off-heap), DiskStore — на локальном диске. Понимание их алгоритмов — это разница между кэшем, который работает, и кэшем, который молча вытесняется, пока ты смотришь на Fraction Cached: 40% в Spark UI.

StorageLevel: матрица из флагов

StorageLevel — это sealed class с пятью булевыми флагами и целочисленным _replication:

class StorageLevel private (
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)

Из комбинаций получаются стандартные уровни:

УровеньuseDiskuseMemorydeserializedreplicationЧто происходит при вытеснении
MEMORY_ONLYfalsetruetrue1Блок удаляется безвозвратно
MEMORY_AND_DISKtruetruetrue1Спилл на диск
MEMORY_ONLY_SERfalsetruefalse1Блок удаляется (сериализованный)
MEMORY_AND_DISK_SERtruetruefalse1Спилл на диск
DISK_ONLYtruefalsefalse1N/A (сразу на диск)
MEMORY_ONLY_2falsetruetrue2Реплика на другом executor
MEMORY_AND_DISK_2truetruetrue2Реплика на другом executor
OFF_HEAPfalsefalsefalse1Off-heap через Unsafe

Дефолт для df.cache() в Spark — MEMORY_AND_DISK. Для rdd.cache() тоже MEMORY_AND_DISK. Это безопаснее, чем MEMORY_ONLY: при вытеснении партиция пишется на диск, а не исчезает.

WARNING

MEMORY_ONLY_SER экономит RAM (сериализованные данные плотнее JVM-объектов в 2-5x), но требует десериализации при каждом чтении. Для DataFrame это почти всегда проигрыш по скорости: Spark сам хранит данные в columnar формате (UnsafeRow), поэтому MEMORY_ONLY (deserialized) быстрее MEMORY_ONLY_SER.

MemoryStore: структура данных

MemoryStore хранит блоки в LinkedHashMap с accessOrder=true — это встроенный механизм LRU:

// core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
// accessOrder=true: итерирование идёт от least-recently-used к most-recently-used

MemoryEntry — sealed trait с двумя реализациями:

sealed trait MemoryEntry[T] {
  def size: Long
  def memoryMode: MemoryMode
  def classTag: ClassTag[T]
}

// Десериализованный: JVM-объекты в heap
case class DeserializedMemoryEntry[T](
    value: Array[T],        // Массив объектов Row, GenericArrayData, etc.
    size: Long,             // Оценка размера через SizeEstimator
    classTag: ClassTag[T]
) extends MemoryEntry[T] {
  val memoryMode: MemoryMode = MemoryMode.ON_HEAP
}

// Сериализованный: байты (on-heap ByteBuffer или off-heap через Unsafe)
case class SerializedMemoryEntry[T](
    buffer: ChunkedByteBuffer,  // Один или несколько ByteBuffer
    memoryMode: MemoryMode,     // ON_HEAP или OFF_HEAP
    classTag: ClassTag[T]
) extends MemoryEntry[T]

Десериализованное хранение (deserialized=true) — это массив JVM-объектов. При чтении задача получает объекты напрямую без десериализации, но GC видит каждый объект. Сериализованное — это один ChunkedByteBuffer с байтами в формате Kryo или Java serialization. GC видит только один объект (буфер), но при чтении нужна десериализация.

Unrolling: почему нельзя писать блок напрямую

Когда задача кэширует партицию, данные поступают как итератор — строки читаются одна за другой. Нельзя зарезервировать память сразу, потому что размер заранее неизвестен (сколько строк пройдёт фильтр?). Именно для этого существует unrolling:

def putIteratorAsValues[T](
    blockId: BlockId,
    values: Iterator[T],
    classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {

  // 1. Резервируем начальный unroll memory (spark.storage.unrollMemoryThreshold = 1 MiB)
  var unrollMemoryUsedByThisBlock = 0L
  val initialMemoryThreshold = unrollMemoryThreshold  // 1 MiB
  keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, ON_HEAP)

  val vector = new SizeTrackingVector[T]  // Хранит объекты, отслеживает размер

  while (values.hasNext && keepUnrolling) {
    vector += values.next()  // Добавляем объект

    // 2. Периодически (каждые 16 элементов) проверяем и докупаем память
    if (vector.length % memoryCheckPeriod == 0) {
      val currentSize = vector.estimateSize()
      if (currentSize >= memoryThreshold) {
        val amountToRequest = 2 * currentSize - memoryThreshold
        keepUnrolling = reserveUnrollMemoryForThisTask(
          blockId, amountToRequest, MemoryMode.ON_HEAP)
        if (keepUnrolling) {
          memoryThreshold = currentSize + amountToRequest
        }
      }
    }
  }

  if (keepUnrolling) {
    // 3. Успех: конвертируем unroll memory -> storage memory
    val entry = new DeserializedMemoryEntry[T](vector.toArray, ...)
    val size = entry.size
    def transferUnrollToStorage(amount: Long): Unit = {
      unrollMemoryMap(taskAttemptId) -= amount
      storageMemoryUsed += amount  // Не меняем MemoryManager счётчики
    }
    entries.synchronized {
      transferUnrollToStorage(unrollMemoryUsedByThisBlock)
      entries.put(blockId, entry)
    }
    Right(size)  // Успех
  } else {
    // 4. Не хватило памяти: возвращаем частично unroll'енный итератор
    // BlockManager может попробовать записать на диск
    Left(new PartiallyUnrolledIterator(this, ON_HEAP,
      unrollMemoryUsedByThisBlock, unrolled = vector.iterator,
      rest = values))
  }
}

Если unrolling не удался (вернул Left), BlockManager пробует записать блок на диск (если useDisk=true в StorageLevel). Если и диска нет — блок просто не кэшируется, задача продолжает без кэша.

Unrolling: жизненный цикл занятия памяти

Unroll memory временно занимается из storage pool. После успешного unrolling конвертируется в storage entry. При провале освобождается.

1. reserveUnrollMemory(1 MiB)Начальный запрос из storage poolspark.storage.unrollMemoryThreshold (default 1 MiB). Если storage pool занят, вытесняет LRU блоки чтобы освободить место для unrolling.
2. Цикл: SizeTrackingVector.add() каждые 16 строкПроверка размера и запрос дополнительной памятиSizeEstimator.estimate() использует рефлексию для оценки объектного графа. Точность ~90-95%: не видит внутренности ByteBuffer и off-heap. Запрос дополнительной памяти удваивает текущий порог.
Успех: конвертация unroll -> storageentries.put(blockId, entry)Unroll memory переводится в категорию storage без физического перемещения данных — только обновление счётчиков в MemoryManager.
Провал: PartiallyUnrolledIteratorВозврат Left с частично прочитанными даннымиBlockManager пробует DiskStore.put(). Если StorageLevel не включает диск — блок теряется. Задача продолжает работу, читая данные из источника повторно при следующем обращении.

LRU-вытеснение: алгоритм evictBlocksToFreeSpace

Когда нужно освободить место (для unrolling нового блока или по требованию ExecutionMemoryPool), вызывается evictBlocksToFreeSpace:

def evictBlocksToFreeSpace(
    blockId: Option[BlockId],
    space: Long,
    memoryMode: MemoryMode): Long = {

  var freedMemory = 0L
  val rddToAdd = blockId.flatMap(getRddId)  // RDD ID нового блока

  // Итерируем entries от LRU (least recently used) к MRU
  // LinkedHashMap(accessOrder=true) гарантирует этот порядок
  val iterator = entries.entrySet().iterator()

  while (freedMemory < space && iterator.hasNext) {
    val pair = iterator.next()
    val blockId = pair.getKey
    val entry = pair.getValue

    // Не вытесняем блок того же RDD что кэшируем — circulareviction
    if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
      if (entry.memoryMode == memoryMode) {
        iterator.remove()  // Удаляем из entries

        // Если блок принадлежит BroadcastBlockId — не спиллим, просто удаляем
        // Если RDDBlockId и useDisk=true — спиллим на диск через BlockManager

        val freed = entry.size
        freedMemory += freed
      }
    }
  }

  freedMemory
}

Ключевой момент: итерация по LinkedHashMap(accessOrder=true) начинается с наименее недавно использованного блока. Каждый вызов entries.get(blockId) обновляет позицию блока в этой очереди. Поэтому часто читаемые блоки остаются в MRU-конце и не вытесняются.

Ограничение anti-circular: блок того же RDD не вытесняет сам себя. Если ты кэшируешь RDD#42 partition 7 и в памяти нет места, Spark не будет вытеснять RDD#42 partition 3 — это привело бы к бесконечному вытеснению при обходе партиций одного RDD.

Что происходит при MEMORY_AND_DISK вытеснении

Если StorageLevel включает useDisk=true, вытесненный блок пишется на диск через DiskStore:

// В BlockManager.dropFromMemory():
if (level.useDisk && !diskStore.contains(blockId)) {
  // Сериализуем и пишем на диск
  val data = entry match {
    case DeserializedMemoryEntry(values, _, ct) =>
      serializerManager.dataSerializeWithExplicitClassTag(blockId, values.iterator, ct)
    case SerializedMemoryEntry(buffer, _, _) =>
      buffer
  }
  diskStore.put(blockId) { channel =>
    val buffer = new ChunkedByteBufferOutputStream(...)
    data.writeFully(channel)
  }
}

После записи на диск, при следующем чтении этого блока BlockManager будет читать из DiskStore — это медленнее (I/O), но блок не нужно пересчитывать.

DiskStore: файловая система и spark.local.dir

DiskStore хранит каждый блок как отдельный файл:

// core/src/main/scala/org/apache/spark/storage/DiskStore.scala
class DiskStore(conf: SparkConf, diskManager: DiskBlockManager, ...) {

  def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
    val file = diskManager.getFile(blockId)  // Путь в spark.local.dir
    val out = new CountingWritableChannel(openForWrite(file))
    try {
      writeFunc(out)
    } finally {
      out.close()
    }
    val size = out.count
    blockSizes.put(blockId, size)
  }
}

DiskBlockManager управляет файловой структурой в spark.local.dir:

spark.local.dir/
  blockmgr-<uuid>/
    0c/  # Hash-subdirectory (0x0 - 0xff)
      rdd_42_7    # RDDBlockId(42, 7) — кэшированная партиция
    shuffle_0_3_5.data  # ShuffleDataBlockId
    shuffle_0_3.index   # ShuffleIndexBlockId

Субдиректории 00/ - ff/ — это 256 hash-bucket’ов для равномерного распределения файлов и предотвращения inode проблем при большом числе файлов. Имя файла — blockId.name.

TIP

Настрой spark.local.dir на быстрый NVMe SSD (желательно несколько дисков через запятую). Spark будет round-robin распределять файлы между дисками. В облаке (AWS EMR, GCP Dataproc) используй instance store (NVMe) вместо EBS для spark.local.dir. Разница в производительности shuffle и spill — 5-10x.

Production: наблюдение через Spark UI и логи

В Spark UI -> Storage tab смотри:

  • Size in Memory vs Size on Disk: если Size on Disk ненулевой — были spill-вытеснения
  • Fraction Cached ниже 100%: часть партиций вытеснена без записи на диск (MEMORY_ONLY) или ещё не кэширована

В executor логах ищи:

# Успешный unrolling
DEBUG MemoryStore: Block rdd_42_7 stored as values in memory
  (estimated size 45.3 MiB, free 234.1 MiB)

# Вытеснение при новом unrolling
INFO MemoryStore: Memory use = 2.1 GiB (blocks) + 234.1 MiB (scratch
  space shared with 3 task(s)) = 2.4 GiB. Target local block size: 512.0 MiB
INFO MemoryStore: Block rdd_18_3 evicted from memory (size: 78.3 MiB,
  free: 0.0 B)
DEBUG BlockManager: Putting block rdd_18_3 on disk

# Провальный unrolling
WARN BlockManager: Not enough space to cache rdd_42_11 in memory!
  (computed 512.0 MiB so far, but only 312.0 MiB free after evicting)

Попробуй сам

from pyspark.sql import SparkSession
from pyspark import StorageLevel
import time

spark = SparkSession.builder \
    .master("local[2]") \
    .config("spark.executor.memory", "512m") \
    .config("spark.memory.fraction", "0.6") \
    .config("spark.memory.storageFraction", "0.5") \
    .appName("storage-levels") \
    .getOrCreate()

sc = spark.sparkContext

# Генерируем данные: ~100 MiB RDD
data = list(range(2_000_000))
rdd = sc.parallelize(data, numSlices=4)

# 1. MEMORY_ONLY: при нехватке памяти партиции теряются
t0 = time.time()
mem_only = rdd.persist(StorageLevel.MEMORY_ONLY)
print(f"MEMORY_ONLY count: {mem_only.count()}, time: {time.time()-t0:.2f}s")
# Второй count — из кэша (если влезло)
t0 = time.time()
print(f"MEMORY_ONLY 2nd: {mem_only.count()}, time: {time.time()-t0:.2f}s")
mem_only.unpersist()

# 2. MEMORY_AND_DISK_SER: сериализованный, спилл на диск
t0 = time.time()
mem_disk_ser = rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
print(f"MEM_DISK_SER count: {mem_disk_ser.count()}, time: {time.time()-t0:.2f}s")
t0 = time.time()
print(f"MEM_DISK_SER 2nd: {mem_disk_ser.count()}, time: {time.time()-t0:.2f}s")
mem_disk_ser.unpersist()

# 3. DISK_ONLY: всегда на диске, медленно
t0 = time.time()
disk_only = rdd.persist(StorageLevel.DISK_ONLY)
print(f"DISK_ONLY count: {disk_only.count()}, time: {time.time()-t0:.2f}s")
t0 = time.time()
print(f"DISK_ONLY 2nd: {disk_only.count()}, time: {time.time()-t0:.2f}s")
disk_only.unpersist()

spark.stop()

Ожидаемое поведение (на малой памяти):

MEMORY_ONLY count: 2000000, time: 2.14s
MEMORY_ONLY 2nd: 2000000, time: 0.42s  # Из кэша
MEM_DISK_SER count: 2000000, time: 2.38s
MEM_DISK_SER 2nd: 2000000, time: 1.21s  # Частично с диска
DISK_ONLY count: 2000000, time: 2.91s
DISK_ONLY 2nd: 2000000, time: 1.89s    # Всегда с диска
Проверка знанийKnowledge check
Spark-задание кэширует DataFrame df1 (800 MiB) с уровнем MEMORY_AND_DISK. Потом кэширует df2 (1.2 GiB) с уровнем MEMORY_ONLY. Storage memory pool = 1 GiB. Опиши, что произойдёт с блоками df1 и df2, и почему результат может удивить разработчика.
ОтветAnswer
Storage memory pool = 1 GiB. df1 кэшируется первым: 800 MiB помещается целиком (Fraction Cached: 100%). Затем df2 пытается занять 1.2 GiB при свободных 200 MiB. MemoryStore запускает evictBlocksToFreeSpace(): итерирует LRU, начиная с блоков df1 (они были закэшированы раньше, значит LRU). Вытесняет блоки df1 один за другим. Поскольку df1 имеет MEMORY_AND_DISK, вытесненные блоки записываются на диск в DiskStore. После вытеснения 800 MiB блоков df1 освобождается 800 MiB + 200 MiB = 1 GiB, но df2 нужно 1.2 GiB — места всё ещё не хватает. Unrolling df2 провалится (1.2 GiB превышает 1 GiB pool), часть партиций df2 не попадёт в кэш (Fraction Cached ниже 100%). Сюрприз: df1 с MEMORY_AND_DISK вытеснен на диск, а df2 с MEMORY_ONLY частично некэширован. Разработчик ожидал, что MEMORY_AND_DISK 'безопаснее', но df1 оказался вытеснен именно потому, что был закэширован раньше (LRU) — независимо от StorageLevel. StorageLevel определяет только что делать при вытеснении (писать на диск или удалить), но не защищает от вытеснения.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. MemoryStore использует LinkedHashMap(accessOrder=true) для хранения блоков. Executor читает блок B три раза, блок A один раз, блок C два раза. При следующем LRU eviction какой блок будет вытеснен первым?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5