Learning Platform
Troubleshooting
Глоссарий

Глоссарий — Apache Spark

Справочник ключевых терминов курса Apache Spark.

8 категорий · 52 терминов

Архитектура и ядро

SparkSession

Spark Session
Термин

Унифицированная точка входа в Spark (с версии 2.0). Объединяет SparkContext, SQLContext и HiveContext в единый API. Через SparkSession регистрируются таблицы, выполняются SQL-запросы и настраиваются параметры приложения.

Пример:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

df = spark.sql("SELECT 1 AS id")
df.show()
Подробнее в уроках:

Driver

Driver Program
Термин

Процесс, который координирует выполнение Spark-приложения. Driver запускает SparkContext, создаёт DAG из трансформаций, разбивает его на stage-и и распределяет task-и по executor-ам. Содержит метаданные о всех partition-ах и текущем состоянии выполнения.

Пример:
# Driver — это процесс, где выполняется ваш main()
# Код внутри spark.sql() / df.filter() описывает DAG
# Реальная работа происходит на executor-ах

spark = SparkSession.builder.master("yarn").getOrCreate()

# Это описание DAG (на driver-е)
result = spark.read.parquet("/data") \
    .filter("age > 30") \
    .groupBy("city").count()

# Это запуск выполнения (driver отправляет task-и)
result.show()
Подробнее в уроках:

Executor

Executor
Термин

JVM-процесс на worker-ноде, выполняющий task-и и хранящий данные в памяти или на диске. Каждое Spark-приложение получает собственные executor-ы, изолированные друг от друга. Executor управляет пулом потоков для параллельного выполнения task-ов и блочным менеджером для кеширования.

Пример:
# Настройка executor-ов
spark = SparkSession.builder \
    .config("spark.executor.instances", "10") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .getOrCreate()

# Каждый executor получит 8 GB RAM и 4 ядра
# 10 executor-ов = 40 параллельных task-ов
Подробнее в уроках:

DAG (Directed Acyclic Graph)

Directed Acyclic Graph
Термин

Граф зависимостей между RDD-партициями, описывающий полный план вычислений. DAGScheduler анализирует граф и разбивает его на stage-и по shuffle-границам. Каждый вызов action (collect, count, write) порождает отдельный job в DAG.

Пример:
# DAG строится из цепочки трансформаций
df = spark.read.parquet("/events")

# Stage 1: scan + filter (narrow)
filtered = df.filter("event_type = 'purchase'")

# Shuffle boundary → Stage 2
grouped = filtered.groupBy("user_id").agg(
    F.sum("amount").alias("total")
)

# Action запускает DAG
grouped.write.parquet("/output")
Подробнее в уроках:

Stage

Stage
Термин

Набор task-ов, которые могут выполняться параллельно без shuffle. Stage-и разделяются shuffle-границами — каждая wide-трансформация (groupBy, join, repartition) создаёт новый stage. DAGScheduler создаёт stage-и и определяет порядок их выполнения.

Пример:
-- Просмотр stage-ей через EXPLAIN
EXPLAIN EXTENDED
SELECT dept, AVG(salary)
FROM employees
WHERE active = true
GROUP BY dept;

-- Stage 0: Scan + Filter (narrow transformations)
-- Shuffle (Exchange hashpartitioning(dept))
-- Stage 1: Aggregate (после shuffle)
Подробнее в уроках:

Task

Task
Термин

Минимальная единица выполнения в Spark — обработка одной partition на одном ядре executor-а. Количество task-ов в stage равно количеству partition-ов. Task получает код для выполнения от driver-а и оперирует только своей локальной partition-ой данных.

Пример:
# Число task-ов = число partition-ов
df = spark.read.parquet("/data")  # 200 файлов → 200 partition → 200 task-ов

# Контроль параллелизма через repartition
df_repart = df.repartition(50)  # 50 task-ов в следующем stage

# Просмотр числа partition-ов
print(f"Partitions: {df.rdd.getNumPartitions()}")
Подробнее в уроках:

Partition

Partition
Термин

Логическая единица параллелизма в Spark. Данные распределяются по partition-ам, и каждая partition обрабатывается отдельным task-ом на одном ядре. Число partition-ов определяет степень параллелизма и влияет на утилизацию ресурсов кластера.

Пример:
from pyspark.sql import functions as F

df = spark.read.parquet("/data")
print(f"Partitions: {df.rdd.getNumPartitions()}")

# Увеличить partition-ы (shuffle)
df_more = df.repartition(200)

# Уменьшить partition-ы (без shuffle)
df_less = df.coalesce(10)

# Partition по колонке для оптимизации join
df_part = df.repartition(100, "user_id")
Подробнее в уроках:

Catalyst и Tungsten

Catalyst Optimizer

Catalyst Optimizer
Термин

Rule-based и cost-based оптимизатор в Spark SQL. Трансформирует логический план запроса в оптимальный физический план через 4 фазы: Analysis → Logical Optimization → Physical Planning → Code Generation. Использует pattern-matching для применения правил оптимизации.

Пример:
-- Просмотр всех фаз оптимизации
EXPLAIN EXTENDED
SELECT u.name, COUNT(o.id)
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE u.active = true
GROUP BY u.name;

