Learning Platform
Глоссарий Troubleshooting
Урок 08.04 · 35 мин
Продвинутый
Tungstensun.misc.UnsafeMemoryBlockTaskMemoryManagerUnsafeExternalSorterOff-Heap

Tungsten: binary processing и memory management

Мы знаем формат строк (UnsafeRow), мы знаем как Encoder превращает объекты в этот формат. Теперь — инфраструктура, которая управляет самой памятью: как Spark выделяет большие блоки, как task-и запрашивают и освобождают память, и как UnsafeExternalSorter сортирует гигабайты данных не копируя и не десериализуя их.

sun.misc.Unsafe: прямой доступ к памяти

Весь Tungsten строится на sun.misc.Unsafe — internal JVM API, которое предоставляет прямые операции над памятью:

// Ключевые методы Unsafe, используемые Tungsten:
Unsafe unsafe = /* получается через reflection */;

// Выделить native память (off-heap):
long address = unsafe.allocateMemory(1024 * 1024 * 512L);  // 512 MB

// Освободить:
unsafe.freeMemory(address);

// Читать/писать напрямую по адресу:
unsafe.putLong(null, address + 8, 42L);  // baseObject=null для native
long val = unsafe.getLong(null, address + 8);

// Читать из Java array (on-heap):
byte[] arr = new byte[1024];
long base = unsafe.arrayBaseOffset(byte[].class);  // = 16 для byte[]
unsafe.putLong(arr, base + 8, 42L);
long val2 = unsafe.getLong(arr, base + 8);

// Сравнение блоков памяти:
boolean equal = unsafe.arrayEquals(arr1, off1, arr2, off2, len);

// Копирование:
unsafe.copyMemory(srcObj, srcOffset, dstObj, dstOffset, bytes);

Spark оборачивает Unsafe в org.apache.spark.unsafe.Platform — тонкий слой, который добавляет обработку ошибок и обеспечивает совместимость между JDK-версиями (в Java 17+ прямое использование sun.misc.Unsafe потребовало --add-opens):

// Platform.java в Spark 4.0:
public final class Platform {
    private static final Unsafe _UNSAFE;
    
    public static long getLong(Object object, long offset) {
        return _UNSAFE.getLong(object, offset);
    }
    
    public static void putLong(Object object, long offset, long value) {
        _UNSAFE.putLong(object, offset, value);
    }
    
    // Специализированные методы для массивов:
    public static final int BYTE_ARRAY_OFFSET;  // = _UNSAFE.arrayBaseOffset(byte[].class)
    public static final int INT_ARRAY_OFFSET;
    public static final int LONG_ARRAY_OFFSET;
}

Унификация через (baseObject, baseOffset) позволяет одному коду работать с on-heap и off-heap памятью:

  • On-heap: baseObject = byte[], baseOffset = Platform.BYTE_ARRAY_OFFSET + relative_offset
  • Off-heap: baseObject = null, baseOffset = native_address

MemoryBlock: абстракция над блоком памяти

org.apache.spark.unsafe.memory.MemoryBlock — это неделимая единица памяти в Tungsten:

public class MemoryBlock extends MemoryLocation {
    public final long size;
    private int pageNumber;  // для адресации в TaskMemoryManager

    // Создание on-heap блока из long[]:
    public static MemoryBlock fromLongArray(final long[] array) {
        return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
    }
    
    // Создание off-heap блока:
    // baseObject = null, offset = native address
}

MemoryLocation хранит пару (Object obj, long offset). Для off-heap obj = null, для on-heap obj — Java-объект (обычно long[]).

Почему long[] а не byte[] для on-heap? Выравнивание. long[] имеет базовый offset 16 байт на большинстве JVM (с compressed oops), что само по себе выровнено по 8 байтам. byte[] имеет базовый offset 16 байт, но доступ к произвольным позициям может нарушать 8-байтовое выравнивание. Tungsten хочет гарантированное 8-байтовое выравнивание для всех операций.

MemoryAllocator: два режима выделения

