Learning Platform
Глоссарий Troubleshooting
Урок 08.02 · 30 мин
Продвинутый
EncoderExpressionEncoderDatasetCodegenInternalRowSerialization

Фреймворк Encoder

В предыдущем уроке мы разобрали, как устроен UnsafeRow — бинарный формат внутри движка. Теперь встаёт вопрос: как Spark превращает ваш case class User(id: Long, name: String, score: Double) в этот бинарный формат и обратно? Этот мост называется Encoder. Он определяет, насколько дорого обходятся typed-операции в Dataset[T] по сравнению с DataFrame, и почему UDF-и часто неожиданно дороги.

Dataset и DataFrame: одна реализация, разные типы

В Spark 2.0 DataFrame стал синонимом Dataset[Row]. Обе сущности используют один и тот же движок. Разница — только в типе Scala:

  • Dataset[Row] (aka DataFrame): строки представлены как Row — публичный untyped интерфейс. Движок работает с InternalRow. При вызове .show() или .collect() движок конвертирует InternalRow в Row через RowEncoder.
  • Dataset[T]: строки представлены как T (ваш case class). Движок всё так же работает с InternalRow. При каждом вызове .map, .filter(f: T => Boolean), .collect() — нужна конвертация InternalRow <-> T.

Эта конвертация и есть задача Encoder[T].

Encoder: контракт

Encoder в Spark 4.0 — это трейт org.apache.spark.sql.Encoder[T]:

trait Encoder[T] extends Serializable {
  def schema: StructType
  def clsTag: ClassTag[T]
}

Сам трейт минималистичен — он только описывает схему и тип. Реальная логика конвертации живёт в ExpressionEncoder[T].

ExpressionEncoder: сердце системы

ExpressionEncoder[T] в org.apache.spark.sql.catalyst.encoders содержит два дерева выражений Catalyst:

case class ExpressionEncoder[T](
  objSerializer: Expression,      // T -> InternalRow (сериализация)
  objDeserializer: Expression,    // InternalRow -> T (десериализация)
  clsTag: ClassTag[T]
) extends Encoder[T]

objSerializer — это дерево Catalyst-выражений, описывающее как взять JVM-объект типа T и произвести InternalRow. objDeserializer — обратное: взять InternalRow и построить объект T.

Эти деревья не интерпретируются в runtime — они компилируются в Java-байткод через Janino (встроенный Java-компилятор). Результатом является UnsafeProjection — объект с методом apply(InternalRow): UnsafeRow.

ExpressionEncoder: путь T -> InternalRow -> T
Объект T (JVM heap)Пользовательский объект T: case class, Java bean, или примитив. Живёт на JVM heap. Может содержать nullable поля, вложенные case class-ы, коллекции.
objSerializer (codegen)
InternalRow (Tungsten)InternalRow (UnsafeRow): бинарное представление в Tungsten memory. Нет object overhead, нет GC pressure. Движок Spark работает только с этим представлением.
objDeserializer (codegen)
Новый объект TНовый объект T: создан на JVM heap через сгенерированный конструктор. Каждый вызов .map / .collect / .filter(T => Boolean) создаёт такой объект для каждой строки.
Janino codegenGenerateUnsafeProjection: компилятор, который берёт дерево objSerializer и генерирует Java-класс с методом apply(InternalRow): UnsafeRow. Использует Janino compiler. Происходит один раз при первом использовании Dataset[T].
compile once
UnsafeProjection (JIT-compiled)Сгенерированный UnsafeProjection: Java-класс без virtual dispatch, без reflection. Вызов apply() = несколько LOAD/STORE + арифметика. Throughput порядка миллионов строк/сек на одном ядре.

Как строится ExpressionEncoder: ScalaReflection

Для case class Scala, Encoders.product[T] вызывает ScalaReflection.serializerFor[T] и ScalaReflection.deserializerFor[T]. Эти методы рекурсивно обходят поля типа T через scala.reflect.api.Mirror и строят дерево выражений.

Для case class User(id: Long, name: String, score: Double):

// Примерная структура objSerializer (не реальный код, упрощённое дерево):
CreateNamedStruct(
  "id",    AssertNotNull(Invoke(inputObject, "id", LongType)),
  "name",  StaticInvoke(UTF8String, "fromString", StringType,
              Invoke(inputObject, "name", ObjectType(classOf[String]))),
  "score", AssertNotNull(Invoke(inputObject, "score", DoubleType))
)

Каждый узел дерева — это Catalyst-выражение. Invoke генерирует код для вызова метода на объекте. StaticInvoke генерирует статический вызов. AssertNotNull добавляет проверку и бросает NullPointerException если поле non-nullable оказалось null.

