Справочник ключевых терминов курса Apache Spark.
Унифицированная точка входа в Spark (с версии 2.0). Объединяет SparkContext, SQLContext и HiveContext в единый API. Через SparkSession регистрируются таблицы, выполняются SQL-запросы и настраиваются параметры приложения.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MyApp") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
df = spark.sql("SELECT 1 AS id")
df.show()Процесс, который координирует выполнение Spark-приложения. Driver запускает SparkContext, создаёт DAG из трансформаций, разбивает его на stage-и и распределяет task-и по executor-ам. Содержит метаданные о всех partition-ах и текущем состоянии выполнения.
# Driver — это процесс, где выполняется ваш main()
# Код внутри spark.sql() / df.filter() описывает DAG
# Реальная работа происходит на executor-ах
spark = SparkSession.builder.master("yarn").getOrCreate()
# Это описание DAG (на driver-е)
result = spark.read.parquet("/data") \
.filter("age > 30") \
.groupBy("city").count()
# Это запуск выполнения (driver отправляет task-и)
result.show()JVM-процесс на worker-ноде, выполняющий task-и и хранящий данные в памяти или на диске. Каждое Spark-приложение получает собственные executor-ы, изолированные друг от друга. Executor управляет пулом потоков для параллельного выполнения task-ов и блочным менеджером для кеширования.
# Настройка executor-ов
spark = SparkSession.builder \
.config("spark.executor.instances", "10") \
.config("spark.executor.memory", "8g") \
.config("spark.executor.cores", "4") \
.getOrCreate()
# Каждый executor получит 8 GB RAM и 4 ядра
# 10 executor-ов = 40 параллельных task-овГраф зависимостей между RDD-партициями, описывающий полный план вычислений. DAGScheduler анализирует граф и разбивает его на stage-и по shuffle-границам. Каждый вызов action (collect, count, write) порождает отдельный job в DAG.
# DAG строится из цепочки трансформаций
df = spark.read.parquet("/events")
# Stage 1: scan + filter (narrow)
filtered = df.filter("event_type = 'purchase'")
# Shuffle boundary → Stage 2
grouped = filtered.groupBy("user_id").agg(
F.sum("amount").alias("total")
)
# Action запускает DAG
grouped.write.parquet("/output")Набор task-ов, которые могут выполняться параллельно без shuffle. Stage-и разделяются shuffle-границами — каждая wide-трансформация (groupBy, join, repartition) создаёт новый stage. DAGScheduler создаёт stage-и и определяет порядок их выполнения.
-- Просмотр stage-ей через EXPLAIN
EXPLAIN EXTENDED
SELECT dept, AVG(salary)
FROM employees
WHERE active = true
GROUP BY dept;
-- Stage 0: Scan + Filter (narrow transformations)
-- Shuffle (Exchange hashpartitioning(dept))
-- Stage 1: Aggregate (после shuffle)Минимальная единица выполнения в Spark — обработка одной partition на одном ядре executor-а. Количество task-ов в stage равно количеству partition-ов. Task получает код для выполнения от driver-а и оперирует только своей локальной partition-ой данных.
# Число task-ов = число partition-ов
df = spark.read.parquet("/data") # 200 файлов → 200 partition → 200 task-ов
# Контроль параллелизма через repartition
df_repart = df.repartition(50) # 50 task-ов в следующем stage
# Просмотр числа partition-ов
print(f"Partitions: {df.rdd.getNumPartitions()}")Логическая единица параллелизма в Spark. Данные распределяются по partition-ам, и каждая partition обрабатывается отдельным task-ом на одном ядре. Число partition-ов определяет степень параллелизма и влияет на утилизацию ресурсов кластера.
from pyspark.sql import functions as F
df = spark.read.parquet("/data")
print(f"Partitions: {df.rdd.getNumPartitions()}")
# Увеличить partition-ы (shuffle)
df_more = df.repartition(200)
# Уменьшить partition-ы (без shuffle)
df_less = df.coalesce(10)
# Partition по колонке для оптимизации join
df_part = df.repartition(100, "user_id")Rule-based и cost-based оптимизатор в Spark SQL. Трансформирует логический план запроса в оптимальный физический план через 4 фазы: Analysis → Logical Optimization → Physical Planning → Code Generation. Использует pattern-matching для применения правил оптимизации.
-- Просмотр всех фаз оптимизации
EXPLAIN EXTENDED
SELECT u.name, COUNT(o.id)
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE u.active = true
GROUP BY u.name;
-- Покажет: Parsed → Analyzed → Optimized → Physical PlanДекларативное дерево операций, описывающее «что» нужно сделать без указания «как». Содержит узлы: Scan, Filter, Project, Aggregate, Join, Sort. Catalyst применяет правила оптимизации (predicate pushdown, column pruning, constant folding) к логическому плану перед генерацией физического.
# Просмотр логического плана в PySpark
df = spark.read.parquet("/sales") \
.filter("amount > 100") \
.groupBy("region").sum("amount")
# Нераcпарсенный план
print(df._jdf.queryExecution().logical())
# Оптимизированный план
print(df._jdf.queryExecution().optimizedPlan())Исполняемое дерево операторов, определяющее конкретный способ выполнения запроса. Physical Plan выбирает стратегии join (BroadcastHashJoin, SortMergeJoin), режимы сканирования и способы агрегации на основе статистик и cost-based оптимизации.
-- Просмотр физического плана с метриками
EXPLAIN COST
SELECT *
FROM orders o
JOIN products p ON o.product_id = p.id
WHERE p.category = 'electronics';
-- Результат покажет выбранную стратегию join:
-- BroadcastHashJoin (если products < broadcast threshold)
-- или SortMergeJoin (если обе таблицы большие)Проект оптимизации производительности Spark, фокусирующийся на CPU и памяти. Включает: off-heap memory management (обход GC), UnsafeRow (компактная бинарная сериализация строк), cache-aware вычисления и whole-stage code generation. Устраняет основные JVM overhead-ы.
// Tungsten использует UnsafeRow для компактного хранения
// Вместо Java-объектов — бинарный формат
// Настройка off-heap памяти Tungsten
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g")
// Tungsten автоматически активен в DataFrame/SQL API
// RDD API не использует Tungsten-оптимизацииТехника генерации Java-байткода из цепочки операторов Catalyst-плана. Вместо итерации по виртуальным вызовам операторов, генерируется один метод, обрабатывающий строку от scan до финального оператора. Устраняет overhead виртуальных вызовов и улучшает CPU cache locality.
-- Звёздочка (*) в EXPLAIN означает CodeGen
EXPLAIN
SELECT name, age
FROM users
WHERE age > 30;
-- *(1) Filter (age > 30) ← CodeGen
-- *(1) ColumnarToRow ← CodeGen (одна цепочка)
-- Scan parquet [name, age]
-- Отключение для отладки:
-- SET spark.sql.codegen.wholeStage = false;Компактный бинарный формат представления строки данных в Tungsten. Хранит данные в непрерывном блоке off-heap памяти без Java-объектов, что устраняет GC-давление и позволяет прямое сравнение байтов. Каждая строка содержит null-bitmap, fixed-length данные и variable-length область.
// UnsafeRow — внутренний формат, но можно наблюдать
// через метрики в Spark UI:
// - "peak memory" в sort/aggregate операторах
// - "spill size" при нехватке памяти
// Размер UnsafeRow зависит от схемы:
// INT: 8 байт (aligned), STRING: 8 байт offset + длина
// NULL bitmap: ceil(numFields / 64) * 8 байт
// Пример: строка (id INT, name STRING, age INT)
// = 8 (bitmap) + 8 + 8 + 8 = 32 байт fixed
// + variable length для nameРаспределённая коллекция данных с именованными колонками, аналогичная таблице в реляционной БД. Основной API для работы с данными в Spark. DataFrame оптимизируется через Catalyst и выполняется через Tungsten, что делает его значительно быстрее RDD API.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
# Создание DataFrame из файла
df = spark.read.parquet("/data/users")
# Операции с DataFrame
result = df.select("name", "age") \
.filter(F.col("age") > 25) \
.withColumn("age_group",
F.when(F.col("age") < 30, "young").otherwise("senior"))Типизированный DataFrame, доступный только в Scala и Java. Dataset[T] обеспечивает compile-time проверку типов и позволяет использовать лямбда-функции с сохранением Catalyst-оптимизаций. В PySpark DataFrame эквивалентен Dataset[Row].
// Scala: типизированный Dataset
case class User(name: String, age: Int)
val users: Dataset[User] = spark.read
.parquet("/data/users")
.as[User] // compile-time type safety
// Типизированные операции
val adults = users
.filter(_.age >= 18)
.map(u => u.copy(name = u.name.toUpperCase))
// В PySpark нет Dataset[T], только DataFrame = Dataset[Row]Операция, создающая новый DataFrame из существующего без запуска вычислений (lazy). Бывает narrow (без shuffle: filter, select, map) и wide (с shuffle: groupBy, join, repartition). Трансформации записываются в DAG и выполняются только при вызове action.
df = spark.read.parquet("/events")
# Narrow transformations (без shuffle)
filtered = df.filter("event_type = 'click'")
projected = filtered.select("user_id", "timestamp")
# Wide transformations (вызывают shuffle)
grouped = projected.groupBy("user_id").count()
joined = df.join(other_df, "id")
# Ничего не выполнено! Всё lazy.
# Выполнение начнётся только при action:
grouped.show() # ← action запускает DAGОперация, которая запускает выполнение DAG и возвращает результат driver-у или записывает данные. Примеры: collect(), count(), show(), write(). Без вызова action ни одна трансформация не выполняется (lazy evaluation). Каждый action создаёт новый job.
df = spark.read.parquet("/data")
filtered = df.filter("age > 30") # Трансформация — ничего не выполняется
# Actions — запускают выполнение:
filtered.count() # → число строк (Long)
filtered.show(5) # → вывод 5 строк в консоль
filtered.collect() # → все строки в driver (осторожно с OOM!)
filtered.first() # → первая строка
filtered.write.parquet("/output") # → запись на дискСтратегия отложенного выполнения: трансформации не выполняются сразу, а записываются в DAG. Выполнение начинается только при вызове action. Это позволяет Catalyst оптимизировать весь план целиком — убирать ненужные колонки, проталкивать фильтры ближе к источнику данных.
# Все трансформации — lazy (описание плана)
df = spark.read.parquet("/huge_table") # Не читает данные!
filtered = df.filter("date = '2024-01-01'") # Не фильтрует!
result = filtered.select("id", "amount") # Не проецирует!
# Catalyst оптимизирует: pushdown фильтра + column pruning
# В итоге читаются только 2 колонки за 1 день
result.show() # ← Только здесь начинается чтение и обработкаФункции, выполняющие вычисления над набором строк (окном), связанных с текущей строкой. В отличие от groupBy, window functions не схлопывают строки — каждая строка сохраняется с добавленным результатом. Поддерживают ranking (row_number, rank), аналитику (lag, lead) и агрегации (sum, avg) по окнам.
from pyspark.sql import Window
from pyspark.sql import functions as F
# Окно по отделу, сортировка по зарплате
w = Window.partitionBy("dept").orderBy(F.desc("salary"))
df.withColumn("rank", F.row_number().over(w)) \
.withColumn("dept_avg", F.avg("salary").over(
Window.partitionBy("dept")
)) \
.withColumn("prev_salary", F.lag("salary", 1).over(w))Реестр метаданных, хранящий информацию о базах данных, таблицах, представлениях и функциях. Catalog поддерживает как временные (session-scoped) объекты, так и постоянные (Hive Metastore). С Spark 3.x DataSourceV2 позволяет подключать внешние каталоги (Iceberg, Delta).
# Работа с каталогом
spark.sql("CREATE DATABASE IF NOT EXISTS analytics")
spark.sql("USE analytics")
# Регистрация таблицы
df.write.saveAsTable("events")
# Просмотр метаданных
spark.catalog.listDatabases()
spark.catalog.listTables("analytics")
spark.catalog.listColumns("events")
# Временное представление (session-scoped)
df.createOrReplaceTempView("temp_events")Механизм динамической оптимизации в Spark 3.x, который адаптирует план выполнения на основе runtime-статистик после каждого shuffle. Включает: автоматическое объединение маленьких partition-ов (coalesce), обработку data skew в join-ах и dynamic partition pruning.
# Включение AQE (по умолчанию с Spark 3.2)
spark.conf.set("spark.sql.adaptive.enabled", "true")
# Автоматическое объединение мелких partition-ов
spark.conf.set(
"spark.sql.adaptive.coalescePartitions.enabled", "true"
)
spark.conf.set(
"spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m"
)
# Обработка skew в join
spark.conf.set(
"spark.sql.adaptive.skewJoin.enabled", "true"
)Стратегия join, при которой маленькая таблица (dimension) целиком рассылается всем executor-ам, избегая shuffle большой таблицы (fact). Значительно быстрее SortMergeJoin для случаев, когда одна сторона помещается в память. Порог контролируется параметром autoBroadcastJoinThreshold.
from pyspark.sql import functions as F
# Автоматический broadcast (таблица < 10 MB по умолчанию)
result = big_df.join(small_df, "id")
# Явный broadcast hint
result = big_df.join(
F.broadcast(small_df), "id"
)
# Настройка порога
spark.conf.set(
"spark.sql.autoBroadcastJoinThreshold", "50m"
) # broadcast таблицы до 50 MB
# Отключение broadcast
spark.conf.set(
"spark.sql.autoBroadcastJoinThreshold", "-1"
)Техника предварительного распределения данных по bucket-ам (корзинам) на основе хеш-функции от ключевых колонок. При join двух таблиц, bucketed по одному ключу, Spark пропускает shuffle — данные уже распределены корректно. Эффективно для повторяющихся join-ов по одним и тем же ключам.
# Запись с bucketing
df.write \
.bucketBy(32, "user_id") \
.sortBy("user_id") \
.saveAsTable("bucketed_orders")
# Join без shuffle (bucket-to-bucket)
orders = spark.table("bucketed_orders") # 32 buckets by user_id
users = spark.table("bucketed_users") # 32 buckets by user_id
result = orders.join(users, "user_id") # No shuffle!Неравномерное распределение данных по partition-ам, при котором один или несколько task-ов обрабатывают значительно больше данных, чем остальные. Data skew — основная причина «зависших» stage-ей и OOM-ошибок. Диагностируется в Spark UI по разнице между median и max task duration.
# Диагностика skew
df.groupBy("key").count().orderBy(
F.desc("count")
).show(10) # Покажет самые частые ключи
# Решение 1: AQE skew join (Spark 3.x)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Решение 2: Salting (ручное)
from pyspark.sql.functions import rand, concat, lit
df_salted = df.withColumn(
"salted_key", concat("key", lit("_"), (rand() * 10).cast("int"))
)Оптимизация, при которой фильтр из dimension-таблицы join-а применяется к fact-таблице на этапе scan. Если dimension-таблица после фильтрации содержит 3 значения ключа, то из fact-таблицы будут прочитаны только partition-ы с этими ключами. Значительно снижает объём I/O.
-- DPP автоматически применяется в star-schema join-ах
SELECT f.*, d.category_name
FROM fact_sales f
JOIN dim_category d ON f.category_id = d.id
WHERE d.department = 'Electronics';
-- Без DPP: читает ВСЕ partition-ы fact_sales
-- С DPP: читает только partition-ы где category_id IN (
-- SELECT id FROM dim_category WHERE department = 'Electronics'
-- )
-- Конфигурация
-- spark.sql.optimizer.dynamicPartitionPruning.enabled = trueПерераспределение данных между executor-ами по сети. Самая дорогая операция в Spark: данные сериализуются, записываются на диск, передаются по сети и десериализуются. Возникает при groupBy(), join(), repartition(), distinct(). Оптимизация shuffle — ключ к производительности.
# Операции, вызывающие shuffle
df.groupBy("key").count() # shuffle по key
df.join(other, "id") # shuffle обеих таблиц
df.repartition(200) # полный shuffle
df.distinct() # shuffle для дедупликации
# Операции БЕЗ shuffle
df.filter("age > 30") # narrow
df.select("name", "age") # narrow
df.coalesce(10) # без shuffle (уменьшение)
# Настройка числа shuffle-partition-ов
spark.conf.set("spark.sql.shuffle.partitions", "200")Механизм сохранения промежуточных результатов в памяти или на диске для повторного использования. cache() сохраняет DataFrame в MEMORY_AND_DISK. persist() позволяет выбрать уровень хранения. Кешированные данные хранятся на executor-ах и доступны без повторного вычисления.
from pyspark import StorageLevel
# Кеширование в память (MEMORY_AND_DISK)
df_cached = df.filter("active = true").cache()
df_cached.count() # Материализует кеш
# Разные уровни persistence
df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.DISK_ONLY)
df.persist(StorageLevel.MEMORY_AND_DISK_SER) # Сжатый
# Освобождение кеша
df_cached.unpersist()
# Проверка в Spark UI → Storage tabКолоночный формат файлов, де-факто стандарт для аналитических workload-ов. Поддерживает predicate pushdown (чтение только нужных row-group-ов), column pruning (чтение только нужных колонок), встроенную компрессию (Snappy, Zstd) и статистики (min/max) для data skipping.
# Запись в Parquet с партиционированием
df.write \
.partitionBy("year", "month") \
.option("compression", "zstd") \
.mode("overwrite") \
.parquet("/data/events")
# Чтение с predicate pushdown
df = spark.read.parquet("/data/events") \
.filter("year = 2024 AND month = 1") # Читает только нужные partition-ы
# Просмотр схемы без чтения данных
spark.read.parquet("/data/events").printSchema()Open-source lakehouse формат от Databricks, построенный поверх Parquet. Обеспечивает ACID-транзакции, time travel (запросы к предыдущим версиям данных), schema enforcement/evolution, Z-ordering для оптимизации чтения и MERGE (upsert) операции. Транзакционный лог хранится в _delta_log/.
# Запись в Delta
df.write.format("delta").save("/data/events_delta")
# Time travel — чтение предыдущей версии
spark.read.format("delta") \
.option("versionAsOf", 5) \
.load("/data/events_delta")
# MERGE (upsert)
from delta.tables import DeltaTable
delta_t = DeltaTable.forPath(spark, "/data/events_delta")
delta_t.alias("t").merge(
updates.alias("u"), "t.id = u.id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()Lakehouse формат с hidden partitioning, schema evolution и time travel. Iceberg использует catalog-based управление таблицами и хранит метаданные в иерархии: catalog → metadata files → manifest lists → manifest files → data files. Поддерживает partition evolution без перезаписи данных.
-- Создание Iceberg-таблицы
CREATE TABLE catalog.db.events (
id BIGINT,
event_type STRING,
ts TIMESTAMP
) USING iceberg
PARTITIONED BY (days(ts));
-- Hidden partitioning — не нужно указывать partition-колонку
SELECT * FROM catalog.db.events
WHERE ts > '2024-01-01'; -- Автоматический partition pruning
-- Time travel
SELECT * FROM catalog.db.events
VERSION AS OF 1234567890;Техника упорядочивания данных внутри файлов по нескольким колонкам одновременно с помощью Z-кривой (space-filling curve). Улучшает data skipping — запросы с фильтрами по Z-ordered колонкам пропускают файлы, не содержащие релевантных значений. Особенно эффективна для высококардинальных колонок.
-- Z-ordering в Delta Lake
OPTIMIZE events
ZORDER BY (user_id, event_date);
-- После Z-ordering запросы по user_id и/или event_date
-- пропускают файлы, где min/max не пересекаются с фильтром
SELECT * FROM events
WHERE user_id = 'abc' AND event_date = '2024-01-01';
-- Читает ~1% файлов вместо 100%Процесс объединения мелких файлов в крупные для улучшения производительности чтения. Мелкие файлы создают overhead на открытие/закрытие файлов и метаданные. В Delta Lake выполняется через OPTIMIZE, в Iceberg — через rewriteDataFiles(). Рекомендуемый размер файла: 128 MB — 1 GB.
-- Compaction в Delta Lake
OPTIMIZE '/data/events_delta';
-- С Z-ordering
OPTIMIZE '/data/events_delta'
ZORDER BY (user_id);
-- Удаление старых версий после compaction
VACUUM '/data/events_delta' RETAIN 168 HOURS;
-- В Iceberg (через Spark procedure)
CALL catalog.system.rewrite_data_files(
table => 'db.events',
options => map('target-file-size-bytes', '134217728')
);Проблема производительности при наличии множества мелких файлов (< 1-10 MB). Каждый файл создаёт отдельный task в Spark, overhead на метаданные в NameNode (HDFS) и замедляет listing в object storage (S3). Возникает при частых записях, streaming и неправильном партиционировании.
# Диагностика
import os
path = "/data/events/"
files = spark.read.parquet(path).inputFiles()
print(f"Файлов: {len(files)}")
print(f"Partitions: {spark.read.parquet(path).rdd.getNumPartitions()}")
# Решение 1: coalesce перед записью
df.coalesce(10).write.parquet("/output")
# Решение 2: repartition по колонке
df.repartition("date").write.partitionBy("date").parquet("/output")
# Решение 3: OPTIMIZE (Delta Lake)
# OPTIMIZE '/data/events_delta'Архитектурный паттерн, объединяющий надёжность data warehouse (ACID, схема) с масштабируемостью data lake (дешёвое хранилище, open formats). Реализуется через табличные форматы (Delta Lake, Iceberg, Hudi) поверх объектного хранилища (S3, GCS, ADLS). Устраняет необходимость в отдельном ETL между lake и warehouse.
-- Типичная Lakehouse архитектура в Spark
-- Bronze layer: сырые данные
raw_df.write.format("delta").save("/lakehouse/bronze/events")
-- Silver layer: очищенные данные
cleaned_df.write.format("delta").save("/lakehouse/silver/events")
-- Gold layer: агрегаты для аналитики
agg_df.write.format("delta").save("/lakehouse/gold/daily_metrics")
-- Все слои используют ACID, time travel, schema enforcementПотоковый API Spark, построенный на DataFrame/Dataset. Трактует поток данных как бесконечную таблицу, к которой применяются те же операции, что и к batch-данным. Поддерживает exactly-once семантику через checkpointing и three output modes: Append, Update, Complete.
# Чтение потока из Kafka
stream_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "events") \
.load()
# Обработка (тот же API, что и batch)
parsed = stream_df.selectExpr(
"CAST(value AS STRING)",
"timestamp"
)
# Запись результата
query = parsed.writeStream \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", "/cp/events") \
.start("/data/events")Механизм отслеживания прогресса event time в потоковой обработке. Watermark определяет порог — данные с event time старше watermark-а считаются «поздними» и могут быть отброшены. Это позволяет Spark очищать state store от устаревших состояний и ограничивать потребление памяти.
from pyspark.sql import functions as F
# Watermark: данные опаздывают максимум на 10 минут
stream_df = stream_df \
.withWatermark("event_time", "10 minutes")
# Агрегация с watermark
result = stream_df \
.groupBy(
F.window("event_time", "5 minutes"),
"user_id"
).count()
# Без watermark state растёт бесконечно!
# С watermark: окна старше 10 мин очищаются автоматическиОпределяет, когда запускать следующий micro-batch в Structured Streaming. Варианты: processingTime (фиксированный интервал), once (однократный запуск для backfill), availableNow (обработать всё доступное и остановиться), continuous (экспериментальный low-latency режим с ~1 мс задержкой).
from pyspark.sql.streaming import Trigger
# Micro-batch каждые 30 секунд
query = df.writeStream \
.trigger(processingTime="30 seconds") \
.start()
# Однократный запуск (для backfill)
query = df.writeStream \
.trigger(once=True) \
.start()
# Обработать всё доступное и остановиться
query = df.writeStream \
.trigger(availableNow=True) \
.start()
# Continuous (экспериментальный, ~1 мс latency)
query = df.writeStream \
.trigger(continuous="1 second") \
.start()Определяет, какие строки записываются в sink после каждого micro-batch. Append — только новые строки (для фильтров, map). Update — только изменившиеся строки (для агрегаций с watermark). Complete — полный результат целиком (для агрегаций без watermark). Выбор режима зависит от типа запроса.
# Append: только новые строки (подходит для ETL)
df.writeStream.outputMode("append").start()
# Update: только изменённые агрегаты
df.groupBy("category").count() \
.writeStream.outputMode("update").start()
# Complete: весь результат каждый раз
df.groupBy("category").count() \
.writeStream.outputMode("complete").start()
# Append + aggregation требует watermark:
df.withWatermark("ts", "10 min") \
.groupBy(F.window("ts", "5 min")).count() \
.writeStream.outputMode("append").start()Механизм сохранения прогресса streaming query на надёжное хранилище (HDFS, S3). Checkpoint хранит offset-ы прочитанных данных, состояние state store и метаданные запроса. Обеспечивает exactly-once семантику при перезапуске — query продолжает с последнего записанного offset-а.
# Обязательно для production streaming
query = df.writeStream \
.format("delta") \
.option("checkpointLocation", "s3://bucket/checkpoints/my_query") \
.outputMode("append") \
.start("/data/output")
# Checkpoint содержит:
# - offsets/ — какие данные прочитаны
# - commits/ — какие batch-и завершены
# - state/ — state store snapshots
# - metadata — query metadata
# ВАЖНО: один checkpointLocation = один query
# Нельзя переиспользовать между разными запросамиKey-value хранилище для stateful-операций в Structured Streaming: агрегаций, дедупликации, session windows, stream-to-stream join. По умолчанию — HDFS-backed (HDFSBackedStateStoreProvider). Для production рекомендуется RocksDB backend, который хранит state на диске executor-а с LRU-кешем в памяти.
# Включение RocksDB state store (рекомендуется)
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming."
"state.RocksDBStateStoreProvider"
)
# Stateful операции используют state store:
# - groupBy().count() (running aggregation)
# - dropDuplicatesWithinWatermark()
# - stream-to-stream join
# - flatMapGroupsWithState (custom state)
# Мониторинг state store:
# Spark UI → Structured Streaming → State Store metricsВстроенный веб-интерфейс для мониторинга Spark-приложений. Доступен на порту 4040 во время работы приложения. Показывает job-ы, stage-и, task-и, SQL-планы, метрики памяти и storage. Основной инструмент для диагностики performance-проблем: data skew, spill, shuffle overhead.
# Spark UI доступен автоматически
# http://driver-host:4040
# Ключевые вкладки:
# - Jobs: список job-ов и их stage-ей
# - Stages: task-и, метрики (shuffle read/write, GC time)
# - SQL: планы выполнения SQL-запросов
# - Storage: кешированные DataFrame-ы
# - Environment: конфигурация Spark
# Для YARN: http://yarn-rm:8088 → Application → Tracking URLСервис для просмотра завершённых Spark-приложений. Читает event log-файлы из указанной директории (HDFS, S3) и предоставляет UI, идентичный Spark UI, для post-mortem анализа. Позволяет исследовать performance-проблемы после завершения job-а.
# Настройка event log
spark.conf.set("spark.eventLog.enabled", "true")
spark.conf.set("spark.eventLog.dir", "hdfs:///spark-logs")
# Запуск History Server
# $SPARK_HOME/sbin/start-history-server.sh
# Конфигурация History Server
# spark.history.fs.logDirectory = hdfs:///spark-logs
# spark.history.ui.port = 18080
# Доступ: http://history-server:18080
# Показывает все завершённые приложения с полным UIИнтерфейс для перехвата событий жизненного цикла Spark-приложения: запуск/завершение job-ов, stage-ей, task-ов, получение метрик executor-ов. Позволяет реализовать кастомный мониторинг, алертинг и logging. Регистрируется через SparkContext.addSparkListener().
// Scala: кастомный SparkListener
import org.apache.spark.scheduler._
class MyListener extends SparkListener {
override def onStageCompleted(
event: SparkListenerStageCompleted
): Unit = {
val info = event.stageInfo
val duration = info.completionTime.get - info.submissionTime.get
println(s"Stage ${info.stageId} completed in ${duration}ms")
// Алерт если stage занял > 5 минут
if (duration > 300000)
alertSlack(s"Slow stage: ${info.stageId}")
}
}
spark.sparkContext.addSparkListener(new MyListener())Интеграция Spark с Prometheus для сбора метрик в time-series формате. Spark 3.x поддерживает встроенный PrometheusServlet, экспортирующий метрики executor-ов, JVM, shuffle и streaming. Метрики собираются в pull-модели через HTTP-endpoint и визуализируются в Grafana.
# Конфигурация Prometheus sink
spark.conf.set("spark.metrics.conf.*.sink.prometheusServlet.class",
"org.apache.spark.metrics.sink.PrometheusServlet")
spark.conf.set("spark.metrics.conf.*.sink.prometheusServlet.path",
"/metrics/prometheus")
spark.conf.set("spark.ui.prometheus.enabled", "true")
# Endpoint: http://driver:4040/metrics/prometheus
# Метрики: executor memory, GC time, shuffle bytes,
# task duration, streaming batch duration
# prometheus.yml scrape config:
# - job_name: 'spark'
# metrics_path: '/metrics/prometheus'
# static_configs:
# - targets: ['driver:4040']Визуализация метрик Spark-приложений через Grafana, подключённую к Prometheus. Позволяет создавать dashboards с графиками executor memory usage, GC pressure, shuffle read/write throughput, task duration distribution и streaming lag. Обеспечивает real-time наблюдаемость кластера.
# Ключевые панели Grafana для Spark:
#
# 1. Executor Overview:
# - Active executors count
# - JVM heap usage per executor
# - GC time percentage
#
# 2. Job Performance:
# - Task duration p50/p95/p99
# - Shuffle read/write bytes
# - Spill to disk bytes
#
# 3. Streaming (если используется):
# - Batch processing time
# - Input rows per second
# - State store memory usage
#
# Пример PromQL: rate(spark_executor_gc_time_total[5m])Columnar in-memory формат данных для эффективного обмена между Spark (JVM) и Python/pandas. Используется в pandas UDF (vectorized UDF) для передачи данных без сериализации Java-объектов. Arrow Flight протокол используется в Spark Connect для клиент-серверной архитектуры.
# Arrow ускоряет toPandas() и pandas UDF
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
# Без Arrow: Java → Pickle → Python (медленно)
# С Arrow: Java → Arrow IPC → Python (zero-copy)
pdf = df.toPandas() # В 10-100x быстрее с Arrow
# Pandas UDF (vectorized) использует Arrow
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("double")
def normalize(s: pd.Series) -> pd.Series:
return (s - s.mean()) / s.std()Клиент-серверная архитектура (с Spark 3.4), разделяющая клиентское приложение и Spark-кластер через gRPC. Клиент отправляет логический план, сервер выполняет и возвращает результаты через Arrow Flight. Позволяет тонкий клиент на Python/Scala без полного SparkSession на стороне клиента.
# Запуск Spark Connect Server
# ./sbin/start-connect-server.sh --packages \
# org.apache.spark:spark-connect_2.12:3.5.0
# Подключение клиента (тонкий — без JVM)
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.remote("sc://spark-server:15002") \
.getOrCreate()
# Тот же API, что и локальный Spark
df = spark.sql("SELECT * FROM events")
df.show()Yet Another Resource Negotiator — менеджер ресурсов в Hadoop-экосистеме. Spark на YARN работает в двух режимах: cluster (driver на YARN) и client (driver на submitting machine). YARN распределяет контейнеры с executor-ами по нодам кластера на основе запрошенных ресурсов.
# Запуск Spark на YARN
spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 20 \
--executor-memory 8g \
--executor-cores 4 \
--conf spark.yarn.maxAppAttempts=2 \
my_app.py
# Режимы деплоя:
# cluster: driver в YARN контейнере (для production)
# client: driver на машине отправки (для отладки)
# Мониторинг: YARN ResourceManager UI → ApplicationsКонтейнерный оркестратор, поддерживаемый Spark как cluster manager (с Spark 2.3, GA в 3.1). Spark создаёт pod-ы для driver-а и executor-ов в namespace Kubernetes. Преимущества: auto-scaling, resource isolation, CI/CD интеграция, единая инфраструктура для всех workload-ов.
# Запуск Spark на Kubernetes
spark-submit \
--master k8s://https://k8s-apiserver:443 \
--deploy-mode cluster \
--conf spark.kubernetes.container.image=spark:3.5.0 \
--conf spark.kubernetes.namespace=spark-jobs \
--conf spark.executor.instances=10 \
--conf spark.kubernetes.executor.request.cores=2 \
--conf spark.kubernetes.executor.limit.cores=4 \
local:///opt/spark/my_app.py
# Dynamic allocation на K8s
# spark.dynamicAllocation.enabled=true
# spark.dynamicAllocation.shuffleTracking.enabled=truePython-фреймворк для валидации данных и data quality. Определяет «ожидания» (expectations) — декларативные проверки значений, типов, уникальности, ranges и распределений. Интегрируется с Spark через SparkDFDataset для валидации DataFrame-ов прямо в pipeline.
import great_expectations as gx
# Создание контекста
context = gx.get_context()
# Подключение Spark DataFrame как data source
datasource = context.data_sources.add_spark("my_spark")
asset = datasource.add_dataframe_asset("events")
batch = asset.add_batch_definition_whole_dataframe("full")
# Определение expectations
suite = context.suites.add(
gx.ExpectationSuite(name="events_quality")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="amount", min_value=0, max_value=1000000
)
)Библиотека для data quality от Amazon, написанная на Scala для Spark. Вычисляет метрики (completeness, uniqueness, compliance), проверяет constraints и предлагает автоматические проверки через profiling. В отличие от Great Expectations, работает нативно на Spark без Python overhead.
// Scala: проверка качества данных с Deequ
import com.amazon.deequ.VerificationSuite
import com.amazon.deequ.checks.{Check, CheckLevel}
val result = VerificationSuite()
.onData(df)
.addCheck(
Check(CheckLevel.Error, "data quality")
.isComplete("id") // не NULL
.isUnique("id") // уникальные значения
.isPositive("amount") // > 0
.isContainedIn("status", // допустимые значения
Array("active", "inactive"))
.hasSize(_ > 1000) // минимум строк
)
.run()
// PySpark: через PyDeequ wrapper
// from pydeequ.checks import CheckRemote shuffle service (бывший RemoteShuffleService от Alibaba), хранящий shuffle-данные на выделенных worker-нодах вместо executor-ов. Устраняет зависимость между shuffle-данными и executor lifecycle — executor может быть убит или перезапущен без потери shuffle-результатов. Критичен для Kubernetes-деплоя со dynamic allocation.
# Подключение Celeborn к Spark
spark-submit \
--conf spark.shuffle.manager=org.apache.celeborn.client.spark.SparkShuffleManager \
--conf spark.celeborn.master.endpoints=celeborn-master:9097 \
--conf spark.celeborn.client.spark.push.data.timeout=120s \
my_app.py
# Преимущества:
# - Executor-ы могут быть пересозданы без потери shuffle
# - Лучше для K8s + dynamic allocation
# - Снижает давление на локальные диски executor-ов
# - Поддерживает memory + disk storage на Celeborn workers