Фреймворк Encoder
В предыдущем уроке мы разобрали, как устроен UnsafeRow — бинарный формат внутри движка. Теперь встаёт вопрос: как Spark превращает ваш case class User(id: Long, name: String, score: Double) в этот бинарный формат и обратно? Этот мост называется Encoder. Он определяет, насколько дорого обходятся typed-операции в Dataset[T] по сравнению с DataFrame, и почему UDF-и часто неожиданно дороги.
Dataset и DataFrame: одна реализация, разные типы
В Spark 2.0 DataFrame стал синонимом Dataset[Row]. Обе сущности используют один и тот же движок. Разница — только в типе Scala:
Dataset[Row](aka DataFrame): строки представлены какRow— публичный untyped интерфейс. Движок работает сInternalRow. При вызове.show()или.collect()движок конвертируетInternalRowвRowчерезRowEncoder.Dataset[T]: строки представлены какT(ваш case class). Движок всё так же работает сInternalRow. При каждом вызове.map,.filter(f: T => Boolean),.collect()— нужна конвертацияInternalRow <-> T.
Эта конвертация и есть задача Encoder[T].
Encoder: контракт
Encoder в Spark 4.0 — это трейт org.apache.spark.sql.Encoder[T]:
trait Encoder[T] extends Serializable {
def schema: StructType
def clsTag: ClassTag[T]
}
Сам трейт минималистичен — он только описывает схему и тип. Реальная логика конвертации живёт в ExpressionEncoder[T].
ExpressionEncoder: сердце системы
ExpressionEncoder[T] в org.apache.spark.sql.catalyst.encoders содержит два дерева выражений Catalyst:
case class ExpressionEncoder[T](
objSerializer: Expression, // T -> InternalRow (сериализация)
objDeserializer: Expression, // InternalRow -> T (десериализация)
clsTag: ClassTag[T]
) extends Encoder[T]
objSerializer — это дерево Catalyst-выражений, описывающее как взять JVM-объект типа T и произвести InternalRow. objDeserializer — обратное: взять InternalRow и построить объект T.
Эти деревья не интерпретируются в runtime — они компилируются в Java-байткод через Janino (встроенный Java-компилятор). Результатом является UnsafeProjection — объект с методом apply(InternalRow): UnsafeRow.
Как строится ExpressionEncoder: ScalaReflection
Для case class Scala, Encoders.product[T] вызывает ScalaReflection.serializerFor[T] и ScalaReflection.deserializerFor[T]. Эти методы рекурсивно обходят поля типа T через scala.reflect.api.Mirror и строят дерево выражений.
Для case class User(id: Long, name: String, score: Double):
// Примерная структура objSerializer (не реальный код, упрощённое дерево):
CreateNamedStruct(
"id", AssertNotNull(Invoke(inputObject, "id", LongType)),
"name", StaticInvoke(UTF8String, "fromString", StringType,
Invoke(inputObject, "name", ObjectType(classOf[String]))),
"score", AssertNotNull(Invoke(inputObject, "score", DoubleType))
)
Каждый узел дерева — это Catalyst-выражение. Invoke генерирует код для вызова метода на объекте. StaticInvoke генерирует статический вызов. AssertNotNull добавляет проверку и бросает NullPointerException если поле non-nullable оказалось null.
Для objDeserializer:
// Примерное дерево objDeserializer:
NewInstance(
cls = classOf[User],
arguments = Seq(
GetColumnByOrdinal(0, LongType), // id
Invoke(GetColumnByOrdinal(1, StringType), "toString", // name
ObjectType(classOf[String])),
GetColumnByOrdinal(2, DoubleType) // score
)
)
GetColumnByOrdinal генерирует row.getLong(0), row.getUTF8String(1).toString(), row.getDouble(2). NewInstance генерирует new User(...).
Весь этот граф выражений затем передаётся в GenerateUnsafeProjection.generate(serializer) или GenerateSafeProjection.generate(deserializer), которые превращают его в Java-код и компилируют через Janino.
Сгенерированный код: что реально выполняется
Catalyst codegen для простого User будет выглядеть примерно так:
// Сгенерированный UnsafeProjection для User -> InternalRow
public class GeneratedUnsafeProjection extends UnsafeProjection {
private UnsafeRowWriter writer = new UnsafeRowWriter(3);
@Override
public UnsafeRow apply(InternalRow _i) {
// _i в данном случае это InternalRow с одним полем: сам объект User
User obj = (User) _i.get(0, ObjectType(User.class));
writer.reset();
// Поле id: Long, non-nullable
if (obj.id() == null) throw new NullPointerException("id is null");
writer.write(0, obj.id()); // Platform.putLong(base, offset + 8, value)
// Поле name: String -> UTF8String
String name_val = obj.name();
if (name_val == null) {
writer.setNullAt(1);
} else {
writer.write(1, UTF8String.fromString(name_val));
}
// Поле score: Double, non-nullable
writer.write(2, obj.score());
return writer.getRow();
}
}
Никакой рефлексии, никакого instanceof, никакого virtual dispatch в hot path. JIT-компилятор видит прямые вызовы и инлайнит их. Результат — производительность сопоставима с ручным кодом.
Почему typed-операции дороже
Рассмотрим два варианта фильтрации:
// Вариант A: DataFrame (untyped)
df.filter($"score" > 1.0)
// Вариант B: Dataset[User] (typed)
ds.filter(user => user.score > 1.0)
В варианте A Catalyst генерирует код который напрямую читает UnsafeRow.getDouble(2) и сравнивает с 1.0. Нет создания объектов, нет GC.
В варианте B на каждую строку происходит:
- Вызов
objDeserializer— создание нового объектаUserна JVM heap - Вызов лямбды
user => user.score > 1.0на этом объекте - Если лямбда возвращает
true, строка остаётся вUnsafeRowформате — объектUserбольше не нужен и станет мусором
Для миллиарда строк это миллиард созданных и немедленно выброшенных User объектов. GC pressure колоссальный.
Это фундаментальная причина, по которой Dataset[T].filter(f: T => Boolean) медленнее DataFrame.filter(Column). Typed лямбды прекрасны для типобезопасности в compile time, но каждая строка требует полной материализации JVM-объекта. В продакшн аналитических workloads предпочитайте Column-based operations там, где это возможно.
Измерение разницы:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark = SparkSession.builder().appName("encoder-benchmark").getOrCreate()
import spark.implicits._
case class User(id: Long, name: String, score: Double)
val N = 10_000_000
val data = (1 to N).map(i => User(i.toLong, s"user_$i", i.toDouble / N))
val ds = spark.createDataset(data)
ds.cache()
ds.count() // прогрев кэша
// Typed filter: десериализация каждой строки
val t1 = System.nanoTime()
val r1 = ds.filter(u => u.score > 0.5).count()
val typed_ms = (System.nanoTime() - t1) / 1e6
// DataFrame filter: нет десериализации
val df = ds.toDF()
val t2 = System.nanoTime()
val r2 = df.filter($"score" > 0.5).count()
val df_ms = (System.nanoTime() - t2) / 1e6
println(s"Typed filter: ${typed_ms}ms")
println(s"DataFrame filter: ${df_ms}ms")
println(s"Ratio: ${typed_ms / df_ms}x")
// Типичный результат: typed в 2-5x медленнее при коротких объектах
// При сложных вложенных case class-ах разрыв может достигать 10x
Implicit Encoders: как Spark находит Encoder[T]
В Spark 4.0 import spark.implicits._ импортирует implicit значения Encoder[T] для стандартных типов. Для пользовательских case class Scala компилятор находит implicit через Encoders.product[T] (через shapeless-подобный механизм Scala).
// Явное создание Encoder
val userEncoder: Encoder[User] = Encoders.product[User]
// Implicit через import
import spark.implicits._
val ds: Dataset[User] = spark.createDataset(data) // Encoder[User] implicit
Для Java-объектов: Encoders.bean(classOf[UserBean]) использует JavaBeans reflection. Для примитивов: Encoders.LONG, Encoders.STRING и т.д.
В Spark 4.0 появилось улучшение: Dataset API теперь использует implicit Encoder-ы более широко через SparkSession.implicits, и ряд ранее явных вызовов теперь работает без import spark.implicits._ в некоторых контекстах.
Encoder для сложных типов
Case class с Option, вложенными классами и коллекциями:
case class Address(city: String, zip: String)
case class User(id: Long, name: Option[String], address: Address, tags: Seq[String])
// ExpressionEncoder строит рекурсивное дерево:
// - name: Option[String] -> nullable StringType (null если None)
// - address: Address -> StructType("city" -> StringType, "zip" -> StringType)
// - tags: Seq[String] -> ArrayType(StringType)
val encoder = Encoders.product[User]
println(encoder.schema)
// root
// |-- id: long (nullable = false)
// |-- name: string (nullable = true)
// |-- address: struct (nullable = true)
// | |-- city: string (nullable = true)
// | |-- zip: string (nullable = true)
// |-- tags: array (nullable = true)
// | |-- element: string (containsNull = true)
Вложенный Address сериализуется в вложенный UnsafeRow (как поле variable-length region родительской строки). Seq[String] сериализуется в UnsafeArrayData.
Как Encoder обрабатывает null: AssertNotNull и nullable/non-nullable
Один из самых коварных аспектов ExpressionEncoder — обработка null. В Scala case class поля типа Long или Double по умолчанию non-nullable — они не могут быть null по контракту типа. Но данные из Parquet, CSV или внешних источников могут содержать null в любой колонке.
Когда objDeserializer встречает null в non-nullable поле, генерируется AssertNotNull выражение, которое бросает NullPointerException:
// Схема из файла: id: Long nullable=true
// case class User(id: Long, ...) — Long не может быть null в Scala
val ds = spark.read.parquet("data.parquet").as[User]
ds.show() // Может упасть с NullPointerException если id содержит null
Сгенерированный код десериализатора:
// Фрагмент сгенерированного objDeserializer:
long field0 = row.getLong(0);
if (row.isNullAt(0)) {
throw new NullPointerException("Null value appeared in non-nullable field: id (LongType)");
}
// Если поле Optional в case class (Option[Long]) — null допустим
Long field1 = row.isNullAt(1) ? null : row.getLong(1);
Правильный паттерн для nullable данных:
// Вариант 1: Option[T] для nullable полей
case class User(id: Long, email: Option[String], score: Double)
// email = null -> None, email = "x" -> Some("x")
// Вариант 2: явная проверка до as[T]
spark.read.parquet("data.parquet")
.filter($"id".isNotNull) // фильтр до as[User]
.as[User]
// Вариант 3: заполнить null перед типизацией
spark.read.parquet("data.parquet")
.na.fill(Map("id" -> 0L, "score" -> 0.0))
.as[User]
Паттерн df.as[T] без предварительной обработки null — один из самых частых источников неожиданных NullPointerException в Spark. Проверяйте nullable=true/false в df.schema перед as[CaseClass]. Если схема файла допускает null в поле, которое non-nullable в case class — либо фильтруйте, либо используйте Option[T].
Encoder vs Schema Evolution
ExpressionEncoder не поддерживает schema evolution из коробки. Если вы сохранили данные с case class User(id: Long, name: String) и пытаетесь прочитать с case class User(id: Long, name: String, email: String) — новое поле email не будет найдено, и десериализация упадёт или вернёт null в зависимости от конфигурации.
Для schema evolution при чтении используйте mergeSchema опцию в DataFrameReader + DataFrame + явный cast, или используйте форматы с нативной поддержкой evolution (Avro, Protobuf через соответствующие Spark-интеграции).
RowEncoder: мост между Row и InternalRow
RowEncoder — специальный ExpressionEncoder[Row] для Dataset[Row]. Он конвертирует публичный Row в InternalRow. Именно его вызывает .collect() на DataFrame:
// DataFrame.collect() внутри:
// 1. Каждый executor возвращает Array[InternalRow] (UnsafeRow)
// 2. Driver получает сериализованные bytes
// 3. На driver: bytes -> UnsafeRow -> RowEncoder.deserializer -> Row
val rows: Array[Row] = df.collect()
Row — это публичный интерфейс с объектами: row.getLong(0) возвращает Long (boxed), row.getString(1) — String. InternalRow.getLong(0) — это уже unboxed примитив из UnsafeRow. RowEncoder.deserializer проводит распаковку: (Long) Platform.getLong(...) с boxing при конвертации в Row.
Именно поэтому .collect() всегда немного дороже, чем кажется: помимо сетевой передачи, driver материализует каждую строку в Row объект через RowEncoder. Для аналитики на больших результатах лучше использовать .show(n), write.parquet(path), или .toPandas() с Arrow — они работают с бинарным форматом дольше.
При создании DataFrame из коллекции Row:
val schema = StructType(Seq(StructField("id", LongType), StructField("name", StringType)))
val rows = Seq(Row(1L, "Alice"), Row(2L, "Bob"))
val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
// Внутри: RowEncoder.objSerializer конвертирует Row -> InternalRow на каждом executor
PySpark и Encoder: Arrow-мост
В PySpark Encoder работает иначе. Python-объекты не имеют Scala/Java типов, поэтому PySpark использует Arrow IPC format как промежуточное представление между Python и JVM.
# PySpark DataFrame.collect() упрощённо:
# 1. JVM executor: UnsafeRow -> Arrow RecordBatch (через ArrowColumnVector)
# 2. Сериализация Arrow bytes в Python процесс
# 3. Python: pyarrow.RecordBatch -> pandas DataFrame или Python list
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("encoder-demo").getOrCreate()
df = spark.range(5).toDF("id")
# .collect() использует Arrow если spark.sql.execution.arrow.pyspark.enabled=true
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
pandas_df = df.toPandas() # эффективная конвертация через Arrow
Это объясняет, почему Pandas UDFs (@pandas_udf) значительно быстрее row-level Python UDFs: они передают целые батчи через Arrow вместо сериализации каждой строки по отдельности.
Попробуй сам
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder \
.appName("encoder-exploration") \
.config("spark.sql.codegen.wholeStage", "true") \
.getOrCreate()
# 1. Создать Dataset и посмотреть сгенерированный код
data = [(1, "Alice", 3.14), (2, "Bob", 2.71), (3, "Charlie", 1.41)]
df = spark.createDataFrame(data, ["id", "name", "score"])
# 2. Типизированный фильтр vs Column фильтр — explain показывает разницу
print("=== Column filter (no deserialization) ===")
df.filter(col("score") > 2.0).explain("codegen")
# 3. Посмотреть Encoder schema
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType
schema = StructType([
StructField("id", LongType(), False),
StructField("name", StringType(), True),
StructField("score", DoubleType(), False),
])
df2 = spark.createDataFrame(data, schema)
print("Schema:", df2.schema)
# 4. Проверить, что toDF() не добавляет сериализацию
print("=== Physical plan: filter on DataFrame ===")
df2.filter("score > 2.0").explain("extended")
# В Physical Plan не будет DeserializeToObject — данные остаются в InternalRow
Чтобы увидеть момент, когда появляется десериализация, в Scala напишите:
case class User(id: Long, name: String, score: Double)
val ds = spark.createDataset(data)(Encoders.product[User])
// Этот план БУДЕТ содержать DeserializeToObject и SerializeFromObject
ds.filter(u => u.score > 2.0).explain("extended")
Найдите в выводе узлы DeserializeToObject и SerializeFromObject — это и есть точки конвертации между InternalRow и T.