Байтовая раскладка UnsafeRow
Модуль 06 объяснил, что Tungsten переходит на бинарное представление данных и уходит от JVM-объектов. Здесь мы вскроем это представление до последнего байта: как именно устроена каждая строка, какая математика стоит за адресацией полей, и почему такая раскладка не случайна — она напрямую определяет производительность сортировки, перемешивания и агрегации.
InternalRow: контракт без реализации
Всё, что Spark обрабатывает внутри движка — это InternalRow. Это абстрактный базовый класс в пакете org.apache.spark.sql.catalyst.expressions, и у него больше десятка наследников:
UnsafeRow— основное бинарное представление для выполненияGenericInternalRow— массивObject[], используется в тестах и некоторых read-pathSpecificInternalRow/MutableUnsafeRow— mutable варианты для code generationJoinedRow— объединяет дваInternalRowбез копирования (логический view)
InternalRow предоставляет унифицированный интерфейс: getInt(ordinal), getLong(ordinal), getUTF8String(ordinal), isNullAt(ordinal). Конкретные реализации отличаются тем, где и как они хранят данные. GenericInternalRow хранит Array[Any] на JVM heap — каждое обращение это распакованный боксинг. UnsafeRow хранит непрерывный бинарный буфер и вычисляет адрес каждого поля математически.
Ключевой момент: Spark никогда не вызывает row.getInt() в hot path вручную. Catalyst генерирует Java-код, который делает это через Platform.getLong(baseObject, baseOffset + fieldOffset) — напрямую, через sun.misc.Unsafe, без virtual dispatch.
Три региона: анатомия UnsafeRow
Каждый UnsafeRow — это ровно три следующих друг за другом блока памяти. Размер каждого блока строго выравнен до 8 байт (64-битное выравнивание), что позволяет CPU читать поля одной инструкцией загрузки.
Рассмотрим конкретную схему: (id: Long, name: String, score: Double, tags: Array[String]). Четыре поля. Null bitmap: ceil(4/64) * 8 = 8 байт. Fixed-Length Region: 4 * 8 = 32 байт. Начало Variable-Length Region: смещение 40 байт от начала строки.
Offset Размер Содержимое
------ ------ ----------
0 8 Null bitmap: 0x0000000000000000 (все non-null)
8 8 id = 42L --> 0x000000000000002A
16 8 name (offset|length) --> 0x0000002800000005 (offset=40, len=5)
24 8 score = 3.14 --> IEEE 754 double bits
32 8 tags (offset|length) --> 0x0000002D00000028 (offset=45, len=40)
40 8 "Alice" в UTF-8: 0x 41 6C 69 63 65 00 00 00 (padded to 8 bytes)
48 40 сериализованный UnsafeArrayData для tags
Ключевой инсайт: чтение id — это одна инструкция LOAD по адресу baseOffset + 8. Никакого pointer chasing, никакого virtual dispatch. Чтение name — это LOAD по baseOffset + 16, получение packed (offset, length), затем LOAD по baseOffset + offset. Два memory access вместо одного — но оба sequential, и CPU prefetcher справляется.
Null Bitmap: битовая адресация
Исходник UnsafeRow.java содержит:
public static int calculateBitSetWidthInBytes(int numFields) {
return ((numFields + 63) / 64) * 8;
}
(numFields + 63) / 64 — это целочисленное деление с округлением вверх, то есть ceil(numFields / 64.0). Умноженное на 8 — количество байт для хранения битов группами по 64.
Проверка null:
@Override
public boolean isNullAt(int ordinal) {
assertIndexIsValid(ordinal);
return BitSetMethods.isSet(baseObject, baseOffset, ordinal);
}
BitSetMethods.isSet делает:
public static boolean isSet(Object baseObject, long baseOffset, int index) {
final long mask = 1L << (index & 0x3f); // index % 64
final long wordOffset = baseOffset + (index >> 6) * 8; // index / 64 * 8
final long word = Platform.getLong(baseObject, wordOffset);
return (word & mask) != 0;
}
Три операции: битовый сдвиг, умножение на 8, одна LOAD. Именно такую скорость Spark получает вместо if (obj == null) на Java-объектах.
Запись null при построении строки:
public void setNullAt(int ordinal) {
assertIndexIsValid(ordinal);
BitSetMethods.set(baseObject, baseOffset, ordinal);
// Дополнительно: обнуляем fixed-length слот для предсказуемого поведения
Platform.putLong(baseObject, getFieldOffset(ordinal), 0);
}
Fixed-Length Region: слоты по 8 байт
Каждое поле занимает ровно 8 байт в Fixed-Length Region, независимо от типа данных:
public long getFieldOffset(int ordinal) {
return baseOffset + bitSetWidthInBytes + ordinal * 8L;
}
Для Int (4 байта): значение хранится как long с нулями в старших битах. getInt(ordinal) читает 8 байт и берёт нижние 4:
@Override
public int getInt(int ordinal) {
assertIndexIsValid(ordinal);
return Platform.getInt(baseObject, getFieldOffset(ordinal));
}
Platform.getInt читает ровно 4 байта — JVM выравнивает memory access по размеру типа. На little-endian (x86) это нижние 4 байта 8-байтового слота, что соответствует маленькому значению int без дополнительных операций.
Почему 8 байт на поле, даже для Boolean (1 байт)? Выравнивание. Если поле 3 расположено по неравному адресу, CPU на x86 всё равно прочитает его, но на RISC-архитектурах (ARM, MIPS) неравный доступ бросает SIGBUS. Кроме того, 8-байтовое выравнивание означает, что каждое поле помещается ровно в один кэш-line word, без split access — это важно для атомарности операций на многоядерных системах.
Для Double:
@Override
public double getDouble(int ordinal) {
assertIndexIsValid(ordinal);
return Platform.getDouble(baseObject, getFieldOffset(ordinal));
}
IEEE 754 double — это уже 8 байт, так что слот заполняется полностью.
Variable-Length Region: offset+length encoding
Строки, массивы, MapData и вложенные InternalRow не могут поместиться в 8-байтовый слот. Вместо этого в Fixed-Length Region хранится packed reference: старшие 4 байта — offset от начала UnsafeRow (не от baseOffset!), младшие 4 байта — длина в байтах.
@Override
public UTF8String getUTF8String(int ordinal) {
if (isNullAt(ordinal)) return null;
final long offsetAndSize = getLong(ordinal);
final int offset = (int)(offsetAndSize >> 32);
final int size = (int)offsetAndSize;
return UTF8String.fromAddress(baseObject, baseOffset + offset, size);
}
Операция >> 32 извлекает старшие 4 байта как offset, cast to int — это младшие 4 байта. UTF8String.fromAddress создаёт view поверх исходной памяти без копирования.
Важная деталь: offset отсчитывается от начала строки (позиция 0), а не от начала Variable-Length Region. Это означает, что код, который читает offset, может напрямую прибавить его к baseOffset:
// Правильно: baseOffset + offset_from_row_start
Platform.copyMemory(baseObject, baseOffset + offset, dst, dstOffset, size);
Данные в Variable-Length Region выровнены до 8 байт. После строки "Alice" (5 байт) добавляется 3 байта паддинга, чтобы следующий объект начинался по кратному 8 адресу.
UnsafeArrayData и вложенные структуры
Массивы внутри UnsafeRow хранятся как UnsafeArrayData. Их структура аналогична: заголовок с числом элементов и null bitmap, затем fixed-length слоты для каждого элемента, затем variable-length данные для строковых элементов.
UnsafeArrayData для Array[String]("hello", null, "world"):
Offset 0: 8 байт — numElements = 3
Offset 8: 8 байт — null bitmap: бит 1 = 1 (второй элемент null)
Offset 16: 8 байт — slot 0: (offset=40, length=5) для "hello"
Offset 24: 8 байт — slot 1: 0L (null, слот не используется)
Offset 32: 8 байт — slot 2: (offset=48, length=5) для "world"
Offset 40: 8 байт — "hello\x00\x00\x00"
Offset 48: 8 байт — "world\x00\x00\x00"
UnsafeMapData состоит из двух UnsafeArrayData: один для ключей, один для значений. Header хранит keyArraySize — смещение между ними.
Cache locality: почему это всё важно
Процессор L1 cache line — обычно 64 байта. Когда Spark сканирует колонку id (Long) в 1 000 000 строк через broadcast hash join или агрегацию, паттерн доступа к памяти определяет всё.
С GenericInternalRow (массив объектов): каждая строка — отдельный Java-объект где-то на heap. Читать id из 1М строк означает 1М случайных memory access — практически гарантированный cache miss на каждой строке. При L1 latency 4ns и L3 latency 40ns это 40 миллисекунд только на промахи кэша для 1М строк.
С UnsafeRow в contiguous buffer: строки лежат одна за другой. Читая строку N, CPU загружает в L1 кусок памяти, который также содержит начало строки N+1 и N+2. Prefetcher CPU предсказывает sequential access и загружает следующие строки заблаговременно. Результат — effective latency близок к L1, то есть ~4ns.
Числа для строки из 5 полей (Long, String, Double, Long, Boolean) с типичной Variable-Length Region 32 байт:
- Размер строки UnsafeRow: 8 (null bitmap) + 40 (fixed) + 32 (variable) = 80 байт
- На один L1 cache line (64 байта): почти целая строка с null bitmap и fixed region
- При sequential scan 1М строк: 80МБ данных — помещается в L3 cache большинства серверных CPU
Построение UnsafeRow: UnsafeRowWriter
На практике вы никогда не создаёте UnsafeRow вручную. Catalyst codegen создаёт UnsafeRowWriter — класс-builder с методами для записи каждого типа. Сгенерированный код выглядит примерно так:
// Фрагмент сгенерированного кода для схемы (id: Long, name: String, score: Double)
final UnsafeRowWriter writer = new UnsafeRowWriter(3); // 3 поля
writer.reset(); // обнуляет null bitmap, сбрасывает cursor
// Запись Long: просто пишем 8 байт в fixed slot
writer.write(0, ((Long) value_id));
// Запись String: сначала пишем данные в variable region, потом offset+length в fixed slot
writer.write(1, utf8_name); // внутри: reserveAdditional + writeUnaligned + setOffset
// Запись Double
writer.write(2, ((Double) value_score));
UnsafeRow result = writer.getRow();
UnsafeRowWriter управляет курсором в variable-length region и обеспечивает выравнивание. getRow() возвращает UnsafeRow, которая view поверх внутреннего буфера writer-а — без копирования.
Сравнение и сортировка без десериализации
Одно из самых мощных свойств UnsafeRow — строки можно сравнивать и сортировать без десериализации.
Для сортировки по Long-ключу Catalyst генерирует:
// Сравнение двух UnsafeRow по первому полю (Long)
long left_key = Platform.getLong(left.baseObject, left.baseOffset + 8L);
long right_key = Platform.getLong(right.baseObject, right.baseOffset + 8L);
// Long comparison: одна инструкция CMP
result = Long.compare(left_key, right_key);
Для сравнения строк UTF8String:
// Lexicographic compare: Platform.arrayEquals или bytewise SIMD
UTF8String.compareToUnsafe(
left.baseObject, left.baseOffset + left_str_offset, left_str_len,
right.baseObject, right.baseOffset + right_str_offset, right_str_len
)
Внутри UTF8String.compareToUnsafe использует Platform.getLong для сравнения 8 байт за раз, что соответствует SIMD-подобной обработке на уровне Java.
Для shuffle: вместо сериализации UnsafeRow в byte[], Spark напрямую копирует её бинарное содержимое через Platform.copyMemory. Нет десериализации на стороне отправителя, нет сериализации на стороне получателя — только memcpy.
Граница между on-heap и off-heap
UnsafeRow одинаково работает с on-heap и off-heap памятью за счёт пары (baseObject, baseOffset):
- On-heap:
baseObject=byte[](или другой Java array),baseOffset=Platform.BYTE_ARRAY_OFFSET(обычно 16 для 64-bit JVM с compressed oops).Platform.getLong(byte[], 16 + fieldOffset, ...)читает байты из массива. - Off-heap:
baseObject = null,baseOffset= raw native address (результатUnsafe.allocateMemory()).Platform.getLong(null, address + fieldOffset, ...)читает из native memory.
Этот дизайн позволяет UnsafeExternalSorter прозрачно работать с обоими режимами. MemoryBlock — абстракция над обоими случаями:
// MemoryBlock: обёртка над (baseObject, offset, length)
public final class MemoryBlock extends MemoryLocation {
public final long size;
// on-heap: obj != null, off-heap: obj == null, offset = native address
}
Попробуй сам
Следующий PySpark-код позволяет увидеть размер UnsafeRow в действии через SizeEstimator и RDD.mapPartitions:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType
spark = SparkSession.builder \
.appName("unsaferow-layout") \
.config("spark.memory.offHeap.enabled", "false") \
.getOrCreate()
schema = StructType([
StructField("id", LongType()),
StructField("name", StringType()),
StructField("score", DoubleType()),
])
# Строки с разными длинами строки name
rows = [(1, "A", 1.0), (2, "Alice", 2.0), (3, "Alexander", 3.0)]
df = spark.createDataFrame(rows, schema)
# Посмотреть физический план с кодогенерацией
df.filter("score > 1.0").explain("codegen")
# Проверить, что Spark использует UnsafeRow через QueryExecution
from pyspark.sql.functions import col
plan = df._jdf.queryExecution().executedPlan()
print(plan.toString())
# Размер строк в памяти (байт)
# Для (1, "A", 1.0): 8 + 24 + (1+7) = 40 байт
# Для (2, "Alice", 1.0): 8 + 24 + (5+3) = 40 байт
# Для (3, "Alexander", 1.0): 8 + 24 + (9+7) = 48 байт
# null bitmap = 8, fixed region = 3*8 = 24
# variable region = длина_строки, выровненная до 8
print("Expected row sizes: 40, 40, 48 bytes")
Чтобы увидеть реальное использование Unsafe, включите spark.eventLog.enabled и посмотрите метрики peakExecutionMemory в Spark UI. Разница между GenericInternalRow и UnsafeRow особенно заметна на большом количестве строк с длинными строковыми полями.