Tungsten: Off-Heap память и UnsafeRow
Проблема: JVM GC на больших данных
Spark выполняется на JVM. Когда executor работает с десятками гигабайт данных, Garbage Collector становится серьёзной проблемой:
- Full GC может блокировать executor на секунды (иногда десятки секунд)
- Частый GC снижает throughput на 10-30%
- GC pauses приводят к timeout heartbeats и потере executors
- Объекты Java имеют значительный overhead:
Integerзанимает 16 байт вместо 4 дляint
До Tungsten (Spark < 1.4) каждая строка данных хранилась как Row — массив Java-объектов. Для таблицы с 10 колонками каждая строка создавала 10+ объектов на JVM heap, и GC должен был отслеживать все эти объекты.
Решение: Project Tungsten
Project Tungsten (появился в Spark 1.4, зрелый с 2.0) — инициатива по радикальному ускорению Spark через три направления:
- Off-heap memory management — управление памятью вне JVM heap через
sun.misc.Unsafe - Cache-aware computation — алгоритмы, оптимизированные для CPU cache
- Code generation — генерация байткода вместо интерпретации (Whole-Stage CodeGen)
В этом уроке мы сфокусируемся на off-heap памяти и формате UnsafeRow.
UnsafeRow: бинарный формат строк
Вместо хранения строк как массивов Java-объектов, Tungsten использует UnsafeRow — компактный бинарный формат, который хранит данные как непрерывный блок байтов.
Структура UnsafeRow
Для строки ("Alice", 30, 100) с схемой (name: String, age: Int, dept_id: Int):
Байт 0-7: Null bitmap = 0x0000000000000000 (нет null значений)
Байт 8-15: name (offset+len) = 0x0000001800000005 (offset=24, length=5)
Байт 16-23: age = 0x000000000000001E (30 в hex)
Байт 24-31: dept_id = 0x0000000000000064 (100 в hex)
Байт 32-36: "Alice" = 0x416C696365 (UTF-8 байты)
Ключевые детали:
- Null bitmap — один бит на колонку. Для 3 колонок нужен 1 байт, но выравнивается до 8 байт (64-bit alignment)
- Fixed-length values — примитивные типы (
Int,Long,Double) хранятся inline, по 8 байт каждый - Variable-length values — строки и массивы хранятся в конце, а в fixed-length region записывается пара (offset, length)
Преимущества UnsafeRow
| Аспект | Java Object Model | UnsafeRow |
|---|---|---|
| Overhead на строку | 16+ байт на объект | 0 (компактный бинарный) |
| Объекты для GC | 10+ на строку | 0 (off-heap) |
| Сравнение значений | .equals() (виртуальный вызов) | memcmp (прямое сравнение байтов) |
| Cache locality | Объекты разбросаны по heap | Непрерывный блок памяти |
| Serialization | Java serialization (медленно) | Не нужна (уже в бинарном формате) |
UnsafeRow — это формат и хранения, и передачи. Когда Spark отправляет данные между executors (shuffle), он отправляет сырые байты UnsafeRow без сериализации/десериализации. Это даёт огромное ускорение shuffle-операций.
Executor Memory Model
Используйте интерактивную диаграмму ниже, чтобы понять, как параметры spark.memory.fraction и spark.memory.storageFraction влияют на распределение памяти executor.
Unified Memory Manager (Spark 1.6+)
Spark 4.0 GA (2025) и 4.1 (текущая) используют Unified Memory Manager — ту же модель с гибкой границей между storage и execution памятью, что и Spark 3.5 LTS:
Правила заимствования
Граница между Storage и Execution — мягкая:
- Storage может заимствовать Execution memory (если Execution свободна)
- Execution может заимствовать Storage memory (если Storage свободна)
- Execution может вытеснить данные из Storage (evict cached blocks)
- Storage не может вытеснить Execution (execution memory имеет приоритет)
Это асимметричное правило критически важно: shuffle и join всегда завершатся, даже ценой вытеснения кэшированных данных. Но кэшированные данные не могут заблокировать shuffle.
Что происходит при нехватке Execution Memory? Когда shuffle или join требуют больше памяти, чем доступно (включая заимствование из Storage), Spark начинает spill-to-disk — записывает промежуточные данные на локальный диск. Это работает, но замедляет операцию в 10-100 раз из-за disk I/O.
Конфигурация Off-Heap памяти
Tungsten поддерживает два режима работы:
On-Heap (по умолчанию)
# Данные хранятся в JVM heap через Unsafe
spark.conf.set("spark.executor.memory", "4g")
spark.conf.set("spark.memory.fraction", "0.6")
Даже в on-heap режиме UnsafeRow используется: данные хранятся как бинарные массивы в JVM heap, а не как отдельные Java-объекты. Это уменьшает количество объектов для GC.
Off-Heap (явное включение)
# Выделяем отдельный off-heap pool
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "2g")
В off-heap режиме Tungsten выделяет память через sun.misc.Unsafe.allocateMemory() вне JVM heap. Эта память полностью невидима для GC.
Когда использовать off-heap:
- Executors с большим heap (>16GB), где GC pauses критичны
- Workloads с интенсивным shuffle (большие join, groupBy на высококардинальных ключах)
- Когда GC tuning исчерпан
UnsafeRow в действии: сравнение значений
Одно из самых мощных преимуществ UnsafeRow — прямое сравнение байтов без десериализации:
// Java Object Model: 3+ виртуальных вызова на сравнение
row1.getString(0).equals(row2.getString(0))
// UnsafeRow: прямое сравнение блоков памяти
Platform.copyMemory(row1.baseObject, row1.baseOffset + field1Offset,
row2.baseObject, row2.baseOffset + field2Offset,
fieldLength)
Для sort и join операций, где миллиарды сравнений — это порядок magnitude разницы в производительности.