Spark Connect: клиент-серверная архитектура
Проблема традиционного режима
В традиционном режиме PySpark-приложение запускает driver process прямо на клиентской машине. Driver — это полноценный JVM-процесс, который:
- Содержит SparkContext и управляет всей координацией задач
- Держит сетевое соединение с каждым executor в кластере
- Потребляет сотни мегабайт памяти (JVM + метаданные планов)
- При crash клиентского процесса весь Spark-job погибает
Crash клиента = потеря всего job
Это создаёт tight coupling: клиент и кластер связаны напрямую. Версия PySpark на клиенте должна точно совпадать с версией на кластере. Обновление Spark требует обновления всех клиентских машин.
Архитектура Spark Connect
Spark3.5 Spark Connect, представленный в Spark 3.4 и доведённый до production-ready состояния в 3.5, решает эту проблему через клиент-серверную декомпозицию.
Идея простая: вынести driver из клиента на сервер. Клиент отправляет только описание плана (unresolved logical plan), а сервер выполняет всё остальное:
Client Application
PySpark Driver (~300MB)
SparkSession runs here
Direct JVM process
Spark Cluster
Executors
Tasks execute here
gRPC и Protocol Buffers
Клиент и сервер общаются через gRPC — высокопроизводительный RPC-фреймворк от Google. Каждая DataFrame-операция на клиенте транслируется в Protocol Buffers сообщение:
# Клиентский код выглядит ИДЕНТИЧНО обычному PySpark
from pyspark.sql import SparkSession
# Подключение через Spark Connect
spark = SparkSession.builder \
.remote("sc://spark-server:15002") \
.getOrCreate()
# Обычные DataFrame операции
df = spark.read.parquet("/data/events")
result = df.filter(df.age > 25).groupBy("city").count()
result.show()
Под капотом каждый вызов (filter, groupBy, count) не выполняется локально — он сериализуется в Protocol Buffers и отправляется на сервер. Сервер разворачивает план, оптимизирует через Catalyst и запускает на кластере.
Данные возвращаются клиенту в формате Apache Arrow (cross-reference M11: Arrow и нативная интеграция) — column-oriented, zero-copy десериализация.
Строка подключения
Spark Connect использует URI-схему sc://:
# Базовое подключение
spark.remote = "sc://hostname:15002"
# С параметрами
spark.remote = "sc://hostname:15002/;use_ssl=true;token=<auth-token>"
# Через SparkSession builder
spark = SparkSession.builder \
.remote("sc://spark-server:15002") \
.getOrCreate()
Порт 15002 — стандартный порт Spark Connect gRPC server.
pyspark-client: лёгкий клиент
Полный пакет pyspark весит ~300MB и тянет JVM-зависимости. Для работы через Spark Connect нужна только клиентская часть:
# Полный PySpark -- ~300MB, требует Java
pip install pyspark
# Только Spark Connect клиент -- ~5MB, НЕ требует Java
pip install pyspark[connect]
Пакет pyspark[connect] содержит только:
- gRPC-клиент для Protocol Buffers
- DataFrame API (трансляция операций в proto-сообщения)
- Arrow-десериализацию для получения результатов
Никакого JVM, никакого JAVA_HOME, никакого SPARK_HOME. Это радикально упрощает deployment на клиентских машинах и в CI/CD.
Преимущества Spark Connect
| Аспект | Традиционный режим | Spark Connect |
|---|---|---|
| Клиентский размер | ~300MB + JVM | ~5MB, без JVM |
| Crash isolation | Crash клиента = потеря job | Crash клиента не влияет на server |
| Версии | Client и cluster должны совпадать | Независимые версии (протокол стабилен) |
| Multi-tenancy | Один driver на приложение | Множество клиентов к одному серверу |
| Языки | Python, Scala, Java, R | Любой язык с gRPC (Go, Rust, …) |
| Ресурсы клиента | JVM heap + metastore connections | Минимальные (gRPC stub) |
Resource Isolation
В традиционном режиме crash одного пользовательского приложения может повлиять на стабильность shared cluster. С Spark Connect:
- Каждый клиент получает изолированную SparkSession на сервере
- Server управляет lifecycle сессий: timeout, cleanup, resource limits
- Один клиент не может уронить другого — сервер изолирует fault domains
Spark4.0 В Spark 4.0 Spark Connect стал рекомендуемым режимом работы. Улучшения включают расширенную поддержку streaming, UDF через Arrow и полную совместимость DataFrame API.
Spark Connect не заменяет Spark — он заменяет способ подключения к Spark. Все возможности (SQL, DataFrame, MLlib, Streaming) доступны через Connect. Единственное ограничение: low-level RDD API недоступен через Connect, но он и не рекомендуется для новых проектов.
Запуск Spark Connect Server
На стороне кластера Spark Connect server запускается как дополнительный endpoint:
# Запуск Spark Connect server
./sbin/start-connect-server.sh \
--master spark://master:7077 \
--conf spark.connect.grpc.binding.port=15002 \
--packages org.apache.spark:spark-connect_2.12:4.0.0
Server принимает gRPC-подключения и маршрутизирует их к SparkSession на кластере. Один server обслуживает множество клиентов.
Production deployment: в production Spark Connect server разворачивается за load balancer (nginx, envoy) с TLS termination. Аутентификация реализуется через token-based gRPC interceptors. Мониторинг — через стандартные gRPC metrics (Prometheus exporter).
Python Data Source API (Spark 4.0)
Spark4.0Помимо thin-client паттерна, Spark 4.0 принёс ещё одну важную Python-first возможность — Python Data Source API. Раньше для собственного коннектора (REST API, custom binary format, проприетарная система) приходилось писать Scala-код, реализующий traits DataSourceV2/Table/ScanBuilder — что предполагало JVM-экспертизу и доступ к Spark internals. С 4.0 тот же интерфейс есть в Python.
Базовая реализация custom data source
Custom source — это subclass DataSource плюс пара классов для reader/writer. Никакого JVM-кода:
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
class JsonLineDataSource(DataSource):
"""Custom reader для JSON Lines с произвольной фильтрацией."""
@classmethod
def name(cls) -> str:
return "jsonline"
def schema(self) -> StructType:
# Можно вернуть schema из options или определить динамически
return StructType([
StructField("user_id", IntegerType(), False),
StructField("event", StringType(), True),
])
def reader(self, schema: StructType) -> "JsonLineReader":
return JsonLineReader(self.options, schema)
class JsonLinePartition(InputPartition):
def __init__(self, file_path: str):
self.file_path = file_path
class JsonLineReader(DataSourceReader):
def __init__(self, options, schema):
self.path = options["path"]
self.schema = schema
def partitions(self):
# Один partition per файл
import glob
return [JsonLinePartition(p) for p in glob.glob(self.path)]
def read(self, partition: JsonLinePartition):
import json
with open(partition.file_path) as f:
for line in f:
row = json.loads(line)
yield (row.get("user_id"), row.get("event"))
# Регистрация и использование
spark.dataSource.register(JsonLineDataSource)
df = spark.read.format("jsonline").option("path", "/data/logs/*.jsonl").load()
df.show()
Trade-offs vs JVM-source vs UDF
| Подход | Production Suitability | Overhead | Where logic runs | Когда выбирать |
|---|---|---|---|---|
| Built-in source (parquet, jdbc) | Production-ready | Минимальный | JVM | Стандартные форматы |
| Scala/Java DataSourceV2 | Production-ready | Минимальный | JVM | Высокопроизводительные custom source |
| Python Data Source API | Production-ready (4.0+) | Малый (Arrow-based transfer) | Python worker | Custom source без JVM-экспертизы |
| Pandas UDF + spark.range driving | Workaround | Средний | Python worker | Legacy-проекты на 3.x |
| collect() -> spark.createDataFrame | Anti-pattern | Огромный | Driver | Только prototyping |
Ключевая выгода Python Data Source API — нативная интеграция в Spark plan. В отличие от UDF, которые требуют JVM <-> Python serialization on per-row basis, Python Data Source общается с executor через Arrow batch interface (как Pandas UDF), и Catalyst видит Source как обычный Scan-узел — predicate pushdown, partition pruning, column pruning работают штатно (если reader реализует соответствующие capabilities).
Streaming-aware sources
Python Data Source API поддерживает и streaming reads через subclass DataSourceStreamReader:
from pyspark.sql.datasource import DataSourceStreamReader
class RestApiStreamReader(DataSourceStreamReader):
def initialOffset(self):
return {"timestamp": 0}
def latestOffset(self):
# Текущий offset со стороны источника
return {"timestamp": int(time.time())}
def read(self, start_offset, end_offset):
# Fetch новые записи между offsets
response = requests.get(
f"https://api.example.com/events",
params={"from": start_offset["timestamp"],
"to": end_offset["timestamp"]}
)
for event in response.json():
yield (event["id"], event["payload"])
После регистрации source доступен через стандартный spark.readStream.format("custom_api").load() — структурированный стриминг видит его как любой другой source, с offset management и checkpoint-консистентностью.
Когда применять Python Data Source
- REST API connectors — Slack, GitHub, Stripe, internal HTTP services, где нет готового JDBC.
- Proprietary binary formats — internal company-specific формат, для которого писать Scala-парсер избыточно.
- HuggingFace datasets — streaming-чтение из HF Hub, который is Python-first.
- Synthetic data generators — тестовые datasets, генерируемые из distributions/seed.
- In-house storage systems — system-X с Python-SDK, но без Spark-коннектора.
Python Data Source API не заменяет JVM-source для горячих путей: Arrow-roundtrip всё равно стоит дороже native parquet-read. Но для коннекторов, где I/O bound (network), Python implementation в 90% случаев имеет тот же effective throughput, что и JVM-эквивалент — bottleneck вне Spark, в самом источнике.
# Production-pattern: registration в init script кластера
# /databricks/init/register_sources.py
def init_spark_sources(spark):
from my_company.spark_sources import (
SnowflakeStreamSource,
S3EventsSource,
ProtoBinarySource,
)
for source_cls in [SnowflakeStreamSource, S3EventsSource, ProtoBinarySource]:
spark.dataSource.register(source_cls)
Что дальше?
В следующем уроке мы разберём GraphX и GraphFrames — как Spark обрабатывает графовые структуры данных и какие алгоритмы доступны из коробки.