Learning Platform
Глоссарий Troubleshooting
Урок 05.04 · 10 мин
Средний
Scala UDFJVMType SafetyCatalyst Integration

Scala UDF: производительность JVM

Scala UDF: золотая середина

Scala UDF занимает уникальную позицию в иерархии производительности. В отличие от Python UDF, Scala UDF выполняется нативно на JVM — без сериализации, без socket-передачи, без отдельного процесса. Но в отличие от встроенных функций, Scala UDF не участвует в Catalyst optimization и Whole-Stage Code Generation.

Встроенная функция: JVM CodeGen → ~5ns/row    (1x)
Scala UDF:          JVM вызов   → ~10ns/row   (~2x)
Pandas UDF:         Arrow batch → ~250ns/row  (~5x)
Python UDF:         Per-row ser → ~5μs/row    (~100x)

Scala UDF — это ~2x overhead по сравнению со встроенными функциями. Для большинства задач это приемлемая цена за кастомную логику.

Почему Scala UDF медленнее встроенных

Хотя Scala UDF выполняется на JVM, два фактора делают его медленнее:

1. Нет Whole-Stage Code Generation

Встроенные функции компилируются в единый оптимизированный Java-метод (Whole-Stage CodeGen). Spark генерирует байткод, который работает напрямую с UnsafeRow — бинарным представлением данных в Tungsten.

Scala UDF — это обычный вызов функции. Данные нужно извлечь из UnsafeRow, конвертировать в Scala-типы, вызвать функцию, конвертировать обратно:

CodeGen (встроенная):
  unsafeRow.getUTF8String(0).toUpperCase()  // прямой доступ

Scala UDF:
  val input = row.get(0, StringType)         // извлечение
  val scalaStr = input.toString              // конвертация
  val result = userFunction(scalaStr)        // вызов UDF
  val output = UTF8String.fromString(result) // конвертация обратно

2. Нет Catalyst optimization

Catalyst не может заглянуть внутрь Scala UDF (как и внутрь Python UDF). Predicate pushdown, constant folding и другие оптимизации не применяются через UDF-boundary.

Синтаксис Scala UDF

Регистрация и использование

import org.apache.spark.sql.functions.udf

// Лямбда-стиль (до 10 аргументов)
val normalizeNameUDF = udf((name: String) => {
  if (name == null) null
  else name.trim.split("\\s+").map(_.capitalize).mkString(" ")
})

// Применение к DataFrame
df.withColumn("clean_name", normalizeNameUDF($"name"))
// Типизированный UDF с явным указанием типов
import org.apache.spark.sql.expressions.UserDefinedFunction

val calculateBMI: UserDefinedFunction = udf(
  (weight: Double, height: Double) => {
    if (height <= 0) null.asInstanceOf[Double]
    else weight / (height * height)
  }
)

df.withColumn("bmi", calculateBMI($"weight_kg", $"height_m"))

Регистрация для SQL

// Для использования в spark.sql()
spark.udf.register("normalize_name", (name: String) => {
  if (name == null) null
  else name.trim.split("\\s+").map(_.capitalize).mkString(" ")
})

spark.sql("SELECT normalize_name(name) FROM employees")

Работа со сложными типами

// UDF, возвращающий массив
val splitAndClean = udf((text: String) => {
  if (text == null) Array.empty[String]
  else text.split(",").map(_.trim.toLowerCase).filter(_.nonEmpty)
})

// UDF, принимающий Map
val lookupValue = udf((mapping: Map[String, String], key: String) => {
  mapping.getOrElse(key, "unknown")
})
TIP

Scala UDF vs Python в PySpark-проектах: Если ваша команда пишет на PySpark, Scala UDF можно упаковать в JAR и подключить через spark.jars. Это оправдано для горячих путей (hot paths), где UDF вызывается миллиарды раз и разница в 50x между Python UDF и Scala UDF критична. Для редких вычислений Pandas UDF достаточно.

