Learning Platform
Глоссарий Troubleshooting
Урок 08.03 · 30 мин
Продвинутый
KryoSerializerJavaSerializerClosureCleanerClosureNotSerializableExceptionspark.serializer

Kryo и Java-сериализация

Tungsten и ExpressionEncoder решают проблему сериализации данных внутри движка — между операторами, при shuffle. Но у Spark есть второй, принципиально иной контекст сериализации: задачи и замыкания. Когда вы пишете rdd.map(x => x + myOffset), Spark должен передать эту лямбду вместе со всем её контекстом (переменной myOffset) на каждый executor. Именно здесь живёт spark.serializer, KryoSerializer, ClosureCleaner и самая популярная ошибка Spark — Task not serializable.

Два контекста сериализации

Прежде чем погружаться в детали, важно разграничить два совершенно разных использования сериализации в Spark:

Контекст 1: Shuffle-сериализация данных — перемещение InternalRow / UnsafeRow между partition-ами и executor-ами при groupBy, join, repartition. Для этого Spark использует внутренний бинарный формат Tungsten, минуя spark.serializer. Этот контекст мы разобрали в предыдущих уроках.

Контекст 2: Task-сериализация — отправка задач (task-ов) с driver-а на executor-ы. Каждый task содержит замыкание (closure) — лямбду с захваченными переменными. Также сериализация используется для: broadcast-переменных (драйвер отправляет большой объект всем executor-ам), accumulator-ов, и иногда для shuffle-данных в RDD API. Вот здесь работает spark.serializer.

# spark.serializer влияет ТОЛЬКО на этот контекст:
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

JavaSerializer: медленно, но надёжно

По умолчанию Spark использует org.apache.spark.serializer.JavaSerializer. Это обёртка над стандартной Java-сериализацией: ObjectOutputStream / ObjectInputStream.

Java-сериализация работает через reflection в runtime: для каждого объекта она записывает имя класса, версию (serialVersionUID), все поля рекурсивно. Формат включает значительный overhead:

  • Имя класса: минимум 10-15 байт для каждого нового типа в потоке
  • Header для каждого объекта: 4 байта magic + class descriptor
  • Backreference table: отслеживание уже сериализованных объектов для циклических ссылок
  • Поля: ключи (имена полей) + значения

Для task-сериализации это приемлемо — задачи отправляются один раз при старте. Но для RDD-операций, где объекты сериализуются миллиарды раз (например, при использовании spark.rdd.compress), Java-сериализация становится узким местом.

KryoSerializer: быстро, но требует аккуратности

org.apache.spark.serializer.KryoSerializer использует библиотеку Kryo. Kryo работает по-другому: он присваивает каждому зарегистрированному классу числовой ID и использует этот ID вместо имени класса. Нет reflection в hot path — только прямая запись полей по заранее определённому layout.

Сравнение производительности:

ОперацияJavaSerializerKryoSerializer
Сериализация POJO 5 полей~1.5 мкс~0.15 мкс
Размер для 1M объектов180+ байт/объект20-50 байт/объект
Cold-start overheadнет (reflection)регистрация классов

Включение Kryo:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("kryo-demo") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrationRequired", "false") \
    .getOrCreate()
TIP

spark.kryo.registrationRequired=false (дефолт) — Kryo использует имя класса как fallback если класс не зарегистрирован. Это менее эффективно, но позволяет не регистрировать все классы вручную. Для максимальной производительности регистрируйте все классы явно и устанавливайте registrationRequired=true.

Регистрация классов в Kryo

При регистрации Kryo присваивает классу числовой ID (начиная с 10 — первые 10 зарезервированы для базовых типов). При сериализации вместо имени "com.example.UserEvent" (22 байта) записывается число 11 (1-2 байта).

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    // Порядок регистрации важен: ID присваивается по порядку
    kryo.register(classOf[UserEvent])
    kryo.register(classOf[SessionData])
    kryo.register(classOf[Array[UserEvent]])
    // Массивы примитивов Kryo умеет эффективно сериализовать out-of-the-box
    kryo.register(classOf[Array[Long]])
    kryo.register(classOf[Array[Double]])
  }
}
# В SparkConf:
spark.conf.set("spark.kryo.registrator", "com.example.MyKryoRegistrator")
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

Важно: если вы меняете порядок регистрации между версиями приложения, ID-ы сдвигаются, и сохранённые checkpoint-ы или shuffle-файлы становятся нечитаемыми. Для production рекомендуется фиксировать ID явно:

kryo.register(classOf[UserEvent], 11)
kryo.register(classOf[SessionData], 12)
// Явные ID: не зависят от порядка в коде