-- Покажет: Parsed → Analyzed → Optimized → Physical Plan
Подробнее в уроках:

Logical Plan

Logical Plan
Термин

Декларативное дерево операций, описывающее «что» нужно сделать без указания «как». Содержит узлы: Scan, Filter, Project, Aggregate, Join, Sort. Catalyst применяет правила оптимизации (predicate pushdown, column pruning, constant folding) к логическому плану перед генерацией физического.

Пример:
# Просмотр логического плана в PySpark
df = spark.read.parquet("/sales") \
    .filter("amount > 100") \
    .groupBy("region").sum("amount")

# Нераcпарсенный план
print(df._jdf.queryExecution().logical())

# Оптимизированный план
print(df._jdf.queryExecution().optimizedPlan())
Подробнее в уроках:

Physical Plan

Physical Plan
Термин

Исполняемое дерево операторов, определяющее конкретный способ выполнения запроса. Physical Plan выбирает стратегии join (BroadcastHashJoin, SortMergeJoin), режимы сканирования и способы агрегации на основе статистик и cost-based оптимизации.

Пример:
-- Просмотр физического плана с метриками
EXPLAIN COST
SELECT *
FROM orders o
JOIN products p ON o.product_id = p.id
WHERE p.category = 'electronics';

-- Результат покажет выбранную стратегию join:
-- BroadcastHashJoin (если products < broadcast threshold)
-- или SortMergeJoin (если обе таблицы большие)
Подробнее в уроках:

Tungsten

Project Tungsten
Термин

Проект оптимизации производительности Spark, фокусирующийся на CPU и памяти. Включает: off-heap memory management (обход GC), UnsafeRow (компактная бинарная сериализация строк), cache-aware вычисления и whole-stage code generation. Устраняет основные JVM overhead-ы.

Пример:
// Tungsten использует UnsafeRow для компактного хранения
// Вместо Java-объектов — бинарный формат

// Настройка off-heap памяти Tungsten
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g")

// Tungsten автоматически активен в DataFrame/SQL API
// RDD API не использует Tungsten-оптимизации
Подробнее в уроках:

Whole-Stage CodeGen

Whole-Stage Code Generation
Термин

Техника генерации Java-байткода из цепочки операторов Catalyst-плана. Вместо итерации по виртуальным вызовам операторов, генерируется один метод, обрабатывающий строку от scan до финального оператора. Устраняет overhead виртуальных вызовов и улучшает CPU cache locality.

Пример:
-- Звёздочка (*) в EXPLAIN означает CodeGen
EXPLAIN
SELECT name, age
FROM users
WHERE age > 30;

-- *(1) Filter (age > 30)        ← CodeGen
--   *(1) ColumnarToRow           ← CodeGen (одна цепочка)
--     Scan parquet [name, age]

-- Отключение для отладки:
-- SET spark.sql.codegen.wholeStage = false;
Подробнее в уроках:

UnsafeRow

Unsafe Row
Термин

Компактный бинарный формат представления строки данных в Tungsten. Хранит данные в непрерывном блоке off-heap памяти без Java-объектов, что устраняет GC-давление и позволяет прямое сравнение байтов. Каждая строка содержит null-bitmap, fixed-length данные и variable-length область.

Пример:
// UnsafeRow — внутренний формат, но можно наблюдать
// через метрики в Spark UI:
//   - "peak memory" в sort/aggregate операторах
//   - "spill size" при нехватке памяти

// Размер UnsafeRow зависит от схемы:
// INT: 8 байт (aligned), STRING: 8 байт offset + длина
// NULL bitmap: ceil(numFields / 64) * 8 байт

// Пример: строка (id INT, name STRING, age INT)
// = 8 (bitmap) + 8 + 8 + 8 = 32 байт fixed
// + variable length для name
Подробнее в уроках:

DataFrame и Spark SQL

DataFrame

DataFrame
Термин

Распределённая коллекция данных с именованными колонками, аналогичная таблице в реляционной БД. Основной API для работы с данными в Spark. DataFrame оптимизируется через Catalyst и выполняется через Tungsten, что делает его значительно быстрее RDD API.

Пример:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

# Создание DataFrame из файла
df = spark.read.parquet("/data/users")

# Операции с DataFrame
result = df.select("name", "age") \
    .filter(F.col("age") > 25) \
    .withColumn("age_group", 
        F.when(F.col("age") < 30, "young").otherwise("senior"))
Подробнее в уроках:

Dataset

Dataset
Термин

Типизированный DataFrame, доступный только в Scala и Java. Dataset[T] обеспечивает compile-time проверку типов и позволяет использовать лямбда-функции с сохранением Catalyst-оптимизаций. В PySpark DataFrame эквивалентен Dataset[Row].

Пример:
// Scala: типизированный Dataset
case class User(name: String, age: Int)

val users: Dataset[User] = spark.read
  .parquet("/data/users")
  .as[User]  // compile-time type safety

// Типизированные операции
val adults = users
  .filter(_.age >= 18)
  .map(u => u.copy(name = u.name.toUpperCase))

// В PySpark нет Dataset[T], только DataFrame = Dataset[Row]
Подробнее в уроках:

Transformation

