Tungsten: binary processing и memory management
Мы знаем формат строк (UnsafeRow), мы знаем как Encoder превращает объекты в этот формат. Теперь — инфраструктура, которая управляет самой памятью: как Spark выделяет большие блоки, как task-и запрашивают и освобождают память, и как UnsafeExternalSorter сортирует гигабайты данных не копируя и не десериализуя их.
sun.misc.Unsafe: прямой доступ к памяти
Весь Tungsten строится на sun.misc.Unsafe — internal JVM API, которое предоставляет прямые операции над памятью:
// Ключевые методы Unsafe, используемые Tungsten:
Unsafe unsafe = /* получается через reflection */;
// Выделить native память (off-heap):
long address = unsafe.allocateMemory(1024 * 1024 * 512L); // 512 MB
// Освободить:
unsafe.freeMemory(address);
// Читать/писать напрямую по адресу:
unsafe.putLong(null, address + 8, 42L); // baseObject=null для native
long val = unsafe.getLong(null, address + 8);
// Читать из Java array (on-heap):
byte[] arr = new byte[1024];
long base = unsafe.arrayBaseOffset(byte[].class); // = 16 для byte[]
unsafe.putLong(arr, base + 8, 42L);
long val2 = unsafe.getLong(arr, base + 8);
// Сравнение блоков памяти:
boolean equal = unsafe.arrayEquals(arr1, off1, arr2, off2, len);
// Копирование:
unsafe.copyMemory(srcObj, srcOffset, dstObj, dstOffset, bytes);
Spark оборачивает Unsafe в org.apache.spark.unsafe.Platform — тонкий слой, который добавляет обработку ошибок и обеспечивает совместимость между JDK-версиями (в Java 17+ прямое использование sun.misc.Unsafe потребовало --add-opens):
// Platform.java в Spark 4.0:
public final class Platform {
private static final Unsafe _UNSAFE;
public static long getLong(Object object, long offset) {
return _UNSAFE.getLong(object, offset);
}
public static void putLong(Object object, long offset, long value) {
_UNSAFE.putLong(object, offset, value);
}
// Специализированные методы для массивов:
public static final int BYTE_ARRAY_OFFSET; // = _UNSAFE.arrayBaseOffset(byte[].class)
public static final int INT_ARRAY_OFFSET;
public static final int LONG_ARRAY_OFFSET;
}
Унификация через (baseObject, baseOffset) позволяет одному коду работать с on-heap и off-heap памятью:
- On-heap:
baseObject = byte[],baseOffset = Platform.BYTE_ARRAY_OFFSET + relative_offset - Off-heap:
baseObject = null,baseOffset = native_address
MemoryBlock: абстракция над блоком памяти
org.apache.spark.unsafe.memory.MemoryBlock — это неделимая единица памяти в Tungsten:
public class MemoryBlock extends MemoryLocation {
public final long size;
private int pageNumber; // для адресации в TaskMemoryManager
// Создание on-heap блока из long[]:
public static MemoryBlock fromLongArray(final long[] array) {
return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
}
// Создание off-heap блока:
// baseObject = null, offset = native address
}
MemoryLocation хранит пару (Object obj, long offset). Для off-heap obj = null, для on-heap obj — Java-объект (обычно long[]).
Почему long[] а не byte[] для on-heap? Выравнивание. long[] имеет базовый offset 16 байт на большинстве JVM (с compressed oops), что само по себе выровнено по 8 байтам. byte[] имеет базовый offset 16 байт, но доступ к произвольным позициям может нарушать 8-байтовое выравнивание. Tungsten хочет гарантированное 8-байтовое выравнивание для всех операций.
MemoryAllocator: два режима выделения
MemoryAllocator — интерфейс с двумя реализациями:
// HeapMemoryAllocator: выделяет long[] на JVM heap
public class HeapMemoryAllocator implements MemoryAllocator {
public MemoryBlock allocate(long size) throws OutOfMemoryError {
int numWords = (int)((size + 7) / 8); // округление до 8 байт
long[] array = new long[numWords];
MemoryBlock memory = MemoryBlock.fromLongArray(array);
// Инициализация нулями не нужна — long[] уже инициализирован JVM
return memory;
}
}
// UnsafeMemoryAllocator: выделяет native memory через Unsafe
public class UnsafeMemoryAllocator implements MemoryAllocator {
public MemoryBlock allocate(long size) throws OutOfMemoryError {
long address = Platform.allocateMemory(size);
MemoryBlock memory = new MemoryBlock(null, address, size);
return memory;
}
public void free(MemoryBlock memory) {
Platform.freeMemory(memory.getBaseOffset());
}
}
Выбор аллокатора определяется конфигурацией:
# On-heap Tungsten (дефолт): HeapMemoryAllocator
spark.conf.set("spark.memory.offHeap.enabled", "false")
# Off-heap Tungsten: UnsafeMemoryAllocator
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g")
TaskMemoryManager: бухгалтер памяти task-а
Каждый запущенный Spark task получает свой TaskMemoryManager — объект, который отвечает за все выделения памяти этого task-а. Взаимодействие:
MemoryManager (executor-level, глобальный)
|
+-- TaskMemoryManager (task-level, один на task)
|
+-- MemoryConsumer (конкретный алгоритм: сортировщик, hash map)
|
+-- MemoryBlock (страница памяти)
TaskMemoryManager работает со страницами (pages) — большими блоками памяти (обычно 64MB–128MB), которые выделяются через MemoryAllocator. Внутри страницы операторы управляют памятью сами через cursor.
Адресация страниц — 64-битная схема: старшие 13 бит — номер страницы (page number, до 8192 страниц), младшие 51 бит — offset внутри страницы. Это позволяет хранить полный адрес в одном long:
// TaskMemoryManager.encodePageNumberAndOffset:
public static long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
final long pageNumber = page.getPageNumber();
return (pageNumber << OFFSET_BITS) | offsetInPage;
}
// Декодирование:
public static int decodePageNumber(long pageNumberAndOffset) {
return (int)(pageNumberAndOffset >>> OFFSET_BITS);
}
public static long decodeOffset(long pageNumberAndOffset) {
return (pageNumberAndOffset & MASK_LONG_LOWER_51_BITS);
}
Это компактное представление используется в UnsafeInMemorySorter: каждая строка сортируемых данных хранится как пара (encoded_address, sort_key) — два long по 8 байт = 16 байт overhead на строку.
MemoryConsumer и spill механизм
MemoryConsumer — базовый класс для всех алгоритмов, потребляющих память: UnsafeExternalSorter, BytesToBytesMap (hash map для join), UnsafeHashedRelation.
public abstract class MemoryConsumer {
protected final TaskMemoryManager taskMemoryManager;
protected final long pageSize;
// Запросить страницу: если памяти нет — может вызвать spill у других consumers
protected MemoryBlock allocatePage(long required) {
return taskMemoryManager.allocatePage(required, this);
}
// Этот метод вызывается TaskMemoryManager когда памяти не хватает:
public abstract long spill(long size, MemoryConsumer trigger) throws IOException;
}
Когда TaskMemoryManager не может выделить запрошенную память (все страницы заняты), он вызывает spill() у существующих потребителей. UnsafeExternalSorter.spill() записывает текущие данные в memory-mapped файл и освобождает страницы. Новый запрос может быть выполнен.
Это graceful degradation: операция не падает с OOM, а замедляется из-за disk I/O. Production-приоритет: сначала spill, потом OOM.
UnsafeExternalSorter: сортировка без десериализации
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter — один из самых технически изощрённых компонентов Spark. Он сортирует строки произвольного размера, хранящиеся в MemoryBlock-ах, без десериализации содержимого.
Ключевая идея: сортировать нужно не данные, а указатели на данные. Данные (UnsafeRow байты) остаются на месте — перемещается только компактный массив (pointer, key).
Структура UnsafeInMemorySorter:
// Внутренний массив для сортировки: по 2 long на запись
// [encoded_address_0][prefix_key_0][encoded_address_1][prefix_key_1]...
private long[] array; // LongArray, обёртка над MemoryBlock
private int usableCapacity; // = array.length / 2
prefix_key — это prefix sort key: первые 8 байт ключа сортировки (или хеш ключа для hash-based сортировки), упакованные в long. При сравнении двух записей сначала сравниваются prefix_key — если они различаются, результат известен без чтения данных. Только при равных prefix-ах идёт полное сравнение через RecordComparator.
Алгоритм сортировки: radix sort
UnsafeInMemorySorter использует radix sort по prefix key:
// RadixSort.sort() из Spark 4.0:
// Сортирует long[] по bits [startByteIndex*8, endByteIndex*8) каждого second element
// (second element = prefix_key в паре [address, prefix_key])
RadixSort.sort(array, currentSortBufferOffset, newSortedBuffer, size,
0, 7, // startByteIndex=0, endByteIndex=7 (все 8 байт)
false, false);
Radix sort — O(n) по времени, не требует сравнений в hot loop. Для 8-байтных ключей Spark делает 8 passes по 1 байту каждый. Каждый pass — простое распределение по 256 buckets.
Когда prefix key недостаточно (несколько записей с одинаковым ключом), вызывается TimSort с полным RecordComparator.
Spill и merge
Когда памяти не хватает, UnsafeExternalSorter.spill():
- Создаёт
UnsafeSorterSpillWriter - Итерирует по отсортированному
UnsafeInMemorySorter - Пишет записи в
SpillFile(memory-mapped файл) в отсортированном порядке - Очищает все
MemoryBlockстраницы, освобождая память - Добавляет
SpillFileв список разлитых файлов
После нескольких spill-ов при финальном слиянии UnsafeExternalSorter делает K-way merge:
// K-way merge через priority queue:
// Каждый SpillFile открывает UnsafeSorterSpillReader
// PriorityQueue упорядочивает readers по текущему ключу
// Merge идёт линейно: O(n log k) где k = число spill-файлов
Cache-aware computation: Tungsten’s third pillar
Tungsten документирует три направления оптимизации. Первые два (off-heap memory и code generation) мы разбирали в модуле 06. Третье — cache-aware algorithms: переработка классических алгоритмов для лучшей локальности кэша.
Sort: cache-oblivious vs cache-aware
Стандартный sort на объектах: comparator вызывает .compareTo() который читает из двух произвольных мест heap — classic cache-unfriendly access pattern. При сортировке 1M строк = 1M * log2(1M) ≈ 20M comparisons, каждое с высокой вероятностью cache miss.
Tungsten sort: все ключи (prefix_key) в contiguous array. При radix sort — sequential read по всему массиву. При TimSort с prefix — большинство сравнений разрешаются по prefix, без чтения данных. Cache hit rate близок к 100% для prefix comparisons.
Hash join: BytesToBytesMap
BytesToBytesMap — hash map для probe-side в hash join. В отличие от java.util.HashMap, она хранит ключи и значения как contiguous bytes в MemoryBlock, а не как отдельные объекты. Lookup:
- Вычислить hash key (на UnsafeRow байтах без десериализации)
- Найти bucket в compact array
- Сравнить ключ (bytes-to-bytes comparison через Platform.arrayEquals)
- Вернуть value (pointer в MemoryBlock)
Нет boxing, нет allocation, нет GC. Каждый bucket — 8 байт (encoded pointer). При 1M entries и load factor 0.7 — структура занимает ~11MB для bucket array, плюс actual data.
Конфигурация и диагностика
# Основные Tungsten-конфиги:
spark.conf.set("spark.memory.fraction", "0.6") # доля executor memory под Tungsten
spark.conf.set("spark.memory.storageFraction", "0.5") # из execution memory под cache
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g")
# Tungsten code generation:
spark.conf.set("spark.sql.codegen.wholeStage", "true") # дефолт в Spark 4.0
spark.conf.set("spark.sql.codegen.factoryMode", "CODEGEN_ONLY") # не fallback на интерпретацию
# Для диагностики spill:
# В Spark UI -> SQL -> Stage -> показывает "spill (memory)" и "spill (disk)" метрики
# В логах executor: "Task N spilled X MB" при verbose GC logs
Метрики spill в Spark UI — важный диагностический сигнал:
spill (memory): сколько данных было в памяти перед spill (несжатый размер)spill (disk): сколько занял spill-файл на диске (после сжатия, если включено)
Если spill (memory) >> spill (disk), данные хорошо сжимаются — рассмотрите spark.shuffle.spill.compress=true (дефолт в Spark 4.0).
Если видите spill в каждом stage — executor memory недостаточна или parallelism слишком мала (мало партиций = большие данные на партицию):
# Диагностика: оптимальный размер партиции для sorting
# Правило: partition_size < executor_memory * memory_fraction * 0.5
# При executor_memory=8GB, memory_fraction=0.6: partition < 2.4GB
optimal_partitions = total_data_gb / 0.5 # целевой размер ~500MB на партицию
spark.conf.set("spark.sql.shuffle.partitions", str(int(optimal_partitions)))
Tungsten в контексте Project Photon (Delta Lake / Databricks)
Это расширение контекста: Databricks разработали Photon — vectorized native execution engine, написанный на C++. Photon обходит JVM полностью и работает с теми же UnsafeRow данными через JNI, получая преимущества SIMD-инструкций (AVX-512) и дополнительно снижая overhead JVM boundary.
В open-source Apache Spark 4.0 Photon недоступен, но архитектурный принцип тот же: бинарное представление данных (UnsafeRow) и off-heap memory делают возможной как JVM-оптимизацию (Tungsten), так и native-оптимизацию (Photon) без изменения формата данных.
Попробуй сам
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc
spark = SparkSession.builder \
.appName("tungsten-demo") \
.config("spark.memory.offHeap.enabled", "false") \
.config("spark.sql.autoBroadcastJoinThreshold", "-1") \
.getOrCreate()
# Генерируем данные для наблюдения за сортировкой
import random
random.seed(42)
N = 1_000_000
data = [(i, f"name_{random.randint(0, 100000)}", float(random.randint(0, 1000)))
for i in range(N)]
df = spark.createDataFrame(data, ["id", "name", "value"])
# 1. Сортировка: смотрим в Spark UI метрики spill
print("=== Sort (observe in Spark UI for spill metrics) ===")
df.orderBy("name").write.format("noop").mode("overwrite").save()
# 2. SortMergeJoin: два UnsafeExternalSorter
df2 = df.withColumnRenamed("id", "id2").withColumnRenamed("name", "key")
df_for_join = df.withColumnRenamed("name", "key")
print("=== SortMergeJoin (two external sorters) ===")
joined = df_for_join.join(df2, "key")
joined.explain() # должен показать SortMergeJoin
# 3. Aggrегация: BytesToBytesMap в действии
print("=== GroupBy aggregation (BytesToBytesMap hash map) ===")
df.groupBy("name").count().explain("extended")
# В Physical Plan: HashAggregate -> partial, затем Exchange, затем HashAggregate -> final
# 4. Посмотреть метрики через SparkContext
sc = spark.sparkContext
print(f"Executors: {len(sc.statusTracker().getExecutorInfos())}")
Откройте Spark UI на порту 4040 (или 4041/4042 если уже занят). Для job с сортировкой перейдите в SQL -> последний job -> посмотрите на метрики задач: spill (memory), spill (disk). Если spark.executor.memory небольшой (например 1GB) и данных много — spill будет виден.