MemoryAllocator — интерфейс с двумя реализациями:

// HeapMemoryAllocator: выделяет long[] на JVM heap
public class HeapMemoryAllocator implements MemoryAllocator {
    public MemoryBlock allocate(long size) throws OutOfMemoryError {
        int numWords = (int)((size + 7) / 8);  // округление до 8 байт
        long[] array = new long[numWords];
        MemoryBlock memory = MemoryBlock.fromLongArray(array);
        // Инициализация нулями не нужна — long[] уже инициализирован JVM
        return memory;
    }
}

// UnsafeMemoryAllocator: выделяет native memory через Unsafe
public class UnsafeMemoryAllocator implements MemoryAllocator {
    public MemoryBlock allocate(long size) throws OutOfMemoryError {
        long address = Platform.allocateMemory(size);
        MemoryBlock memory = new MemoryBlock(null, address, size);
        return memory;
    }
    
    public void free(MemoryBlock memory) {
        Platform.freeMemory(memory.getBaseOffset());
    }
}

Выбор аллокатора определяется конфигурацией:

# On-heap Tungsten (дефолт): HeapMemoryAllocator
spark.conf.set("spark.memory.offHeap.enabled", "false")

# Off-heap Tungsten: UnsafeMemoryAllocator
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g")

TaskMemoryManager: бухгалтер памяти task-а

Каждый запущенный Spark task получает свой TaskMemoryManager — объект, который отвечает за все выделения памяти этого task-а. Взаимодействие:

MemoryManager (executor-level, глобальный)
    |
    +-- TaskMemoryManager (task-level, один на task)
            |
            +-- MemoryConsumer (конкретный алгоритм: сортировщик, hash map)
                    |
                    +-- MemoryBlock (страница памяти)

TaskMemoryManager работает со страницами (pages) — большими блоками памяти (обычно 64MB–128MB), которые выделяются через MemoryAllocator. Внутри страницы операторы управляют памятью сами через cursor.

Адресация страниц — 64-битная схема: старшие 13 бит — номер страницы (page number, до 8192 страниц), младшие 51 бит — offset внутри страницы. Это позволяет хранить полный адрес в одном long:

// TaskMemoryManager.encodePageNumberAndOffset:
public static long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
    final long pageNumber = page.getPageNumber();
    return (pageNumber << OFFSET_BITS) | offsetInPage;
}

// Декодирование:
public static int decodePageNumber(long pageNumberAndOffset) {
    return (int)(pageNumberAndOffset >>> OFFSET_BITS);
}

public static long decodeOffset(long pageNumberAndOffset) {
    return (pageNumberAndOffset & MASK_LONG_LOWER_51_BITS);
}

Это компактное представление используется в UnsafeInMemorySorter: каждая строка сортируемых данных хранится как пара (encoded_address, sort_key) — два long по 8 байт = 16 байт overhead на строку.

TaskMemoryManager: структура адресации
Биты 63-51Номер страницы памяти. Максимум 2^13 = 8192 страниц на task. При страницах 64MB это максимум 512GB на task — значительно больше практического размера executor.
Биты 50-0Смещение внутри страницы в байтах. 2^51 = 2 петабайта — максимальный размер страницы. На практике страницы 64MB (используются только нижние 26 бит из 51).
Encoded pointer: 8 байт на записьКаждый long в массиве UnsafeInMemorySorter кодирует полный адрес записи. Сортировка массива по sort_key = перемещение указателей, не данных. Данные остаются на месте.
decode
MemoryBlock + offset -> UnsafeRowРеальные данные: UnsafeRow в MemoryBlock. TaskMemoryManager преобразует (pageNumber, offset) в (baseObject, baseOffset) для Platform.getLong.

MemoryConsumer и spill механизм

MemoryConsumer — базовый класс для всех алгоритмов, потребляющих память: UnsafeExternalSorter, BytesToBytesMap (hash map для join), UnsafeHashedRelation.