Для objDeserializer:

// Примерное дерево objDeserializer:
NewInstance(
  cls = classOf[User],
  arguments = Seq(
    GetColumnByOrdinal(0, LongType),                         // id
    Invoke(GetColumnByOrdinal(1, StringType), "toString",    // name
      ObjectType(classOf[String])),
    GetColumnByOrdinal(2, DoubleType)                        // score
  )
)

GetColumnByOrdinal генерирует row.getLong(0), row.getUTF8String(1).toString(), row.getDouble(2). NewInstance генерирует new User(...).

Весь этот граф выражений затем передаётся в GenerateUnsafeProjection.generate(serializer) или GenerateSafeProjection.generate(deserializer), которые превращают его в Java-код и компилируют через Janino.

Сгенерированный код: что реально выполняется

Catalyst codegen для простого User будет выглядеть примерно так:

// Сгенерированный UnsafeProjection для User -> InternalRow
public class GeneratedUnsafeProjection extends UnsafeProjection {
    private UnsafeRowWriter writer = new UnsafeRowWriter(3);

    @Override
    public UnsafeRow apply(InternalRow _i) {
        // _i в данном случае это InternalRow с одним полем: сам объект User
        User obj = (User) _i.get(0, ObjectType(User.class));

        writer.reset();

        // Поле id: Long, non-nullable
        if (obj.id() == null) throw new NullPointerException("id is null");
        writer.write(0, obj.id());  // Platform.putLong(base, offset + 8, value)

        // Поле name: String -> UTF8String
        String name_val = obj.name();
        if (name_val == null) {
            writer.setNullAt(1);
        } else {
            writer.write(1, UTF8String.fromString(name_val));
        }

        // Поле score: Double, non-nullable
        writer.write(2, obj.score());

        return writer.getRow();
    }
}

Никакой рефлексии, никакого instanceof, никакого virtual dispatch в hot path. JIT-компилятор видит прямые вызовы и инлайнит их. Результат — производительность сопоставима с ручным кодом.

Почему typed-операции дороже

Рассмотрим два варианта фильтрации:

// Вариант A: DataFrame (untyped)
df.filter($"score" > 1.0)

// Вариант B: Dataset[User] (typed)
ds.filter(user => user.score > 1.0)

В варианте A Catalyst генерирует код который напрямую читает UnsafeRow.getDouble(2) и сравнивает с 1.0. Нет создания объектов, нет GC.

В варианте B на каждую строку происходит:

  1. Вызов objDeserializer — создание нового объекта User на JVM heap
  2. Вызов лямбды user => user.score > 1.0 на этом объекте
  3. Если лямбда возвращает true, строка остаётся в UnsafeRow формате — объект User больше не нужен и станет мусором

Для миллиарда строк это миллиард созданных и немедленно выброшенных User объектов. GC pressure колоссальный.

WARNING

Это фундаментальная причина, по которой Dataset[T].filter(f: T => Boolean) медленнее DataFrame.filter(Column). Typed лямбды прекрасны для типобезопасности в compile time, но каждая строка требует полной материализации JVM-объекта. В продакшн аналитических workloads предпочитайте Column-based operations там, где это возможно.

Измерение разницы:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder().appName("encoder-benchmark").getOrCreate()
import spark.implicits._

case class User(id: Long, name: String, score: Double)

val N = 10_000_000
val data = (1 to N).map(i => User(i.toLong, s"user_$i", i.toDouble / N))
val ds = spark.createDataset(data)
ds.cache()
ds.count() // прогрев кэша

// Typed filter: десериализация каждой строки
val t1 = System.nanoTime()
val r1 = ds.filter(u => u.score > 0.5).count()
val typed_ms = (System.nanoTime() - t1) / 1e6

// DataFrame filter: нет десериализации
val df = ds.toDF()
val t2 = System.nanoTime()
val r2 = df.filter($"score" > 0.5).count()
val df_ms = (System.nanoTime() - t2) / 1e6

println(s"Typed filter: ${typed_ms}ms")
println(s"DataFrame filter: ${df_ms}ms")
println(s"Ratio: ${typed_ms / df_ms}x")
// Типичный результат: typed в 2-5x медленнее при коротких объектах
// При сложных вложенных case class-ах разрыв может достигать 10x

Implicit Encoders: как Spark находит Encoder[T]

В Spark 4.0 import spark.implicits._ импортирует implicit значения Encoder[T] для стандартных типов. Для пользовательских case class Scala компилятор находит implicit через Encoders.product[T] (через shapeless-подобный механизм Scala).

// Явное создание Encoder
val userEncoder: Encoder[User] = Encoders.product[User]

