Learning Platform
Troubleshooting
Глоссарий

Глоссарий — Apache Spark Internals

Справочник ключевых терминов курса Apache Spark Internals.

9 категорий · 60 терминов

RDD и модель выполнения

RDD (Resilient Distributed Dataset)

Resilient Distributed Dataset
Термин

Фундаментальная абстракция Spark — неизменяемая распределённая коллекция объектов, описываемая пятью свойствами: список partition-ов, функция вычисления каждого partition-а, список зависимостей от других RDD, опциональный Partitioner (только для key-value RDD) и опциональный список preferred locations для каждого partition-а. RDD материализуется только при вызове action.

Пример:
// Пять свойств RDD в исходниках Spark:
// abstract class RDD[T] {
//   def getPartitions: Array[Partition]
//   def compute(split: Partition, ctx: TaskContext): Iterator[T]
//   def getDependencies: Seq[Dependency[_]]
//   val partitioner: Option[Partitioner]
//   def getPreferredLocations(split: Partition): Seq[String]
// }

val rdd = sc.textFile("hdfs:///data/*.log")  // HadoopRDD
val mapped = rdd.map(line => line.split(","))  // MapPartitionsRDD
val filtered = mapped.filter(_.length > 3)     // MapPartitionsRDD
// Граф зависимостей создан, данных ещё нет
Подробнее в уроках:

ShuffleMapStage / ResultStage

ShuffleMapStage / ResultStage
Термин

Два типа stage в Spark. ShuffleMapStage — промежуточный stage, task-и которого записывают shuffle-выходы для следующего stage. Завершается регистрацией MapStatus в MapOutputTracker. ResultStage — финальный stage, task-и которого возвращают результат напрямую driver-у (через ResultHandler) или записывают в sink. Каждый ResultStage завершает один Job. DAGScheduler создаёт ShuffleMapStage для каждой ShuffleDependency.

Пример:
// Визуализация в Spark UI:
// Job X:
//   Stage 0 (ShuffleMapStage): 200 tasks → shuffle write
//   Stage 1 (ShuffleMapStage): 200 tasks → shuffle write
//   Stage 2 (ResultStage):     100 tasks → write to parquet

// В логах DAGScheduler:
// INFO DAGScheduler: Submitting ShuffleMapStage 0 (200 tasks)
// INFO DAGScheduler: ShuffleMapStage 0 finished in 12.3 s
// INFO DAGScheduler: Submitting ResultStage 2 (100 tasks)

// Ключевое: если ShuffleMapStage упал частично
// → пересчитываются только потерянные map outputs
// → ResultStage перезапускается полностью
Подробнее в уроках:

RDD Lineage

RDD Lineage / Dependency Graph
Термин

Направленный ациклический граф зависимостей между RDD, запоминающий полную цепочку трансформаций от источника данных до результата. Lineage — механизм отказоустойчивости: при потере partition-а Spark восстанавливает его, повторно выполняя соответствующую ветку lineage, а не сохраняя данные на диск. toDebugString() возвращает человекочитаемое представление lineage.

Пример:
val rdd = sc.textFile("/data")
  .map(_.split(","))
  .filter(_(2).toInt > 100)
  .map(arr => (arr(0), arr(1).toLong))
  .reduceByKey(_ + _)

// Печать lineage
println(rdd.toDebugString)
// (8) ShuffledRDD[5]
//  +-(8) MapPartitionsRDD[4]
//      |  MapPartitionsRDD[3]
//      |  MapPartitionsRDD[2]
//      |  MapPartitionsRDD[1]
//      |  /data HadoopRDD[0]
Подробнее в уроках:

Narrow Dependency

Narrow Dependency
Термин

Тип зависимости RDD, при котором каждый partition дочернего RDD зависит от одного (или фиксированного числа) partition-ов родительского RDD. Не требует shuffle. Примеры: map, filter, flatMap, union. Узкие зависимости позволяют конвейеризировать вычисления в одном task-е и локально восстанавливать потерянные partition-ы.

Пример:
// Narrow: каждый выходной partition ← один входной
val rdd2 = rdd1.map(x => x * 2)       // MapPartitionsRDD: narrow
val rdd3 = rdd2.filter(_ > 5)          // MapPartitionsRDD: narrow
val rdd4 = rdd1.union(rdd2)            // UnionRDD: narrow

// Конвейеризация: map + filter выполняются в одном task
// без записи промежуточных данных на диск

// Восстановление: потеря partition i в rdd3
// → пересчитать только partition i rdd1 → rdd2 → rdd3
Подробнее в уроках:

Wide Dependency

Wide Dependency / Shuffle Dependency
Термин

Тип зависимости RDD, при котором каждый partition дочернего RDD зависит от множества partition-ов родительского. Требует shuffle — полного перераспределения данных между executor-ами через запись на диск и передачу по сети. DAGScheduler разрезает граф на stage-и по границам широких зависимостей. Примеры: groupByKey, reduceByKey, join с разными partitioner-ами.

Пример:
// Wide: каждый выходной partition ← много входных
val counts = rdd.groupByKey()   // ShuffledRDD: wide
val joined = rdd1.join(rdd2)    // CoGroupedRDD: wide

// DAGScheduler создаёт stage-границу здесь:
// Stage 0: [upstream ops] → shuffle write
// Stage 1: shuffle read → [downstream ops]

// Восстановление wide dependency дорогостоящее:
// потеря partition требует повторного shuffle из Stage 0
Подробнее в уроках:

Partitioner

Partitioner
Термин

Объект, определяющий, в какой partition попадёт элемент при shuffle. Два встроенных: HashPartitioner (ключ → partition по key.hashCode % numPartitions) и RangePartitioner (сортирует ключи по диапазонам, эффективен для sortByKey). Custom Partitioner позволяет реализовать domain-специфичную логику распределения, избегая data skew.

Пример:
import org.apache.spark.HashPartitioner
import org.apache.spark.RangePartitioner

// HashPartitioner — по умолчанию для groupByKey/reduceByKey
val partitioned = rdd.partitionBy(new HashPartitioner(100))

// RangePartitioner — для sortByKey
val rangePart = new RangePartitioner(100, rdd)  // сэмплирует данные!
val sorted = rdd.sortByKey()  // использует RangePartitioner

// Custom Partitioner
class DomainPartitioner(n: Int) extends Partitioner {
  def numPartitions: Int = n
  def getPartition(key: Any): Int =
    Math.abs(key.hashCode) % n  // или своя логика
}
Подробнее в уроках:

Планировщик: DAG и Task

DAGScheduler

DAG Scheduler
Термин

Компонент Spark, преобразующий RDD-граф в физический план выполнения. При вызове action DAGScheduler обходит граф зависимостей в обратном направлении, выявляет ShuffleDependency-границы и разрезает граф на stage-и. Для каждого stage создаёт ShuffleMapStage (результат shuffle write) или ResultStage (финальная стадия). Направляет готовые stage-и в TaskScheduler.

Пример:
// DAGScheduler не вызывается напрямую, но его поведение видно через:

// 1. Spark UI: вкладка Jobs → stages breakdown
// 2. Логи: [DAGScheduler] Submitting ShuffleMapStage 0
// 3. eventLog: SparkListenerStageSubmitted

// Ключевые методы (Scala internals):
// DAGScheduler.handleJobSubmitted() → создаёт finalStage
// DAGScheduler.getOrCreateShuffleMapStage() → для wide deps
// DAGScheduler.submitStage() → проверяет готовность родителей
Подробнее в уроках:

TaskScheduler

Task Scheduler
Термин

Компонент Spark, принимающий TaskSet от DAGScheduler и распределяющий task-и по executor-ам с учётом локальности данных. TaskSchedulerImpl реализует delay scheduling — ждёт освобождения executor-а с нужной локальностью определённое время перед переходом к менее локальному. Взаимодействует с cluster manager через SchedulerBackend.

