Learning Platform
Глоссарий Troubleshooting
Урок 11.06 · 16 мин
Продвинутый
Spark ConnectgRPCProtocol Bufferspyspark-clientClient-Server Architecture

Spark Connect: клиент-серверная архитектура

Проблема традиционного режима

В традиционном режиме PySpark-приложение запускает driver process прямо на клиентской машине. Driver — это полноценный JVM-процесс, который:

  • Содержит SparkContext и управляет всей координацией задач
  • Держит сетевое соединение с каждым executor в кластере
  • Потребляет сотни мегабайт памяти (JVM + метаданные планов)
  • При crash клиентского процесса весь Spark-job погибает
Традиционный режим (thick client)

Crash клиента = потеря всего job

Client Machine
Python Process+ JVM Driver+ SparkContext(~300MB pyspark)
RPC
Cluster
Executor 1
Executor 2
Executor 3

Это создаёт tight coupling: клиент и кластер связаны напрямую. Версия PySpark на клиенте должна точно совпадать с версией на кластере. Обновление Spark требует обновления всех клиентских машин.

Архитектура Spark Connect

Spark3.5 Spark Connect, представленный в Spark 3.4 и доведённый до production-ready состояния в 3.5, решает эту проблему через клиент-серверную декомпозицию.

Идея простая: вынести driver из клиента на сервер. Клиент отправляет только описание плана (unresolved logical plan), а сервер выполняет всё остальное:

Spark Connect: Traditional vs Connect Architecture

Client Application

PySpark Driver (~300MB)

SparkSession runs here

Direct JVM process

Direct connection

Spark Cluster

Executors

Tasks execute here

Client = Driver (coupled)
Client size~300 MB
Failure isolationCoupled
Multi-clientN/A
ClientServer / Cluster

gRPC и Protocol Buffers

Клиент и сервер общаются через gRPC — высокопроизводительный RPC-фреймворк от Google. Каждая DataFrame-операция на клиенте транслируется в Protocol Buffers сообщение:

# Клиентский код выглядит ИДЕНТИЧНО обычному PySpark
from pyspark.sql import SparkSession

# Подключение через Spark Connect
spark = SparkSession.builder \
    .remote("sc://spark-server:15002") \
    .getOrCreate()

# Обычные DataFrame операции
df = spark.read.parquet("/data/events")
result = df.filter(df.age > 25).groupBy("city").count()
result.show()

Под капотом каждый вызов (filter, groupBy, count) не выполняется локально — он сериализуется в Protocol Buffers и отправляется на сервер. Сервер разворачивает план, оптимизирует через Catalyst и запускает на кластере.

Spark Connect: gRPC-протокол клиент ↔ сервер
Client
Server
AnalyzePlan (proto)AnalyzePlanResponseExecutePlan (proto)Arrow RecordBatchArrow RecordBatchResultComplete

Данные возвращаются клиенту в формате Apache Arrow (cross-reference M11: Arrow и нативная интеграция) — column-oriented, zero-copy десериализация.

Строка подключения

Spark Connect использует URI-схему sc://:

# Базовое подключение
spark.remote = "sc://hostname:15002"

# С параметрами
spark.remote = "sc://hostname:15002/;use_ssl=true;token=<auth-token>"

# Через SparkSession builder
spark = SparkSession.builder \
    .remote("sc://spark-server:15002") \
    .getOrCreate()

Порт 15002 — стандартный порт Spark Connect gRPC server.

pyspark-client: лёгкий клиент

Полный пакет pyspark весит ~300MB и тянет JVM-зависимости. Для работы через Spark Connect нужна только клиентская часть:

# Полный PySpark -- ~300MB, требует Java
pip install pyspark

# Только Spark Connect клиент -- ~5MB, НЕ требует Java
pip install pyspark[connect]

Пакет pyspark[connect] содержит только:

  • gRPC-клиент для Protocol Buffers
  • DataFrame API (трансляция операций в proto-сообщения)
  • Arrow-десериализацию для получения результатов

Никакого JVM, никакого JAVA_HOME, никакого SPARK_HOME. Это радикально упрощает deployment на клиентских машинах и в CI/CD.

Преимущества Spark Connect

