Learning Platform
Глоссарий Troubleshooting
Урок 11.04 · 12 мин
Продвинутый
Spark ConnectClient-ServerArrow Flightpyspark.sql.connectLightweight Client

Spark Connect и Arrow

Проблема: PySpark = тяжёлый клиент

До Spark 3.4 PySpark был встроенным в кластер. Каждый PySpark-скрипт запускал полноценную JVM-сессию на машине пользователя (driver). Это создавало серьёзные проблемы:

Традиционный PySpark

JVM на каждом клиенте, 4-8 GB RAM minimum

Машина разработчика
Python process+ JVM process (driver)+ Spark Core + Catalyst+ Py4J bridge (Python ↔ JVM)JVM на ноутбуке! 4-8 GB RAM
RPC (JVM → JVM)
Spark Cluster(executors)

Проблемы тяжёлого клиента:

  • JVM на каждом клиенте: 4-8 GB RAM только для driver
  • Версионная привязка: клиентский PySpark должен точно соответствовать версии кластера
  • Py4J bottleneck: каждый вызов DataFrame API идёт через socket между Python и JVM
  • Нестабильность: OOM driver’а убивает весь кластер
  • IDE интеграция: JVM-зависимости усложняют установку в Jupyter/VS Code

Spark Connect: client-server через gRPC + Arrow Flight

Spark Connect (появился в Spark 3.4, станет основным режимом в Spark 4.0) полностью разделяет клиент и сервер:

Spark Connect

Без JVM, ~50 MB, ~100 MB RAM — gRPC + Arrow Flight

Машина разработчика
Python process ONLYpip install pyspark[connect](лёгкий gRPC клиент)Без JVM! ~100 MB RAM
gRPC + Arrow Flight
Spark Connect Server
gRPC endpointUnresolved Plan → Catalyst → Execute → Arrow RecordBatch
Executors(data processing)

Как это работает

1. Клиент строит query plan (не данные!):

# На клиенте -- Python-only, без JVM
from pyspark.sql import SparkSession

spark = SparkSession.builder.remote("sc://cluster:15002").getOrCreate()

# DataFrame API вызовы НЕ выполняются сразу --
# они строят unresolved logical plan
df = (spark.read.parquet("s3://data/events/")
    .filter("age > 30")
    .groupBy("dept").avg("salary"))

2. План отправляется как protobuf (несколько KB):

Protobuf Query Plan

~500 байт — передаётся план, не данные

UnresolvedPlan {
root: Aggregate {
groupBy: ["dept"]
agg: [avg("salary")]
child: Filter {
condition: "age > 30"
child: Read {
source: "s3://data/events/"
}
}
}
}
~500 байт protobuf (не данные!)

3. Сервер оптимизирует и выполняет:

  • Catalyst optimizer обрабатывает план
  • Physical planner выбирает стратегии (BroadcastJoin, SortMerge)
  • Executors обрабатывают данные на кластере

4. Результаты возвращаются как Arrow RecordBatch stream:

# Результат передаётся как Arrow stream
result = df.collect()
# Или toPandas() -- через Arrow, zero-copy

pandas_df = df.toPandas()
# Arrow RecordBatch → pandas DataFrame, без pickle
TIP

Protobuf для планов, Arrow для данных

Spark Connect использует два протокола: protobuf для передачи query plan (маленький, структурированный) и Arrow Flight для передачи результатов (большие, табличные). Это оптимально: план — это дерево операций (КБ), результат — это данные (МБ-ГБ). Каждый протокол делает то, для чего он лучше всего подходит.

Подключение к удалённому Spark-кластеру

from pyspark.sql import SparkSession

# Подключение через Spark Connect
# sc:// -- протокол Spark Connect (gRPC)
spark = SparkSession.builder \
    .remote("sc://spark-cluster.company.com:15002") \
    .getOrCreate()

# Далее -- обычный PySpark API!
df = spark.read.parquet("s3://data/warehouse/events/")

result = (df
    .filter(df.event_type == "purchase")
    .groupBy("user_id")
    .agg({"amount": "sum", "event_id": "count"})
    .orderBy("sum(amount)", ascending=False)
    .limit(100))

# Результат через Arrow Flight
result.show()

# toPandas() через Arrow (быстрый!)
pandas_df = result.toPandas()