// Implicit через import
import spark.implicits._
val ds: Dataset[User] = spark.createDataset(data)  // Encoder[User] implicit

Для Java-объектов: Encoders.bean(classOf[UserBean]) использует JavaBeans reflection. Для примитивов: Encoders.LONG, Encoders.STRING и т.д.

В Spark 4.0 появилось улучшение: Dataset API теперь использует implicit Encoder-ы более широко через SparkSession.implicits, и ряд ранее явных вызовов теперь работает без import spark.implicits._ в некоторых контекстах.

Encoder для сложных типов

Case class с Option, вложенными классами и коллекциями:

case class Address(city: String, zip: String)
case class User(id: Long, name: Option[String], address: Address, tags: Seq[String])

// ExpressionEncoder строит рекурсивное дерево:
// - name: Option[String] -> nullable StringType (null если None)
// - address: Address -> StructType("city" -> StringType, "zip" -> StringType)
// - tags: Seq[String] -> ArrayType(StringType)
val encoder = Encoders.product[User]
println(encoder.schema)
// root
//  |-- id: long (nullable = false)
//  |-- name: string (nullable = true)
//  |-- address: struct (nullable = true)
//  |    |-- city: string (nullable = true)
//  |    |-- zip: string (nullable = true)
//  |-- tags: array (nullable = true)
//  |    |-- element: string (containsNull = true)

Вложенный Address сериализуется в вложенный UnsafeRow (как поле variable-length region родительской строки). Seq[String] сериализуется в UnsafeArrayData.

Как Encoder обрабатывает null: AssertNotNull и nullable/non-nullable

Один из самых коварных аспектов ExpressionEncoder — обработка null. В Scala case class поля типа Long или Double по умолчанию non-nullable — они не могут быть null по контракту типа. Но данные из Parquet, CSV или внешних источников могут содержать null в любой колонке.

Когда objDeserializer встречает null в non-nullable поле, генерируется AssertNotNull выражение, которое бросает NullPointerException:

// Схема из файла: id: Long nullable=true
// case class User(id: Long, ...) — Long не может быть null в Scala
val ds = spark.read.parquet("data.parquet").as[User]
ds.show()  // Может упасть с NullPointerException если id содержит null

Сгенерированный код десериализатора:

// Фрагмент сгенерированного objDeserializer:
long field0 = row.getLong(0);
if (row.isNullAt(0)) {
    throw new NullPointerException("Null value appeared in non-nullable field: id (LongType)");
}
// Если поле Optional в case class (Option[Long]) — null допустим
Long field1 = row.isNullAt(1) ? null : row.getLong(1);

Правильный паттерн для nullable данных:

// Вариант 1: Option[T] для nullable полей
case class User(id: Long, email: Option[String], score: Double)
// email = null -> None, email = "x" -> Some("x")

// Вариант 2: явная проверка до as[T]
spark.read.parquet("data.parquet")
  .filter($"id".isNotNull)  // фильтр до as[User]
  .as[User]

// Вариант 3: заполнить null перед типизацией
spark.read.parquet("data.parquet")
  .na.fill(Map("id" -> 0L, "score" -> 0.0))
  .as[User]
WARNING

Паттерн df.as[T] без предварительной обработки null — один из самых частых источников неожиданных NullPointerException в Spark. Проверяйте nullable=true/false в df.schema перед as[CaseClass]. Если схема файла допускает null в поле, которое non-nullable в case class — либо фильтруйте, либо используйте Option[T].

Encoder vs Schema Evolution

ExpressionEncoder не поддерживает schema evolution из коробки. Если вы сохранили данные с case class User(id: Long, name: String) и пытаетесь прочитать с case class User(id: Long, name: String, email: String) — новое поле email не будет найдено, и десериализация упадёт или вернёт null в зависимости от конфигурации.

Для schema evolution при чтении используйте mergeSchema опцию в DataFrameReader + DataFrame + явный cast, или используйте форматы с нативной поддержкой evolution (Avro, Protobuf через соответствующие Spark-интеграции).

RowEncoder: мост между Row и InternalRow

RowEncoder — специальный ExpressionEncoder[Row] для Dataset[Row]. Он конвертирует публичный Row в InternalRow. Именно его вызывает .collect() на DataFrame:

// DataFrame.collect() внутри:
// 1. Каждый executor возвращает Array[InternalRow] (UnsafeRow)
// 2. Driver получает сериализованные bytes
// 3. На driver: bytes -> UnsafeRow -> RowEncoder.deserializer -> Row
val rows: Array[Row] = df.collect()

