Kryo и Java-сериализация
Tungsten и ExpressionEncoder решают проблему сериализации данных внутри движка — между операторами, при shuffle. Но у Spark есть второй, принципиально иной контекст сериализации: задачи и замыкания. Когда вы пишете rdd.map(x => x + myOffset), Spark должен передать эту лямбду вместе со всем её контекстом (переменной myOffset) на каждый executor. Именно здесь живёт spark.serializer, KryoSerializer, ClosureCleaner и самая популярная ошибка Spark — Task not serializable.
Два контекста сериализации
Прежде чем погружаться в детали, важно разграничить два совершенно разных использования сериализации в Spark:
Контекст 1: Shuffle-сериализация данных — перемещение InternalRow / UnsafeRow между partition-ами и executor-ами при groupBy, join, repartition. Для этого Spark использует внутренний бинарный формат Tungsten, минуя spark.serializer. Этот контекст мы разобрали в предыдущих уроках.
Контекст 2: Task-сериализация — отправка задач (task-ов) с driver-а на executor-ы. Каждый task содержит замыкание (closure) — лямбду с захваченными переменными. Также сериализация используется для: broadcast-переменных (драйвер отправляет большой объект всем executor-ам), accumulator-ов, и иногда для shuffle-данных в RDD API. Вот здесь работает spark.serializer.
# spark.serializer влияет ТОЛЬКО на этот контекст:
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
JavaSerializer: медленно, но надёжно
По умолчанию Spark использует org.apache.spark.serializer.JavaSerializer. Это обёртка над стандартной Java-сериализацией: ObjectOutputStream / ObjectInputStream.
Java-сериализация работает через reflection в runtime: для каждого объекта она записывает имя класса, версию (serialVersionUID), все поля рекурсивно. Формат включает значительный overhead:
- Имя класса: минимум 10-15 байт для каждого нового типа в потоке
- Header для каждого объекта: 4 байта magic + class descriptor
- Backreference table: отслеживание уже сериализованных объектов для циклических ссылок
- Поля: ключи (имена полей) + значения
Для task-сериализации это приемлемо — задачи отправляются один раз при старте. Но для RDD-операций, где объекты сериализуются миллиарды раз (например, при использовании spark.rdd.compress), Java-сериализация становится узким местом.
KryoSerializer: быстро, но требует аккуратности
org.apache.spark.serializer.KryoSerializer использует библиотеку Kryo. Kryo работает по-другому: он присваивает каждому зарегистрированному классу числовой ID и использует этот ID вместо имени класса. Нет reflection в hot path — только прямая запись полей по заранее определённому layout.
Сравнение производительности:
| Операция | JavaSerializer | KryoSerializer |
|---|---|---|
| Сериализация POJO 5 полей | ~1.5 мкс | ~0.15 мкс |
| Размер для 1M объектов | 180+ байт/объект | 20-50 байт/объект |
| Cold-start overhead | нет (reflection) | регистрация классов |
Включение Kryo:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("kryo-demo") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryo.registrationRequired", "false") \
.getOrCreate()
spark.kryo.registrationRequired=false (дефолт) — Kryo использует имя класса как fallback если класс не зарегистрирован. Это менее эффективно, но позволяет не регистрировать все классы вручную. Для максимальной производительности регистрируйте все классы явно и устанавливайте registrationRequired=true.
Регистрация классов в Kryo
При регистрации Kryo присваивает классу числовой ID (начиная с 10 — первые 10 зарезервированы для базовых типов). При сериализации вместо имени "com.example.UserEvent" (22 байта) записывается число 11 (1-2 байта).
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
class MyKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
// Порядок регистрации важен: ID присваивается по порядку
kryo.register(classOf[UserEvent])
kryo.register(classOf[SessionData])
kryo.register(classOf[Array[UserEvent]])
// Массивы примитивов Kryo умеет эффективно сериализовать out-of-the-box
kryo.register(classOf[Array[Long]])
kryo.register(classOf[Array[Double]])
}
}
# В SparkConf:
spark.conf.set("spark.kryo.registrator", "com.example.MyKryoRegistrator")
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
Важно: если вы меняете порядок регистрации между версиями приложения, ID-ы сдвигаются, и сохранённые checkpoint-ы или shuffle-файлы становятся нечитаемыми. Для production рекомендуется фиксировать ID явно:
kryo.register(classOf[UserEvent], 11)
kryo.register(classOf[SessionData], 12)
// Явные ID: не зависят от порядка в коде
Сериализация замыканий: ClosureCleaner
Теперь о самой коварной части. Когда вы пишете:
val multiplier = 5
val rdd = sc.parallelize(1 to 1000)
val result = rdd.map(x => x * multiplier)
Лямбда x => x * multiplier — это анонимный класс Scala. Компилятор создаёт примерно такой класс:
// Что компилятор Scala генерирует для лямбды:
class AnonFunction1$1 implements Function1<Integer, Integer>, Serializable {
private final int multiplier; // захваченная переменная
AnonFunction1$1(int multiplier) {
this.multiplier = multiplier;
}
public Integer apply(Integer x) {
return x * multiplier;
}
}
Spark должен сериализовать этот анонимный класс и отправить его на каждый executor. Для этого класс должен реализовывать java.io.Serializable (что Scala лямбды и делают).
Проблема возникает, когда лямбда захватывает не сериализуемый объект:
class DataProcessor(val connection: DBConnection) {
// DBConnection не реализует Serializable!
def process(rdd: RDD[Int]): RDD[Int] = {
rdd.map(x => x + this.connection.fetchOffset())
// ^^ лямбда захватывает 'this' (DataProcessor), который содержит DBConnection
}
}
Это приводит к org.apache.spark.SparkException: Task not serializable с NotSerializableException: DBConnection в стеке.
Как ClosureCleaner спасает
org.apache.spark.util.ClosureCleaner — компонент, который Spark запускает на каждой лямбде перед сериализацией. Он анализирует байткод замыкания через ASM (библиотека для анализа Java bytecode) и пытается минимизировать захваченный контекст.
Алгоритм ClosureCleaner:
- Получить байткод анонимного класса через
getClass.getName+ classloader - Через ASM просканировать все
GETFIELDинструкции — найти, какие поля$outer(внешний объект) реально используются - Если
$outerиспользуется только для чтения конкретного поля — попытаться заменить ссылку на$outerпрямой ссылкой на значение поля - Если
$outerсодержит несериализуемые поля, которые не используются в лямбде — обнулить их через reflection
// ClosureCleaner может спасти этот случай:
class DataProcessor(val threshold: Int, val connection: DBConnection) {
def process(rdd: RDD[Int]): RDD[Int] = {
rdd.map(x => x + threshold)
// Лямбда использует только threshold (Int, сериализуемый)
// ClosureCleaner обнулит connection в захваченном $outer
}
}
Но не этот:
class DataProcessor(val connection: DBConnection) {
def process(rdd: RDD[Int]): RDD[Int] = {
rdd.map(x => x + connection.fetchOffset())
// Лямбда РЕАЛЬНО использует connection — ClosureCleaner не может помочь
}
}
Типичные паттерны NotSerializableException
Паттерн 1: Захват this через ссылку на метод
class EventProcessor(val config: Config) {
// Config не Serializable!
def transform(x: Int): Int = x * config.factor // метод класса
def process(rdd: RDD[Int]): RDD[Int] = {
rdd.map(transform) // эквивалентно: rdd.map(this.transform)
// Spark захватывает 'this', а через него — весь EventProcessor
}
}
// Правильно:
def process(rdd: RDD[Int]): RDD[Int] = {
val factor = config.factor // вытащить нужное значение в локальную переменную
rdd.map(x => x * factor) // лямбда захватывает только Int
}
Паттерн 2: Logger в enclosing class
class DataPipeline {
private val logger = LogManager.getLogger(getClass) // Logger не Serializable!
def process(rdd: RDD[String]): RDD[String] = {
rdd.map { line =>
logger.debug(s"Processing: $line") // захват logger
line.toUpperCase
}
}
}
// Правильно: создать logger на executor-е
def process(rdd: RDD[String]): RDD[String] = {
rdd.map { line =>
val log = LogManager.getLogger("executor") // создаётся на executor
log.debug(s"Processing: $line")
line.toUpperCase
}
}
Паттерн 3: Nested class без static модификатора
// В Java: nested non-static class содержит implicit ref на outer instance
public class Outer {
class Inner implements Serializable {
int process(int x) { return x; }
// Inner несёт ссылку на Outer даже если не использует её!
}
}
// Правильно: статический nested класс
public class Outer {
static class Inner implements Serializable {
int process(int x) { return x; }
}
}
Паттерн 4: Broadcast как антидот для больших объектов
Если вам нужно передать большой несериализуемый (или просто большой) объект на executors, используйте broadcast:
// Плохо: большой словарь сериализуется отдельно для КАЖДОГО task
val dict = loadLargeDict() // Map[String, Int], 100MB
rdd.map(x => dict.getOrElse(x, 0))
// Хорошо: словарь отправляется один раз, executor кэширует копию
val dictBC = sc.broadcast(loadLargeDict())
rdd.map(x => dictBC.value.getOrElse(x, 0))
Broadcast использует BitTorrent-подобный протокол: driver отправляет файл одному executor, тот передаёт другим, минимизируя нагрузку на driver.
spark.kryo.classesToRegister: краткий способ регистрации
Для простых случаев можно не писать KryoRegistrator:
spark.conf.set("spark.kryo.classesToRegister",
"com.example.UserEvent,com.example.SessionData")
Это удобно для небольшого числа классов, но не позволяет контролировать порядок/ID.
Буферы Kryo: tune для больших объектов
Kryo использует буфер при сериализации. Если объект больше буфера — исключение:
com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 65536, required: 131072
Конфигурация:
# Размер начального буфера (по умолчанию 2MB в Spark 4.0)
spark.conf.set("spark.kryoserializer.buffer", "64m")
# Максимальный буфер (по умолчанию 64MB)
spark.conf.set("spark.kryoserializer.buffer.max", "256m")
Увеличение buffer.max — это memory overhead на каждый thread каждого executor. При 50 task-slots на executor и buffer.max=256m вы добавляете 50 * 256MB = 12.5GB потенциального overhead. На практике буферы растут лениво, но будьте осторожны с большими значениями.
Отладка: как найти несериализуемый объект
Когда вы видите Task not serializable, стек часто показывает только верхний класс. Чтобы найти точную причину:
import org.apache.spark.util.ClosureCleaner
import java.io.{ObjectOutputStream, ByteArrayOutputStream}
// Метод 1: явная сериализация перед отправкой task
def testSerializable(obj: AnyRef): Unit = {
val baos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(baos)
try {
oos.writeObject(obj)
println(s"OK: ${obj.getClass.getName} is serializable, size=${baos.size()} bytes")
} catch {
case e: java.io.NotSerializableException =>
println(s"FAIL: ${e.getMessage} is not serializable")
e.printStackTrace()
}
}
// Применяем к подозрительной лямбде:
val closure = (x: Int) => x * config.factor
testSerializable(closure)
# PySpark: проверить через pickle (PySpark сериализует замыкания через cloudpickle)
import cloudpickle
import pickle
def my_func(x):
return x * external_factor
try:
data = cloudpickle.dumps(my_func)
print(f"OK: serializable, size={len(data)} bytes")
# Проверить десериализацию
recovered = pickle.loads(data)
print(f"Round-trip OK: {recovered(5)}")
except Exception as e:
print(f"FAIL: {e}")
PySpark и cloudpickle: свой мир сериализации
В PySpark замыкания сериализуются через cloudpickle — расширение стандартного pickle, которое умеет сериализовывать lambda-функции, closures, и даже динамически определённые классы.
# PySpark аналог ClosureCleaner: cloudpickle автоматически захватывает контекст
external_db = connect_to_db() # не сериализуемо через pickle!
def process(row):
return row['value'] + external_db.fetch_offset() # захват external_db
# Это упадёт с PicklingError:
rdd.map(process).collect()
Паттерн решения в PySpark:
# Правильно: инициализировать ресурс на executor, не захватывать из driver
import os
def process_with_db(row):
# Создаём соединение на executor при первом вызове
if not hasattr(process_with_db, '_db'):
process_with_db._db = connect_to_db()
return row['value'] + process_with_db._db.fetch_offset()
rdd.map(process_with_db).collect()
# Или через mapPartitions (одно соединение на partition, а не на строку):
def process_partition(rows):
db = connect_to_db()
try:
for row in rows:
yield row['value'] + db.fetch_offset()
finally:
db.close()
rdd.mapPartitions(process_partition).collect()
mapPartitions — один из наиболее эффективных паттернов для resource-heavy операций: connection pool, ML-модели, внешние клиенты. Один экземпляр на partition вместо одного на строку.
Benchmark: Java vs Kryo для RDD-операций
import org.apache.spark.SparkConf
// Измерение сериализации broadcast-переменной
val N = 1_000_000
case class Event(id: Long, name: String, value: Double)
val events = (1 to N).map(i => Event(i, s"event_$i", i.toDouble))
// Java serializer
val confJava = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
val scJava = new SparkContext(confJava)
val t1 = System.nanoTime()
val bc1 = scJava.broadcast(events)
println(s"Java broadcast: ${(System.nanoTime()-t1)/1e6}ms")
scJava.stop()
// Kryo serializer
val confKryo = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val scKryo = new SparkContext(confKryo)
val t2 = System.nanoTime()
val bc2 = scKryo.broadcast(events)
println(s"Kryo broadcast: ${(System.nanoTime()-t2)/1e6}ms")
// Типичный результат: Kryo в 3-5x быстрее Java для POJO
scKryo.stop()
Попробуй сам
from pyspark.sql import SparkSession
import pickle
import cloudpickle
spark = SparkSession.builder \
.appName("closure-debug") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.getOrCreate()
sc = spark.sparkContext
# Упражнение 1: безопасное замыкание
threshold = 100
rdd = sc.parallelize(range(200))
result = rdd.filter(lambda x: x > threshold).count()
print(f"Filtered: {result}") # 99
# Упражнение 2: проверить сериализацию через cloudpickle
class Config:
def __init__(self, offset):
self.offset = offset
config = Config(10)
safe_lambda = lambda x: x + config.offset
pickled = cloudpickle.dumps(safe_lambda)
print(f"Lambda size: {len(pickled)} bytes")
recovered = pickle.loads(pickled)
print(f"Result after round-trip: {recovered(5)}") # 15
# Упражнение 3: mapPartitions vs map для expensive resources
import time
def expensive_init_map(x):
# Имитация дорогой инициализации на КАЖДУЮ строку
time.sleep(0.001) # 1ms overhead
return x * 2
def expensive_init_mappartitions(rows):
# Инициализация ОДИН РАЗ на partition
time.sleep(0.001) # 1ms overhead
for row in rows:
yield row * 2
data = sc.parallelize(range(100), 4) # 4 партиции
t1 = time.time()
data.map(expensive_init_map).count()
print(f"map: {time.time()-t1:.3f}s") # ~100ms
t2 = time.time()
data.mapPartitions(expensive_init_mappartitions).count()
print(f"mapPartitions: {time.time()-t2:.3f}s") # ~4ms (4 партиции)