Transformation
Термин

Операция, создающая новый DataFrame из существующего без запуска вычислений (lazy). Бывает narrow (без shuffle: filter, select, map) и wide (с shuffle: groupBy, join, repartition). Трансформации записываются в DAG и выполняются только при вызове action.

Пример:
df = spark.read.parquet("/events")

# Narrow transformations (без shuffle)
filtered = df.filter("event_type = 'click'")
projected = filtered.select("user_id", "timestamp")

# Wide transformations (вызывают shuffle)
grouped = projected.groupBy("user_id").count()
joined = df.join(other_df, "id")

# Ничего не выполнено! Всё lazy.
# Выполнение начнётся только при action:
grouped.show()  # ← action запускает DAG
Подробнее в уроках:

Action

Action
Термин

Операция, которая запускает выполнение DAG и возвращает результат driver-у или записывает данные. Примеры: collect(), count(), show(), write(). Без вызова action ни одна трансформация не выполняется (lazy evaluation). Каждый action создаёт новый job.

Пример:
df = spark.read.parquet("/data")
filtered = df.filter("age > 30")  # Трансформация — ничего не выполняется

# Actions — запускают выполнение:
filtered.count()           # → число строк (Long)
filtered.show(5)           # → вывод 5 строк в консоль
filtered.collect()         # → все строки в driver (осторожно с OOM!)
filtered.first()           # → первая строка
filtered.write.parquet("/output")  # → запись на диск
Подробнее в уроках:

Lazy Evaluation

Lazy Evaluation
Термин

Стратегия отложенного выполнения: трансформации не выполняются сразу, а записываются в DAG. Выполнение начинается только при вызове action. Это позволяет Catalyst оптимизировать весь план целиком — убирать ненужные колонки, проталкивать фильтры ближе к источнику данных.

Пример:
# Все трансформации — lazy (описание плана)
df = spark.read.parquet("/huge_table")  # Не читает данные!
filtered = df.filter("date = '2024-01-01'")  # Не фильтрует!
result = filtered.select("id", "amount")  # Не проецирует!

# Catalyst оптимизирует: pushdown фильтра + column pruning
# В итоге читаются только 2 колонки за 1 день

result.show()  # ← Только здесь начинается чтение и обработка
Подробнее в уроках:

Window Functions

Window Functions
Термин

Функции, выполняющие вычисления над набором строк (окном), связанных с текущей строкой. В отличие от groupBy, window functions не схлопывают строки — каждая строка сохраняется с добавленным результатом. Поддерживают ranking (row_number, rank), аналитику (lag, lead) и агрегации (sum, avg) по окнам.

Пример:
from pyspark.sql import Window
from pyspark.sql import functions as F

# Окно по отделу, сортировка по зарплате
w = Window.partitionBy("dept").orderBy(F.desc("salary"))

df.withColumn("rank", F.row_number().over(w)) \
  .withColumn("dept_avg", F.avg("salary").over(
      Window.partitionBy("dept")
  )) \
  .withColumn("prev_salary", F.lag("salary", 1).over(w))
Подробнее в уроках:

Spark SQL Catalog

Spark SQL Catalog
Термин

Реестр метаданных, хранящий информацию о базах данных, таблицах, представлениях и функциях. Catalog поддерживает как временные (session-scoped) объекты, так и постоянные (Hive Metastore). С Spark 3.x DataSourceV2 позволяет подключать внешние каталоги (Iceberg, Delta).

Пример:
# Работа с каталогом
spark.sql("CREATE DATABASE IF NOT EXISTS analytics")
spark.sql("USE analytics")

# Регистрация таблицы
df.write.saveAsTable("events")

# Просмотр метаданных
spark.catalog.listDatabases()
spark.catalog.listTables("analytics")
spark.catalog.listColumns("events")

# Временное представление (session-scoped)
df.createOrReplaceTempView("temp_events")
Подробнее в уроках:

Performance и оптимизация

AQE (Adaptive Query Execution)

Adaptive Query Execution
Термин

Механизм динамической оптимизации в Spark 3.x, который адаптирует план выполнения на основе runtime-статистик после каждого shuffle. Включает: автоматическое объединение маленьких partition-ов (coalesce), обработку data skew в join-ах и dynamic partition pruning.

Пример:
# Включение AQE (по умолчанию с Spark 3.2)
spark.conf.set("spark.sql.adaptive.enabled", "true")

# Автоматическое объединение мелких partition-ов
spark.conf.set(
    "spark.sql.adaptive.coalescePartitions.enabled", "true"
)
spark.conf.set(
    "spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m"
)

# Обработка skew в join
spark.conf.set(
    "spark.sql.adaptive.skewJoin.enabled", "true"
)
Подробнее в уроках:

Broadcast Join

Broadcast Join
Термин

Стратегия join, при которой маленькая таблица (dimension) целиком рассылается всем executor-ам, избегая shuffle большой таблицы (fact). Значительно быстрее SortMergeJoin для случаев, когда одна сторона помещается в память. Порог контролируется параметром autoBroadcastJoinThreshold.

Пример:
from pyspark.sql import functions as F

# Автоматический broadcast (таблица < 10 MB по умолчанию)
result = big_df.join(small_df, "id")