АспектТрадиционный режимSpark Connect
Клиентский размер~300MB + JVM~5MB, без JVM
Crash isolationCrash клиента = потеря jobCrash клиента не влияет на server
ВерсииClient и cluster должны совпадатьНезависимые версии (протокол стабилен)
Multi-tenancyОдин driver на приложениеМножество клиентов к одному серверу
ЯзыкиPython, Scala, Java, RЛюбой язык с gRPC (Go, Rust, …)
Ресурсы клиентаJVM heap + metastore connectionsМинимальные (gRPC stub)

Resource Isolation

В традиционном режиме crash одного пользовательского приложения может повлиять на стабильность shared cluster. С Spark Connect:

  • Каждый клиент получает изолированную SparkSession на сервере
  • Server управляет lifecycle сессий: timeout, cleanup, resource limits
  • Один клиент не может уронить другого — сервер изолирует fault domains

Spark4.0 В Spark 4.0 Spark Connect стал рекомендуемым режимом работы. Улучшения включают расширенную поддержку streaming, UDF через Arrow и полную совместимость DataFrame API.

TIP

Spark Connect не заменяет Spark — он заменяет способ подключения к Spark. Все возможности (SQL, DataFrame, MLlib, Streaming) доступны через Connect. Единственное ограничение: low-level RDD API недоступен через Connect, но он и не рекомендуется для новых проектов.

Запуск Spark Connect Server

На стороне кластера Spark Connect server запускается как дополнительный endpoint:

# Запуск Spark Connect server
./sbin/start-connect-server.sh \
  --master spark://master:7077 \
  --conf spark.connect.grpc.binding.port=15002 \
  --packages org.apache.spark:spark-connect_2.12:4.0.0

Server принимает gRPC-подключения и маршрутизирует их к SparkSession на кластере. Один server обслуживает множество клиентов.

WARNING

Production deployment: в production Spark Connect server разворачивается за load balancer (nginx, envoy) с TLS termination. Аутентификация реализуется через token-based gRPC interceptors. Мониторинг — через стандартные gRPC metrics (Prometheus exporter).

Проверка знанийKnowledge check
Чем Spark Connect отличается от традиционного режима PySpark? Назовите минимум 3 ключевых отличия.
ОтветAnswer
Ключевые отличия: (1) Клиент НЕ содержит JVM/driver -- только лёгкий gRPC-клиент (~5MB vs ~300MB); (2) Crash isolation -- crash клиента не уничтожает Spark job, сервер продолжает работу; (3) Независимость версий -- клиент и сервер могут быть разных версий Spark (протокол стабилен); (4) Multi-tenancy -- множество клиентов подключаются к одному серверу с изолированными SparkSession; (5) Language independence -- любой язык с gRPC-клиентом может работать со Spark.
Проверка знанийKnowledge check
Какой протокол используется для передачи данных между клиентом и Spark Connect сервером? Почему именно он?
ОтветAnswer
Для отправки запросов используется gRPC с Protocol Buffers -- высокопроизводительная бинарная сериализация, идеальная для передачи структурированных планов (Logical Plan). Для возврата результатов используется Apache Arrow -- column-oriented формат с zero-copy десериализацией, что минимизирует overhead при передаче табличных данных. Комбинация gRPC (control plane) + Arrow (data plane) обеспечивает оптимальную производительность.
Проверка знанийKnowledge check
Почему pyspark[connect] не требует Java на клиентской машине?
ОтветAnswer
В традиционном PySpark клиент содержит embedded JVM (driver process), который напрямую координирует executor на кластере. pyspark[connect] содержит только gRPC-клиент, который отправляет описание плана (Protocol Buffers) на удалённый Spark Connect server. Вся JVM-логика (Catalyst optimizer, Tungsten execution, SparkContext) выполняется на сервере. Клиенту нужен только Python + grpcio + pyarrow для десериализации результатов.

Python Data Source API (Spark 4.0)

Spark4.0

Помимо thin-client паттерна, Spark 4.0 принёс ещё одну важную Python-first возможность — Python Data Source API. Раньше для собственного коннектора (REST API, custom binary format, проприетарная система) приходилось писать Scala-код, реализующий traits DataSourceV2/Table/ScanBuilder — что предполагало JVM-экспертизу и доступ к Spark internals. С 4.0 тот же интерфейс есть в Python.

Базовая реализация custom data source

Custom source — это subclass DataSource плюс пара классов для reader/writer. Никакого JVM-кода:

from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