Сериализация замыканий: ClosureCleaner

Теперь о самой коварной части. Когда вы пишете:

val multiplier = 5
val rdd = sc.parallelize(1 to 1000)
val result = rdd.map(x => x * multiplier)

Лямбда x => x * multiplier — это анонимный класс Scala. Компилятор создаёт примерно такой класс:

// Что компилятор Scala генерирует для лямбды:
class AnonFunction1$1 implements Function1<Integer, Integer>, Serializable {
    private final int multiplier;  // захваченная переменная
    
    AnonFunction1$1(int multiplier) {
        this.multiplier = multiplier;
    }
    
    public Integer apply(Integer x) {
        return x * multiplier;
    }
}

Spark должен сериализовать этот анонимный класс и отправить его на каждый executor. Для этого класс должен реализовывать java.io.Serializable (что Scala лямбды и делают).

Проблема возникает, когда лямбда захватывает не сериализуемый объект:

class DataProcessor(val connection: DBConnection) {
    // DBConnection не реализует Serializable!
    def process(rdd: RDD[Int]): RDD[Int] = {
        rdd.map(x => x + this.connection.fetchOffset())
        // ^^ лямбда захватывает 'this' (DataProcessor), который содержит DBConnection
    }
}

Это приводит к org.apache.spark.SparkException: Task not serializable с NotSerializableException: DBConnection в стеке.

Как ClosureCleaner спасает

org.apache.spark.util.ClosureCleaner — компонент, который Spark запускает на каждой лямбде перед сериализацией. Он анализирует байткод замыкания через ASM (библиотека для анализа Java bytecode) и пытается минимизировать захваченный контекст.

Алгоритм ClosureCleaner:

  1. Получить байткод анонимного класса через getClass.getName + classloader
  2. Через ASM просканировать все GETFIELD инструкции — найти, какие поля $outer (внешний объект) реально используются
  3. Если $outer используется только для чтения конкретного поля — попытаться заменить ссылку на $outer прямой ссылкой на значение поля
  4. Если $outer содержит несериализуемые поля, которые не используются в лямбде — обнулить их через reflection
// ClosureCleaner может спасти этот случай:
class DataProcessor(val threshold: Int, val connection: DBConnection) {
    def process(rdd: RDD[Int]): RDD[Int] = {
        rdd.map(x => x + threshold)
        // Лямбда использует только threshold (Int, сериализуемый)
        // ClosureCleaner обнулит connection в захваченном $outer
    }
}

Но не этот:

class DataProcessor(val connection: DBConnection) {
    def process(rdd: RDD[Int]): RDD[Int] = {
        rdd.map(x => x + connection.fetchOffset())
        // Лямбда РЕАЛЬНО использует connection — ClosureCleaner не может помочь
    }
}
ClosureCleaner: анализ и очистка замыканий
Исходная лямбда (с $outer ref)Лямбда: анонимный Scala-класс с полями $outer (ссылка на enclosing class), захваченными переменными. Перед сериализацией Spark вызывает ClosureCleaner.clean(closure).
ClosureCleaner.clean()
$outer не используется -> обнулитьСлучай 1: Лямбда использует только примитивные/сериализуемые поля outer. ClosureCleaner копирует значения в новые поля лямбды, обнуляет $outer. Результат сериализуем.
Сериализуемая лямбда -> executorОчищенная лямбда: содержит только сериализуемые данные. Успешно отправляется на executor.
$outer несериализуем и нуженСлучай 2: Лямбда реально использует несериализуемый объект из $outer. ClosureCleaner не может помочь — бросает NotSerializableException с описанием проблемного класса.
NotSerializableExceptionSparkException: Task not serializable. В stack trace будет NotSerializableException с именем класса, который не реализует Serializable. Нужен рефакторинг кода.

Типичные паттерны NotSerializableException

Паттерн 1: Захват this через ссылку на метод

class EventProcessor(val config: Config) {
    // Config не Serializable!
    
    def transform(x: Int): Int = x * config.factor  // метод класса
    
    def process(rdd: RDD[Int]): RDD[Int] = {
        rdd.map(transform)  // эквивалентно: rdd.map(this.transform)
        // Spark захватывает 'this', а через него — весь EventProcessor
    }
}

// Правильно:
def process(rdd: RDD[Int]): RDD[Int] = {
    val factor = config.factor  // вытащить нужное значение в локальную переменную
    rdd.map(x => x * factor)    // лямбда захватывает только Int
}

Паттерн 2: Logger в enclosing class