spark.sql.execution.arrow.pyspark.enabled

Эта конфигурация контролирует, используется ли Arrow для toPandas() и createDataFrame():

# Включение Arrow optimization (рекомендуется всегда!)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# toPandas() с Arrow: 10-100x быстрее
df_large = spark.read.parquet("s3://data/1gb_table/")
pandas_df = df_large.toPandas()
# Без Arrow: pickle сериализация, ~30 сек для 1 GB
# С Arrow: RecordBatch streaming, ~0.5 сек для 1 GB

# createDataFrame() с Arrow: 3-10x быстрее
import pandas as pd

local_df = pd.DataFrame({"id": range(1_000_000), "value": range(1_000_000)})
spark_df = spark.createDataFrame(local_df)
# Без Arrow: row-by-row через Py4J, ~10 сек
# С Arrow: batch transfer, ~1 сек

Spark 4.0: Connect как основной режим

В Spark 4.0 Spark Connect становится рекомендуемым способом работы с PySpark:

АспектClassic PySparkSpark Connect (4.0)
JVM на клиентеДа (driver)Нет
RAM клиента4-8 GB~100 MB
Установкаpip install pyspark (500+ MB)pip install pyspark[connect] (~50 MB)
Версионная совместимостьТочная matchОбратная совместимость
ПротоколPy4J (socket)gRPC + Arrow Flight
Driver stabilityOOM убивает кластерКлиент изолирован
IDE supportТяжёлый (JVM deps)Лёгкий (pure Python)
NOTE

Полный deep-dive по Spark Connect

В этом уроке мы рассмотрели Spark Connect с точки зрения Arrow — как протокол обмена данными. В будущем модуле Advanced Topics мы детально разберём внутреннюю архитектуру Spark Connect: маршрутизацию планов, сессионное управление, обработку ошибок, plugin API и миграцию с классического PySpark.

Архитектура данных в Spark Connect

Полный путь данных от клиента до результата:

1. Python клиент
   df.filter("age > 30").groupBy("dept").avg("salary")
   ↓ (lazy evaluation -- строится plan tree)

2. Unresolved Logical Plan (protobuf)
   ↓ gRPC transfer (~500 bytes)

3. Spark Connect Server
   ↓ Catalyst: analyze → optimize → physical plan

4. Executors (parallel processing)
   ↓ Parquet → Arrow RecordBatch

5. Arrow Flight stream → Client
   ↓ RecordBatch → pandas DataFrame (zero-copy)

6. Python клиент получает результат
   pandas_df = result.toPandas()

Обратите внимание: данные никогда не проходят через клиент до финального результата. Промежуточные трансформации выполняются полностью на кластере. Клиент получает только финальный результат через Arrow Flight.

Проверка знанийKnowledge check
Почему Spark Connect передаёт query plan как protobuf, а результаты как Arrow RecordBatch?
ОтветAnswer
Query plan -- это структурированное дерево операций размером несколько КБ. Protobuf идеально подходит для таких структур: типизированный, компактный, с обратной совместимостью. Результаты -- это табличные данные размером МБ-ГБ. Arrow RecordBatch оптимизирован для табличных данных: колоночный формат, zero-copy deserialization, streaming по batch'ам. Использование одного формата для обоих случаев было бы неоптимальным.
Проверка знанийKnowledge check
Какие преимущества даёт Spark Connect по сравнению с классическим PySpark для разработчика?
ОтветAnswer
(1) Нет JVM на клиенте -- 100 MB RAM вместо 4-8 GB. (2) Лёгкая установка -- pip install pyspark[connect] (~50 MB) вместо 500+ MB. (3) Обратная совместимость -- клиент 4.0 может работать с кластером 3.5+. (4) Стабильность -- OOM клиента не убивает кластер. (5) Лучшая IDE интеграция -- pure Python, без JVM зависимостей. (6) Быстрый toPandas() через Arrow Flight вместо pickle.

Что дальше?

В следующем уроке мы перейдём к практике и разберём PyArrow интеграцию с Spark: как включить Arrow-оптимизации, benchmark toPandas() с Arrow vs без, и как Pandas UDF используют Arrow под капотом.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. До Spark Connect каждый PySpark-скрипт запускал JVM на машине пользователя. Какие проблемы это создавало?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6