Встроенные функции vs UDF: когда что использовать
Почему встроенные функции быстрее
Встроенные функции Spark (upper, when, date_add, regexp_extract и ещё 300+) выполняются фундаментально быстрее любого UDF. Причин три:
1. Catalyst Optimization
Catalyst optimizer видит внутрь встроенных функций. Он может:
- Объединять несколько операций в одну (fold constants)
- Переставлять порядок выполнения для минимизации данных
- Применять predicate pushdown через встроенные функции
UDF для Catalyst — чёрный ящик. Оптимизатор не может заглянуть внутрь Python/Scala-функции и не знает, какие столбцы она реально использует.
2. Tungsten Code Generation (Whole-Stage CodeGen)
Для встроенных функций Spark генерирует оптимизированный JVM-байткод, который работает напрямую с бинарными данными в памяти (UnsafeRow). Это называется Whole-Stage Code Generation — вся stage компилируется в один Java-метод, без виртуальных вызовов.
UDF не участвует в code generation. Каждый вызов UDF — это отдельный вызов функции с boxing/unboxing аргументов.
3. Нет сериализации (для Python UDF)
Встроенные функции выполняются на JVM. Python UDF требует передачу данных из JVM в Python-процесс и обратно — через socket с per-row сериализацией. Это самый дорогой компонент.
Встроенная функция: JVM -> [Tungsten bytecode] -> результат
нет сериализации, нет Python
Python UDF: JVM -> serialize -> socket -> Python -> execute
-> serialize -> socket -> JVM -> deserialize
PER EACH ROW!
Когда UDF неизбежны
Несмотря на overhead, UDF остаются необходимы в нескольких сценариях:
- Кастомная бизнес-логика, не выражаемая комбинацией встроенных функций — проприетарные алгоритмы, специфические валидации (ИНН, ОГРН)
- ML-инференс — применение обученной модели к каждой строке/батчу (scikit-learn, PyTorch)
- Интеграция с внешними библиотеками — вызов Python/Java библиотек, недоступных в Spark API
- Нестандартный парсинг — разбор проприетарных форматов данных
Правило большого пальца: 90% задач решаются встроенными функциями. Прежде чем писать UDF, потратьте 10 минут на поиск подходящей встроенной функции. Spark предоставляет 300+ функций — вероятно, нужная уже существует.
Дерево решений: какой подход выбрать
При необходимости трансформации данных следуйте этому алгоритму:
Переписываем UDF на встроенные функции
Пример 1: Парсинг строк
# ПЛОХО: Python UDF для извлечения домена из email
@udf(returnType=StringType())
def extract_domain(email):
if email and "@" in email:
return email.split("@")[1].lower()
return None
df.withColumn("domain", extract_domain(col("email")))
# ХОРОШО: встроенные функции
from pyspark.sql.functions import split, lower, element_at
df.withColumn(
"domain",
lower(element_at(split(col("email"), "@"), 2))
)
Пример 2: Условная логика
# ПЛОХО: Python UDF для категоризации
@udf(returnType=StringType())
def categorize_salary(salary):
if salary is None:
return "unknown"
elif salary < 50000:
return "junior"
elif salary < 120000:
return "middle"
else:
return "senior"
df.withColumn("level", categorize_salary(col("salary")))
# ХОРОШО: when/otherwise
from pyspark.sql.functions import when, lit
df.withColumn(
"level",
when(col("salary").isNull(), lit("unknown"))
.when(col("salary") < 50000, lit("junior"))
.when(col("salary") < 120000, lit("middle"))
.otherwise(lit("senior"))
)
Версия с when/otherwise участвует в code generation, оптимизируется Catalyst, и выполняется на JVM без сериализации.
Пример 3: Работа с датами
# ПЛОХО: Python UDF для вычисления возраста
@udf(returnType=IntegerType())
def calculate_age(birth_date):
if birth_date is None:
return None
from datetime import date
today = date.today()
return today.year - birth_date.year - (
(today.month, today.day) < (birth_date.month, birth_date.day)
)
# ХОРОШО: встроенные функции
from pyspark.sql.functions import datediff, current_date, floor
df.withColumn(
"age",
floor(datediff(current_date(), col("birth_date")) / 365.25)
)
Анти-паттерн: оборачиваем простую логику в UDF
Самая частая ошибка — создание UDF для операций, которые напрямую поддерживаются встроенными функциями:
# Анти-паттерны (НЕ ДЕЛАЙТЕ ТАК!)
@udf(returnType=StringType())
def to_upper(s): return s.upper() if s else None
# Правильно: upper(col("name"))
@udf(returnType=DoubleType())
def add_tax(price): return price * 1.2 if price else None
# Правильно: col("price") * 1.2
@udf(returnType=BooleanType())
def is_null(v): return v is None
# Правильно: col("value").isNull()
Каждый такой UDF добавляет 100x overhead по сравнению со встроенной альтернативой. На датасете из 1 миллиарда строк это разница между 5 секундами и 8 минутами.
Что дальше?
В следующем уроке мы детально разберём анатомию overhead Python UDF — почему он в 100 раз медленнее встроенных функций, как работает мост JVM-Python, и какие ресурсы потребляет каждый UDF-вызов.