Learning Platform
Глоссарий Troubleshooting
Урок 11.05 · 12 мин
Средний
PyArrowtoPandascreateDataFrameArrow Optimizationspark.sql.execution.arrow.pyspark.enabled

PyArrow и Spark: практическая интеграция

spark.sql.execution.arrow.pyspark.enabled

Это самая важная конфигурация для Python-разработчиков, работающих со Spark. Одна строка, которая ускоряет toPandas() и createDataFrame() в 10-100x:

# Включение Arrow optimization
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Или через SparkSession builder
spark = SparkSession.builder \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

Что меняется при включении Arrow

Без Arrow (pickle-based, по умолчанию в Spark < 4.0):

toPandas() без Arrow:
Executor 1 → pickle serialize → socket → Driver JVM
Executor 2 → pickle serialize → socket → Driver JVM
Executor 3 → pickle serialize → socket → Driver JVM

Driver JVM → Py4J socket → Python → pickle deserialize → pandas
                                                          (row-by-row!)

С Arrow (arrow.pyspark.enabled=true):

toPandas() с Arrow:
Executor 1 → Arrow RecordBatch ─┐
Executor 2 → Arrow RecordBatch ─┤→ Driver → Arrow Table → pandas
Executor 3 → Arrow RecordBatch ─┘       (columnar, zero-copy)

Benchmark: toPandas() с Arrow vs без

import time
import pandas as pd

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
    .getOrCreate()

# Создаём тестовый DataFrame: 5 миллионов строк
df = spark.range(5_000_000).selectExpr(
    "id",
    "CAST(id * 0.1 AS DOUBLE) AS value",
    "CONCAT('user_', CAST(id AS STRING)) AS name",
    "id % 100 AS category"
)
df.cache()
df.count()  # Форсируем кэширование

# Benchmark без Arrow (pickle)
start = time.time()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")
pdf_pickle = df.toPandas()
time_pickle = time.time() - start
print(f"Pickle: {time_pickle:.2f} сек")
# ~12-15 сек для 5M строк

# Benchmark с Arrow
start = time.time()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
pdf_arrow = df.toPandas()
time_arrow = time.time() - start
print(f"Arrow:  {time_arrow:.2f} сек")
# ~0.3-0.5 сек для 5M строк

print(f"Ускорение: {time_pickle / time_arrow:.0f}x")
# Обычно 25-40x для таких данных

Результаты benchmark (типичные)

Размер DataFramePickle (сек)Arrow (сек)Ускорение
100K строк0.80.0516x
1M строк4.20.1235x
5M строк14.50.436x
10M строк31.00.839x
50M строкOOM (driver)3.9
WARNING

Anti-pattern: не включать Arrow optimization

# ПЛОХО: по умолчанию используется pickle
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")  # default in Spark < 4.0
df.toPandas()  # 10-100x медленнее, чем нужно

# ХОРОШО: всегда включайте Arrow
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df.toPandas()  # Arrow-based, columnar, fast

# В Spark 4.0+: Arrow включён по умолчанию
# Но лучше указывать явно для переносимости

Если вы используете toPandas() без arrow.pyspark.enabled=true на датасетах > 1M строк, вы тратите CPU и RAM впустую. Pickle сериализует каждую строку отдельно — это O(N) socket-вызовов вместо O(N/batch_size).

createDataFrame() с Arrow

Arrow ускоряет не только чтение из Spark, но и запись pandas DataFrame в Spark:

import pandas as pd

# Создаём pandas DataFrame
pdf = pd.DataFrame({
    'id': range(1_000_000),
    'value': [float(i) * 0.01 for i in range(1_000_000)],
    'category': [f'cat_{i % 50}' for i in range(1_000_000)]
})

# Без Arrow: row-by-row через Py4J socket
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")
sdf_slow = spark.createDataFrame(pdf)  # ~8 сек

# С Arrow: batch transfer через Arrow IPC
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
sdf_fast = spark.createDataFrame(pdf)  # ~0.8 сек
# 10x ускорение

Конвертация типов

Arrow автоматически маппит pandas/NumPy типы в Spark типы:

pandas / NumPyArrowSpark
int64int64LongType
float64float64DoubleType
object (str)utf8StringType
boolboolBooleanType
datetime64[ns]timestamp[ns]TimestampType
CategoricaldictionaryStringType
TIP

PyArrow 22.0.0 и Pyodide

PyArrow 22.0.0 уже включён в Pyodide 0.29.3, который используется в наших интерактивных упражнениях. Это означает, что базовые операции с Arrow (создание таблиц, конвертация в pandas, работа с буферами) доступны прямо в браузере. Однако Arrow Flight и Spark Connect требуют серверной инфраструктуры.

Pandas UDF под капотом: Arrow batch serialization

В модуле UDF Performance мы видели, что Pandas UDF в 3-100x быстрее обычных Python UDF. Теперь мы понимаем почему — Arrow:

Python UDF vs Pandas UDF

1M socket-вызовов vs 100 Arrow transfers

Python UDF (pickle, per-row)
JVM ExecutorRow 1…1M
pickle
pickle
Python Workerfunc(x) × 1M
1M socket-вызовов, 1M pickle сериализаций
Pandas UDF (Arrow, per-batch)
JVM ExecutorBatch 1…100(10K rows)
Arrow IPC
Arrow IPC
Python Workerfunc(series)Vectorized NumPy
100 Arrow transfers (вместо 1M pickle transfers)