class JsonLineDataSource(DataSource):
    """Custom reader для JSON Lines с произвольной фильтрацией."""

    @classmethod
    def name(cls) -> str:
        return "jsonline"

    def schema(self) -> StructType:
        # Можно вернуть schema из options или определить динамически
        return StructType([
            StructField("user_id", IntegerType(), False),
            StructField("event", StringType(), True),
        ])

    def reader(self, schema: StructType) -> "JsonLineReader":
        return JsonLineReader(self.options, schema)


class JsonLinePartition(InputPartition):
    def __init__(self, file_path: str):
        self.file_path = file_path


class JsonLineReader(DataSourceReader):
    def __init__(self, options, schema):
        self.path = options["path"]
        self.schema = schema

    def partitions(self):
        # Один partition per файл
        import glob
        return [JsonLinePartition(p) for p in glob.glob(self.path)]

    def read(self, partition: JsonLinePartition):
        import json
        with open(partition.file_path) as f:
            for line in f:
                row = json.loads(line)
                yield (row.get("user_id"), row.get("event"))


# Регистрация и использование
spark.dataSource.register(JsonLineDataSource)

df = spark.read.format("jsonline").option("path", "/data/logs/*.jsonl").load()
df.show()

Trade-offs vs JVM-source vs UDF

ПодходProduction SuitabilityOverheadWhere logic runsКогда выбирать
Built-in source (parquet, jdbc)Production-readyМинимальныйJVMСтандартные форматы
Scala/Java DataSourceV2Production-readyМинимальныйJVMВысокопроизводительные custom source
Python Data Source APIProduction-ready (4.0+)Малый (Arrow-based transfer)Python workerCustom source без JVM-экспертизы
Pandas UDF + spark.range drivingWorkaroundСреднийPython workerLegacy-проекты на 3.x
collect() -> spark.createDataFrameAnti-patternОгромныйDriverТолько prototyping

Ключевая выгода Python Data Source API — нативная интеграция в Spark plan. В отличие от UDF, которые требуют JVM <-> Python serialization on per-row basis, Python Data Source общается с executor через Arrow batch interface (как Pandas UDF), и Catalyst видит Source как обычный Scan-узел — predicate pushdown, partition pruning, column pruning работают штатно (если reader реализует соответствующие capabilities).

Streaming-aware sources

Python Data Source API поддерживает и streaming reads через subclass DataSourceStreamReader:

from pyspark.sql.datasource import DataSourceStreamReader

class RestApiStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        return {"timestamp": 0}

    def latestOffset(self):
        # Текущий offset со стороны источника
        return {"timestamp": int(time.time())}

    def read(self, start_offset, end_offset):
        # Fetch новые записи между offsets
        response = requests.get(
            f"https://api.example.com/events",
            params={"from": start_offset["timestamp"],
                    "to": end_offset["timestamp"]}
        )
        for event in response.json():
            yield (event["id"], event["payload"])

После регистрации source доступен через стандартный spark.readStream.format("custom_api").load() — структурированный стриминг видит его как любой другой source, с offset management и checkpoint-консистентностью.

Когда применять Python Data Source

  • REST API connectors — Slack, GitHub, Stripe, internal HTTP services, где нет готового JDBC.
  • Proprietary binary formats — internal company-specific формат, для которого писать Scala-парсер избыточно.
  • HuggingFace datasets — streaming-чтение из HF Hub, который is Python-first.
  • Synthetic data generators — тестовые datasets, генерируемые из distributions/seed.
  • In-house storage systems — system-X с Python-SDK, но без Spark-коннектора.
NOTE

Python Data Source API не заменяет JVM-source для горячих путей: Arrow-roundtrip всё равно стоит дороже native parquet-read. Но для коннекторов, где I/O bound (network), Python implementation в 90% случаев имеет тот же effective throughput, что и JVM-эквивалент — bottleneck вне Spark, в самом источнике.

# Production-pattern: registration в init script кластера
# /databricks/init/register_sources.py
def init_spark_sources(spark):
    from my_company.spark_sources import (
        SnowflakeStreamSource,
        S3EventsSource,
        ProtoBinarySource,
    )
    for source_cls in [SnowflakeStreamSource, S3EventsSource, ProtoBinarySource]:
        spark.dataSource.register(source_cls)

Что дальше?

В следующем уроке мы разберём GraphX и GraphFrames — как Spark обрабатывает графовые структуры данных и какие алгоритмы доступны из коробки.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 6. Какую проблему решает Spark Connect по сравнению с традиционным режимом PySpark?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6