class DataPipeline {
    private val logger = LogManager.getLogger(getClass)  // Logger не Serializable!
    
    def process(rdd: RDD[String]): RDD[String] = {
        rdd.map { line =>
            logger.debug(s"Processing: $line")  // захват logger
            line.toUpperCase
        }
    }
}

// Правильно: создать logger на executor-е
def process(rdd: RDD[String]): RDD[String] = {
    rdd.map { line =>
        val log = LogManager.getLogger("executor")  // создаётся на executor
        log.debug(s"Processing: $line")
        line.toUpperCase
    }
}

Паттерн 3: Nested class без static модификатора

// В Java: nested non-static class содержит implicit ref на outer instance
public class Outer {
    class Inner implements Serializable {
        int process(int x) { return x; }
        // Inner несёт ссылку на Outer даже если не использует её!
    }
}

// Правильно: статический nested класс
public class Outer {
    static class Inner implements Serializable {
        int process(int x) { return x; }
    }
}

Паттерн 4: Broadcast как антидот для больших объектов

Если вам нужно передать большой несериализуемый (или просто большой) объект на executors, используйте broadcast:

// Плохо: большой словарь сериализуется отдельно для КАЖДОГО task
val dict = loadLargeDict()  // Map[String, Int], 100MB
rdd.map(x => dict.getOrElse(x, 0))

// Хорошо: словарь отправляется один раз, executor кэширует копию
val dictBC = sc.broadcast(loadLargeDict())
rdd.map(x => dictBC.value.getOrElse(x, 0))

Broadcast использует BitTorrent-подобный протокол: driver отправляет файл одному executor, тот передаёт другим, минимизируя нагрузку на driver.

spark.kryo.classesToRegister: краткий способ регистрации

Для простых случаев можно не писать KryoRegistrator:

spark.conf.set("spark.kryo.classesToRegister", 
               "com.example.UserEvent,com.example.SessionData")

Это удобно для небольшого числа классов, но не позволяет контролировать порядок/ID.

Буферы Kryo: tune для больших объектов

Kryo использует буфер при сериализации. Если объект больше буфера — исключение:

com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 65536, required: 131072

Конфигурация:

# Размер начального буфера (по умолчанию 2MB в Spark 4.0)
spark.conf.set("spark.kryoserializer.buffer", "64m")
# Максимальный буфер (по умолчанию 64MB)
spark.conf.set("spark.kryoserializer.buffer.max", "256m")
WARNING

Увеличение buffer.max — это memory overhead на каждый thread каждого executor. При 50 task-slots на executor и buffer.max=256m вы добавляете 50 * 256MB = 12.5GB потенциального overhead. На практике буферы растут лениво, но будьте осторожны с большими значениями.

Отладка: как найти несериализуемый объект

Когда вы видите Task not serializable, стек часто показывает только верхний класс. Чтобы найти точную причину:

import org.apache.spark.util.ClosureCleaner
import java.io.{ObjectOutputStream, ByteArrayOutputStream}

// Метод 1: явная сериализация перед отправкой task
def testSerializable(obj: AnyRef): Unit = {
    val baos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(baos)
    try {
        oos.writeObject(obj)
        println(s"OK: ${obj.getClass.getName} is serializable, size=${baos.size()} bytes")
    } catch {
        case e: java.io.NotSerializableException =>
            println(s"FAIL: ${e.getMessage} is not serializable")
            e.printStackTrace()
    }
}

// Применяем к подозрительной лямбде:
val closure = (x: Int) => x * config.factor
testSerializable(closure)
# PySpark: проверить через pickle (PySpark сериализует замыкания через cloudpickle)
import cloudpickle
import pickle

def my_func(x):
    return x * external_factor

try:
    data = cloudpickle.dumps(my_func)
    print(f"OK: serializable, size={len(data)} bytes")
    # Проверить десериализацию
    recovered = pickle.loads(data)
    print(f"Round-trip OK: {recovered(5)}")
except Exception as e:
    print(f"FAIL: {e}")

PySpark и cloudpickle: свой мир сериализации

В PySpark замыкания сериализуются через cloudpickle — расширение стандартного pickle, которое умеет сериализовывать lambda-функции, closures, и даже динамически определённые классы.

# PySpark аналог ClosureCleaner: cloudpickle автоматически захватывает контекст
external_db = connect_to_db()  # не сериализуемо через pickle!

def process(row):
    return row['value'] + external_db.fetch_offset()  # захват external_db

# Это упадёт с PicklingError:
rdd.map(process).collect()

Паттерн решения в PySpark:

# Правильно: инициализировать ресурс на executor, не захватывать из driver
import os

