UDF: пользовательские функции
Зачем нужны UDF?
Spark предоставляет сотни встроенных функций (col, when, regexp_extract, date_add, array_contains…). Но иногда бизнес-логика не вписывается в стандартный набор:
- Кастомная валидация ИНН/ОГРН по контрольным суммам
- Транслитерация имён по специфическим правилам
- Вычисление бизнес-метрики по проприетарной формуле
- Парсинг нестандартного формата данных
В таких случаях вы создаёте UDF (User-Defined Function) — собственную функцию, которую Spark применяет к каждой строке или группе строк.
Python UDF: простая регистрация
Самый простой способ — обернуть Python-функцию в UDF:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Способ 1: декоратор
@udf(returnType=StringType())
def normalize_name(name):
if name is None:
return None
return name.strip().title()
# Применение
df.withColumn("clean_name", normalize_name(col("name")))
# Способ 2: регистрация функции
def validate_inn(inn):
"""Проверка контрольной суммы ИНН"""
if inn is None or len(inn) not in (10, 12):
return False
# ... логика валидации
return True
validate_inn_udf = udf(validate_inn, BooleanType())
df.withColumn("is_valid_inn", validate_inn_udf(col("inn")))
Для использования в SQL-запросах:
spark.udf.register("normalize_name", normalize_name)
spark.sql("SELECT normalize_name(name) FROM employees")
Return type обязателен! Spark не может вывести тип возвращаемого значения из Python-функции. Если не указать returnType, Spark по умолчанию использует StringType(), что может привести к неожиданным результатам при числовых вычислениях.
Проблема производительности Python UDF
Python UDF — это чёрный ящик для Catalyst optimizer. Spark не может:
- Оптимизировать код внутри UDF
- Применить predicate pushdown через UDF
- Выполнить UDF на JVM (нужен Python-процесс)
Механика выполнения Python UDF:
Для каждой строки:
1. JVM сериализует строку (Row -> bytes)
2. Bytes передаются в Python worker через socket
3. Python десериализует данные
4. Python выполняет вашу функцию
5. Python сериализует результат
6. Bytes передаются обратно в JVM
7. JVM десериализует результат
Этот процесс сериализации/десериализации (ser/de) на каждую строку создаёт огромный overhead. На миллионах строк Python UDF может быть в 10-100x медленнее встроенных функций.
Pandas UDF (Vectorized UDF): решение проблемы
Pandas UDF обрабатывают данные пакетами (батчами), а не по одной строке. Данные передаются как Apache Arrow columnar format, что радикально сокращает overhead сериализации:
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("double")
def normalize_salary(salary: pd.Series) -> pd.Series:
"""Нормализация зарплаты (z-score) -- пакетная обработка"""
return (salary - salary.mean()) / salary.std()
df.withColumn("salary_normalized", normalize_salary(col("salary")))
Ключевое отличие: вместо обработки одной строки за раз, Pandas UDF получает pd.Series (целый столбец или батч строк) и возвращает pd.Series. Данные передаются через Apache Arrow — бинарный columnar format без per-row сериализации.
Типы Pandas UDF:
| Тип | Вход | Выход | Применение |
|---|---|---|---|
| Scalar | pd.Series | pd.Series | Поэлементные операции (map) |
| Grouped Map | pd.DataFrame | pd.DataFrame | Операции над группами (groupBy().applyInPandas()) |
| Grouped Aggregate | pd.Series | Scalar | Кастомные агрегации |
# Grouped Map: кастомная обработка групп
def fill_missing_by_group(df: pd.DataFrame) -> pd.DataFrame:
"""Заполняем пропуски средним по группе"""
df["salary"] = df["salary"].fillna(df["salary"].mean())
return df
result = employees.groupBy("department").applyInPandas(
fill_missing_by_group,
schema=employees.schema
)
Spark4.0
Spark 4.0 улучшает производительность Arrow-backed Pandas UDF за счёт оптимизированного батчинга и уменьшенного memory overhead при передаче данных между JVM и Python.
Scala UDF: максимальная производительность
Для критически важных по производительности UDF можно использовать Scala:
// Scala UDF -- выполняется напрямую на JVM
val normalizeNameUDF = udf((name: String) => {
if (name == null) null
else name.trim.split("\\s+").map(_.capitalize).mkString(" ")
})
df.withColumn("clean_name", normalizeNameUDF($"name"))
Scala UDF не требует сериализации между JVM и Python — функция выполняется нативно на JVM. Это самый быстрый тип UDF, но требует знания Scala и перекомпиляции при изменениях.
Иерархия производительности
Всегда стремитесь использовать вариант ближе к началу списка:
Производительность (от лучшей к худшей):
1. Встроенные функции (col, when, regexp_extract...) ← ЛУЧШИЙ ВЫБОР
- Выполняются на JVM, Catalyst может оптимизировать
- 100% рекомендуется для стандартных операций
2. Scala UDF
- Выполняется на JVM, нет сериализации
- Когда встроенных недостаточно и есть Scala-expertise
3. Pandas UDF (vectorized)
- Батчевая обработка через Arrow
- Когда нужен Python + NumPy/pandas для вычислений
4. Python UDF ← ХУДШИЙ ВЫБОР
- Per-row сериализация через socket
- Только когда нет альтернативы
Анти-паттерн: UDF вместо встроенных функций
# ПЛОХО: Python UDF для стандартной операции
@udf(returnType=StringType())
def upper_case(s):
return s.upper() if s else None
df.withColumn("name_upper", upper_case(col("name")))
# ХОРОШО: встроенная функция
from pyspark.sql.functions import upper
df.withColumn("name_upper", upper(col("name")))
Встроенная функция upper() выполняется на JVM без сериализации и поддерживает code generation (Tungsten). Python UDF для той же операции будет в 10-50x медленнее на больших датасетах.
Правило: прежде чем писать UDF, проверьте документацию встроенных функций. Spark предоставляет 300+ функций для строк, дат, массивов, map, математики, условий.
Почему UDF теряют оптимизации Tungsten — разбор на уровне исходников в senior-курсе:
Spark Internals: фреймворк EncoderЧто дальше?
В следующем уроке мы изучим pandas API on Spark — как масштабировать существующий pandas-код на кластер Spark без переписывания, и когда это имеет смысл, а когда лучше использовать нативный PySpark.