Пример:
// Конфигурация задержки для delay scheduling:
spark.conf.set("spark.locality.wait", "3s")         // общее ожидание
spark.conf.set("spark.locality.wait.node", "3s")     // NODE_LOCAL
spark.conf.set("spark.locality.wait.rack", "3s")     // RACK_LOCAL
// PROCESS_LOCAL → NODE_LOCAL → RACK_LOCAL → ANY

// Метрики в Spark UI → Stages → Task Metrics:
// Locality Level: PROCESS_LOCAL/NODE_LOCAL/RACK_LOCAL/ANY
// Наличие только ANY-задач → проблема с локальностью
Подробнее в уроках:

TaskSetManager

TaskSetManager
Термин

Внутренний компонент TaskScheduler, управляющий одним TaskSet (набором task-ов одного stage). Отвечает за: отслеживание статуса каждого task-а, повторный запуск упавших task-ов (до spark.task.maxFailures попыток), обнаружение и запуск speculative task-ов для straggler-ов, вычисление уровня локальности для каждого pending task-а.

Пример:
// Настройки TaskSetManager:
spark.conf.set("spark.task.maxFailures", "4")  // макс. попыток
spark.conf.set(
  "spark.speculation", "true"
)  // включить speculative execution
spark.conf.set(
  "spark.speculation.multiplier", "1.5"
)  // задача-straggler: >1.5x медианы
spark.conf.set(
  "spark.speculation.quantile", "0.75"
)  // ждать 75% завершённых задач

// В логах:
// TaskSetManager: Starting task 5.0 in stage 2.0 (TID 15)
// TaskSetManager: Lost task 5.0 in stage 2.0 — retrying (1/4)
Подробнее в уроках:

Task Locality

Task Locality
Термин

Принцип выполнения task-а как можно ближе к данным, которые он обрабатывает. Уровни приоритета (от лучшего к худшему): PROCESS_LOCAL (данные в памяти того же JVM), NODE_LOCAL (данные на том же узле), NO_PREF (нет предпочтений), RACK_LOCAL (тот же rack), ANY (любой узел). Delay scheduling задерживает отправку task-а ради лучшего уровня локальности.

Пример:
// Проверка локальности в Spark UI → Stage → Tasks:
// PROCESS_LOCAL: кешированный RDD/DataFrame в памяти executor
// NODE_LOCAL: блок HDFS на том же узле, executor не кеширует
// RACK_LOCAL: блок HDFS на другом узле того же rack
// ANY: данные на другом rack или object storage (S3/GCS)

// Для S3/GCS всегда ANY — нет данных на executor-нодах
// Оптимизация: увеличить cache, уменьшить locality.wait для S3
spark.conf.set("spark.locality.wait", "0s")  // для object storage
Подробнее в уроках:

Speculative Execution

Speculative Execution
Термин

Механизм запуска дублирующих копий медленных (straggler) task-ов на других executor-ах. TaskSetManager определяет straggler, если его время выполнения превышает spark.speculation.multiplier × медианное время завершённых task-ов. Первый завершившийся результат используется, дубликат отменяется. Помогает при неравномерной нагрузке на узлы, но увеличивает потребление ресурсов.

Пример:
// Включение speculative execution
spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.interval", "100ms")  // частота проверки
spark.conf.set("spark.speculation.multiplier", "1.5")   // порог straggler
spark.conf.set("spark.speculation.quantile", "0.75")    // % завершённых задач

// В Spark UI → Stage → Tasks:
// task с (speculative) в поле Index — это дубликат
// Успех первого завершённого убивает второй

// ОСТОРОЖНО: non-idempotent задачи могут давать двойные записи!
Подробнее в уроках:

Dynamic Allocation

Dynamic Resource Allocation
Термин

Механизм автоматического масштабирования числа executor-ов в зависимости от нагрузки. При наличии pending task-ов executor-ы добавляются (экспоненциально: 1→2→4→8). Простаивающие executor-ы удаляются после spark.dynamicAllocation.executorIdleTimeout. Требует External Shuffle Service или shuffle tracking (Kubernetes) для сохранения shuffle-данных после удаления executor-а.

Пример:
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "2")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "100")
spark.conf.set("spark.dynamicAllocation.initialExecutors", "5")
spark.conf.set("spark.dynamicAllocation.executorIdleTimeout", "60s")

// Для Kubernetes (без External Shuffle Service):
spark.conf.set(
  "spark.dynamicAllocation.shuffleTracking.enabled", "true"
)
// Spark отслеживает, какой executor держит нужные shuffle-блоки
Подробнее в уроках:

Shuffle: внутреннее устройство

ShuffleManager

ShuffleManager
Термин

Плагинный интерфейс Spark для управления shuffle-операциями. Производится регистрация через spark.shuffle.manager. Стандартная реализация — SortShuffleManager (с Spark 1.6). Предоставляет ShuffleWriter (для map-стороны) и ShuffleReader (для reduce-стороны). Позволяет заменить встроенный shuffle внешним сервисом (Celeborn, Uniffle).

Пример:
// По умолчанию:
// spark.shuffle.manager = sort (SortShuffleManager)

// Замена на Celeborn:
// --conf spark.shuffle.manager=
//   org.apache.celeborn.client.spark.SparkShuffleManager

// Внутренние методы ShuffleManager:
// registerShuffle() → создаёт ShuffleHandle при начале shuffle
// getWriter()       → ShuffleWriter для map-задачи
// getReader()       → ShuffleReader для reduce-задачи
// unregisterShuffle() → очистка после завершения задания
Подробнее в уроках:

SortShuffleManager

Sort Shuffle Manager
Термин

Стандартный ShuffleManager в Spark. Реализует три пути записи: BypassMergeSortShuffleWriter (нет combiner, <200 partition-ов — один файл на partition без сортировки), UnsafeShuffleWriter (Tungsten-сериализация, сортировка по 64-bit encoded partition+key, merge), SortShuffleWriter (универсальный — ExternalSorter, поддерживает combiner и spill). Каждый map task создаёт один data-файл и один index-файл.

Пример:
// Три пути SortShuffleManager:
// 1. BypassMergeSort: нет агрегации И partition < 200
//    → spark.shuffle.sort.bypassMergeThreshold = 200
// 2. UnsafeShuffle: serializedSorting поддерживается кодеком
//    И нет агрегации И partition ≤ 16777216 (2^24)
// 3. SortShuffle: все остальные случаи (включая combiner)

// Результат всегда: один .data + один .index файл
// .index: массив кумулятивных смещений (8 байт × numPartitions+1)
// .data: конкатенация partition-блоков в порядке partition ID
Подробнее в уроках:

MapOutputTracker

MapOutputTracker
Термин

Distributed registry, хранящий информацию о расположении shuffle-выходов (map outputs). Driver-сторона — MapOutputTrackerMaster — принимает регистрации от завершённых map task-ов. Executor-сторона — MapOutputTrackerWorker — запрашивает у Driver информацию о расположении нужных блоков перед чтением shuffle. Кеширует ответы локально для уменьшения RPC-нагрузки на Driver.

Пример:
// Поток данных MapOutputTracker:
// 1. Map task завершён → executor отправляет
//    MapStatus(blockManagerId, sizes[]) → Driver
// 2. Driver сохраняет в MapOutputTrackerMaster
// 3. Reduce task стартует → запрашивает у Driver:
//    GetMapOutputStatuses(shuffleId)
// 4. Driver отвечает сериализованным MapStatus[]
// 5. Executor распаковывает → знает, где читать каждый блок

// Мониторинг: spark.reducer.maxSizeInFlight контролирует
// параллельное чтение shuffle-блоков
Подробнее в уроках:

Push-based Shuffle (Magnet)

Push-Based Shuffle / Magnet
Термин

