Learning Platform
Глоссарий Troubleshooting
Урок 03.07 · 14 мин
Продвинутый
UDFPython UDFPandas UDFScalar UDFPerformanceArrow

UDF: пользовательские функции

Зачем нужны UDF?

Spark предоставляет сотни встроенных функций (col, when, regexp_extract, date_add, array_contains…). Но иногда бизнес-логика не вписывается в стандартный набор:

  • Кастомная валидация ИНН/ОГРН по контрольным суммам
  • Транслитерация имён по специфическим правилам
  • Вычисление бизнес-метрики по проприетарной формуле
  • Парсинг нестандартного формата данных

В таких случаях вы создаёте UDF (User-Defined Function) — собственную функцию, которую Spark применяет к каждой строке или группе строк.

Python UDF: простая регистрация

Самый простой способ — обернуть Python-функцию в UDF:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Способ 1: декоратор
@udf(returnType=StringType())
def normalize_name(name):
    if name is None:
        return None
    return name.strip().title()

# Применение
df.withColumn("clean_name", normalize_name(col("name")))
# Способ 2: регистрация функции
def validate_inn(inn):
    """Проверка контрольной суммы ИНН"""
    if inn is None or len(inn) not in (10, 12):
        return False
    # ... логика валидации
    return True

validate_inn_udf = udf(validate_inn, BooleanType())

df.withColumn("is_valid_inn", validate_inn_udf(col("inn")))

Для использования в SQL-запросах:

spark.udf.register("normalize_name", normalize_name)
spark.sql("SELECT normalize_name(name) FROM employees")
WARNING

Return type обязателен! Spark не может вывести тип возвращаемого значения из Python-функции. Если не указать returnType, Spark по умолчанию использует StringType(), что может привести к неожиданным результатам при числовых вычислениях.

Проблема производительности Python UDF

Python UDF — это чёрный ящик для Catalyst optimizer. Spark не может:

  • Оптимизировать код внутри UDF
  • Применить predicate pushdown через UDF
  • Выполнить UDF на JVM (нужен Python-процесс)

Механика выполнения Python UDF:

Для каждой строки:
1. JVM сериализует строку (Row -> bytes)
2. Bytes передаются в Python worker через socket
3. Python десериализует данные
4. Python выполняет вашу функцию
5. Python сериализует результат
6. Bytes передаются обратно в JVM
7. JVM десериализует результат

Этот процесс сериализации/десериализации (ser/de) на каждую строку создаёт огромный overhead. На миллионах строк Python UDF может быть в 10-100x медленнее встроенных функций.

Pandas UDF (Vectorized UDF): решение проблемы

Pandas UDF обрабатывают данные пакетами (батчами), а не по одной строке. Данные передаются как Apache Arrow columnar format, что радикально сокращает overhead сериализации:

import pandas as pd

from pyspark.sql.functions import pandas_udf

@pandas_udf("double")
def normalize_salary(salary: pd.Series) -> pd.Series:
    """Нормализация зарплаты (z-score) -- пакетная обработка"""
    return (salary - salary.mean()) / salary.std()

df.withColumn("salary_normalized", normalize_salary(col("salary")))

Ключевое отличие: вместо обработки одной строки за раз, Pandas UDF получает pd.Series (целый столбец или батч строк) и возвращает pd.Series. Данные передаются через Apache Arrow — бинарный columnar format без per-row сериализации.

Типы Pandas UDF:

ТипВходВыходПрименение
Scalarpd.Seriespd.SeriesПоэлементные операции (map)
Grouped Mappd.DataFramepd.DataFrameОперации над группами (groupBy().applyInPandas())
Grouped Aggregatepd.SeriesScalarКастомные агрегации
# Grouped Map: кастомная обработка групп
def fill_missing_by_group(df: pd.DataFrame) -> pd.DataFrame:
    """Заполняем пропуски средним по группе"""
    df["salary"] = df["salary"].fillna(df["salary"].mean())
    return df