Row — это публичный интерфейс с объектами: row.getLong(0) возвращает Long (boxed), row.getString(1)String. InternalRow.getLong(0) — это уже unboxed примитив из UnsafeRow. RowEncoder.deserializer проводит распаковку: (Long) Platform.getLong(...) с boxing при конвертации в Row.

Именно поэтому .collect() всегда немного дороже, чем кажется: помимо сетевой передачи, driver материализует каждую строку в Row объект через RowEncoder. Для аналитики на больших результатах лучше использовать .show(n), write.parquet(path), или .toPandas() с Arrow — они работают с бинарным форматом дольше.

При создании DataFrame из коллекции Row:

val schema = StructType(Seq(StructField("id", LongType), StructField("name", StringType)))
val rows = Seq(Row(1L, "Alice"), Row(2L, "Bob"))
val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
// Внутри: RowEncoder.objSerializer конвертирует Row -> InternalRow на каждом executor

PySpark и Encoder: Arrow-мост

В PySpark Encoder работает иначе. Python-объекты не имеют Scala/Java типов, поэтому PySpark использует Arrow IPC format как промежуточное представление между Python и JVM.

# PySpark DataFrame.collect() упрощённо:
# 1. JVM executor: UnsafeRow -> Arrow RecordBatch (через ArrowColumnVector)
# 2. Сериализация Arrow bytes в Python процесс
# 3. Python: pyarrow.RecordBatch -> pandas DataFrame или Python list

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("encoder-demo").getOrCreate()

df = spark.range(5).toDF("id")
# .collect() использует Arrow если spark.sql.execution.arrow.pyspark.enabled=true
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
pandas_df = df.toPandas()  # эффективная конвертация через Arrow

Это объясняет, почему Pandas UDFs (@pandas_udf) значительно быстрее row-level Python UDFs: они передают целые батчи через Arrow вместо сериализации каждой строки по отдельности.

Попробуй сам

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .appName("encoder-exploration") \
    .config("spark.sql.codegen.wholeStage", "true") \
    .getOrCreate()

# 1. Создать Dataset и посмотреть сгенерированный код
data = [(1, "Alice", 3.14), (2, "Bob", 2.71), (3, "Charlie", 1.41)]
df = spark.createDataFrame(data, ["id", "name", "score"])

# 2. Типизированный фильтр vs Column фильтр — explain показывает разницу
print("=== Column filter (no deserialization) ===")
df.filter(col("score") > 2.0).explain("codegen")

# 3. Посмотреть Encoder schema
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType
schema = StructType([
    StructField("id", LongType(), False),
    StructField("name", StringType(), True),
    StructField("score", DoubleType(), False),
])
df2 = spark.createDataFrame(data, schema)
print("Schema:", df2.schema)

# 4. Проверить, что toDF() не добавляет сериализацию
print("=== Physical plan: filter on DataFrame ===")
df2.filter("score > 2.0").explain("extended")
# В Physical Plan не будет DeserializeToObject — данные остаются в InternalRow

Чтобы увидеть момент, когда появляется десериализация, в Scala напишите:

case class User(id: Long, name: String, score: Double)
val ds = spark.createDataset(data)(Encoders.product[User])
// Этот план БУДЕТ содержать DeserializeToObject и SerializeFromObject
ds.filter(u => u.score > 2.0).explain("extended")

Найдите в выводе узлы DeserializeToObject и SerializeFromObject — это и есть точки конвертации между InternalRow и T.

Проверка знанийKnowledge check
Вы написали ds.filter(user => user.name.startsWith("A")).count() на Dataset[User] с 500 миллионами строк. Профайлер показывает, что 60% времени уходит на GC. Объясните механизм, предложите решение без потери типобезопасности.
ОтветAnswer
На каждую из 500M строк ExpressionEncoder.objDeserializer создаёт новый объект User на JVM heap. После проверки user.name.startsWith("A") объект сразу становится мусором — GC давление огромное. Решение 1 (самое эффективное): переписать на DataFrame Column API — df.filter(col("name").startsWith("A")).count(). Нет создания объектов, нет GC. Решение 2 (без потери типобезопасности в read-path): использовать ds.select("name").as[String].filter(_.startsWith("A")).count() — десериализуется только одно поле String вместо всего User, объекты меньше, GC давление снижается в разы. Решение 3: переключиться на spark.memory.offHeap.enabled=true и увеличить G1GC settings — паллиатив, не устраняет причину. Корневая причина: typed lambda в Spark — это не zero-cost abstraction, каждая строка требует полной материализации объекта.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. ExpressionEncoder[T] содержит два поля: objSerializer и objDeserializer. Что именно происходит при вызове ds.filter(user => user.score > 2.0).count() на Dataset[User] с 1 миллиардом строк?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4