SparkSession и SparkContext: Точки входа
Каждое Spark-приложение начинается с создания точки входа — объекта, который инициализирует подключение к кластеру и предоставляет API для работы с данными.
Эволюция точек входа
До Spark 2.0 существовало несколько отдельных точек входа, что создавало путаницу:
| Версия Spark | Точка входа | Назначение |
|---|---|---|
| 1.x | SparkContext | Низкоуровневый RDD API |
| 1.x | SQLContext | SQL-запросы и DataFrame |
| 1.x | HiveContext | Hive-интеграция, UDF |
| 2.0+ | SparkSession | Unified: всё в одном |
Начиная со Spark 2.0, SparkSession объединил все предыдущие точки входа. Вам больше не нужно создавать отдельные контексты — SparkSession инкапсулирует SparkContext, SQLContext и HiveContext внутри себя.
Создание SparkSession
Стандартный паттерн создания SparkSession — builder pattern:
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("MyAnalyticsApp")
.master("yarn")
.config("spark.executor.memory", "8g")
.config("spark.executor.cores", "4")
.config("spark.sql.shuffle.partitions", "200")
.getOrCreate()
)
Ключевые конфигурационные параметры
| Параметр | Описание | Значение по умолчанию |
|---|---|---|
spark.master | URL cluster manager (local, yarn, k8s://) | local[*] |
spark.app.name | Имя приложения (видно в Spark UI) | — |
spark.executor.memory | Память каждого executor | 1g |
spark.executor.cores | Количество cores на executor | 1 |
spark.driver.memory | Память driver-процесса | 1g |
spark.sql.shuffle.partitions | Число партиций после shuffle | 200 |
getOrCreate() vs builder.create()
Метод getOrCreate() реализует singleton pattern: если SparkSession с тем же appName уже существует в текущем JVM-процессе, он возвращает существующую сессию вместо создания новой.
# Первый вызов -- создаёт новую сессию
spark1 = SparkSession.builder.appName("App").getOrCreate()
# Второй вызов -- возвращает ТУ ЖЕ сессию
spark2 = SparkSession.builder.appName("App").getOrCreate()
assert spark1 is spark2 # True!
Это важно в нескольких сценариях:
- Jupyter notebooks: ячейки могут выполняться повторно,
getOrCreate()предотвращает создание дублирующих сессий - Модульный код: разные модули могут независимо получить ссылку на одну и ту же сессию
- Тестирование: тест-фреймворк может создать сессию один раз для всех тестов
Если вам нужна новая сессия с другой конфигурацией (например, другой appName), используйте SparkSession.builder.config(...).create() или newSession() для создания изолированного SQL-контекста.
SparkContext внутри SparkSession
SparkSession.sparkContext предоставляет доступ к низкоуровневому SparkContext:
# SparkContext доступен через SparkSession
sc = spark.sparkContext
# Используйте SparkContext для:
# - Broadcast-переменных
broadcast_var = sc.broadcast({"key": "value"})
# - Accumulators
counter = sc.accumulator(0)
# - Управления конфигурацией
print(sc.getConf().getAll())
# - Информации о кластере
print(f"App ID: {sc.applicationId}")
print(f"Master: {sc.master}")
В 99% случаев вы работаете через SparkSession. SparkContext нужен только для broadcast-переменных, accumulators и низкоуровневого RDD API.
Конфигурация: приоритет параметров
Spark читает конфигурацию из нескольких источников с чётким приоритетом (от высшего к низшему):
- Программная конфигурация —
SparkSession.builder.config("key", "value") - Флаги spark-submit —
--conf spark.executor.memory=8g - spark-defaults.conf — файл на узле, откуда запускается приложение
- Defaults — встроенные значения по умолчанию
# spark-submit с конфигурацией
spark-submit \
--master yarn \
--deploy-mode cluster \
--conf spark.executor.memory=16g \
--conf spark.executor.cores=4 \
--conf spark.dynamicAllocation.enabled=true \
my_app.py
Если один и тот же параметр задан в коде и в spark-submit, код имеет приоритет. Это часто становится проблемой: вы меняете --conf при запуске, но приложение игнорирует изменения, потому что значение захардкожено в коде.
Избегайте хардкода критических параметров (memory, cores, partitions) в коде приложения. Используйте spark-submit --conf или spark-defaults.conf, чтобы ops-команда могла менять конфигурацию без перекомпиляции.
Удаление устаревших точек входа Spark4.0
В Spark 4.0 отдельные SQLContext и HiveContext окончательно удалены как самостоятельные точки входа. Весь код должен использовать SparkSession. Если вы работаете с legacy-кодом, миграция проста:
# Было (Spark 1.x)
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext("local", "OldApp")
sqlContext = SQLContext(sc)
df = sqlContext.read.json("/data/")
# Стало (Spark 2.0+)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NewApp").getOrCreate()
df = spark.read.json("/data/")
Что дальше?
Теперь вы знаете, как создаётся точка входа в Spark. В следующем уроке мы разберём DAG Scheduler — компонент, который превращает ваши трансформации в оптимизированный план выполнения, разбитый на stages.