# Явный broadcast hint
result = big_df.join(
    F.broadcast(small_df), "id"
)

# Настройка порога
spark.conf.set(
    "spark.sql.autoBroadcastJoinThreshold", "50m"
)  # broadcast таблицы до 50 MB

# Отключение broadcast
spark.conf.set(
    "spark.sql.autoBroadcastJoinThreshold", "-1"
)
Подробнее в уроках:

Bucketing

Bucketing
Термин

Техника предварительного распределения данных по bucket-ам (корзинам) на основе хеш-функции от ключевых колонок. При join двух таблиц, bucketed по одному ключу, Spark пропускает shuffle — данные уже распределены корректно. Эффективно для повторяющихся join-ов по одним и тем же ключам.

Пример:
# Запись с bucketing
df.write \
    .bucketBy(32, "user_id") \
    .sortBy("user_id") \
    .saveAsTable("bucketed_orders")

# Join без shuffle (bucket-to-bucket)
orders = spark.table("bucketed_orders")  # 32 buckets by user_id
users = spark.table("bucketed_users")    # 32 buckets by user_id

result = orders.join(users, "user_id")  # No shuffle!
Подробнее в уроках:

Data Skew

Data Skew
Термин

Неравномерное распределение данных по partition-ам, при котором один или несколько task-ов обрабатывают значительно больше данных, чем остальные. Data skew — основная причина «зависших» stage-ей и OOM-ошибок. Диагностируется в Spark UI по разнице между median и max task duration.

Пример:
# Диагностика skew
df.groupBy("key").count().orderBy(
    F.desc("count")
).show(10)  # Покажет самые частые ключи

# Решение 1: AQE skew join (Spark 3.x)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Решение 2: Salting (ручное)
from pyspark.sql.functions import rand, concat, lit
df_salted = df.withColumn(
    "salted_key", concat("key", lit("_"), (rand() * 10).cast("int"))
)
Подробнее в уроках:

Dynamic Partition Pruning (DPP)

Dynamic Partition Pruning
Термин

Оптимизация, при которой фильтр из dimension-таблицы join-а применяется к fact-таблице на этапе scan. Если dimension-таблица после фильтрации содержит 3 значения ключа, то из fact-таблицы будут прочитаны только partition-ы с этими ключами. Значительно снижает объём I/O.

Пример:
-- DPP автоматически применяется в star-schema join-ах
SELECT f.*, d.category_name
FROM fact_sales f
JOIN dim_category d ON f.category_id = d.id
WHERE d.department = 'Electronics';

-- Без DPP: читает ВСЕ partition-ы fact_sales
-- С DPP: читает только partition-ы где category_id IN (
--   SELECT id FROM dim_category WHERE department = 'Electronics'
-- )

-- Конфигурация
-- spark.sql.optimizer.dynamicPartitionPruning.enabled = true
Подробнее в уроках:

Shuffle

Shuffle
Термин

Перераспределение данных между executor-ами по сети. Самая дорогая операция в Spark: данные сериализуются, записываются на диск, передаются по сети и десериализуются. Возникает при groupBy(), join(), repartition(), distinct(). Оптимизация shuffle — ключ к производительности.

Пример:
# Операции, вызывающие shuffle
df.groupBy("key").count()     # shuffle по key
df.join(other, "id")          # shuffle обеих таблиц
df.repartition(200)           # полный shuffle
df.distinct()                 # shuffle для дедупликации

# Операции БЕЗ shuffle
df.filter("age > 30")         # narrow
df.select("name", "age")      # narrow
df.coalesce(10)               # без shuffle (уменьшение)

# Настройка числа shuffle-partition-ов
spark.conf.set("spark.sql.shuffle.partitions", "200")
Подробнее в уроках:

Caching

Caching / Persistence
Термин

Механизм сохранения промежуточных результатов в памяти или на диске для повторного использования. cache() сохраняет DataFrame в MEMORY_AND_DISK. persist() позволяет выбрать уровень хранения. Кешированные данные хранятся на executor-ах и доступны без повторного вычисления.

Пример:
from pyspark import StorageLevel

# Кеширование в память (MEMORY_AND_DISK)
df_cached = df.filter("active = true").cache()
df_cached.count()  # Материализует кеш

# Разные уровни persistence
df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.DISK_ONLY)
df.persist(StorageLevel.MEMORY_AND_DISK_SER)  # Сжатый

# Освобождение кеша
df_cached.unpersist()

# Проверка в Spark UI → Storage tab
Подробнее в уроках:

Форматы и хранение

Parquet

Apache Parquet
Термин

Колоночный формат файлов, де-факто стандарт для аналитических workload-ов. Поддерживает predicate pushdown (чтение только нужных row-group-ов), column pruning (чтение только нужных колонок), встроенную компрессию (Snappy, Zstd) и статистики (min/max) для data skipping.

Пример:
# Запись в Parquet с партиционированием
df.write \
    .partitionBy("year", "month") \
    .option("compression", "zstd") \
    .mode("overwrite") \
    .parquet("/data/events")

# Чтение с predicate pushdown
df = spark.read.parquet("/data/events") \
    .filter("year = 2024 AND month = 1")  # Читает только нужные partition-ы

