MemoryStore и DiskStore: физика хранения блоков
BlockManager — это facade. Физическое хранение делегируется двум классам: MemoryStore хранит блоки в RAM (on-heap или off-heap), DiskStore — на локальном диске. Понимание их алгоритмов — это разница между кэшем, который работает, и кэшем, который молча вытесняется, пока ты смотришь на Fraction Cached: 40% в Spark UI.
StorageLevel: матрица из флагов
StorageLevel — это sealed class с пятью булевыми флагами и целочисленным _replication:
class StorageLevel private (
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
Из комбинаций получаются стандартные уровни:
| Уровень | useDisk | useMemory | deserialized | replication | Что происходит при вытеснении |
|---|---|---|---|---|---|
MEMORY_ONLY | false | true | true | 1 | Блок удаляется безвозвратно |
MEMORY_AND_DISK | true | true | true | 1 | Спилл на диск |
MEMORY_ONLY_SER | false | true | false | 1 | Блок удаляется (сериализованный) |
MEMORY_AND_DISK_SER | true | true | false | 1 | Спилл на диск |
DISK_ONLY | true | false | false | 1 | N/A (сразу на диск) |
MEMORY_ONLY_2 | false | true | true | 2 | Реплика на другом executor |
MEMORY_AND_DISK_2 | true | true | true | 2 | Реплика на другом executor |
OFF_HEAP | false | false | false | 1 | Off-heap через Unsafe |
Дефолт для df.cache() в Spark — MEMORY_AND_DISK. Для rdd.cache() тоже MEMORY_AND_DISK. Это безопаснее, чем MEMORY_ONLY: при вытеснении партиция пишется на диск, а не исчезает.
MEMORY_ONLY_SER экономит RAM (сериализованные данные плотнее JVM-объектов в 2-5x), но требует десериализации при каждом чтении. Для DataFrame это почти всегда проигрыш по скорости: Spark сам хранит данные в columnar формате (UnsafeRow), поэтому MEMORY_ONLY (deserialized) быстрее MEMORY_ONLY_SER.
MemoryStore: структура данных
MemoryStore хранит блоки в LinkedHashMap с accessOrder=true — это встроенный механизм LRU:
// core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
// accessOrder=true: итерирование идёт от least-recently-used к most-recently-used
MemoryEntry — sealed trait с двумя реализациями:
sealed trait MemoryEntry[T] {
def size: Long
def memoryMode: MemoryMode
def classTag: ClassTag[T]
}
// Десериализованный: JVM-объекты в heap
case class DeserializedMemoryEntry[T](
value: Array[T], // Массив объектов Row, GenericArrayData, etc.
size: Long, // Оценка размера через SizeEstimator
classTag: ClassTag[T]
) extends MemoryEntry[T] {
val memoryMode: MemoryMode = MemoryMode.ON_HEAP
}
// Сериализованный: байты (on-heap ByteBuffer или off-heap через Unsafe)
case class SerializedMemoryEntry[T](
buffer: ChunkedByteBuffer, // Один или несколько ByteBuffer
memoryMode: MemoryMode, // ON_HEAP или OFF_HEAP
classTag: ClassTag[T]
) extends MemoryEntry[T]
Десериализованное хранение (deserialized=true) — это массив JVM-объектов. При чтении задача получает объекты напрямую без десериализации, но GC видит каждый объект. Сериализованное — это один ChunkedByteBuffer с байтами в формате Kryo или Java serialization. GC видит только один объект (буфер), но при чтении нужна десериализация.
Unrolling: почему нельзя писать блок напрямую
Когда задача кэширует партицию, данные поступают как итератор — строки читаются одна за другой. Нельзя зарезервировать память сразу, потому что размер заранее неизвестен (сколько строк пройдёт фильтр?). Именно для этого существует unrolling:
def putIteratorAsValues[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
// 1. Резервируем начальный unroll memory (spark.storage.unrollMemoryThreshold = 1 MiB)
var unrollMemoryUsedByThisBlock = 0L
val initialMemoryThreshold = unrollMemoryThreshold // 1 MiB
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, ON_HEAP)
val vector = new SizeTrackingVector[T] // Хранит объекты, отслеживает размер
while (values.hasNext && keepUnrolling) {
vector += values.next() // Добавляем объект
// 2. Периодически (каждые 16 элементов) проверяем и докупаем память
if (vector.length % memoryCheckPeriod == 0) {
val currentSize = vector.estimateSize()
if (currentSize >= memoryThreshold) {
val amountToRequest = 2 * currentSize - memoryThreshold
keepUnrolling = reserveUnrollMemoryForThisTask(
blockId, amountToRequest, MemoryMode.ON_HEAP)
if (keepUnrolling) {
memoryThreshold = currentSize + amountToRequest
}
}
}
}
if (keepUnrolling) {
// 3. Успех: конвертируем unroll memory -> storage memory
val entry = new DeserializedMemoryEntry[T](vector.toArray, ...)
val size = entry.size
def transferUnrollToStorage(amount: Long): Unit = {
unrollMemoryMap(taskAttemptId) -= amount
storageMemoryUsed += amount // Не меняем MemoryManager счётчики
}
entries.synchronized {
transferUnrollToStorage(unrollMemoryUsedByThisBlock)
entries.put(blockId, entry)
}
Right(size) // Успех
} else {
// 4. Не хватило памяти: возвращаем частично unroll'енный итератор
// BlockManager может попробовать записать на диск
Left(new PartiallyUnrolledIterator(this, ON_HEAP,
unrollMemoryUsedByThisBlock, unrolled = vector.iterator,
rest = values))
}
}
Если unrolling не удался (вернул Left), BlockManager пробует записать блок на диск (если useDisk=true в StorageLevel). Если и диска нет — блок просто не кэшируется, задача продолжает без кэша.
Unroll memory временно занимается из storage pool. После успешного unrolling конвертируется в storage entry. При провале освобождается.
LRU-вытеснение: алгоритм evictBlocksToFreeSpace
Когда нужно освободить место (для unrolling нового блока или по требованию ExecutionMemoryPool), вызывается evictBlocksToFreeSpace:
def evictBlocksToFreeSpace(
blockId: Option[BlockId],
space: Long,
memoryMode: MemoryMode): Long = {
var freedMemory = 0L
val rddToAdd = blockId.flatMap(getRddId) // RDD ID нового блока
// Итерируем entries от LRU (least recently used) к MRU
// LinkedHashMap(accessOrder=true) гарантирует этот порядок
val iterator = entries.entrySet().iterator()
while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
val entry = pair.getValue
// Не вытесняем блок того же RDD что кэшируем — circulareviction
if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
if (entry.memoryMode == memoryMode) {
iterator.remove() // Удаляем из entries
// Если блок принадлежит BroadcastBlockId — не спиллим, просто удаляем
// Если RDDBlockId и useDisk=true — спиллим на диск через BlockManager
val freed = entry.size
freedMemory += freed
}
}
}
freedMemory
}
Ключевой момент: итерация по LinkedHashMap(accessOrder=true) начинается с наименее недавно использованного блока. Каждый вызов entries.get(blockId) обновляет позицию блока в этой очереди. Поэтому часто читаемые блоки остаются в MRU-конце и не вытесняются.
Ограничение anti-circular: блок того же RDD не вытесняет сам себя. Если ты кэшируешь RDD#42 partition 7 и в памяти нет места, Spark не будет вытеснять RDD#42 partition 3 — это привело бы к бесконечному вытеснению при обходе партиций одного RDD.
Что происходит при MEMORY_AND_DISK вытеснении
Если StorageLevel включает useDisk=true, вытесненный блок пишется на диск через DiskStore:
// В BlockManager.dropFromMemory():
if (level.useDisk && !diskStore.contains(blockId)) {
// Сериализуем и пишем на диск
val data = entry match {
case DeserializedMemoryEntry(values, _, ct) =>
serializerManager.dataSerializeWithExplicitClassTag(blockId, values.iterator, ct)
case SerializedMemoryEntry(buffer, _, _) =>
buffer
}
diskStore.put(blockId) { channel =>
val buffer = new ChunkedByteBufferOutputStream(...)
data.writeFully(channel)
}
}
После записи на диск, при следующем чтении этого блока BlockManager будет читать из DiskStore — это медленнее (I/O), но блок не нужно пересчитывать.
DiskStore: файловая система и spark.local.dir
DiskStore хранит каждый блок как отдельный файл:
// core/src/main/scala/org/apache/spark/storage/DiskStore.scala
class DiskStore(conf: SparkConf, diskManager: DiskBlockManager, ...) {
def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
val file = diskManager.getFile(blockId) // Путь в spark.local.dir
val out = new CountingWritableChannel(openForWrite(file))
try {
writeFunc(out)
} finally {
out.close()
}
val size = out.count
blockSizes.put(blockId, size)
}
}
DiskBlockManager управляет файловой структурой в spark.local.dir:
spark.local.dir/
blockmgr-<uuid>/
0c/ # Hash-subdirectory (0x0 - 0xff)
rdd_42_7 # RDDBlockId(42, 7) — кэшированная партиция
shuffle_0_3_5.data # ShuffleDataBlockId
shuffle_0_3.index # ShuffleIndexBlockId
Субдиректории 00/ - ff/ — это 256 hash-bucket’ов для равномерного распределения файлов и предотвращения inode проблем при большом числе файлов. Имя файла — blockId.name.
Настрой spark.local.dir на быстрый NVMe SSD (желательно несколько дисков через запятую). Spark будет round-robin распределять файлы между дисками. В облаке (AWS EMR, GCP Dataproc) используй instance store (NVMe) вместо EBS для spark.local.dir. Разница в производительности shuffle и spill — 5-10x.
Production: наблюдение через Spark UI и логи
В Spark UI -> Storage tab смотри:
Size in MemoryvsSize on Disk: еслиSize on Diskненулевой — были spill-вытесненияFraction Cachedниже 100%: часть партиций вытеснена без записи на диск (MEMORY_ONLY) или ещё не кэширована
В executor логах ищи:
# Успешный unrolling
DEBUG MemoryStore: Block rdd_42_7 stored as values in memory
(estimated size 45.3 MiB, free 234.1 MiB)
# Вытеснение при новом unrolling
INFO MemoryStore: Memory use = 2.1 GiB (blocks) + 234.1 MiB (scratch
space shared with 3 task(s)) = 2.4 GiB. Target local block size: 512.0 MiB
INFO MemoryStore: Block rdd_18_3 evicted from memory (size: 78.3 MiB,
free: 0.0 B)
DEBUG BlockManager: Putting block rdd_18_3 on disk
# Провальный unrolling
WARN BlockManager: Not enough space to cache rdd_42_11 in memory!
(computed 512.0 MiB so far, but only 312.0 MiB free after evicting)
Попробуй сам
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import time
spark = SparkSession.builder \
.master("local[2]") \
.config("spark.executor.memory", "512m") \
.config("spark.memory.fraction", "0.6") \
.config("spark.memory.storageFraction", "0.5") \
.appName("storage-levels") \
.getOrCreate()
sc = spark.sparkContext
# Генерируем данные: ~100 MiB RDD
data = list(range(2_000_000))
rdd = sc.parallelize(data, numSlices=4)
# 1. MEMORY_ONLY: при нехватке памяти партиции теряются
t0 = time.time()
mem_only = rdd.persist(StorageLevel.MEMORY_ONLY)
print(f"MEMORY_ONLY count: {mem_only.count()}, time: {time.time()-t0:.2f}s")
# Второй count — из кэша (если влезло)
t0 = time.time()
print(f"MEMORY_ONLY 2nd: {mem_only.count()}, time: {time.time()-t0:.2f}s")
mem_only.unpersist()
# 2. MEMORY_AND_DISK_SER: сериализованный, спилл на диск
t0 = time.time()
mem_disk_ser = rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
print(f"MEM_DISK_SER count: {mem_disk_ser.count()}, time: {time.time()-t0:.2f}s")
t0 = time.time()
print(f"MEM_DISK_SER 2nd: {mem_disk_ser.count()}, time: {time.time()-t0:.2f}s")
mem_disk_ser.unpersist()
# 3. DISK_ONLY: всегда на диске, медленно
t0 = time.time()
disk_only = rdd.persist(StorageLevel.DISK_ONLY)
print(f"DISK_ONLY count: {disk_only.count()}, time: {time.time()-t0:.2f}s")
t0 = time.time()
print(f"DISK_ONLY 2nd: {disk_only.count()}, time: {time.time()-t0:.2f}s")
disk_only.unpersist()
spark.stop()
Ожидаемое поведение (на малой памяти):
MEMORY_ONLY count: 2000000, time: 2.14s
MEMORY_ONLY 2nd: 2000000, time: 0.42s # Из кэша
MEM_DISK_SER count: 2000000, time: 2.38s
MEM_DISK_SER 2nd: 2000000, time: 1.21s # Частично с диска
DISK_ONLY count: 2000000, time: 2.91s
DISK_ONLY 2nd: 2000000, time: 1.89s # Всегда с диска