Оптимизация shuffle в Spark 3.2+, при которой map task-и активно (push) отправляют свои данные на merge-сервисы External Shuffle Service сразу после записи, не дожидаясь reduce-стадии. Merge-сервис объединяет блоки от разных map task-ов для одного reduce partition-а в один merged file. Снижает число random read при чтении reduce-стороны.

Пример:
// Включение push-based shuffle
spark.conf.set("spark.shuffle.push.enabled", "true")
spark.conf.set(
  "spark.shuffle.push.minShuffleSizeToWait", "500m"
)  // минимальный размер для активации push
spark.conf.set(
  "spark.shuffle.push.maxBlockSizeToPush", "1m"
)  // max размер блока для push

// Требует External Shuffle Service на каждом узле
// Метрики: remote merged fetches vs. remote fetches
// Spark UI → Stage → Task Metrics → Shuffle Read
Подробнее в уроках:

External Shuffle Service (ESS)

External Shuffle Service
Термин

Отдельный процесс на каждом worker-узле кластера, хранящий shuffle-данные вне executor JVM. Позволяет удалять executor-ы при dynamic allocation без потери их shuffle-выходов. Реализован как долгоживущий сервис (spark-external-shuffle-service), запускаемый вместе с NodeManager (YARN) или DaemonSet (Kubernetes). Обязателен для dynamic allocation в YARN-кластерах.

Пример:
# YARN: ESS встроен в NodeManager
spark.conf.set("spark.shuffle.service.enabled", "true")
spark.conf.set("spark.shuffle.service.port", "7337")

# Kubernetes: требует sidecar или DaemonSet
spark.conf.set("spark.shuffle.service.enabled", "true")
spark.conf.set(
  "spark.kubernetes.shuffle.namespace", "spark-shuffle"
)

# Без ESS на K8s — shuffle tracking:
spark.conf.set(
  "spark.dynamicAllocation.shuffleTracking.enabled", "true"
)
Подробнее в уроках:

ExternalSorter

External Sorter
Термин

Компонент SortShuffleWriter, реализующий sort-based shuffle с поддержкой spill-на-диск. Буферизует записи в PartitionedAppendOnlyMap (если есть combiner) или PartitionedPairBuffer (без combiner) до достижения memory threshold, затем делает частичный spill во временный файл. По завершении все spill-файлы merge-сортируются в один итоговый shuffle-файл. Поддерживает Aggregator для partial aggregation до shuffle.

Пример:
// ExternalSorter активируется в SortShuffleWriter:
// 1. insertAll() — добавляет записи с combiner (reduceByKey) или без
// 2. При нехватке памяти → maybeSpillCollection() → spill на диск
// 3. writePartitionedMapOutput() → merge-сортировка + единый .data файл

// Управление памятью spill:
spark.conf.set(
  "spark.shuffle.spill.compress", "true"
)  // сжатие spill-файлов
spark.conf.set(
  "spark.shuffle.sort.bypassMergeThreshold", "200"
)  // избежать sorter для < 200 partition
Подробнее в уроках:

Память и хранение блоков

UnifiedMemoryManager

Unified Memory Manager
Термин

Менеджер памяти Spark (с версии 1.6), управляющий единым пулом памяти executor-а. Делит доступную память (spark.executor.memory × spark.memory.fraction, default 0.6) на два региона: execution memory (shuffle, sort, join, aggregation) и storage memory (кеш RDD/DataFrame). Регионы динамически делят одну границу — execution может вытеснять storage при нехватке, storage возвращается только при освобождении execution.

Пример:
// Расчёт регионов памяти:
// executor.memory = 8g
// memory.fraction = 0.6 → 4.8g для UMM
// memory.storageFraction = 0.5 → 2.4g для storage (не вытесняется)
// Оставшиеся 40% (3.2g) — reserved для user code, overhead

spark.conf.set("spark.memory.fraction", "0.6")
spark.conf.set("spark.memory.storageFraction", "0.5")

// Мониторинг в Spark UI → Executors:
// Storage Memory: X.X GiB / Y.Y GiB
// Включить детали: /metrics/json endpoint
Подробнее в уроках:

BlockManager

Block Manager
Термин

Распределённая система управления блоками данных в Spark. Каждый executor (и driver) имеет BlockManager, который хранит данные в памяти (MemoryStore) или на диске (DiskStore) и отвечает на запросы от других executor-ов. BlockManagerMaster на driver-е хранит метаданные о расположении всех блоков. BlockId идентифицирует блок: RDDBlockId, ShuffleBlockId, BroadcastBlockId и др.

Пример:
// Типы BlockId:
// RDDBlockId(rddId, partitionId)     → кеш RDD
// ShuffleBlockId(shuffleId, mapId, reduceId) → shuffle данные
// BroadcastBlockId(broadcastId)      → broadcast переменная
// TempShuffleBlockId(uuid)           → временный shuffle блок

// Чтение блока в порядке приоритета:
// 1. Локальная память (MemoryStore)
// 2. Локальный диск (DiskStore)
// 3. Remote BlockManager (по сети через BlockTransferService)
// 4. Пересчёт через lineage (для RDD-блоков)
Подробнее в уроках:

MemoryStore

Memory Store
Термин

Компонент BlockManager, хранящий блоки данных в JVM heap или off-heap памяти. Поддерживает два формата: десериализованные Java-объекты (быстрый доступ, больший размер) и сериализованные байтовые буферы (компактнее, медленнее). При нехватке памяти вытесняет блоки по LRU-политике — сначала в DiskStore (если StorageLevel предусматривает disk), затем удаляет.

Пример:
// StorageLevel контролирует поведение MemoryStore:
df.persist(StorageLevel.MEMORY_ONLY)        // только heap, без disk
df.persist(StorageLevel.MEMORY_ONLY_SER)    // heap + сериализация
df.persist(StorageLevel.MEMORY_AND_DISK)    // heap → spill на диск
df.persist(StorageLevel.OFF_HEAP)           // off-heap (Tungsten)

// Мониторинг вытеснений:
// Spark UI → Storage → RDD Partitions:
// Cached In Memory: X, Cached On Disk: Y, Size in Memory: Z
// Если много Cached On Disk → memory pressure, увеличь executor.memory
Подробнее в уроках:

DiskStore

Disk Store
Термин

Компонент BlockManager, записывающий блоки данных на локальный диск executor-а при нехватке памяти или использовании StorageLevel с disk-составляющей. Блоки хранятся в директории spark.local.dir в виде файлов, именованных по BlockId. DiskStore использует DiskBlockManager для маппинга BlockId → путь к файлу. Поддерживает шифрование блоков через spark.io.encryption.enabled.

Пример:
// Настройка DiskStore:
spark.conf.set(
  "spark.local.dir", "/fast-ssd/spark-temp,/hdd/spark-temp"
)  // несколько директорий для striping

// StorageLevel с disk fallback:
df.persist(StorageLevel.MEMORY_AND_DISK)  // → DiskStore при вытеснении
df.persist(StorageLevel.DISK_ONLY)        // → только DiskStore

// Блоки на диске шифруются если:
spark.conf.set("spark.io.encryption.enabled", "true")
spark.conf.set("spark.io.encryption.keySizeBits", "128")
// Mониторинг: Spark UI → Executors → Disk Used
Подробнее в уроках:

TorrentBroadcast

Torrent Broadcast
Термин

Реализация broadcast-переменных в Spark по BitTorrent-протоколу. Driver разбивает broadcast-значение на блоки (spark.broadcast.blockSize, default 4m) и хранит их в своём BlockManager. Executor-ы скачивают блоки с driver-а (или от других executor-ов, уже получивших блоки) параллельно. Это снижает нагрузку на driver при большом числе executor-ов по сравнению с наивной рассылкой от одного источника.