# Просмотр схемы без чтения данных
spark.read.parquet("/data/events").printSchema()
Подробнее в уроках:

Delta Lake

Delta Lake
Термин

Open-source lakehouse формат от Databricks, построенный поверх Parquet. Обеспечивает ACID-транзакции, time travel (запросы к предыдущим версиям данных), schema enforcement/evolution, Z-ordering для оптимизации чтения и MERGE (upsert) операции. Транзакционный лог хранится в _delta_log/.

Пример:
# Запись в Delta
df.write.format("delta").save("/data/events_delta")

# Time travel — чтение предыдущей версии
spark.read.format("delta") \
    .option("versionAsOf", 5) \
    .load("/data/events_delta")

# MERGE (upsert)
from delta.tables import DeltaTable
delta_t = DeltaTable.forPath(spark, "/data/events_delta")
delta_t.alias("t").merge(
    updates.alias("u"), "t.id = u.id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
Подробнее в уроках:

Apache Iceberg

Apache Iceberg
Термин

Lakehouse формат с hidden partitioning, schema evolution и time travel. Iceberg использует catalog-based управление таблицами и хранит метаданные в иерархии: catalog → metadata files → manifest lists → manifest files → data files. Поддерживает partition evolution без перезаписи данных.

Пример:
-- Создание Iceberg-таблицы
CREATE TABLE catalog.db.events (
    id BIGINT,
    event_type STRING,
    ts TIMESTAMP
) USING iceberg
PARTITIONED BY (days(ts));

-- Hidden partitioning — не нужно указывать partition-колонку
SELECT * FROM catalog.db.events
WHERE ts > '2024-01-01';  -- Автоматический partition pruning

-- Time travel
SELECT * FROM catalog.db.events
  VERSION AS OF 1234567890;
Подробнее в уроках:

Z-ordering

Z-ordering / Data Skipping
Термин

Техника упорядочивания данных внутри файлов по нескольким колонкам одновременно с помощью Z-кривой (space-filling curve). Улучшает data skipping — запросы с фильтрами по Z-ordered колонкам пропускают файлы, не содержащие релевантных значений. Особенно эффективна для высококардинальных колонок.

Пример:
-- Z-ordering в Delta Lake
OPTIMIZE events
ZORDER BY (user_id, event_date);

-- После Z-ordering запросы по user_id и/или event_date
-- пропускают файлы, где min/max не пересекаются с фильтром
SELECT * FROM events
WHERE user_id = 'abc' AND event_date = '2024-01-01';
-- Читает ~1% файлов вместо 100%
Подробнее в уроках:

Compaction

Compaction / Optimize
Термин

Процесс объединения мелких файлов в крупные для улучшения производительности чтения. Мелкие файлы создают overhead на открытие/закрытие файлов и метаданные. В Delta Lake выполняется через OPTIMIZE, в Iceberg — через rewriteDataFiles(). Рекомендуемый размер файла: 128 MB — 1 GB.

Пример:
-- Compaction в Delta Lake
OPTIMIZE '/data/events_delta';

-- С Z-ordering
OPTIMIZE '/data/events_delta'
ZORDER BY (user_id);

-- Удаление старых версий после compaction
VACUUM '/data/events_delta' RETAIN 168 HOURS;

-- В Iceberg (через Spark procedure)
CALL catalog.system.rewrite_data_files(
    table => 'db.events',
    options => map('target-file-size-bytes', '134217728')
);
Подробнее в уроках:

Small Files Problem

Small Files Problem
Термин

Проблема производительности при наличии множества мелких файлов (< 1-10 MB). Каждый файл создаёт отдельный task в Spark, overhead на метаданные в NameNode (HDFS) и замедляет listing в object storage (S3). Возникает при частых записях, streaming и неправильном партиционировании.

Пример:
# Диагностика
import os
path = "/data/events/"
files = spark.read.parquet(path).inputFiles()
print(f"Файлов: {len(files)}")
print(f"Partitions: {spark.read.parquet(path).rdd.getNumPartitions()}")

# Решение 1: coalesce перед записью
df.coalesce(10).write.parquet("/output")

# Решение 2: repartition по колонке
df.repartition("date").write.partitionBy("date").parquet("/output")

# Решение 3: OPTIMIZE (Delta Lake)
# OPTIMIZE '/data/events_delta'
Подробнее в уроках:

Lakehouse

Lakehouse Architecture
Термин

Архитектурный паттерн, объединяющий надёжность data warehouse (ACID, схема) с масштабируемостью data lake (дешёвое хранилище, open formats). Реализуется через табличные форматы (Delta Lake, Iceberg, Hudi) поверх объектного хранилища (S3, GCS, ADLS). Устраняет необходимость в отдельном ETL между lake и warehouse.

Пример:
-- Типичная Lakehouse архитектура в Spark

-- Bronze layer: сырые данные
raw_df.write.format("delta").save("/lakehouse/bronze/events")

-- Silver layer: очищенные данные
cleaned_df.write.format("delta").save("/lakehouse/silver/events")

-- Gold layer: агрегаты для аналитики
agg_df.write.format("delta").save("/lakehouse/gold/daily_metrics")

-- Все слои используют ACID, time travel, schema enforcement
Подробнее в уроках:

Structured Streaming

Structured Streaming

Structured Streaming
Термин

Потоковый API Spark, построенный на DataFrame/Dataset. Трактует поток данных как бесконечную таблицу, к которой применяются те же операции, что и к batch-данным. Поддерживает exactly-once семантику через checkpointing и three output modes: Append, Update, Complete.

Пример:
# Чтение потока из Kafka
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "events") \
    .load()