def process_with_db(row):
    # Создаём соединение на executor при первом вызове
    if not hasattr(process_with_db, '_db'):
        process_with_db._db = connect_to_db()
    return row['value'] + process_with_db._db.fetch_offset()

rdd.map(process_with_db).collect()

# Или через mapPartitions (одно соединение на partition, а не на строку):
def process_partition(rows):
    db = connect_to_db()
    try:
        for row in rows:
            yield row['value'] + db.fetch_offset()
    finally:
        db.close()

rdd.mapPartitions(process_partition).collect()

mapPartitions — один из наиболее эффективных паттернов для resource-heavy операций: connection pool, ML-модели, внешние клиенты. Один экземпляр на partition вместо одного на строку.

Benchmark: Java vs Kryo для RDD-операций

import org.apache.spark.SparkConf

// Измерение сериализации broadcast-переменной
val N = 1_000_000
case class Event(id: Long, name: String, value: Double)
val events = (1 to N).map(i => Event(i, s"event_$i", i.toDouble))

// Java serializer
val confJava = new SparkConf()
  .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
val scJava = new SparkContext(confJava)
val t1 = System.nanoTime()
val bc1 = scJava.broadcast(events)
println(s"Java broadcast: ${(System.nanoTime()-t1)/1e6}ms")
scJava.stop()

// Kryo serializer
val confKryo = new SparkConf()
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val scKryo = new SparkContext(confKryo)
val t2 = System.nanoTime()
val bc2 = scKryo.broadcast(events)
println(s"Kryo broadcast: ${(System.nanoTime()-t2)/1e6}ms")
// Типичный результат: Kryo в 3-5x быстрее Java для POJO
scKryo.stop()

Попробуй сам

from pyspark.sql import SparkSession
import pickle
import cloudpickle

spark = SparkSession.builder \
    .appName("closure-debug") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()
sc = spark.sparkContext

# Упражнение 1: безопасное замыкание
threshold = 100
rdd = sc.parallelize(range(200))
result = rdd.filter(lambda x: x > threshold).count()
print(f"Filtered: {result}")  # 99

# Упражнение 2: проверить сериализацию через cloudpickle
class Config:
    def __init__(self, offset):
        self.offset = offset

config = Config(10)
safe_lambda = lambda x: x + config.offset

pickled = cloudpickle.dumps(safe_lambda)
print(f"Lambda size: {len(pickled)} bytes")
recovered = pickle.loads(pickled)
print(f"Result after round-trip: {recovered(5)}")  # 15

# Упражнение 3: mapPartitions vs map для expensive resources
import time

def expensive_init_map(x):
    # Имитация дорогой инициализации на КАЖДУЮ строку
    time.sleep(0.001)  # 1ms overhead
    return x * 2

def expensive_init_mappartitions(rows):
    # Инициализация ОДИН РАЗ на partition
    time.sleep(0.001)  # 1ms overhead
    for row in rows:
        yield row * 2

data = sc.parallelize(range(100), 4)  # 4 партиции

t1 = time.time()
data.map(expensive_init_map).count()
print(f"map: {time.time()-t1:.3f}s")  # ~100ms

t2 = time.time()
data.mapPartitions(expensive_init_mappartitions).count()
print(f"mapPartitions: {time.time()-t2:.3f}s")  # ~4ms (4 партиции)
Проверка знанийKnowledge check
Вы переключили spark.serializer на KryoSerializer и установили spark.kryo.registrationRequired=true. Job упал с KryoException: Class is not registered: com.example.AnalyticsEvent. Kryo-регистратор явно регистрирует AnalyticsEvent. В чём проблема?
ОтветAnswer
Скорее всего AnalyticsEvent содержит поля других классов, которые также не зарегистрированы. При registrationRequired=true Kryo требует регистрации ВСЕХ классов в объектном графе — не только верхнего класса, но и типов всех его полей (рекурсивно). Например, если AnalyticsEvent содержит поле List<Tag>, то Tag тоже должен быть зарегистрирован, и так далее. Решение: (1) посмотреть в стеке KryoException — там будет имя незарегистрированного класса; (2) добавить его в KryoRegistrator; (3) повторить до полного исчезновения ошибок. Альтернатива: использовать регистрацию через kryo.register(classOf[AnalyticsEvent]) не в порядке 'только что известные классы', а с явным обходом объектного графа. Инструмент kryo-serializers (GitHub: EsotericSoftware) предоставляет утилиту для автоматического обнаружения всех классов в графе.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. spark.serializer влияет на сериализацию данных при shuffle в DataFrame API (groupBy, join). Верно ли это?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4