public abstract class MemoryConsumer {
    protected final TaskMemoryManager taskMemoryManager;
    protected final long pageSize;
    
    // Запросить страницу: если памяти нет — может вызвать spill у других consumers
    protected MemoryBlock allocatePage(long required) {
        return taskMemoryManager.allocatePage(required, this);
    }
    
    // Этот метод вызывается TaskMemoryManager когда памяти не хватает:
    public abstract long spill(long size, MemoryConsumer trigger) throws IOException;
}

Когда TaskMemoryManager не может выделить запрошенную память (все страницы заняты), он вызывает spill() у существующих потребителей. UnsafeExternalSorter.spill() записывает текущие данные в memory-mapped файл и освобождает страницы. Новый запрос может быть выполнен.

Это graceful degradation: операция не падает с OOM, а замедляется из-за disk I/O. Production-приоритет: сначала spill, потом OOM.

UnsafeExternalSorter: сортировка без десериализации

org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter — один из самых технически изощрённых компонентов Spark. Он сортирует строки произвольного размера, хранящиеся в MemoryBlock-ах, без десериализации содержимого.

Ключевая идея: сортировать нужно не данные, а указатели на данные. Данные (UnsafeRow байты) остаются на месте — перемещается только компактный массив (pointer, key).

Структура UnsafeInMemorySorter:

// Внутренний массив для сортировки: по 2 long на запись
// [encoded_address_0][prefix_key_0][encoded_address_1][prefix_key_1]...
private long[] array;  // LongArray, обёртка над MemoryBlock
private int usableCapacity;  // = array.length / 2

prefix_key — это prefix sort key: первые 8 байт ключа сортировки (или хеш ключа для hash-based сортировки), упакованные в long. При сравнении двух записей сначала сравниваются prefix_key — если они различаются, результат известен без чтения данных. Только при равных prefix-ах идёт полное сравнение через RecordComparator.

Алгоритм сортировки: radix sort

UnsafeInMemorySorter использует radix sort по prefix key:

// RadixSort.sort() из Spark 4.0:
// Сортирует long[] по bits [startByteIndex*8, endByteIndex*8) каждого second element
// (second element = prefix_key в паре [address, prefix_key])
RadixSort.sort(array, currentSortBufferOffset, newSortedBuffer, size,
               0, 7,  // startByteIndex=0, endByteIndex=7 (все 8 байт)
               false, false);

Radix sort — O(n) по времени, не требует сравнений в hot loop. Для 8-байтных ключей Spark делает 8 passes по 1 байту каждый. Каждый pass — простое распределение по 256 buckets.

Когда prefix key недостаточно (несколько записей с одинаковым ключом), вызывается TimSort с полным RecordComparator.

Spill и merge

Когда памяти не хватает, UnsafeExternalSorter.spill():

  1. Создаёт UnsafeSorterSpillWriter
  2. Итерирует по отсортированному UnsafeInMemorySorter
  3. Пишет записи в SpillFile (memory-mapped файл) в отсортированном порядке
  4. Очищает все MemoryBlock страницы, освобождая память
  5. Добавляет SpillFile в список разлитых файлов

После нескольких spill-ов при финальном слиянии UnsafeExternalSorter делает K-way merge:

// K-way merge через priority queue:
// Каждый SpillFile открывает UnsafeSorterSpillReader
// PriorityQueue упорядочивает readers по текущему ключу
// Merge идёт линейно: O(n log k) где k = число spill-файлов
UnsafeExternalSorter: in-memory сортировка и spill
insertRecord(): пишем данные + pointerВходные данные: UnsafeRow поступают из предыдущего оператора. insertRecord() пишет их в текущую MemoryBlock страницу и добавляет (encoded_address, prefix_key) в массив сортировки.
Pointer array (radix sort)Array[(address, prefix_key)]: компактный массив указателей. 16 байт на строку overhead. Для 10M строк = 160MB — значительно меньше самих данных. Radix sort на этом массиве = sort без перемещения данных.
MemoryBlock pages (данные)MemoryBlock pages: реальные UnsafeRow байты. Не перемещаются при сортировке. Могут занимать сотни MB. При spill — сбрасываются на диск в sorted order через pointer array.
Нехватка памяти: spill()OOM риск: TaskMemoryManager не может выделить новую страницу. Вызывает spill() на этом consumer. spill() пишет sorted данные в SpillFile, освобождает MemoryBlock pages.
SpillFile (sorted, disk)SpillFile: memory-mapped файл на локальном диске executor. Записи идут в порядке сортировки. При финальном merge K SpillFile-ов объединяются через priority queue.
Sorted iterator: UnsafeRow без десериализацииИтоговый результат: отсортированные UnsafeRow через iterator. Если не было spill — из памяти. Если был spill — K-way merge из памяти + disk. Десериализации не было ни разу.