Когда Scala UDF оправдан

СценарийРекомендацияПочему
Простая трансформацияВстроенная функцияCode generation + Catalyst
Сложная бизнес-логика без Python-зависимостейScala UDF~2x overhead, нет сериализации
ML-инференс (scikit-learn, PyTorch)Pandas UDFНужен Python-экосистема
Интеграция с Java-библиотекойScala UDFПрямой доступ к JVM
Кастомная валидация (ИНН, ОГРН)Scala UDFType safety + JVM performance
Операция с внешним Python APIPandas UDF (Iterator)Stateful инициализация

Ограничения Scala UDF

  1. Нет Catalyst optimization — как и Python UDF, Scala UDF является чёрным ящиком для оптимизатора
  2. Требует Scala knowledge — не все data engineering команды владеют Scala
  3. Перекомпиляция — изменения в UDF требуют пересборки JAR и перезапуска Spark-приложения
  4. Нет code generation — отсутствие Whole-Stage CodeGen означает ~2x overhead по сравнению со встроенными функциями
  5. Null safety — Scala UDF должен явно обрабатывать null-значения (NullPointerException)

Паттерн: Scala UDF из JAR в PySpark

Для PySpark-проектов, где нужна JVM-производительность:

// 1. Создайте Scala UDF в отдельном проекте (build.sbt)
package com.company.spark.udfs

import org.apache.spark.sql.api.java.UDF1

class ValidateINN extends UDF1[String, Boolean] {
  override def call(inn: String): Boolean = {
    if (inn == null || (inn.length != 10 && inn.length != 12)) false
    else {
      // ... логика валидации контрольных сумм
      true
    }
  }
}
# 2. В PySpark: подключите JAR и зарегистрируйте
spark = SparkSession.builder \
    .config("spark.jars", "/path/to/company-udfs.jar") \
    .getOrCreate()

spark.udf.registerJavaFunction(
    "validate_inn",
    "com.company.spark.udfs.ValidateINN",
    BooleanType()
)

# Используйте в SQL
df = spark.sql("SELECT *, validate_inn(inn) as is_valid FROM partners")

Этот подход даёт JVM-производительность Scala UDF с удобством разработки PySpark.

Проверка знанийKnowledge check
Почему Scala UDF всё ещё в ~2x медленнее встроенных функций, хотя выполняется на JVM?
ОтветAnswer
Две причины: (1) Scala UDF не участвует в Whole-Stage Code Generation -- встроенные функции компилируются в оптимизированный байткод, работающий напрямую с UnsafeRow (бинарный Tungsten формат), а Scala UDF требует извлечения данных из UnsafeRow, конвертации в Scala-типы, вызова функции и конвертации обратно. (2) Catalyst optimizer не может оптимизировать через UDF-boundary -- нет predicate pushdown, constant folding или перестановки операций через UDF.
Проверка знанийKnowledge check
В каком случае Scala UDF предпочтительнее Pandas UDF для PySpark-проекта?
ОтветAnswer
Scala UDF предпочтительнее, когда: (1) UDF вызывается на миллиардах строк и разница 50x (Scala ~2x vs Pandas ~5-20x от built-in) критична для SLA; (2) логика не требует Python-библиотек (pandas, scikit-learn, numpy); (3) нужна интеграция с Java-библиотекой. Scala UDF упаковывается в JAR и подключается через spark.jars, регистрируется через registerJavaFunction. Для ML-инференса или операций с Python-экосистемой Pandas UDF остаётся лучшим выбором.

Что дальше?

В следующем уроке мы сведём всё вместе в benchmark-сравнении с CSS-визуализацией: встроенные функции, Scala UDF, Pandas UDF и Python UDF на одном графике, плюс матрица принятия решений для выбора подхода.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Почему Scala UDF в ~2x медленнее встроенных функций, но без 100x overhead Python UDF?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5