Scala UDF: производительность JVM
Scala UDF: золотая середина
Scala UDF занимает уникальную позицию в иерархии производительности. В отличие от Python UDF, Scala UDF выполняется нативно на JVM — без сериализации, без socket-передачи, без отдельного процесса. Но в отличие от встроенных функций, Scala UDF не участвует в Catalyst optimization и Whole-Stage Code Generation.
Встроенная функция: JVM CodeGen → ~5ns/row (1x)
Scala UDF: JVM вызов → ~10ns/row (~2x)
Pandas UDF: Arrow batch → ~250ns/row (~5x)
Python UDF: Per-row ser → ~5μs/row (~100x)
Scala UDF — это ~2x overhead по сравнению со встроенными функциями. Для большинства задач это приемлемая цена за кастомную логику.
Почему Scala UDF медленнее встроенных
Хотя Scala UDF выполняется на JVM, два фактора делают его медленнее:
1. Нет Whole-Stage Code Generation
Встроенные функции компилируются в единый оптимизированный Java-метод (Whole-Stage CodeGen). Spark генерирует байткод, который работает напрямую с UnsafeRow — бинарным представлением данных в Tungsten.
Scala UDF — это обычный вызов функции. Данные нужно извлечь из UnsafeRow, конвертировать в Scala-типы, вызвать функцию, конвертировать обратно:
CodeGen (встроенная):
unsafeRow.getUTF8String(0).toUpperCase() // прямой доступ
Scala UDF:
val input = row.get(0, StringType) // извлечение
val scalaStr = input.toString // конвертация
val result = userFunction(scalaStr) // вызов UDF
val output = UTF8String.fromString(result) // конвертация обратно
2. Нет Catalyst optimization
Catalyst не может заглянуть внутрь Scala UDF (как и внутрь Python UDF). Predicate pushdown, constant folding и другие оптимизации не применяются через UDF-boundary.
Синтаксис Scala UDF
Регистрация и использование
import org.apache.spark.sql.functions.udf
// Лямбда-стиль (до 10 аргументов)
val normalizeNameUDF = udf((name: String) => {
if (name == null) null
else name.trim.split("\\s+").map(_.capitalize).mkString(" ")
})
// Применение к DataFrame
df.withColumn("clean_name", normalizeNameUDF($"name"))
// Типизированный UDF с явным указанием типов
import org.apache.spark.sql.expressions.UserDefinedFunction
val calculateBMI: UserDefinedFunction = udf(
(weight: Double, height: Double) => {
if (height <= 0) null.asInstanceOf[Double]
else weight / (height * height)
}
)
df.withColumn("bmi", calculateBMI($"weight_kg", $"height_m"))
Регистрация для SQL
// Для использования в spark.sql()
spark.udf.register("normalize_name", (name: String) => {
if (name == null) null
else name.trim.split("\\s+").map(_.capitalize).mkString(" ")
})
spark.sql("SELECT normalize_name(name) FROM employees")
Работа со сложными типами
// UDF, возвращающий массив
val splitAndClean = udf((text: String) => {
if (text == null) Array.empty[String]
else text.split(",").map(_.trim.toLowerCase).filter(_.nonEmpty)
})
// UDF, принимающий Map
val lookupValue = udf((mapping: Map[String, String], key: String) => {
mapping.getOrElse(key, "unknown")
})
Scala UDF vs Python в PySpark-проектах: Если ваша команда пишет на PySpark, Scala UDF можно упаковать в JAR и подключить через spark.jars. Это оправдано для горячих путей (hot paths), где UDF вызывается миллиарды раз и разница в 50x между Python UDF и Scala UDF критична. Для редких вычислений Pandas UDF достаточно.
Когда Scala UDF оправдан
| Сценарий | Рекомендация | Почему |
|---|---|---|
| Простая трансформация | Встроенная функция | Code generation + Catalyst |
| Сложная бизнес-логика без Python-зависимостей | Scala UDF | ~2x overhead, нет сериализации |
| ML-инференс (scikit-learn, PyTorch) | Pandas UDF | Нужен Python-экосистема |
| Интеграция с Java-библиотекой | Scala UDF | Прямой доступ к JVM |
| Кастомная валидация (ИНН, ОГРН) | Scala UDF | Type safety + JVM performance |
| Операция с внешним Python API | Pandas UDF (Iterator) | Stateful инициализация |
Ограничения Scala UDF
- Нет Catalyst optimization — как и Python UDF, Scala UDF является чёрным ящиком для оптимизатора
- Требует Scala knowledge — не все data engineering команды владеют Scala
- Перекомпиляция — изменения в UDF требуют пересборки JAR и перезапуска Spark-приложения
- Нет code generation — отсутствие Whole-Stage CodeGen означает ~2x overhead по сравнению со встроенными функциями
- Null safety — Scala UDF должен явно обрабатывать null-значения (NullPointerException)
Паттерн: Scala UDF из JAR в PySpark
Для PySpark-проектов, где нужна JVM-производительность:
// 1. Создайте Scala UDF в отдельном проекте (build.sbt)
package com.company.spark.udfs
import org.apache.spark.sql.api.java.UDF1
class ValidateINN extends UDF1[String, Boolean] {
override def call(inn: String): Boolean = {
if (inn == null || (inn.length != 10 && inn.length != 12)) false
else {
// ... логика валидации контрольных сумм
true
}
}
}
# 2. В PySpark: подключите JAR и зарегистрируйте
spark = SparkSession.builder \
.config("spark.jars", "/path/to/company-udfs.jar") \
.getOrCreate()
spark.udf.registerJavaFunction(
"validate_inn",
"com.company.spark.udfs.ValidateINN",
BooleanType()
)
# Используйте в SQL
df = spark.sql("SELECT *, validate_inn(inn) as is_valid FROM partners")
Этот подход даёт JVM-производительность Scala UDF с удобством разработки PySpark.
Что дальше?
В следующем уроке мы сведём всё вместе в benchmark-сравнении с CSS-визуализацией: встроенные функции, Scala UDF, Pandas UDF и Python UDF на одном графике, плюс матрица принятия решений для выбора подхода.