Cache-aware computation: Tungsten’s third pillar

Tungsten документирует три направления оптимизации. Первые два (off-heap memory и code generation) мы разбирали в модуле 06. Третье — cache-aware algorithms: переработка классических алгоритмов для лучшей локальности кэша.

Sort: cache-oblivious vs cache-aware

Стандартный sort на объектах: comparator вызывает .compareTo() который читает из двух произвольных мест heap — classic cache-unfriendly access pattern. При сортировке 1M строк = 1M * log2(1M) ≈ 20M comparisons, каждое с высокой вероятностью cache miss.

Tungsten sort: все ключи (prefix_key) в contiguous array. При radix sort — sequential read по всему массиву. При TimSort с prefix — большинство сравнений разрешаются по prefix, без чтения данных. Cache hit rate близок к 100% для prefix comparisons.

Hash join: BytesToBytesMap

BytesToBytesMap — hash map для probe-side в hash join. В отличие от java.util.HashMap, она хранит ключи и значения как contiguous bytes в MemoryBlock, а не как отдельные объекты. Lookup:

  1. Вычислить hash key (на UnsafeRow байтах без десериализации)
  2. Найти bucket в compact array
  3. Сравнить ключ (bytes-to-bytes comparison через Platform.arrayEquals)
  4. Вернуть value (pointer в MemoryBlock)

Нет boxing, нет allocation, нет GC. Каждый bucket — 8 байт (encoded pointer). При 1M entries и load factor 0.7 — структура занимает ~11MB для bucket array, плюс actual data.

Конфигурация и диагностика

# Основные Tungsten-конфиги:
spark.conf.set("spark.memory.fraction", "0.6")  # доля executor memory под Tungsten
spark.conf.set("spark.memory.storageFraction", "0.5")  # из execution memory под cache
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g")

# Tungsten code generation:
spark.conf.set("spark.sql.codegen.wholeStage", "true")  # дефолт в Spark 4.0
spark.conf.set("spark.sql.codegen.factoryMode", "CODEGEN_ONLY")  # не fallback на интерпретацию

# Для диагностики spill:
# В Spark UI -> SQL -> Stage -> показывает "spill (memory)" и "spill (disk)" метрики
# В логах executor: "Task N spilled X MB" при verbose GC logs

Метрики spill в Spark UI — важный диагностический сигнал:

  • spill (memory): сколько данных было в памяти перед spill (несжатый размер)
  • spill (disk): сколько занял spill-файл на диске (после сжатия, если включено)

Если spill (memory) >> spill (disk), данные хорошо сжимаются — рассмотрите spark.shuffle.spill.compress=true (дефолт в Spark 4.0).

Если видите spill в каждом stage — executor memory недостаточна или parallelism слишком мала (мало партиций = большие данные на партицию):

# Диагностика: оптимальный размер партиции для sorting
# Правило: partition_size < executor_memory * memory_fraction * 0.5
# При executor_memory=8GB, memory_fraction=0.6: partition < 2.4GB

optimal_partitions = total_data_gb / 0.5  # целевой размер ~500MB на партицию
spark.conf.set("spark.sql.shuffle.partitions", str(int(optimal_partitions)))