# Обработка (тот же API, что и batch)
parsed = stream_df.selectExpr(
    "CAST(value AS STRING)",
    "timestamp"
)

# Запись результата
query = parsed.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "/cp/events") \
    .start("/data/events")
Подробнее в уроках:

Watermark

Watermark
Термин

Механизм отслеживания прогресса event time в потоковой обработке. Watermark определяет порог — данные с event time старше watermark-а считаются «поздними» и могут быть отброшены. Это позволяет Spark очищать state store от устаревших состояний и ограничивать потребление памяти.

Пример:
from pyspark.sql import functions as F

# Watermark: данные опаздывают максимум на 10 минут
stream_df = stream_df \
    .withWatermark("event_time", "10 minutes")

# Агрегация с watermark
result = stream_df \
    .groupBy(
        F.window("event_time", "5 minutes"),
        "user_id"
    ).count()

# Без watermark state растёт бесконечно!
# С watermark: окна старше 10 мин очищаются автоматически
Подробнее в уроках:

Trigger

Trigger
Термин

Определяет, когда запускать следующий micro-batch в Structured Streaming. Варианты: processingTime (фиксированный интервал), once (однократный запуск для backfill), availableNow (обработать всё доступное и остановиться), continuous (экспериментальный low-latency режим с ~1 мс задержкой).

Пример:
from pyspark.sql.streaming import Trigger

# Micro-batch каждые 30 секунд
query = df.writeStream \
    .trigger(processingTime="30 seconds") \
    .start()

# Однократный запуск (для backfill)
query = df.writeStream \
    .trigger(once=True) \
    .start()

# Обработать всё доступное и остановиться
query = df.writeStream \
    .trigger(availableNow=True) \
    .start()

# Continuous (экспериментальный, ~1 мс latency)
query = df.writeStream \
    .trigger(continuous="1 second") \
    .start()
Подробнее в уроках:

Output Mode

Output Mode
Термин

Определяет, какие строки записываются в sink после каждого micro-batch. Append — только новые строки (для фильтров, map). Update — только изменившиеся строки (для агрегаций с watermark). Complete — полный результат целиком (для агрегаций без watermark). Выбор режима зависит от типа запроса.

Пример:
# Append: только новые строки (подходит для ETL)
df.writeStream.outputMode("append").start()

# Update: только изменённые агрегаты
df.groupBy("category").count() \
    .writeStream.outputMode("update").start()

# Complete: весь результат каждый раз
df.groupBy("category").count() \
    .writeStream.outputMode("complete").start()

# Append + aggregation требует watermark:
df.withWatermark("ts", "10 min") \
    .groupBy(F.window("ts", "5 min")).count() \
    .writeStream.outputMode("append").start()
Подробнее в уроках:

Checkpoint

Checkpoint
Термин

Механизм сохранения прогресса streaming query на надёжное хранилище (HDFS, S3). Checkpoint хранит offset-ы прочитанных данных, состояние state store и метаданные запроса. Обеспечивает exactly-once семантику при перезапуске — query продолжает с последнего записанного offset-а.

Пример:
# Обязательно для production streaming
query = df.writeStream \
    .format("delta") \
    .option("checkpointLocation", "s3://bucket/checkpoints/my_query") \
    .outputMode("append") \
    .start("/data/output")

# Checkpoint содержит:
# - offsets/    — какие данные прочитаны
# - commits/    — какие batch-и завершены
# - state/      — state store snapshots
# - metadata    — query metadata

# ВАЖНО: один checkpointLocation = один query
# Нельзя переиспользовать между разными запросами
Подробнее в уроках:

State Store

State Store
Термин

Key-value хранилище для stateful-операций в Structured Streaming: агрегаций, дедупликации, session windows, stream-to-stream join. По умолчанию — HDFS-backed (HDFSBackedStateStoreProvider). Для production рекомендуется RocksDB backend, который хранит state на диске executor-а с LRU-кешем в памяти.

Пример:
# Включение RocksDB state store (рекомендуется)
spark.conf.set(
    "spark.sql.streaming.stateStore.providerClass",
    "org.apache.spark.sql.execution.streaming."
    "state.RocksDBStateStoreProvider"
)

# Stateful операции используют state store:
# - groupBy().count() (running aggregation)
# - dropDuplicatesWithinWatermark()
# - stream-to-stream join
# - flatMapGroupsWithState (custom state)

# Мониторинг state store:
# Spark UI → Structured Streaming → State Store metrics
Подробнее в уроках:

Мониторинг и наблюдаемость

Spark UI

Spark Web UI
Термин

Встроенный веб-интерфейс для мониторинга Spark-приложений. Доступен на порту 4040 во время работы приложения. Показывает job-ы, stage-и, task-и, SQL-планы, метрики памяти и storage. Основной инструмент для диагностики performance-проблем: data skew, spill, shuffle overhead.

