Learning Platform
Глоссарий Troubleshooting
Урок 05.02 · 14 мин
Продвинутый
Python UDFSerializationJVM-Python BridgePy4JArrow

Python UDF: анатомия overhead

Масштаб проблемы

Python UDF в PySpark может быть в ~100x медленнее эквивалентной встроенной функции. Это не опечатка — два порядка величины. На датасете из 1 миллиарда строк простой upper() через встроенную функцию занимает ~5 секунд, а через Python UDF — ~8 минут.

Чтобы принимать осознанные решения о том, когда UDF оправдан, нужно понимать механику этого overhead.

Анатомия вызова Python UDF

Когда Spark выполняет Python UDF, для каждой строки происходит следующая последовательность:

Анатомия вызова Python UDF (per row)
JVM (Executor)
1. Извлечь Row из UnsafeRow (десериализация бинарного формата Tungsten)
2. Serialize Row → bytes (pickle/cloudpickle)
3. Отправить bytes через socket →
8. Deserialize bytes → JVM obj
9. Записать результат в UnsafeRow
socket
Python Worker
1.JVMИзвлечь Row из UnsafeRow (десериализация бинарного формата Tungsten)
2.JVMSerialize Row → bytes (pickle/cloudpickle)
3.JVMОтправить bytes через socket →
4.PythonDeserialize bytes → Python obj
5.PythonExecute Python function
6.PythonSerialize result → bytes
7.Python← Отправить bytes через socket
8.JVMDeserialize bytes → JVM obj
9.JVMЗаписать результат в UnsafeRow

9 шагов на каждую строку. При 1 миллиарде строк — 9 миллиардов операций.

9 шагов на каждую строку. При 1 миллиарде строк это 9 миллиардов операций, большинство из которых — overhead сериализации.

Компоненты overhead

1. Запуск Python Worker (~200-500ms на старте)

Для каждого executor Spark запускает отдельный Python worker process. Это полноценный Python-интерпретатор:

  • Fork процесса из pyspark.daemon
  • Импорт модулей (cloudpickle, pyspark.worker)
  • Инициализация socket-соединения с JVM

На кластере с 100 executors это 100 Python-процессов, каждый потребляющий ~50-100MB RAM.

2. Сериализация данных (~60% overhead)

Самый дорогой компонент. Spark использует cloudpickle для сериализации каждой строки:

# Внутри Spark (упрощённо):
for row in partition:
    # JVM side: serialize Row
    pickled = cloudpickle.dumps(row)     # ~1-5 μs per row

    # Transfer through socket
    socket.send(pickled)                  # ~0.5-1 μs per row

    # Python side: deserialize
    python_row = cloudpickle.loads(data)  # ~1-5 μs per row

    # Execute UDF
    result = user_function(python_row)    # ~0.1-1 μs per row

    # Serialize result back
    pickled_result = cloudpickle.dumps(result)
    socket.send(pickled_result)

Обратите внимание: сама функция (execute) занимает меньше всего времени. 80%+ времени уходит на сериализацию/десериализацию.

3. Socket transfer (~15% overhead)

Данные передаются между JVM и Python через Unix domain socket (на одной машине) или TCP socket. Каждая передача — системный вызов с context switch.

4. Python GIL (~10% overhead для CPU-bound)

Python Global Interpreter Lock не позволяет параллельному выполнению Python-кода в одном процессе. Если UDF CPU-bound (математические вычисления, строковые операции), GIL сериализует выполнение даже на многоядерном процессоре.

Это не проблема для I/O-bound UDF или если Spark запускает отдельный Python worker на каждый executor core, но увеличивает memory footprint.

5. Memory overhead (~15% overhead)

Каждый Python worker поддерживает собственное Python-окружение:

Executor JVM (4GB) с Python UDF
Spark execution memory
~2.4GB
Python worker 1
~100MB
Python worker 2
~100MB
Buffer memory
~200MB

Итого: 4GB + 200MB Python overhead на executor

На кластере с 50 executors, каждый с 2 Python workers, это 10GB дополнительной RAM только на Python-процессы.

WARNING