result = employees.groupBy("department").applyInPandas(
    fill_missing_by_group,
    schema=employees.schema
)
Spark4.0

Spark 4.0 улучшает производительность Arrow-backed Pandas UDF за счёт оптимизированного батчинга и уменьшенного memory overhead при передаче данных между JVM и Python.

Проверка знанийKnowledge check
Почему Pandas UDF значительно быстрее обычного Python UDF?
ОтветAnswer
Python UDF обрабатывает данные по одной строке, каждый раз сериализуя/десериализуя через socket между JVM и Python. Pandas UDF работает пакетами (батчами), передавая данные в Apache Arrow columnar format -- бинарный формат без per-row overhead. Вместо миллионов сериализаций (по одной на строку) происходит несколько десятков (по одной на батч). Дополнительно, pandas/NumPy операции над pd.Series используют vectorized C-код, что быстрее Python-циклов.

Scala UDF: максимальная производительность

Для критически важных по производительности UDF можно использовать Scala:

// Scala UDF -- выполняется напрямую на JVM
val normalizeNameUDF = udf((name: String) => {
  if (name == null) null
  else name.trim.split("\\s+").map(_.capitalize).mkString(" ")
})

df.withColumn("clean_name", normalizeNameUDF($"name"))

Scala UDF не требует сериализации между JVM и Python — функция выполняется нативно на JVM. Это самый быстрый тип UDF, но требует знания Scala и перекомпиляции при изменениях.

Иерархия производительности

Всегда стремитесь использовать вариант ближе к началу списка:

Производительность (от лучшей к худшей):

1. Встроенные функции (col, when, regexp_extract...)  ← ЛУЧШИЙ ВЫБОР
   - Выполняются на JVM, Catalyst может оптимизировать
   - 100% рекомендуется для стандартных операций

2. Scala UDF
   - Выполняется на JVM, нет сериализации
   - Когда встроенных недостаточно и есть Scala-expertise

3. Pandas UDF (vectorized)
   - Батчевая обработка через Arrow
   - Когда нужен Python + NumPy/pandas для вычислений

4. Python UDF                                          ← ХУДШИЙ ВЫБОР
   - Per-row сериализация через socket
   - Только когда нет альтернативы

Анти-паттерн: UDF вместо встроенных функций

# ПЛОХО: Python UDF для стандартной операции
@udf(returnType=StringType())
def upper_case(s):
    return s.upper() if s else None

df.withColumn("name_upper", upper_case(col("name")))

# ХОРОШО: встроенная функция
from pyspark.sql.functions import upper
df.withColumn("name_upper", upper(col("name")))

Встроенная функция upper() выполняется на JVM без сериализации и поддерживает code generation (Tungsten). Python UDF для той же операции будет в 10-50x медленнее на больших датасетах.

Правило: прежде чем писать UDF, проверьте документацию встроенных функций. Spark предоставляет 300+ функций для строк, дат, массивов, map, математики, условий.

Проверка знанийKnowledge check
В каком порядке следует рассматривать варианты при необходимости кастомной трансформации данных в Spark?
ОтветAnswer
1) Встроенные функции (upper, date_add, regexp_extract) -- всегда первый выбор, максимальная производительность и оптимизация Catalyst. 2) Pandas UDF -- если нужен Python и операция сложнее встроенных, батчевая обработка через Arrow. 3) Scala UDF -- если нужна максимальная скорость UDF и есть Scala-expertise. 4) Python UDF -- последний вариант, только когда альтернатив нет. Ключевой вопрос: 'Есть ли встроенная функция для этой задачи?'

Почему UDF теряют оптимизации Tungsten — разбор на уровне исходников в senior-курсе:

Spark Internals: фреймворк Encoder

Что дальше?

В следующем уроке мы изучим pandas API on Spark — как масштабировать существующий pandas-код на кластер Spark без переписывания, и когда это имеет смысл, а когда лучше использовать нативный PySpark.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 3. Расположите типы UDF в порядке убывания производительности:

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8