Настройка размера batch

# Размер Arrow batch для Pandas UDF (по умолчанию 10,000 строк)
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "10000")

# Для больших данных с простыми UDF -- увеличьте batch
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "50000")
# Больше batch = меньше overhead на transfer, больше memory

# Для сложных UDF (ML inference) -- уменьшите batch
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1000")
# Меньше batch = меньше memory, чаще checkpoint

Arrow-optimized UDF vs Legacy Pickle UDF

Сравнение двух подходов на конкретном примере — нормализация зарплат:

from pyspark.sql.functions import udf, pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd

# Legacy Python UDF (pickle serialization)
@udf(returnType=DoubleType())
def normalize_salary_legacy(salary, mean_salary, std_salary):
    """Pickle: 1 вызов на строку."""
    if salary is None:
        return None
    return (salary - mean_salary) / std_salary

# Arrow-based Pandas UDF (vectorized)
@pandas_udf(DoubleType())
def normalize_salary_arrow(
    salary: pd.Series,
    mean_val: pd.Series,
    std_val: pd.Series
) -> pd.Series:
    """Arrow: 1 вызов на batch (10K строк)."""
    return (salary - mean_val) / std_val
    # NumPy vectorized operation -- SIMD на весь batch!

Performance comparison (10M строк)

МетрикаLegacy (pickle)Pandas UDF (Arrow)
Время выполнения45 сек1.2 сек
Serialization calls10,000,0001,000
Memory overheadHigh (pickle buffers)Low (Arrow batches)
SIMD vectorizationНет (scalar)Да (NumPy/pandas)
Ускорениеbaseline37x

Практические рекомендации

Когда использовать Arrow optimization

# ВСЕГДА включайте для:
# 1. toPandas() на любых данных > 10K строк
# 2. createDataFrame() из pandas
# 3. Pandas UDF (Arrow используется автоматически)

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Единственное исключение: нестандартные типы данных
# Arrow не поддерживает UserDefinedType (UDT)
# В этом случае Spark автоматически fallback на pickle

Checklist для Python + Spark проектов

  1. spark.sql.execution.arrow.pyspark.enabled=trueвсегда
  2. Используйте Pandas UDF вместо обычных Python UDF — всегда
  3. toPandas() только на агрегированных/отфильтрованных данных — не тяните 1 TB в pandas
  4. maxRecordsPerBatch — настройте под ваш use case (default 10K обычно OK)
  5. Проверьте PyArrow version: import pyarrow; print(pyarrow.__version__)
Проверка знанийKnowledge check
Что делает конфигурация spark.sql.execution.arrow.pyspark.enabled и почему её стоит всегда включать?
ОтветAnswer
Эта конфигурация переключает toPandas() и createDataFrame() с pickle-based сериализации (по строкам, через Py4J socket) на Arrow-based batch transfer (колоночный формат, RecordBatch). Ускорение составляет 10-100x за счёт: (1) batch transfer вместо per-row (100 переносов вместо 10M), (2) columnar Arrow format вместо текстового pickle, (3) zero-copy десериализация на стороне pandas. Единственное исключение -- UserDefinedType (UDT), для которых Arrow не поддерживает сериализацию.
Проверка знанийKnowledge check
Почему Pandas UDF быстрее обычных Python UDF в 3-100x, и какую роль играет Arrow?
ОтветAnswer
Обычный Python UDF вызывается 1 раз на строку: каждая строка сериализуется через pickle, передаётся через socket в Python worker, обрабатывается скалярной функцией, результат сериализуется обратно. Для 10M строк -- 10M socket-вызовов. Pandas UDF использует Arrow: данные передаются batch'ами по 10K строк в Arrow columnar format (100 transfers вместо 10M). Функция получает pandas.Series и выполняет векторизованные NumPy-операции (SIMD). Ускорение складывается из: меньше transfer overhead (100x), columnar вместо row (no serialize), SIMD vectorization (4-8x на batch).

Итог модуля

Apache Arrow — это клей между Spark, pandas, и внешними системами:

Где ArrowЧто делаетРезультат
toPandas()Заменяет pickle на RecordBatch10-100x ускорение
createDataFrame()Batch transfer вместо row-by-row3-10x ускорение
Pandas UDFArrow batch serialization3-100x vs Python UDF
Spark ConnectgRPC + Arrow Flight transportНет JVM на клиенте
Arrow FlightЗаменяет JDBC/ODBC10-100x для bulk transfer
Cross-languageЕдиный memory layoutZero-copy между Python/Java/C++

Arrow — это не просто “ещё один формат”. Это инфраструктурный слой, который объясняет, почему Pandas UDF быстрые, почему Spark Connect работает без JVM, и почему современные аналитические системы могут обмениваться данными без сериализации.


Модуль Apache Arrow завершён. Вы теперь понимаете колоночный формат данных, zero-copy transfer, Flight protocol, Spark Connect архитектуру, и практическую интеграцию PyArrow с Spark.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Какой один параметр конфигурации ускоряет toPandas() и createDataFrame() в 10-100x?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6