Tungsten в контексте Project Photon (Delta Lake / Databricks)

Это расширение контекста: Databricks разработали Photon — vectorized native execution engine, написанный на C++. Photon обходит JVM полностью и работает с теми же UnsafeRow данными через JNI, получая преимущества SIMD-инструкций (AVX-512) и дополнительно снижая overhead JVM boundary.

В open-source Apache Spark 4.0 Photon недоступен, но архитектурный принцип тот же: бинарное представление данных (UnsafeRow) и off-heap memory делают возможной как JVM-оптимизацию (Tungsten), так и native-оптимизацию (Photon) без изменения формата данных.

Попробуй сам

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc

spark = SparkSession.builder \
    .appName("tungsten-demo") \
    .config("spark.memory.offHeap.enabled", "false") \
    .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
    .getOrCreate()

# Генерируем данные для наблюдения за сортировкой
import random
random.seed(42)

N = 1_000_000
data = [(i, f"name_{random.randint(0, 100000)}", float(random.randint(0, 1000)))
        for i in range(N)]

df = spark.createDataFrame(data, ["id", "name", "value"])

# 1. Сортировка: смотрим в Spark UI метрики spill
print("=== Sort (observe in Spark UI for spill metrics) ===")
df.orderBy("name").write.format("noop").mode("overwrite").save()

# 2. SortMergeJoin: два UnsafeExternalSorter
df2 = df.withColumnRenamed("id", "id2").withColumnRenamed("name", "key")
df_for_join = df.withColumnRenamed("name", "key")

print("=== SortMergeJoin (two external sorters) ===")
joined = df_for_join.join(df2, "key")
joined.explain()  # должен показать SortMergeJoin

# 3. Aggrегация: BytesToBytesMap в действии
print("=== GroupBy aggregation (BytesToBytesMap hash map) ===")
df.groupBy("name").count().explain("extended")
# В Physical Plan: HashAggregate -> partial, затем Exchange, затем HashAggregate -> final

# 4. Посмотреть метрики через SparkContext
sc = spark.sparkContext
print(f"Executors: {len(sc.statusTracker().getExecutorInfos())}")

Откройте Spark UI на порту 4040 (или 4041/4042 если уже занят). Для job с сортировкой перейдите в SQL -> последний job -> посмотрите на метрики задач: spill (memory), spill (disk). Если spark.executor.memory небольшой (например 1GB) и данных много — spill будет виден.

Проверка знанийKnowledge check
Spark job с сортировкой 10GB данных показывает в Spark UI: spill (memory) = 8GB, spill (disk) = 2GB. Executor настроен на 4GB памяти. Предложите три конкретных изменения конфигурации для устранения spill, объясните механизм каждого.
ОтветAnswer
Проблема: UnsafeExternalSorter не вмещает данные партиции в execution memory (4GB * 0.6 fraction * ~0.5 = ~1.2GB доступно под сортировку одной партиции). Решение 1: увеличить spark.executor.memory до 8-16GB — напрямую увеличивает execution memory pool, TaskMemoryManager может выделить больше MemoryBlock страниц перед необходимостью spill. Решение 2: увеличить spark.sql.shuffle.partitions (дефолт 200) до 400-800 — меньше данных на партицию, каждый UnsafeExternalSorter работает с меньшим объёмом. 10GB / 400 партиций = 25MB на партицию, легко помещается в память. Решение 3: включить spark.memory.offHeap.enabled=true с spark.memory.offHeap.size=4g — Tungsten получает дополнительный off-heap пул, невидимый для GC. TaskMemoryManager может выделять MemoryBlock из native memory, что особенно эффективно когда JVM heap под давлением GC. Дополнительно: spark.shuffle.spill.compress=true (дефолт) — если spill неизбежен, хотя бы минимизировать disk I/O через сжатие.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. TaskMemoryManager кодирует адрес записи в один long: старшие 13 бит = pageNumber, младшие 51 бит = offset. Сколько максимальных страниц может иметь один task, и какой максимальный размер одной страницы?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4