PyArrow и Spark: практическая интеграция
spark.sql.execution.arrow.pyspark.enabled
Это самая важная конфигурация для Python-разработчиков, работающих со Spark. Одна строка, которая ускоряет toPandas() и createDataFrame() в 10-100x:
# Включение Arrow optimization
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
# Или через SparkSession builder
spark = SparkSession.builder \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.getOrCreate()
Что меняется при включении Arrow
Без Arrow (pickle-based, по умолчанию в Spark < 4.0):
toPandas() без Arrow:
Executor 1 → pickle serialize → socket → Driver JVM
Executor 2 → pickle serialize → socket → Driver JVM
Executor 3 → pickle serialize → socket → Driver JVM
↓
Driver JVM → Py4J socket → Python → pickle deserialize → pandas
(row-by-row!)
С Arrow (arrow.pyspark.enabled=true):
toPandas() с Arrow:
Executor 1 → Arrow RecordBatch ─┐
Executor 2 → Arrow RecordBatch ─┤→ Driver → Arrow Table → pandas
Executor 3 → Arrow RecordBatch ─┘ (columnar, zero-copy)
Benchmark: toPandas() с Arrow vs без
import time
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.sql.execution.arrow.pyspark.enabled", "false") \
.getOrCreate()
# Создаём тестовый DataFrame: 5 миллионов строк
df = spark.range(5_000_000).selectExpr(
"id",
"CAST(id * 0.1 AS DOUBLE) AS value",
"CONCAT('user_', CAST(id AS STRING)) AS name",
"id % 100 AS category"
)
df.cache()
df.count() # Форсируем кэширование
# Benchmark без Arrow (pickle)
start = time.time()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")
pdf_pickle = df.toPandas()
time_pickle = time.time() - start
print(f"Pickle: {time_pickle:.2f} сек")
# ~12-15 сек для 5M строк
# Benchmark с Arrow
start = time.time()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
pdf_arrow = df.toPandas()
time_arrow = time.time() - start
print(f"Arrow: {time_arrow:.2f} сек")
# ~0.3-0.5 сек для 5M строк
print(f"Ускорение: {time_pickle / time_arrow:.0f}x")
# Обычно 25-40x для таких данных
Результаты benchmark (типичные)
| Размер DataFrame | Pickle (сек) | Arrow (сек) | Ускорение |
|---|---|---|---|
| 100K строк | 0.8 | 0.05 | 16x |
| 1M строк | 4.2 | 0.12 | 35x |
| 5M строк | 14.5 | 0.4 | 36x |
| 10M строк | 31.0 | 0.8 | 39x |
| 50M строк | OOM (driver) | 3.9 | — |
Anti-pattern: не включать Arrow optimization
# ПЛОХО: по умолчанию используется pickle
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false") # default in Spark < 4.0
df.toPandas() # 10-100x медленнее, чем нужно
# ХОРОШО: всегда включайте Arrow
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df.toPandas() # Arrow-based, columnar, fast
# В Spark 4.0+: Arrow включён по умолчанию
# Но лучше указывать явно для переносимостиЕсли вы используете toPandas() без arrow.pyspark.enabled=true на датасетах > 1M строк, вы тратите CPU и RAM впустую. Pickle сериализует каждую строку отдельно — это O(N) socket-вызовов вместо O(N/batch_size).
createDataFrame() с Arrow
Arrow ускоряет не только чтение из Spark, но и запись pandas DataFrame в Spark:
import pandas as pd
# Создаём pandas DataFrame
pdf = pd.DataFrame({
'id': range(1_000_000),
'value': [float(i) * 0.01 for i in range(1_000_000)],
'category': [f'cat_{i % 50}' for i in range(1_000_000)]
})
# Без Arrow: row-by-row через Py4J socket
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")
sdf_slow = spark.createDataFrame(pdf) # ~8 сек
# С Arrow: batch transfer через Arrow IPC
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
sdf_fast = spark.createDataFrame(pdf) # ~0.8 сек
# 10x ускорение
Конвертация типов
Arrow автоматически маппит pandas/NumPy типы в Spark типы:
| pandas / NumPy | Arrow | Spark |
|---|---|---|
| int64 | int64 | LongType |
| float64 | float64 | DoubleType |
| object (str) | utf8 | StringType |
| bool | bool | BooleanType |
| datetime64[ns] | timestamp[ns] | TimestampType |
| Categorical | dictionary | StringType |
PyArrow 22.0.0 и Pyodide
PyArrow 22.0.0 уже включён в Pyodide 0.29.3, который используется в наших интерактивных упражнениях. Это означает, что базовые операции с Arrow (создание таблиц, конвертация в pandas, работа с буферами) доступны прямо в браузере. Однако Arrow Flight и Spark Connect требуют серверной инфраструктуры.
Pandas UDF под капотом: Arrow batch serialization
В модуле UDF Performance мы видели, что Pandas UDF в 3-100x быстрее обычных Python UDF. Теперь мы понимаем почему — Arrow:
1M socket-вызовов vs 100 Arrow transfers
Настройка размера batch
# Размер Arrow batch для Pandas UDF (по умолчанию 10,000 строк)
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "10000")
# Для больших данных с простыми UDF -- увеличьте batch
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "50000")
# Больше batch = меньше overhead на transfer, больше memory
# Для сложных UDF (ML inference) -- уменьшите batch
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1000")
# Меньше batch = меньше memory, чаще checkpoint
Arrow-optimized UDF vs Legacy Pickle UDF
Сравнение двух подходов на конкретном примере — нормализация зарплат:
from pyspark.sql.functions import udf, pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd
# Legacy Python UDF (pickle serialization)
@udf(returnType=DoubleType())
def normalize_salary_legacy(salary, mean_salary, std_salary):
"""Pickle: 1 вызов на строку."""
if salary is None:
return None
return (salary - mean_salary) / std_salary
# Arrow-based Pandas UDF (vectorized)
@pandas_udf(DoubleType())
def normalize_salary_arrow(
salary: pd.Series,
mean_val: pd.Series,
std_val: pd.Series
) -> pd.Series:
"""Arrow: 1 вызов на batch (10K строк)."""
return (salary - mean_val) / std_val
# NumPy vectorized operation -- SIMD на весь batch!
Performance comparison (10M строк)
| Метрика | Legacy (pickle) | Pandas UDF (Arrow) |
|---|---|---|
| Время выполнения | 45 сек | 1.2 сек |
| Serialization calls | 10,000,000 | 1,000 |
| Memory overhead | High (pickle buffers) | Low (Arrow batches) |
| SIMD vectorization | Нет (scalar) | Да (NumPy/pandas) |
| Ускорение | baseline | 37x |
Практические рекомендации
Когда использовать Arrow optimization
# ВСЕГДА включайте для:
# 1. toPandas() на любых данных > 10K строк
# 2. createDataFrame() из pandas
# 3. Pandas UDF (Arrow используется автоматически)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
# Единственное исключение: нестандартные типы данных
# Arrow не поддерживает UserDefinedType (UDT)
# В этом случае Spark автоматически fallback на pickle
Checklist для Python + Spark проектов
spark.sql.execution.arrow.pyspark.enabled=true— всегда- Используйте Pandas UDF вместо обычных Python UDF — всегда
toPandas()только на агрегированных/отфильтрованных данных — не тяните 1 TB в pandasmaxRecordsPerBatch— настройте под ваш use case (default 10K обычно OK)- Проверьте PyArrow version:
import pyarrow; print(pyarrow.__version__)
Итог модуля
Apache Arrow — это клей между Spark, pandas, и внешними системами:
| Где Arrow | Что делает | Результат |
|---|---|---|
| toPandas() | Заменяет pickle на RecordBatch | 10-100x ускорение |
| createDataFrame() | Batch transfer вместо row-by-row | 3-10x ускорение |
| Pandas UDF | Arrow batch serialization | 3-100x vs Python UDF |
| Spark Connect | gRPC + Arrow Flight transport | Нет JVM на клиенте |
| Arrow Flight | Заменяет JDBC/ODBC | 10-100x для bulk transfer |
| Cross-language | Единый memory layout | Zero-copy между Python/Java/C++ |
Arrow — это не просто “ещё один формат”. Это инфраструктурный слой, который объясняет, почему Pandas UDF быстрые, почему Spark Connect работает без JVM, и почему современные аналитические системы могут обмениваться данными без сериализации.
Модуль Apache Arrow завершён. Вы теперь понимаете колоночный формат данных, zero-copy transfer, Flight protocol, Spark Connect архитектуру, и практическую интеграцию PyArrow с Spark.