Python UDF: анатомия overhead
Масштаб проблемы
Python UDF в PySpark может быть в ~100x медленнее эквивалентной встроенной функции. Это не опечатка — два порядка величины. На датасете из 1 миллиарда строк простой upper() через встроенную функцию занимает ~5 секунд, а через Python UDF — ~8 минут.
Чтобы принимать осознанные решения о том, когда UDF оправдан, нужно понимать механику этого overhead.
Анатомия вызова Python UDF
Когда Spark выполняет Python UDF, для каждой строки происходит следующая последовательность:
9 шагов на каждую строку. При 1 миллиарде строк — 9 миллиардов операций.
9 шагов на каждую строку. При 1 миллиарде строк это 9 миллиардов операций, большинство из которых — overhead сериализации.
Компоненты overhead
1. Запуск Python Worker (~200-500ms на старте)
Для каждого executor Spark запускает отдельный Python worker process. Это полноценный Python-интерпретатор:
- Fork процесса из
pyspark.daemon - Импорт модулей (
cloudpickle,pyspark.worker) - Инициализация socket-соединения с JVM
На кластере с 100 executors это 100 Python-процессов, каждый потребляющий ~50-100MB RAM.
2. Сериализация данных (~60% overhead)
Самый дорогой компонент. Spark использует cloudpickle для сериализации каждой строки:
# Внутри Spark (упрощённо):
for row in partition:
# JVM side: serialize Row
pickled = cloudpickle.dumps(row) # ~1-5 μs per row
# Transfer through socket
socket.send(pickled) # ~0.5-1 μs per row
# Python side: deserialize
python_row = cloudpickle.loads(data) # ~1-5 μs per row
# Execute UDF
result = user_function(python_row) # ~0.1-1 μs per row
# Serialize result back
pickled_result = cloudpickle.dumps(result)
socket.send(pickled_result)
Обратите внимание: сама функция (execute) занимает меньше всего времени. 80%+ времени уходит на сериализацию/десериализацию.
3. Socket transfer (~15% overhead)
Данные передаются между JVM и Python через Unix domain socket (на одной машине) или TCP socket. Каждая передача — системный вызов с context switch.
4. Python GIL (~10% overhead для CPU-bound)
Python Global Interpreter Lock не позволяет параллельному выполнению Python-кода в одном процессе. Если UDF CPU-bound (математические вычисления, строковые операции), GIL сериализует выполнение даже на многоядерном процессоре.
Это не проблема для I/O-bound UDF или если Spark запускает отдельный Python worker на каждый executor core, но увеличивает memory footprint.
5. Memory overhead (~15% overhead)
Каждый Python worker поддерживает собственное Python-окружение:
Итого: 4GB + 200MB Python overhead на executor
На кластере с 50 executors, каждый с 2 Python workers, это 10GB дополнительной RAM только на Python-процессы.
Python UDF memory leak: Python workers остаются в памяти до конца stage. Если UDF создаёт крупные объекты (списки, словари) внутри функции, они не освобождаются между вызовами в пределах одного батча. Используйте spark.python.worker.reuse = true (по умолчанию) для переиспользования workers, но следите за memory footprint.
Визуализация: Встроенная vs Python UDF
Tungsten CodeGen:
for (row in partition) {
result = row.getString(0)
.toUpperCase()
output.write(result)
}5 наносекунд vs 5 микросекунд = 1000x разница для простейшей операции. На более сложных UDF разница снижается до ~100x, потому что доля полезной работы (Python execution) растёт.
Пример: замеры производительности
# Тестовый датасет: 100 миллионов строк
df = spark.range(100_000_000).withColumn("name", lit("john doe"))
# Вариант 1: встроенная функция
from pyspark.sql.functions import upper
df.withColumn("upper_name", upper(col("name"))).write.mode("overwrite").parquet("/tmp/builtin")
# Время: ~8 секунд
# Вариант 2: Python UDF
@udf(returnType=StringType())
def py_upper(s):
return s.upper() if s else None
df.withColumn("upper_name", py_upper(col("name"))).write.mode("overwrite").parquet("/tmp/udf")
# Время: ~12 минут (90x медленнее)
Анти-паттерн: Python UDF для row-level валидации
# ПЛОХО: Python UDF для простой проверки
@udf(returnType=BooleanType())
def is_valid_email(email):
if email is None:
return False
return "@" in email and "." in email.split("@")[-1]
df.filter(is_valid_email(col("email")))
# ХОРОШО: when/otherwise + встроенные функции
from pyspark.sql.functions import col, instr, split, element_at
df.filter(
col("email").isNotNull()
& (instr(col("email"), "@") > 0)
& (instr(element_at(split(col("email"), "@"), 2), ".") > 0)
)
Версия с встроенными функциями не только быстрее (code generation + predicate pushdown), но и позволяет Spark pushdown фильтр в источник данных (Parquet, JDBC).
Как Arrow убирает накладные расходы JVM-Python bridge — на уровне исходников — в курсе Apache Spark Internals:
Spark Internals: PyArrow и SparkЧто дальше?
В следующем уроке мы изучим Pandas UDF (Arrow-based) — подход, который сокращает overhead сериализации в 5-20 раз за счёт батчевой передачи данных через Apache Arrow columnar format.