Пример:
# Spark UI доступен автоматически
# http://driver-host:4040

# Ключевые вкладки:
# - Jobs: список job-ов и их stage-ей
# - Stages: task-и, метрики (shuffle read/write, GC time)
# - SQL: планы выполнения SQL-запросов
# - Storage: кешированные DataFrame-ы
# - Environment: конфигурация Spark

# Для YARN: http://yarn-rm:8088 → Application → Tracking URL
Подробнее в уроках:

History Server

Spark History Server
Термин

Сервис для просмотра завершённых Spark-приложений. Читает event log-файлы из указанной директории (HDFS, S3) и предоставляет UI, идентичный Spark UI, для post-mortem анализа. Позволяет исследовать performance-проблемы после завершения job-а.

Пример:
# Настройка event log
spark.conf.set("spark.eventLog.enabled", "true")
spark.conf.set("spark.eventLog.dir", "hdfs:///spark-logs")

# Запуск History Server
# $SPARK_HOME/sbin/start-history-server.sh

# Конфигурация History Server
# spark.history.fs.logDirectory = hdfs:///spark-logs
# spark.history.ui.port = 18080

# Доступ: http://history-server:18080
# Показывает все завершённые приложения с полным UI
Подробнее в уроках:

SparkListener

Spark Listener
Термин

Интерфейс для перехвата событий жизненного цикла Spark-приложения: запуск/завершение job-ов, stage-ей, task-ов, получение метрик executor-ов. Позволяет реализовать кастомный мониторинг, алертинг и logging. Регистрируется через SparkContext.addSparkListener().

Пример:
// Scala: кастомный SparkListener
import org.apache.spark.scheduler._

class MyListener extends SparkListener {
  override def onStageCompleted(
    event: SparkListenerStageCompleted
  ): Unit = {
    val info = event.stageInfo
    val duration = info.completionTime.get - info.submissionTime.get
    println(s"Stage ${info.stageId} completed in ${duration}ms")
    
    // Алерт если stage занял > 5 минут
    if (duration > 300000)
      alertSlack(s"Slow stage: ${info.stageId}")
  }
}

spark.sparkContext.addSparkListener(new MyListener())
Подробнее в уроках:

Prometheus Metrics

Prometheus Integration
Термин

Интеграция Spark с Prometheus для сбора метрик в time-series формате. Spark 3.x поддерживает встроенный PrometheusServlet, экспортирующий метрики executor-ов, JVM, shuffle и streaming. Метрики собираются в pull-модели через HTTP-endpoint и визуализируются в Grafana.

Пример:
# Конфигурация Prometheus sink
spark.conf.set("spark.metrics.conf.*.sink.prometheusServlet.class",
    "org.apache.spark.metrics.sink.PrometheusServlet")
spark.conf.set("spark.metrics.conf.*.sink.prometheusServlet.path",
    "/metrics/prometheus")
spark.conf.set("spark.ui.prometheus.enabled", "true")

# Endpoint: http://driver:4040/metrics/prometheus
# Метрики: executor memory, GC time, shuffle bytes,
#          task duration, streaming batch duration

# prometheus.yml scrape config:
# - job_name: 'spark'
#   metrics_path: '/metrics/prometheus'
#   static_configs:
#     - targets: ['driver:4040']
Подробнее в уроках:

Grafana Dashboard

Grafana Dashboard
Термин

Визуализация метрик Spark-приложений через Grafana, подключённую к Prometheus. Позволяет создавать dashboards с графиками executor memory usage, GC pressure, shuffle read/write throughput, task duration distribution и streaming lag. Обеспечивает real-time наблюдаемость кластера.

Пример:
# Ключевые панели Grafana для Spark:
#
# 1. Executor Overview:
#    - Active executors count
#    - JVM heap usage per executor
#    - GC time percentage
#
# 2. Job Performance:
#    - Task duration p50/p95/p99
#    - Shuffle read/write bytes
#    - Spill to disk bytes
#
# 3. Streaming (если используется):
#    - Batch processing time
#    - Input rows per second
#    - State store memory usage
#
# Пример PromQL: rate(spark_executor_gc_time_total[5m])
Подробнее в уроках:

Production и экосистема

Apache Arrow

Apache Arrow
Термин

Columnar in-memory формат данных для эффективного обмена между Spark (JVM) и Python/pandas. Используется в pandas UDF (vectorized UDF) для передачи данных без сериализации Java-объектов. Arrow Flight протокол используется в Spark Connect для клиент-серверной архитектуры.

Пример:
# Arrow ускоряет toPandas() и pandas UDF
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Без Arrow: Java → Pickle → Python (медленно)
# С Arrow: Java → Arrow IPC → Python (zero-copy)

pdf = df.toPandas()  # В 10-100x быстрее с Arrow

# Pandas UDF (vectorized) использует Arrow
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("double")
def normalize(s: pd.Series) -> pd.Series:
    return (s - s.mean()) / s.std()
Подробнее в уроках:

Spark Connect

Spark Connect
Термин