Пример:
// Создание broadcast переменной
val lookup = spark.sparkContext.broadcast(
  Map("US" -> "United States", "DE" -> "Germany")
)  // → TorrentBroadcast: нарезается на блоки, хранится в driver BlockManager

// Использование на executor-ах
val result = rdd.map(row => lookup.value.getOrElse(row.country, "Unknown"))
// lookup.value → BlockManager читает локальный кеш или
//                тянет блоки у соседних executor-ов P2P

// Освобождение
lookup.unpersist()  // удалить с executor-ов, driver сохраняет
lookup.destroy()    // удалить полностью
Подробнее в уроках:

Catalyst Optimizer и Tungsten

Logical Plan

Logical Plan
Термин

Декларативное дерево операций в Catalyst, описывающее «что» вычислить, без физической реализации. Проходит три стадии: UnresolvedLogicalPlan (выражения могут ссылаться на несуществующие колонки), AnalyzedLogicalPlan (все ссылки разрешены через Catalog), OptimizedLogicalPlan (применены rule-based оптимизации: predicate pushdown, column pruning, constant folding, join reordering). Каждый узел — TreeNode с методом transform для pattern matching.

Пример:
// Просмотр всех стадий logical plan:
val df = spark.read.parquet("/sales")
  .filter("amount > 100")
  .groupBy("region").sum("amount")

val qe = df.queryExecution
println(qe.logical)         // unresolved: UnresolvedAttribute('amount')
println(qe.analyzed)        // resolved: amount#5 LongType
println(qe.optimizedPlan)   // optimized: predicate pushdown применён

// PySpark:
# df._jdf.queryExecution().logical()
# df._jdf.queryExecution().analyzed()
# df._jdf.queryExecution().optimizedPlan()
Подробнее в уроках:

Physical Plan

Physical Plan
Термин

Исполняемое дерево SparkPlan-операторов, определяющее конкретный способ выполнения запроса. Physical Planning преобразует OptimizedLogicalPlan в SparkPlan через набор Strategy-объектов (JoinSelection, BasicOperators, Aggregation и др.). Каждый PhysicalOperator реализует execute() → RDD[InternalRow]. CostBasedJoinReorder выбирает оптимальный порядок join-ов при наличии column statistics.

Пример:
// Просмотр физического плана:
df.explain()         // краткий
df.explain(True)     // расширенный с codegen stage
df.explain('cost')   // с оценками стоимости

// Scala:
println(df.queryExecution.sparkPlan)    // до codegen
println(df.queryExecution.executedPlan) // после codegen

// Ключевые физические операторы:
// BroadcastHashJoin vs SortMergeJoin vs ShuffledHashJoin
// HashAggregate (с partial + final стадиями)
// SortExec (с spill support)
// Exchange (shuffle boundary)
Подробнее в уроках:

Catalyst Pipeline

Catalyst Optimization Pipeline
Термин

Четырёхфазный pipeline оптимизации запросов: (1) Analysis — разрешение ссылок на таблицы/колонки через Catalog; (2) Logical Optimization — применение rule-based правил (предикат pushdown, column pruning, constant folding, join reordering); (3) Physical Planning — выбор физических стратегий (BroadcastHashJoin vs SortMergeJoin); (4) Code Generation — генерация Java bytecode через Janino. Правила применяются итерационно до фиксированной точки.

Пример:
// Просмотр всех фаз через DataFrame API:
import org.apache.spark.sql.execution.debug._

df.debugCodegen()  // распечатать сгенерированный код

// Через queryExecution:
val qe = df.queryExecution
println(qe.logical)          // unresolved logical plan
println(qe.analyzed)         // resolved logical plan
println(qe.optimizedPlan)    // optimized logical plan
println(qe.sparkPlan)        // physical plan (before codegen)
println(qe.executedPlan)     // final plan с CodeGenStage
Подробнее в уроках:

Optimizer Rule

Optimizer Rule
Термин

Правило трансформации логического плана, реализующее один шаг оптимизации. Наследует Rule[LogicalPlan] и реализует метод apply(plan). Применяется через pattern matching к узлам дерева плана. Примеры встроенных правил: PushDownPredicate, ColumnPruning, ConstantFolding, ReorderJoin, EliminateSorts. SparkSessionExtensions позволяет добавлять custom правила.

Пример:
// Структура optimizer rule (Scala):
class MyRule extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan =
    plan transform {
      // Pattern matching на узлах плана
      case Filter(Always True, child) => child
      case Project(cols, child)
        if cols == child.output => child
    }
}

// Регистрация кастомного правила:
spark.experimental.extraOptimizations =
  Seq(new MyRule)

// Или через SparkSessionExtensions:
.withExtensions(_.injectOptimizerRule(_ => new MyRule))
Подробнее в уроках:

Whole-Stage CodeGen (WSCG)

Whole-Stage Code Generation
Термин

Техника генерации единого Java-метода для обработки строки данных через весь pipeline физических операторов. Вместо виртуальных вызовов между операторами Volcano-модели (один вызов next() на строку на оператор) генерируется один tight loop с inlined-логикой всех операторов. Устраняет overhead виртуальных вызовов, улучшает CPU instruction cache locality и позволяет JIT-компилятору сильнее оптимизировать код.

Пример:
// Звёздочка (*) в EXPLAIN = CodeGenStage boundary:
// *(1) HashAggregate(keys=[dept], functions=[sum(salary)])
// +- Exchange hashpartitioning(dept, 200)
//    +- *(1) HashAggregate(keys=[dept], ...)
//       +- *(1) Filter (active = true)
//          +- *(1) ColumnarToRow
//             +- Scan parquet [...]

// Все *(1)-операторы скомпилированы в один метод!
// Exchange — граница, начинается новый CodeGenStage *(2)

// Отладка сгенерированного кода:
df.queryExecution.debug.codegen()
Подробнее в уроках:

UnsafeRow

Unsafe Row
Термин

Компактный бинарный формат строки данных Tungsten, хранящий данные в непрерывном массиве байт (on-heap или off-heap). Структура: null bitmap (ceil(numFields/64)×8 байт), fixed-length значения (8 байт каждое — Int хранится как long), variable-length данные (строки, массивы) с offset+length в fixed-части. Поддерживает O(1) доступ к полю без десериализации и binary comparison без декодирования.

Пример:
// Структура UnsafeRow для (id: INT, name: STRING, age: INT):
// Offset 0:  null bitmap (8 байт, 3 поля → 1 блок)
// Offset 8:  id value (8 байт, long-aligned)
// Offset 16: name offset+length (8 байт: [offset:32bit][len:32bit])
// Offset 24: age value (8 байт)
// Offset 32: name bytes (variable-length, выровнено до 8 байт)

// Использование в Spark:
// Все операции shuffle, sort, join работают с UnsafeRow
// Не нужна десериализация для сравнения ключей:
// UnsafeRowUtils.compare(row1, row2, keyFields)

// Размер можно смотреть в Spark UI: shuffle write bytes / num records
Подробнее в уроках:

Encoder

Encoder
Термин

Механизм сериализации/десериализации между JVM-объектами и внутренним InternalRow / UnsafeRow форматом Spark. Encoders генерируются через Catalyst кодогенерацию для конкретного типа T: ExpressionEncoder[T] содержит сериализатор (JVM → InternalRow) и десериализатор (InternalRow → JVM). Используется в Dataset[T] API для типобезопасных операций.

Пример:
// Scala: неявные encoder-ы
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

case class User(id: Long, name: String, age: Int)

// Автоматически через implicit:
val ds = spark.createDataset(Seq(User(1, "Alice", 30)))

// Явно:
val enc = Encoders.product[User]
val exprEnc = enc.asInstanceOf[ExpressionEncoder[User]]

// Encoder кодогенерируется один раз и переиспользуется
// PySpark: PickleSerializer или ArrowSerializer (pandas UDF)
import pyspark.sql.functions as F
Подробнее в уроках:

Adaptive Query Execution (AQE)

Runtime Filter Adaptation

Runtime Filter / Bloom Filter Injection
Термин

Оптимизация AQE в Spark 3.3+: после вычисления одной стороны join-а Spark строит Bloom filter или In-filter и инжектирует его в scan-оператор другой стороны. Это позволяет отфильтровать ненужные строки до shuffle или даже на уровне storage. Работает для inner join, semi join, right outer join (для левой стороны). Bloom filter хранится в памяти driver-а.

Пример:
// Включение runtime фильтров:
spark.conf.set(
  "spark.sql.optimizer.runtimeFilter.enabled", "true"
)
spark.conf.set(
  "spark.sql.optimizer.runtimeFilter.creationSideThreshold",
  "10m"  // строить filter только для сторон < 10MB
)

// В EXPLAIN:
// Filter (isnotnull(id) AND might_contain(bf, xxhash64(id)))
// might_contain → bloom filter probe

// Bloom filter размер:
spark.conf.set(
  "spark.sql.optimizer.runtimeFilter.number.bloomFilter.bits",
  "8"  // bits per item
)
Подробнее в уроках:

AQE (Adaptive Query Execution)

Adaptive Query Execution
Термин

Фреймворк динамической re-оптимизации плана выполнения в Spark 3.x на основе runtime-статистик, собранных после каждого shuffle. После завершения shuffle-стадии AQE получает точные размеры partition-ов и перепланирует оставшуюся часть DAG: объединяет мелкие partition-ы (coalesce), реагирует на data skew (skew join), заменяет SortMergeJoin на BroadcastHashJoin если данные оказались меньше порога.

Пример:
// Включение AQE (по умолчанию с Spark 3.2):
spark.conf.set("spark.sql.adaptive.enabled", "true")

// Три основных оптимизации AQE:
// 1. Coalesce shuffle partitions
spark.conf.set(
  "spark.sql.adaptive.coalescePartitions.enabled", "true"
)
spark.conf.set(
  "spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m"
)
// 2. Skew join detection & splitting
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
// 3. Runtime SortMergeJoin → BroadcastHashJoin
// Автоматически если размер < autoBroadcastJoinThreshold
Подробнее в уроках:

AQE Coalesce Partitions

AQE Coalesce Shuffle Partitions
Термин

Оптимизация AQE: после завершения shuffle-стадии AQE собирает точные размеры всех partition-ов и объединяет мелкие смежные partition-ы в один, стремясь к targetSize (advisoryPartitionSizeInBytes). Это устраняет проблему пустых/крошечных partition-ов при фиксированном spark.sql.shuffle.partitions=200 и снижает overhead планировщика.

Пример:
spark.conf.set(
  "spark.sql.adaptive.coalescePartitions.enabled", "true"
)
spark.conf.set(
  "spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m"
)
spark.conf.set(
  "spark.sql.adaptive.coalescePartitions.minPartitionSize", "1m"
)
// minPartitionSize: нижняя граница — не объединять до мельче этого

// Результат: вместо 200 пустых partition-ов после join маленькой таблицы
// AQE может создать 3-5 реальных partition-ов
// Наблюдать в Spark UI → Stage Details → "Custom Metrics: numCoalescedPartitions"
Подробнее в уроках:

AQE Skew Join

AQE Skew Join Optimization
Термин

Оптимизация AQE для обработки data skew в join-операциях. AQE определяет skewed partition как тот, чей размер превышает max(skewedPartitionFactor × median, skewedPartitionThresholdInBytes). Skewed partition дробится на несколько подзадач, соответствующая часть другой стороны join дублируется для каждой подзадачи. Прозрачно для пользователя.

Пример:
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set(
  "spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5"
)  // размер > 5x медианы = skew
spark.conf.set(
  "spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes",
  "256m"
)  // И > 256mb

// Наблюдение в Spark UI → SQL → DAG:
// SortMergeJoin (skew) → появляется при активации
// "number of skewed partitions" в Custom Metrics

// Ограничение: работает только с SortMergeJoin!
// BroadcastHashJoin skew не лечится AQE automaticaly
Подробнее в уроках:

Dynamic Partition Pruning (DPP)

Dynamic Partition Pruning
Термин

Оптимизация Spark 3.x: фильтр, примененный к dimension-стороне join-а, динамически транслируется в subquery и применяется к сканированию fact-таблицы на уровне partition-ов. DPP активируется для star-schema паттернов: broadcast join dimension → dynamically filter fact. Реализуется через вставку DynamicPruningExpression в план сканирования fact-таблицы.

Пример:
-- DPP автоматически применяется:
SELECT f.sale_amount, d.region_name
FROM fact_sales f
JOIN dim_region d ON f.region_id = d.id
WHERE d.country = 'DE';

-- Spark вставляет в план fact-таблицы:
-- Filter region_id IN (
--   DynamicPruning: SELECT id FROM dim_region WHERE country='DE'
-- )
-- → читает только partition-ы с немецкими region_id

-- Конфиги:
spark.conf.set(
  "spark.sql.optimizer.dynamicPartitionPruning.enabled", "true"
)
Подробнее в уроках:

Structured Streaming: внутреннее устройство

MicroBatch Execution Engine

Micro-Batch Execution Engine
Термин

Основной режим выполнения Structured Streaming: MicroBatchExecution периодически опрашивает source на наличие новых данных (до latestOffset), запускает IncrementalExecution для обработки нового batch-а, коммитит offset и повторяет. Каждый micro-batch — полноценный Spark job с планированием задач. Гарантирует exactly-once через checkpoint + idempotent sink.

Пример:
// MicroBatchExecution поток:
// 1. source.latestOffset() → получить новые offsets
// 2. source.getBatch(start, end) → DataFrame с новыми данными
// 3. IncrementalExecution.executedPlan → оптимизированный план
// 4. query.runBatch() → запуск как обычный Spark job
// 5. sink.addBatch(batchId, data) → запись результата
// 6. offsetLog.add(batchId, offsets) → checkpoint offsets
// 7. commitLog.add(batchId) → checkpoint commit

// Просмотр прогресса:
query.lastProgress  // метрики последнего micro-batch
query.recentProgress  // история последних N micro-batch
Подробнее в уроках:

StateStore

State Store
Термин

Версионированное key-value хранилище для stateful-операций Structured Streaming. Каждый stateful оператор (агрегация, дедупликация, stream-stream join) использует отдельный StateStore. Реализации: HDFSBackedStateStoreProvider (default, хранит snapshot + delta-файлы в HDFS/S3) и RocksDBStateStoreProvider (Spark 3.2+, on-disk LSM tree на executor, рекомендуется для больших состояний).

Пример:
// HDFSBackedStateStore — по умолчанию:
// Хранит в checkpointLocation/state/<operatorId>/<partitionId>/
// Snapshot каждые N commit-ов, дельты между снапшотами
// Нагружает память executor (держит всё состояние в HashMap)

// RocksDB — для больших состояний:
spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "org.apache.spark.sql.execution.streaming.state."
  + "RocksDBStateStoreProvider"
)
spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.compactOnCommit", "false"
)
// Метрики: stateRows, stateMemory, loadedMapCacheHitCount
Подробнее в уроках:

IncrementalExecution

Incremental Execution
Термин

Специализированный QueryExecution для Structured Streaming, добавляющий stateful операторы в физический план. Каждый micro-batch использует один и тот же оптимизированный план, но с обновлёнными ссылками на текущий batch данных. Вставляет StateStoreRestoreExec и StateStoreSaveExec узлы в план для операций с состоянием, передаёт batchId и watermark в эти операторы.

Пример:
// IncrementalExecution добавляет в план:
// - StateStoreRestoreExec: читает предыдущее состояние из StateStore
// - HashAggregateExec: обрабатывает строки текущего batch
// - StateStoreSaveExec: сохраняет обновлённое состояние
// - WatermarkEvalExec: применяет eviction по watermark

// Просмотр плана streaming-запроса:
query.explain(extended=True)
// или через:
// spark.streams.active.head.explain()
Подробнее в уроках:

Offset Log и Commit Log

Offset Log / Commit Log
Термин

Двухступенчатый механизм exactly-once в Structured Streaming. OffsetSeqLog (WAL) сначала сохраняет offsets нового micro-batch-а — это гарантирует, что при падении мы знаем, какие данные читать. CommitLog сохраняется после успешной записи в sink — он подтверждает, что batch завершён. При перезапуске: если offset есть но нет commit → повтор batch-а (idempotent). Если есть оба → пропуск.

Пример:
// Структура checkpoint директории:
// checkpointLocation/
//   metadata          — id запроса и версия
//   offsets/          — WAL: offset лог
//     0, 1, 2 ...     — JSON с офсетами каждого batch
//   commits/          — commit лог
//     0, 1, 2 ...     — подтверждения завершённых batch
//   state/            — состояние StatefulOperator-ов
//   sources/          — source-specific состояние

// Два файла для batch N:
// offsets/N  существует → batch запущен
// commits/N  существует → batch завершён успешно
Подробнее в уроках:

transformWithState

transformWithState
Термин

Новый stateful API Spark 4.0 (preview в 3.5), заменяющий flatMapGroupsWithState. Предоставляет типобезопасные операции со state через StatefulProcessor интерфейс: ValueState, ListState, MapState. Поддерживает TTL для автоматического eviction, таймеры (event-time и processing-time), возможность работать с несколькими type-safe state-переменными в одном операторе.

Пример:
// Spark 4.0: transformWithState
class SessionAggregator
    extends StatefulProcessor[String, Event, SessionResult] {

  // Объявление state при инициализации
  var sessionCount: ValueState[Long] = _
  var sessionStart: ValueState[Long] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    sessionCount = getHandle.getValueState[Long]("count", Encoders.scalaLong)
    sessionStart = getHandle.getValueState[Long]("start", Encoders.scalaLong)
  }

  override def handleInputRows(key: String,
    rows: Iterator[Event], timerValues: TimerValues): Iterator[SessionResult] = {
    // Обновление state и генерация результатов
    ???
  }
}
Подробнее в уроках:

RocksDB State Backend

RocksDB State Store Provider
Термин

Реализация StateStore на базе встроенной RocksDB (LSM-tree, on-disk key-value store), доступная с Spark 3.2. Хранит state на локальном SSD executor-а с LRU page cache в памяти. Поддерживает состояния большого размера (гигабайты на partition) без OOM — только активная часть в памяти. Snapshot-ы реплицируются в checkpoint location (HDFS/S3) через background upload.

Пример:
// Конфигурация RocksDB backend:
spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "org.apache.spark.sql.execution.streaming.state."
  + "RocksDBStateStoreProvider"
)

// Тюнинг RocksDB:
spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB", "128"
)
spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.writeBufferSizeBytes",
  str(2 * 1024 * 1024)  // 2MB write buffer
)
// Метрики: rocksdbNumSstFiles, rocksdbBytesWritten, rocksdbBlockCacheHit
Подробнее в уроках:

Arrow, Spark Connect и расширения

Apache Arrow Columnar Format

Apache Arrow Columnar Format
Термин

In-memory колоночный формат данных, являющийся cross-language стандартом (Java, Python, C++, R). Данные одной колонки хранятся в непрерывном буфере памяти (validity bitmap + value buffer), что обеспечивает SIMD-ускорение и эффективную агрегацию. Zero-copy обмен между JVM (Spark) и Python (pandas/pyarrow) через shared memory — без сериализации.

Пример:
// Включение Arrow для toPandas() и pandas UDF:
spark.conf.set(
  "spark.sql.execution.arrow.pyspark.enabled", "true"
)

# Без Arrow: каждая строка → pickle → Python O(n × row_size)
# С Arrow: RecordBatch → shared memory → zero-copy в Python

import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("double")
def fast_udf(s: pd.Series) -> pd.Series:
    return s * 2.0  # выполняется над колонкой целиком (SIMD)

# Контроль размера batch:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "10000")
Подробнее в уроках:

PyArrow Integration

PyArrow Spark Integration
Термин

Интеграция PyArrow в PySpark для ускорения обмена данными между JVM и Python через Arrow IPC. Используется в: toPandas()/createDataFrame() с Arrow (zero-copy где возможно), pandas_udf (SCALAR, GROUPED_MAP, GROUPED_AGG — колонки обрабатываются как pd.Series/DataFrame), Spark Connect (Arrow Flight для результатов), mapInArrow() для нативной Arrow-обработки без conversion в pandas.

Пример:
import pyarrow as pa
from pyspark.sql.functions import pandas_udf
import pandas as pd

# mapInArrow: обработка данных как ArrowRecordBatch
def process_arrow(batches):
    for batch in batches:  # batch: pyarrow.RecordBatch
        arr = batch.column('amount').to_pylist()
        # нативная Arrow обработка без pandas overhead
        result = pa.array([x * 1.1 for x in arr])
        yield pa.record_batch(
            [result], schema=pa.schema([('amount', pa.float64())])
        )

df.mapInArrow(process_arrow, schema="amount double")
Подробнее в уроках:

Arrow Flight

Arrow Flight Protocol
Термин

High-performance RPC-протокол для передачи Arrow RecordBatch-ей по gRPC-соединению. Использует zero-copy сериализацию Arrow IPC-формата напрямую в gRPC stream. Spark Connect использует Arrow Flight для передачи данных между thin client-ом и Spark-кластером. Производительность на порядок выше стандартных REST/JDBC подходов для объёмных данных.

Пример:
// Arrow Flight в контексте Spark Connect:
// Client (Python/Scala) → gRPC → Spark Connect Server
// Server → выполняет plan → возвращает через Arrow Flight Stream

// Схема взаимодействия:
// 1. client.execute(plan) → gRPC ExecutePlan RPC
// 2. Server: runQuery() → DataFrame
// 3. Server: serialize result → Arrow RecordBatches
// 4. Stream RecordBatches через gRPC → client
// 5. Client: pyarrow.RecordBatch → pandas DataFrame

// Конфигурация Flight endpoint:
spark.conf.set("spark.connect.grpc.binding.port", "15002")
Подробнее в уроках:

Spark Connect

Spark Connect
Термин

Клиент-серверная архитектура Spark (GA в 4.0, preview с 3.4), разделяющая клиентское приложение и Spark-кластер через gRPC + Protocol Buffers. Клиент отправляет логический план как protobuf-сообщение. Сервер десериализует план, выполняет через Catalyst/Tungsten и возвращает результаты через Arrow Flight. Thin client не требует JVM — работает на чистом Python/Go/Rust.

Пример:
# Подключение Spark Connect клиента:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .remote("sc://spark-server:15002") \
    .getOrCreate()

# Тот же DataFrame API:
df = spark.read.parquet("s3://bucket/data")
result = df.filter("amount > 1000").groupBy("region").count()
result.show()

# Преимущества:
# - Client изолирован от cluster classpath конфликтов
# - Несколько клиентов на одном кластере
# - Клиент = любой язык с protobuf-поддержкой
Подробнее в уроках:

SparkSessionExtensions

SparkSession Extensions
Термин

API для расширения Spark без модификации исходного кода. Позволяет инжектировать: optimizer rules (до/после стандартных), parser rules (расширение SQL синтаксиса), analyzer rules, planner strategies (custom физические операторы), post-hoc resolution rules. Реализуется как функция SparkSessionExtensions => Unit и передаётся при построении SparkSession через withExtensions.

Пример:
// Регистрация расширений:
val spark = SparkSession.builder()
  .withExtensions { extensions =>
    // Кастомное optimizer rule:
    extensions.injectOptimizerRule(_ => new MyOptimizerRule)
    // Кастомная planner strategy:
    extensions.injectPlannerStrategy(_ => new MyPlannerStrategy)
    // Кастомный SQL parser:
    extensions.injectParser { (_, parser) =>
      new MyParser(parser)  // chain с дефолтным парсером
    }
    // Кастомный post-hoc analyzer rule:
    extensions.injectPostHocResolutionRule(_ => new MyAnalyzerRule)
  }
  .getOrCreate()
Подробнее в уроках:

Apache Celeborn

Apache Celeborn (Remote Shuffle Service)
Термин

Внешний remote shuffle service (бывший RemoteShuffleService от Alibaba), хранящий shuffle-данные на выделенных worker-нодах, а не на executor-ах. Map task-и push-отправляют данные на Celeborn worker-ы через netty. Reduce task-и читают от Celeborn, не зная, какой executor писал данные. Полностью отвязывает shuffle lifecycle от executor lifecycle — идеально для dynamic allocation на Kubernetes.

Пример:
// Подключение Celeborn:
spark-submit \
  --conf spark.shuffle.manager=\
    org.apache.celeborn.client.spark.SparkShuffleManager \
  --conf spark.celeborn.master.endpoints=celeborn-master:9097 \
  --conf spark.celeborn.client.spark.push.data.timeout=120s \
  --conf spark.celeborn.client.spark.fetch.timeout=120s \
  my_app.py

// Преимущества vs ESS:
// - Горизонтально масштабируется (несколько Celeborn workers)
// - Replication для отказоустойчивости
// - Встроенный push-based shuffle
// - Совместим с K8s без DaemonSet
Подробнее в уроках:

Продвинутые темы выполнения

SortMergeJoin

Sort Merge Join
Термин

Стратегия join в Spark по умолчанию для больших таблиц, когда ни одна не помещается в broadcast threshold. Алгоритм: (1) обе стороны shuffle по join-ключу, (2) обе стороны сортируются по ключу, (3) merge-scan двух отсортированных потоков находит совпадения за O(n+m). Поддерживает все типы join (inner, outer, anti, semi). Требует сортируемых ключей (implements Ordering).

Пример:
-- Принудительное использование SortMergeJoin:
SELECT /*+ MERGE(orders) */ *
FROM orders o JOIN products p ON o.product_id = p.id;

// В DataFrame API:
import org.apache.spark.sql.functions._
orders.join(products.hint("merge"), Seq("product_id"))

// Отключить broadcast для форсирования SMJ:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

// SMJ можно ускорить Bucketing:
// Если обе таблицы bucketed by join key с одинаковым N buckets
// → shuffle пропускается (pre-sorted + pre-shuffled)
Подробнее в уроках:

BroadcastHashJoin

Broadcast Hash Join
Термин

Стратегия join, при которой маленькая таблица (dimension) целиком собирается на driver-е, нарезается на TorrentBroadcast-блоки и рассылается всем executor-ам. Каждый executor строит hash table из broadcast-стороны и probe-сканирует свои partition-ы большой таблицы без shuffle. O(n) для построения hash table + O(m) для probe. Порог — spark.sql.autoBroadcastJoinThreshold (default 10m, max 8GB).

Пример:
// Автоматический broadcast (таблица < threshold):
val result = big.join(small, "id")  // → BroadcastHashJoin

// Явный hint:
val result = big.join(broadcast(small), "id")

// SQL hint:
// SELECT /*+ BROADCAST(d) */ f.*, d.name
// FROM fact f JOIN dim d ON f.dim_id = d.id

// Настройка:
spark.conf.set(
  "spark.sql.autoBroadcastJoinThreshold", "50m"
)
// AQE может динамически upgrade SMJ → BHJ
// если после shuffle reduce-сторона < threshold
Подробнее в уроках:

Executor Lifecycle

Executor Lifecycle
Термин

Жизненный цикл executor-а: регистрация в Driver через CoarseGrainedExecutorBackend → получение LaunchTask сообщений → запуск TaskRunner в thread pool → завершение/убийство task-а → DeregisterExecutor при остановке. Executor heartbeat (spark.executor.heartbeatInterval, default 10s) сигнализирует driver-у о доступности — если heartbeat не приходит в течение spark.network.timeout, driver считает executor мёртвым.

Пример:
// Конфигурация таймаутов executor:
spark.conf.set("spark.executor.heartbeatInterval", "10s")
spark.conf.set("spark.network.timeout", "120s")
// network.timeout должен быть >> heartbeatInterval

// Executor thread pool:
spark.conf.set("spark.executor.cores", "4")  // 4 потока
// ThreadPoolExecutor с N = spark.executor.cores потоками
// Каждый поток = один TaskRunner = один task

// Graceful shutdown:
// Executor завершает текущие tasks, затем останавливается
// Принудительный kill: KillTask message от Driver
Подробнее в уроках:

Shuffle Spill

Shuffle Spill to Disk
Термин

Сброс промежуточных shuffle-данных на диск при нехватке execution memory. ExternalSorter и ExternalAppendOnlyMap используют spill для обработки данных, не вмещающихся в память. Данные частично сортируются и записываются во временные spill-файлы, которые затем merge-сортируются при чтении. Метрики в Spark UI: Shuffle Spill (Memory) и Shuffle Spill (Disk).

Пример:
// Метрики spill в Spark UI → Stage → Task Metrics:
// Shuffle Spill (Memory): объём данных до компрессии
// Shuffle Spill (Disk): объём данных после компрессии на диске
// Spill (Memory) >> Spill (Disk) → высокая сжимаемость

// Предотвращение spill:
spark.conf.set("spark.executor.memory", "16g")  // больше памяти
spark.conf.set("spark.memory.fraction", "0.7")   // больше для execution

// Если spill неизбежен — ускорить:
spark.conf.set(
  "spark.shuffle.spill.compress", "true"
)
spark.conf.set("spark.io.compression.codec", "lz4")  // быстрая компрессия
Подробнее в уроках:

Native Execution Engine

Native Execution Engine (Comet / Gluten / RAPIDS)
Термин

Замена JVM-based Tungsten-движка нативным кодом (C++/Rust) для ускорения CPU-bound операций. Apache DataFusion Comet (Rust + DataFusion) и Apache Gluten (C++ + Velox) реализуют физические операторы Spark на нативном уровне, подключаясь через SparkSessionExtensions. NVIDIA RAPIDS заменяет CPU-операции GPU-ускоренными ядрами.

Пример:
// Включение Apache DataFusion Comet:
spark.conf.set(
  "spark.plugins",
  "org.apache.spark.CometPlugin"
)
spark.conf.set("spark.comet.enabled", "true")
spark.conf.set("spark.comet.exec.all.enabled", "true")

// Включение Gluten (Velox backend):
spark.conf.set(
  "spark.plugins",
  "io.glutenproject.GlutenPlugin"
)
spark.conf.set(
  "spark.gluten.sql.columnar.backend.lib", "velox"
)

// Наблюдение в EXPLAIN: NativeFilter, NativeScan вместо стандартных
Подробнее в уроках:

Kryo Serialization

Kryo Serialization
Термин

Альтернативный сериализатор для Spark (вместо Java-сериализации по умолчанию), значительно более компактный и быстрый. Kryo сериализует объекты в бинарный формат без метаданных класса. Требует регистрации классов для максимальной эффективности. Используется для сериализации RDD-трансформаций (closures), broadcast-переменных и объектов shuffle в RDD API.

Пример:
// Включение Kryo:
spark.conf.set(
  "spark.serializer",
  "org.apache.spark.serializer.KryoSerializer"
)
// Регистрация классов для максимальной компактности:
spark.conf.set(
  "spark.kryo.classesToRegister",
  "com.example.User,com.example.Event"
)
// Или через KryoRegistrator:
class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(classOf[User])
    kryo.register(classOf[Event])
  }
}
spark.conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
Подробнее в уроках:

Off-Heap Memory

Off-Heap Memory
Термин

Память за пределами JVM heap, управляемая напрямую через sun.misc.Unsafe. В Spark используется Tungsten для хранения UnsafeRow объектов и shuffle-буферов без GC-давления. Включается через spark.memory.offHeap.enabled. Также используется для RDD-кеша при StorageLevel.OFF_HEAP. Позволяет работать с данными гигабайтного масштаба без риска Full GC пауз.

Пример:
// Включение off-heap Tungsten:
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "8g")
// off-heap не входит в spark.executor.memory!
// Реальное потребление памяти executor-контейнера:
// = executor.memory + memoryOverhead + offHeap.size

// Кеш в off-heap:
df.persist(StorageLevel.OFF_HEAP)

// ВНИМАНИЕ: off-heap утечки не видны в GC-метриках
// Диагностика: native memory tracking
// -XX:NativeMemoryTracking=summary в extraJavaOptions
Подробнее в уроках:

SparkListener

Spark Listener
Термин

Интерфейс для перехвата событий жизненного цикла Spark-приложения через event bus (LiveListenerBus). Слушатели получают события: SparkListenerJobStart/End, SparkListenerStageCompleted, SparkListenerTaskEnd (с TaskMetrics), SparkListenerExecutorAdded/Removed. Используется для custom monitoring, alerting, cost tracking. Асинхронен — события обрабатываются в отдельном потоке чтобы не блокировать планировщик.

Пример:
// Кастомный SparkListener (Scala):
import org.apache.spark.scheduler._

class CostTrackingListener extends SparkListener {
  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
    val m = event.taskMetrics
    val shuffleWriteBytes = m.shuffleWriteMetrics.bytesWritten
    val gcTime = m.jvmGCTime
    // отправить в метрики систему (Prometheus, Datadog)
    MetricsClient.record("spark.task.shuffle_write", shuffleWriteBytes)
    MetricsClient.record("spark.task.gc_time_ms", gcTime)
  }
}

spark.sparkContext.addSparkListener(new CostTrackingListener())
Подробнее в уроках:

Apache Uniffle

Apache Uniffle (Remote Shuffle Service)
Термин

Ещё один Remote Shuffle Service (от Tencent), конкурент Celeborn. Поддерживает Hadoop YARN, Kubernetes, Standalone. Архитектура: Coordinator (метаданные), ShuffleServer (хранение данных), Client (в executor). Поддерживает несколько storage tier-ов (memory → local SSD → HDFS/S3). Отличается от Celeborn более гибкой системой квот и поддержкой multiple backends.

Пример:
// Подключение Uniffle к Spark:
spark-submit \
  --conf spark.shuffle.manager=\
    org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient \
  --conf rss.coordinator.quorum=uniffle-coordinator:19999 \
  --conf rss.storage.type=MEMORY_LOCALFILE_HDFS \
  my_app.py

// Ключевые отличия от Celeborn:
// - Coordinator отдельный процесс (vs Celeborn master)
// - Поддержка HDFS как storage tier напрямую
// - Более гибкая система quota (по user/queue)
Подробнее в уроках:

Apache Gluten

Apache Gluten (Velox Backend)
Термин

Плагин для Spark, заменяющий JVM физические операторы нативными Velox-операторами (C++ движок от Meta). Подключается как SparkPlugin — перехватывает SparkPlan, конвертирует поддерживаемые операторы в Velox execution tree. Данные передаются между JVM и native через Arrow-совместимые columnar буферы (ColumnarBatch). Поддерживает Parquet/ORC native scan, hash join, aggregate, window functions.

Пример:
// Подключение Gluten (Velox backend):
spark-submit \
  --conf spark.plugins=io.glutenproject.GlutenPlugin \
  --conf spark.gluten.sql.columnar.backend.lib=velox \
  --conf spark.gluten.sql.columnar.forceShuffledHashJoin=true \
  --conf spark.sql.shuffle.partitions=200 \
  my_app.py

// Какие операторы нативны — видно в EXPLAIN:
// VeloxHashAggregateExec вместо HashAggregateExec
// VeloxBroadcastHashJoinExec вместо BroadcastHashJoinExec

// Метрики: veloxToNativeTime, veloxFromNativeTime — overhead конвертации
Подробнее в уроках:

Zero-Copy Transfer

Zero-Copy Data Transfer
Термин

Передача данных между JVM и native/Python окружением без копирования байтов. Реализуется через разделяемую (shared) память: JVM выделяет Arrow-буфер, передаёт указатель Python-процессу через IPC. Python читает данные напрямую из того же адресного пространства (или mapped file) без дополнительного memcpy. В Spark Connect — Arrow Flight Stream, где RecordBatch передаются через gRPC с minimal copy.

Пример:
// Zero-copy toPandas() с Arrow:
spark.conf.set(
  "spark.sql.execution.arrow.pyspark.enabled", "true"
)

# Без Arrow: Row → pickle(bytes) → Python deserialize → object
# С Arrow: JVM allocates ArrowRecordBatch
#          → off-heap buffer
#          → Python pyarrow.RecordBatch wraps same memory
#          → pandas.DataFrame.from_pandas (column views)

# Измерение разницы:
import time
start = time.time()
pdf = df.toPandas()  # с Arrow: ~10-50x быстрее
print(f'toPandas: {time.time()-start:.2f}s')
Подробнее в уроках:

DataSource V2 API

DataSource V2 / Table API
Термин

Расширенный API для написания кастомных источников данных и сinks в Spark (с Spark 3.x как стабильный). Разделяет concerns: TableCatalog (метаданные), Table (схема + capabilities), ScanBuilder (построение плана чтения), WriteBuilder (план записи). Поддерживает pushdown фильтров, projection pruning, partition pruning и Statistics provider для Catalyst. Основа для Iceberg, Delta Lake, Hudi коннекторов.

Пример:
// Структура DataSource V2 коннектора:
// 1. TableCatalog — регистрирует таблицы в Catalog
// 2. Table — возвращает Schema + Set[TableCapability]
// 3. SupportsRead → newScanBuilder()
// 4. ScanBuilder → build() → Scan
// 5. Scan → toBatch() → Batch
// 6. Batch → planInputPartitions() → InputPartition[]
// 7. PartitionReaderFactory → createReader(partition)

// Capabilities: BATCH_READ, BATCH_WRITE,
// OVERWRITE_BY_FILTER, TRUNCATE, V1_BATCH_WRITE

// Регистрация через SparkSessionExtensions или
// spark.sql.catalog.<name>=<CatalogImpl>
Подробнее в уроках:

RAPIDS GPU Acceleration

NVIDIA RAPIDS Accelerator for Apache Spark
Термин

Плагин для Spark, заменяющий CPU-исполнение физических операторов на GPU-ускоренные ядра через cuDF библиотеку. Подключается через SparkPlugin API и перехватывает физические операторы Catalyst (scan, filter, project, hash join, sort, aggregate). Данные хранятся в GPU-памяти в Arrow-совместимом формате, что минимизирует PCIe-передачи. Ускорение 10-100x для vectorized операций.

Пример:
spark-submit \
  --conf spark.plugins=com.nvidia.spark.SQLPlugin \
  --conf spark.rapids.sql.enabled=true \
  --conf spark.rapids.memory.gpu.pooling.enabled=true \
  --conf spark.rapids.memory.gpu.pool.size=16G \
  --conf spark.executor.resource.gpu.amount=1 \
  --conf spark.task.resource.gpu.amount=0.25 \
  my_app.py

# Мониторинг GPU utilization:
# nvidia-smi dmon -s u  # GPU utilization per second
# Spark UI → SQL → "GPU" метрики в операторах
Подробнее в уроках: