Spark Connect и Arrow
Проблема: PySpark = тяжёлый клиент
До Spark 3.4 PySpark был встроенным в кластер. Каждый PySpark-скрипт запускал полноценную JVM-сессию на машине пользователя (driver). Это создавало серьёзные проблемы:
JVM на каждом клиенте, 4-8 GB RAM minimum
Проблемы тяжёлого клиента:
- JVM на каждом клиенте: 4-8 GB RAM только для driver
- Версионная привязка: клиентский PySpark должен точно соответствовать версии кластера
- Py4J bottleneck: каждый вызов DataFrame API идёт через socket между Python и JVM
- Нестабильность: OOM driver’а убивает весь кластер
- IDE интеграция: JVM-зависимости усложняют установку в Jupyter/VS Code
Spark Connect: client-server через gRPC + Arrow Flight
Spark Connect (появился в Spark 3.4, станет основным режимом в Spark 4.0) полностью разделяет клиент и сервер:
Без JVM, ~50 MB, ~100 MB RAM — gRPC + Arrow Flight
Как это работает
1. Клиент строит query plan (не данные!):
# На клиенте -- Python-only, без JVM
from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://cluster:15002").getOrCreate()
# DataFrame API вызовы НЕ выполняются сразу --
# они строят unresolved logical plan
df = (spark.read.parquet("s3://data/events/")
.filter("age > 30")
.groupBy("dept").avg("salary"))
2. План отправляется как protobuf (несколько KB):
~500 байт — передаётся план, не данные
3. Сервер оптимизирует и выполняет:
- Catalyst optimizer обрабатывает план
- Physical planner выбирает стратегии (BroadcastJoin, SortMerge)
- Executors обрабатывают данные на кластере
4. Результаты возвращаются как Arrow RecordBatch stream:
# Результат передаётся как Arrow stream
result = df.collect()
# Или toPandas() -- через Arrow, zero-copy
pandas_df = df.toPandas()
# Arrow RecordBatch → pandas DataFrame, без pickle
Protobuf для планов, Arrow для данных
Spark Connect использует два протокола: protobuf для передачи query plan (маленький, структурированный) и Arrow Flight для передачи результатов (большие, табличные). Это оптимально: план — это дерево операций (КБ), результат — это данные (МБ-ГБ). Каждый протокол делает то, для чего он лучше всего подходит.
Подключение к удалённому Spark-кластеру
from pyspark.sql import SparkSession
# Подключение через Spark Connect
# sc:// -- протокол Spark Connect (gRPC)
spark = SparkSession.builder \
.remote("sc://spark-cluster.company.com:15002") \
.getOrCreate()
# Далее -- обычный PySpark API!
df = spark.read.parquet("s3://data/warehouse/events/")
result = (df
.filter(df.event_type == "purchase")
.groupBy("user_id")
.agg({"amount": "sum", "event_id": "count"})
.orderBy("sum(amount)", ascending=False)
.limit(100))
# Результат через Arrow Flight
result.show()
# toPandas() через Arrow (быстрый!)
pandas_df = result.toPandas()
spark.sql.execution.arrow.pyspark.enabled
Эта конфигурация контролирует, используется ли Arrow для toPandas() и createDataFrame():
# Включение Arrow optimization (рекомендуется всегда!)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
# toPandas() с Arrow: 10-100x быстрее
df_large = spark.read.parquet("s3://data/1gb_table/")
pandas_df = df_large.toPandas()
# Без Arrow: pickle сериализация, ~30 сек для 1 GB
# С Arrow: RecordBatch streaming, ~0.5 сек для 1 GB
# createDataFrame() с Arrow: 3-10x быстрее
import pandas as pd
local_df = pd.DataFrame({"id": range(1_000_000), "value": range(1_000_000)})
spark_df = spark.createDataFrame(local_df)
# Без Arrow: row-by-row через Py4J, ~10 сек
# С Arrow: batch transfer, ~1 сек
Spark 4.0: Connect как основной режим
В Spark 4.0 Spark Connect становится рекомендуемым способом работы с PySpark:
| Аспект | Classic PySpark | Spark Connect (4.0) |
|---|---|---|
| JVM на клиенте | Да (driver) | Нет |
| RAM клиента | 4-8 GB | ~100 MB |
| Установка | pip install pyspark (500+ MB) | pip install pyspark[connect] (~50 MB) |
| Версионная совместимость | Точная match | Обратная совместимость |
| Протокол | Py4J (socket) | gRPC + Arrow Flight |
| Driver stability | OOM убивает кластер | Клиент изолирован |
| IDE support | Тяжёлый (JVM deps) | Лёгкий (pure Python) |
Полный deep-dive по Spark Connect
В этом уроке мы рассмотрели Spark Connect с точки зрения Arrow — как протокол обмена данными. В будущем модуле Advanced Topics мы детально разберём внутреннюю архитектуру Spark Connect: маршрутизацию планов, сессионное управление, обработку ошибок, plugin API и миграцию с классического PySpark.
Архитектура данных в Spark Connect
Полный путь данных от клиента до результата:
1. Python клиент
df.filter("age > 30").groupBy("dept").avg("salary")
↓ (lazy evaluation -- строится plan tree)
2. Unresolved Logical Plan (protobuf)
↓ gRPC transfer (~500 bytes)
3. Spark Connect Server
↓ Catalyst: analyze → optimize → physical plan
4. Executors (parallel processing)
↓ Parquet → Arrow RecordBatch
5. Arrow Flight stream → Client
↓ RecordBatch → pandas DataFrame (zero-copy)
6. Python клиент получает результат
pandas_df = result.toPandas()
Обратите внимание: данные никогда не проходят через клиент до финального результата. Промежуточные трансформации выполняются полностью на кластере. Клиент получает только финальный результат через Arrow Flight.
Что дальше?
В следующем уроке мы перейдём к практике и разберём PyArrow интеграцию с Spark: как включить Arrow-оптимизации, benchmark toPandas() с Arrow vs без, и как Pandas UDF используют Arrow под капотом.