Клиент-серверная архитектура (с Spark 3.4), разделяющая клиентское приложение и Spark-кластер через gRPC. Клиент отправляет логический план, сервер выполняет и возвращает результаты через Arrow Flight. Позволяет тонкий клиент на Python/Scala без полного SparkSession на стороне клиента.

Пример:
# Запуск Spark Connect Server
# ./sbin/start-connect-server.sh --packages \
#   org.apache.spark:spark-connect_2.12:3.5.0

# Подключение клиента (тонкий — без JVM)
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .remote("sc://spark-server:15002") \
    .getOrCreate()

# Тот же API, что и локальный Spark
df = spark.sql("SELECT * FROM events")
df.show()
Подробнее в уроках:

YARN

Apache YARN
Термин

Yet Another Resource Negotiator — менеджер ресурсов в Hadoop-экосистеме. Spark на YARN работает в двух режимах: cluster (driver на YARN) и client (driver на submitting machine). YARN распределяет контейнеры с executor-ами по нодам кластера на основе запрошенных ресурсов.

Пример:
# Запуск Spark на YARN
spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --num-executors 20 \
    --executor-memory 8g \
    --executor-cores 4 \
    --conf spark.yarn.maxAppAttempts=2 \
    my_app.py

# Режимы деплоя:
# cluster: driver в YARN контейнере (для production)
# client: driver на машине отправки (для отладки)

# Мониторинг: YARN ResourceManager UI → Applications
Подробнее в уроках:

Kubernetes

Kubernetes
Термин

Контейнерный оркестратор, поддерживаемый Spark как cluster manager (с Spark 2.3, GA в 3.1). Spark создаёт pod-ы для driver-а и executor-ов в namespace Kubernetes. Преимущества: auto-scaling, resource isolation, CI/CD интеграция, единая инфраструктура для всех workload-ов.

Пример:
# Запуск Spark на Kubernetes
spark-submit \
    --master k8s://https://k8s-apiserver:443 \
    --deploy-mode cluster \
    --conf spark.kubernetes.container.image=spark:3.5.0 \
    --conf spark.kubernetes.namespace=spark-jobs \
    --conf spark.executor.instances=10 \
    --conf spark.kubernetes.executor.request.cores=2 \
    --conf spark.kubernetes.executor.limit.cores=4 \
    local:///opt/spark/my_app.py

# Dynamic allocation на K8s
# spark.dynamicAllocation.enabled=true
# spark.dynamicAllocation.shuffleTracking.enabled=true
Подробнее в уроках:

Great Expectations

Great Expectations
Термин

Python-фреймворк для валидации данных и data quality. Определяет «ожидания» (expectations) — декларативные проверки значений, типов, уникальности, ranges и распределений. Интегрируется с Spark через SparkDFDataset для валидации DataFrame-ов прямо в pipeline.

Пример:
import great_expectations as gx

# Создание контекста
context = gx.get_context()

# Подключение Spark DataFrame как data source
datasource = context.data_sources.add_spark("my_spark")
asset = datasource.add_dataframe_asset("events")
batch = asset.add_batch_definition_whole_dataframe("full")

# Определение expectations
suite = context.suites.add(
    gx.ExpectationSuite(name="events_quality")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="amount", min_value=0, max_value=1000000
    )
)
Подробнее в уроках:

Deequ

Amazon Deequ
Термин

Библиотека для data quality от Amazon, написанная на Scala для Spark. Вычисляет метрики (completeness, uniqueness, compliance), проверяет constraints и предлагает автоматические проверки через profiling. В отличие от Great Expectations, работает нативно на Spark без Python overhead.

Пример:
// Scala: проверка качества данных с Deequ
import com.amazon.deequ.VerificationSuite
import com.amazon.deequ.checks.{Check, CheckLevel}

val result = VerificationSuite()
  .onData(df)
  .addCheck(
    Check(CheckLevel.Error, "data quality")
      .isComplete("id")           // не NULL
      .isUnique("id")             // уникальные значения
      .isPositive("amount")       // > 0
      .isContainedIn("status",    // допустимые значения
        Array("active", "inactive"))
      .hasSize(_ > 1000)          // минимум строк
  )
  .run()

// PySpark: через PyDeequ wrapper
// from pydeequ.checks import Check
Подробнее в уроках:

Celeborn

Apache Celeborn
Термин

Remote shuffle service (бывший RemoteShuffleService от Alibaba), хранящий shuffle-данные на выделенных worker-нодах вместо executor-ов. Устраняет зависимость между shuffle-данными и executor lifecycle — executor может быть убит или перезапущен без потери shuffle-результатов. Критичен для Kubernetes-деплоя со dynamic allocation.

Пример:
# Подключение Celeborn к Spark
spark-submit \
    --conf spark.shuffle.manager=org.apache.celeborn.client.spark.SparkShuffleManager \
    --conf spark.celeborn.master.endpoints=celeborn-master:9097 \
    --conf spark.celeborn.client.spark.push.data.timeout=120s \
    my_app.py

# Преимущества:
# - Executor-ы могут быть пересозданы без потери shuffle
# - Лучше для K8s + dynamic allocation
# - Снижает давление на локальные диски executor-ов
# - Поддерживает memory + disk storage на Celeborn workers
Подробнее в уроках: