Learning Platform
Глоссарий Troubleshooting
Урок 08.01 · 30 мин
Продвинутый
UnsafeRowInternalRowMemory LayoutTungstenCache Locality

Байтовая раскладка UnsafeRow

Модуль 06 объяснил, что Tungsten переходит на бинарное представление данных и уходит от JVM-объектов. Здесь мы вскроем это представление до последнего байта: как именно устроена каждая строка, какая математика стоит за адресацией полей, и почему такая раскладка не случайна — она напрямую определяет производительность сортировки, перемешивания и агрегации.

InternalRow: контракт без реализации

Всё, что Spark обрабатывает внутри движка — это InternalRow. Это абстрактный базовый класс в пакете org.apache.spark.sql.catalyst.expressions, и у него больше десятка наследников:

  • UnsafeRow — основное бинарное представление для выполнения
  • GenericInternalRow — массив Object[], используется в тестах и некоторых read-path
  • SpecificInternalRow / MutableUnsafeRow — mutable варианты для code generation
  • JoinedRow — объединяет два InternalRow без копирования (логический view)

InternalRow предоставляет унифицированный интерфейс: getInt(ordinal), getLong(ordinal), getUTF8String(ordinal), isNullAt(ordinal). Конкретные реализации отличаются тем, где и как они хранят данные. GenericInternalRow хранит Array[Any] на JVM heap — каждое обращение это распакованный боксинг. UnsafeRow хранит непрерывный бинарный буфер и вычисляет адрес каждого поля математически.

Ключевой момент: Spark никогда не вызывает row.getInt() в hot path вручную. Catalyst генерирует Java-код, который делает это через Platform.getLong(baseObject, baseOffset + fieldOffset) — напрямую, через sun.misc.Unsafe, без virtual dispatch.

Три региона: анатомия UnsafeRow

Каждый UnsafeRow — это ровно три следующих друг за другом блока памяти. Размер каждого блока строго выравнен до 8 байт (64-битное выравнивание), что позволяет CPU читать поля одной инструкцией загрузки.

Структура UnsafeRow в памяти
Null BitmapБитовый массив: один бит на поле. Бит i = 1 означает, что поле i равно NULL. Для строки с 1-64 полями — ровно 8 байт. Для 65-128 полей — 16 байт. Размер всегда кратен 8 и выровнен по 8-байтовой границе.
Fixed-Length RegionПо одному 8-байтовому слоту на каждое поле. Для примитивов (Int, Long, Double) — само значение, выровненное по нулям. Для строк и массивов — упакованные (offset, length): старшие 4 байта offset от начала строки относительно UnsafeRow, младшие 4 байта length в байтах.
Variable-Length RegionТела строк (UTF-8), массивов и вложенных структур. Данные идут без разрывов, каждый объект выровнен до 8 байт. Порядок объектов соответствует порядку полей в схеме. Эта область никогда не имеет фиксированного смещения — она начинается сразу после Fixed-Length Region.
Offset 0UnsafeRow хранит baseObject (Object или null для off-heap) и baseOffset (long). Вся адресная арифметика ведётся относительно baseOffset.
8 * ceil(N/64)N = numFields. Для 3 полей: ceil(3/64)*8 = 8. Для 65 полей: ceil(65/64)*8 = 16. Это константа для данной схемы — компилируется в код.
8*ceil(N/64) + 8*NЭто фиксированный offset для данной схемы, вычисляется один раз при создании UnsafeRow. Переменные данные пишутся сюда sequentially с cursor-ом.

Рассмотрим конкретную схему: (id: Long, name: String, score: Double, tags: Array[String]). Четыре поля. Null bitmap: ceil(4/64) * 8 = 8 байт. Fixed-Length Region: 4 * 8 = 32 байт. Начало Variable-Length Region: смещение 40 байт от начала строки.

Offset  Размер  Содержимое
------  ------  ----------
0       8       Null bitmap: 0x0000000000000000 (все non-null)
8       8       id = 42L  -->  0x000000000000002A
16      8       name (offset|length) -->  0x0000002800000005  (offset=40, len=5)
24      8       score = 3.14  -->  IEEE 754 double bits
32      8       tags (offset|length)  -->  0x0000002D00000028  (offset=45, len=40)
40      8       "Alice" в UTF-8: 0x 41 6C 69 63 65 00 00 00  (padded to 8 bytes)
48      40      сериализованный UnsafeArrayData для tags

Ключевой инсайт: чтение id — это одна инструкция LOAD по адресу baseOffset + 8. Никакого pointer chasing, никакого virtual dispatch. Чтение name — это LOAD по baseOffset + 16, получение packed (offset, length), затем LOAD по baseOffset + offset. Два memory access вместо одного — но оба sequential, и CPU prefetcher справляется.

Null Bitmap: битовая адресация

Исходник UnsafeRow.java содержит:

public static int calculateBitSetWidthInBytes(int numFields) {
    return ((numFields + 63) / 64) * 8;
}

(numFields + 63) / 64 — это целочисленное деление с округлением вверх, то есть ceil(numFields / 64.0). Умноженное на 8 — количество байт для хранения битов группами по 64.

Проверка null:

@Override
public boolean isNullAt(int ordinal) {
    assertIndexIsValid(ordinal);
    return BitSetMethods.isSet(baseObject, baseOffset, ordinal);
}

BitSetMethods.isSet делает:

public static boolean isSet(Object baseObject, long baseOffset, int index) {
    final long mask = 1L << (index & 0x3f); // index % 64
    final long wordOffset = baseOffset + (index >> 6) * 8; // index / 64 * 8
    final long word = Platform.getLong(baseObject, wordOffset);
    return (word & mask) != 0;
}

Три операции: битовый сдвиг, умножение на 8, одна LOAD. Именно такую скорость Spark получает вместо if (obj == null) на Java-объектах.

Запись null при построении строки:

public void setNullAt(int ordinal) {
    assertIndexIsValid(ordinal);
    BitSetMethods.set(baseObject, baseOffset, ordinal);
    // Дополнительно: обнуляем fixed-length слот для предсказуемого поведения
    Platform.putLong(baseObject, getFieldOffset(ordinal), 0);
}

Fixed-Length Region: слоты по 8 байт

Каждое поле занимает ровно 8 байт в Fixed-Length Region, независимо от типа данных:

public long getFieldOffset(int ordinal) {
    return baseOffset + bitSetWidthInBytes + ordinal * 8L;
}

Для Int (4 байта): значение хранится как long с нулями в старших битах. getInt(ordinal) читает 8 байт и берёт нижние 4:

@Override
public int getInt(int ordinal) {
    assertIndexIsValid(ordinal);
    return Platform.getInt(baseObject, getFieldOffset(ordinal));
}

Platform.getInt читает ровно 4 байта — JVM выравнивает memory access по размеру типа. На little-endian (x86) это нижние 4 байта 8-байтового слота, что соответствует маленькому значению int без дополнительных операций.

NOTE

Почему 8 байт на поле, даже для Boolean (1 байт)? Выравнивание. Если поле 3 расположено по неравному адресу, CPU на x86 всё равно прочитает его, но на RISC-архитектурах (ARM, MIPS) неравный доступ бросает SIGBUS. Кроме того, 8-байтовое выравнивание означает, что каждое поле помещается ровно в один кэш-line word, без split access — это важно для атомарности операций на многоядерных системах.

Для Double:

@Override
public double getDouble(int ordinal) {
    assertIndexIsValid(ordinal);
    return Platform.getDouble(baseObject, getFieldOffset(ordinal));
}

IEEE 754 double — это уже 8 байт, так что слот заполняется полностью.

Variable-Length Region: offset+length encoding

Строки, массивы, MapData и вложенные InternalRow не могут поместиться в 8-байтовый слот. Вместо этого в Fixed-Length Region хранится packed reference: старшие 4 байта — offset от начала UnsafeRow (не от baseOffset!), младшие 4 байта — длина в байтах.

@Override
public UTF8String getUTF8String(int ordinal) {
    if (isNullAt(ordinal)) return null;
    final long offsetAndSize = getLong(ordinal);
    final int offset = (int)(offsetAndSize >> 32);
    final int size = (int)offsetAndSize;
    return UTF8String.fromAddress(baseObject, baseOffset + offset, size);
}

Операция >> 32 извлекает старшие 4 байта как offset, cast to int — это младшие 4 байта. UTF8String.fromAddress создаёт view поверх исходной памяти без копирования.

Packed reference: offset+length в одном long
Биты 63-32Смещение в байтах от начала UnsafeRow (от позиции 0, где начинается null bitmap). Максимальный offset = 2^31 - 1 = 2 ГБ. На практике UnsafeRow не превышает сотни мегабайт.
Биты 31-0Длина данных в байтах. Для строки 'Alice' = 5. Для массива из 10 ints = sizeof(UnsafeArrayData header) + 10*8. Нулевая длина (length=0) при non-null означает пустую строку или пустой массив.

Важная деталь: offset отсчитывается от начала строки (позиция 0), а не от начала Variable-Length Region. Это означает, что код, который читает offset, может напрямую прибавить его к baseOffset:

// Правильно: baseOffset + offset_from_row_start
Platform.copyMemory(baseObject, baseOffset + offset, dst, dstOffset, size);

Данные в Variable-Length Region выровнены до 8 байт. После строки "Alice" (5 байт) добавляется 3 байта паддинга, чтобы следующий объект начинался по кратному 8 адресу.

UnsafeArrayData и вложенные структуры

Массивы внутри UnsafeRow хранятся как UnsafeArrayData. Их структура аналогична: заголовок с числом элементов и null bitmap, затем fixed-length слоты для каждого элемента, затем variable-length данные для строковых элементов.

UnsafeArrayData для Array[String]("hello", null, "world"):
Offset 0:  8 байт — numElements = 3
Offset 8:  8 байт — null bitmap: бит 1 = 1 (второй элемент null)
Offset 16: 8 байт — slot 0: (offset=40, length=5) для "hello"
Offset 24: 8 байт — slot 1: 0L (null, слот не используется)
Offset 32: 8 байт — slot 2: (offset=48, length=5) для "world"
Offset 40: 8 байт — "hello\x00\x00\x00"
Offset 48: 8 байт — "world\x00\x00\x00"

UnsafeMapData состоит из двух UnsafeArrayData: один для ключей, один для значений. Header хранит keyArraySize — смещение между ними.

Cache locality: почему это всё важно

Процессор L1 cache line — обычно 64 байта. Когда Spark сканирует колонку id (Long) в 1 000 000 строк через broadcast hash join или агрегацию, паттерн доступа к памяти определяет всё.

С GenericInternalRow (массив объектов): каждая строка — отдельный Java-объект где-то на heap. Читать id из 1М строк означает 1М случайных memory access — практически гарантированный cache miss на каждой строке. При L1 latency 4ns и L3 latency 40ns это 40 миллисекунд только на промахи кэша для 1М строк.

С UnsafeRow в contiguous buffer: строки лежат одна за другой. Читая строку N, CPU загружает в L1 кусок памяти, который также содержит начало строки N+1 и N+2. Prefetcher CPU предсказывает sequential access и загружает следующие строки заблаговременно. Результат — effective latency близок к L1, то есть ~4ns.

Числа для строки из 5 полей (Long, String, Double, Long, Boolean) с типичной Variable-Length Region 32 байт:

  • Размер строки UnsafeRow: 8 (null bitmap) + 40 (fixed) + 32 (variable) = 80 байт
  • На один L1 cache line (64 байта): почти целая строка с null bitmap и fixed region
  • При sequential scan 1М строк: 80МБ данных — помещается в L3 cache большинства серверных CPU
Cache locality: UnsafeRow vs Java Object
GenericInternalRow: heap fragmentationGenericInternalRow: каждая строка — отдельный объект на JVM heap. Объекты разбросаны по памяти (fragmentated heap). Доступ к row[0].id, row[1].id, row[2].id — три случайных pointer dereference. Cache miss = 40-100ns на строку.
случайный доступ
DRAM: 100-300ns latencyCPU cache miss: данные не в L1/L2/L3. Идём в main memory (DRAM). Latency 100-300ns. При 1M строк = 100-300ms только на cache misses. GC добавляет STW паузы поверх этого.
UnsafeRow: contiguous bufferUnsafeRow buffer: строки лежат непрерывно в памяти. Для сортировки, агрегации, scan — CPU читает последовательно. Hardware prefetcher загружает следующие строки в cache заблаговременно. GC не задействован.
sequential scan
L1/L2: 4-12ns latencyL1/L2 cache hit: данные уже в cache от предыдущей загрузки. Latency 4-12ns. При 1M строк = 4-12ms. В 10-25x быстрее фрагментированного heap. Tungsten называет это 'cache-aware computation'.

Построение UnsafeRow: UnsafeRowWriter

На практике вы никогда не создаёте UnsafeRow вручную. Catalyst codegen создаёт UnsafeRowWriter — класс-builder с методами для записи каждого типа. Сгенерированный код выглядит примерно так:

// Фрагмент сгенерированного кода для схемы (id: Long, name: String, score: Double)
final UnsafeRowWriter writer = new UnsafeRowWriter(3);  // 3 поля
writer.reset();  // обнуляет null bitmap, сбрасывает cursor

// Запись Long: просто пишем 8 байт в fixed slot
writer.write(0, ((Long) value_id));

// Запись String: сначала пишем данные в variable region, потом offset+length в fixed slot
writer.write(1, utf8_name);  // внутри: reserveAdditional + writeUnaligned + setOffset

// Запись Double
writer.write(2, ((Double) value_score));

UnsafeRow result = writer.getRow();

UnsafeRowWriter управляет курсором в variable-length region и обеспечивает выравнивание. getRow() возвращает UnsafeRow, которая view поверх внутреннего буфера writer-а — без копирования.

Сравнение и сортировка без десериализации

Одно из самых мощных свойств UnsafeRow — строки можно сравнивать и сортировать без десериализации.

Для сортировки по Long-ключу Catalyst генерирует:

// Сравнение двух UnsafeRow по первому полю (Long)
long left_key = Platform.getLong(left.baseObject, left.baseOffset + 8L);
long right_key = Platform.getLong(right.baseObject, right.baseOffset + 8L);
// Long comparison: одна инструкция CMP
result = Long.compare(left_key, right_key);

Для сравнения строк UTF8String:

// Lexicographic compare: Platform.arrayEquals или bytewise SIMD
UTF8String.compareToUnsafe(
    left.baseObject, left.baseOffset + left_str_offset, left_str_len,
    right.baseObject, right.baseOffset + right_str_offset, right_str_len
)

Внутри UTF8String.compareToUnsafe использует Platform.getLong для сравнения 8 байт за раз, что соответствует SIMD-подобной обработке на уровне Java.

Для shuffle: вместо сериализации UnsafeRow в byte[], Spark напрямую копирует её бинарное содержимое через Platform.copyMemory. Нет десериализации на стороне отправителя, нет сериализации на стороне получателя — только memcpy.

Граница между on-heap и off-heap

UnsafeRow одинаково работает с on-heap и off-heap памятью за счёт пары (baseObject, baseOffset):

  • On-heap: baseObject = byte[] (или другой Java array), baseOffset = Platform.BYTE_ARRAY_OFFSET (обычно 16 для 64-bit JVM с compressed oops). Platform.getLong(byte[], 16 + fieldOffset, ...) читает байты из массива.
  • Off-heap: baseObject = null, baseOffset = raw native address (результат Unsafe.allocateMemory()). Platform.getLong(null, address + fieldOffset, ...) читает из native memory.

Этот дизайн позволяет UnsafeExternalSorter прозрачно работать с обоими режимами. MemoryBlock — абстракция над обоими случаями:

// MemoryBlock: обёртка над (baseObject, offset, length)
public final class MemoryBlock extends MemoryLocation {
    public final long size;
    // on-heap: obj != null, off-heap: obj == null, offset = native address
}

Попробуй сам

Следующий PySpark-код позволяет увидеть размер UnsafeRow в действии через SizeEstimator и RDD.mapPartitions:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType

spark = SparkSession.builder \
    .appName("unsaferow-layout") \
    .config("spark.memory.offHeap.enabled", "false") \
    .getOrCreate()

schema = StructType([
    StructField("id", LongType()),
    StructField("name", StringType()),
    StructField("score", DoubleType()),
])

# Строки с разными длинами строки name
rows = [(1, "A", 1.0), (2, "Alice", 2.0), (3, "Alexander", 3.0)]
df = spark.createDataFrame(rows, schema)

# Посмотреть физический план с кодогенерацией
df.filter("score > 1.0").explain("codegen")

# Проверить, что Spark использует UnsafeRow через QueryExecution
from pyspark.sql.functions import col
plan = df._jdf.queryExecution().executedPlan()
print(plan.toString())

# Размер строк в памяти (байт)
# Для (1, "A", 1.0):   8 + 24 + (1+7) = 40 байт
# Для (2, "Alice", 1.0): 8 + 24 + (5+3) = 40 байт
# Для (3, "Alexander", 1.0): 8 + 24 + (9+7) = 48 байт
# null bitmap = 8, fixed region = 3*8 = 24
# variable region = длина_строки, выровненная до 8
print("Expected row sizes: 40, 40, 48 bytes")

Чтобы увидеть реальное использование Unsafe, включите spark.eventLog.enabled и посмотрите метрики peakExecutionMemory в Spark UI. Разница между GenericInternalRow и UnsafeRow особенно заметна на большом количестве строк с длинными строковыми полями.

Проверка знанийKnowledge check
UnsafeRow содержит 70 полей. Каков размер Null Bitmap в байтах, и почему именно столько?
ОтветAnswer
Размер Null Bitmap = ceil(70 / 64) * 8 = 2 * 8 = 16 байт. Формула calculateBitSetWidthInBytes(numFields) = ((numFields + 63) / 64) * 8. Для 70 полей нужно 70 бит. 64 бита помещаются в одно 8-байтовое слово, но для 70 нужно два слова — итого 128 бит / 16 байт. Выравнивание до 8 байт гарантирует, что Fixed-Length Region начинается по 8-байтовой границе, что критично для правильной работы Platform.getLong на всех архитектурах.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. UnsafeRow для схемы (a: Int, b: String, c: Double, d: Long, e: Boolean) содержит 5 полей. Каков размер Fixed-Length Region в байтах?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4