Python UDF memory leak: Python workers остаются в памяти до конца stage. Если UDF создаёт крупные объекты (списки, словари) внутри функции, они не освобождаются между вызовами в пределах одного батча. Используйте spark.python.worker.reuse = true (по умолчанию) для переиспользования workers, но следите за memory footprint.

Визуализация: Встроенная vs Python UDF

Встроенная функция vs Python UDF
Встроенная функция upper()
JVM Executor
Tungsten CodeGen:
for (row in partition) {
  result = row.getString(0)
             .toUpperCase()
  output.write(result)
}
~5ns per row (нативный JVM код)
Python UDF upper()
JVM Executor
Python Worker
JVMserialize(row)~1μs
→ socket transfer →
Pydeserialize(bytes)~1μs
Pyresult = s.upper()~0.1μs
Pyserialize(result)~1μs
← socket transfer ←
JVMdeserialize(bytes)~1μs
~5μs per row (1000x медленнее!)

5 наносекунд vs 5 микросекунд = 1000x разница для простейшей операции. На более сложных UDF разница снижается до ~100x, потому что доля полезной работы (Python execution) растёт.

Пример: замеры производительности

# Тестовый датасет: 100 миллионов строк
df = spark.range(100_000_000).withColumn("name", lit("john doe"))

# Вариант 1: встроенная функция
from pyspark.sql.functions import upper
df.withColumn("upper_name", upper(col("name"))).write.mode("overwrite").parquet("/tmp/builtin")
# Время: ~8 секунд

# Вариант 2: Python UDF
@udf(returnType=StringType())
def py_upper(s):
    return s.upper() if s else None

df.withColumn("upper_name", py_upper(col("name"))).write.mode("overwrite").parquet("/tmp/udf")
# Время: ~12 минут (90x медленнее)

Анти-паттерн: Python UDF для row-level валидации

# ПЛОХО: Python UDF для простой проверки
@udf(returnType=BooleanType())
def is_valid_email(email):
    if email is None:
        return False
    return "@" in email and "." in email.split("@")[-1]

df.filter(is_valid_email(col("email")))
# ХОРОШО: when/otherwise + встроенные функции
from pyspark.sql.functions import col, instr, split, element_at

df.filter(
    col("email").isNotNull()
    & (instr(col("email"), "@") > 0)
    & (instr(element_at(split(col("email"), "@"), 2), ".") > 0)
)

Версия с встроенными функциями не только быстрее (code generation + predicate pushdown), но и позволяет Spark pushdown фильтр в источник данных (Parquet, JDBC).

Проверка знанийKnowledge check
Какой компонент Python UDF overhead потребляет больше всего времени и почему?
ОтветAnswer
Сериализация/десериализация данных (cloudpickle) потребляет ~60% всего overhead. Для каждой строки данные сериализуются из JVM Row в bytes (cloudpickle.dumps), передаются через socket, десериализуются в Python-объект (cloudpickle.loads), а после выполнения функции -- обратная сериализация результата. Сама Python-функция обычно занимает менее 10% общего времени. Именно поэтому Pandas UDF (Arrow-based) с батчевой передачей данных дают 5-20x ускорение -- они резко сокращают количество операций сериализации.
Проверка знанийKnowledge check
Как Python GIL влияет на производительность CPU-bound UDF в Spark?
ОтветAnswer
Python GIL (Global Interpreter Lock) не позволяет нескольким потокам одновременно выполнять Python-код. Если UDF выполняет CPU-bound вычисления (математика, строковые операции), GIL сериализует выполнение в рамках одного Python worker. Spark обходит это, запуская отдельный Python worker process на каждый executor core, но это увеличивает memory footprint: каждый worker -- это полноценный Python-процесс с ~50-100MB RAM overhead.

Как Arrow убирает накладные расходы JVM-Python bridge — на уровне исходников — в курсе Apache Spark Internals:

Spark Internals: PyArrow и Spark

Что дальше?

В следующем уроке мы изучим Pandas UDF (Arrow-based) — подход, который сокращает overhead сериализации в 5-20 раз за счёт батчевой передачи данных через Apache Arrow columnar format.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 6. Каков основной overhead Python UDF по сравнению со встроенными функциями?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5