Справочник ключевых терминов курса Apache Spark Internals.
Фундаментальная абстракция Spark — неизменяемая распределённая коллекция объектов, описываемая пятью свойствами: список partition-ов, функция вычисления каждого partition-а, список зависимостей от других RDD, опциональный Partitioner (только для key-value RDD) и опциональный список preferred locations для каждого partition-а. RDD материализуется только при вызове action.
// Пять свойств RDD в исходниках Spark:
// abstract class RDD[T] {
// def getPartitions: Array[Partition]
// def compute(split: Partition, ctx: TaskContext): Iterator[T]
// def getDependencies: Seq[Dependency[_]]
// val partitioner: Option[Partitioner]
// def getPreferredLocations(split: Partition): Seq[String]
// }
val rdd = sc.textFile("hdfs:///data/*.log") // HadoopRDD
val mapped = rdd.map(line => line.split(",")) // MapPartitionsRDD
val filtered = mapped.filter(_.length > 3) // MapPartitionsRDD
// Граф зависимостей создан, данных ещё нетДва типа stage в Spark. ShuffleMapStage — промежуточный stage, task-и которого записывают shuffle-выходы для следующего stage. Завершается регистрацией MapStatus в MapOutputTracker. ResultStage — финальный stage, task-и которого возвращают результат напрямую driver-у (через ResultHandler) или записывают в sink. Каждый ResultStage завершает один Job. DAGScheduler создаёт ShuffleMapStage для каждой ShuffleDependency.
// Визуализация в Spark UI:
// Job X:
// Stage 0 (ShuffleMapStage): 200 tasks → shuffle write
// Stage 1 (ShuffleMapStage): 200 tasks → shuffle write
// Stage 2 (ResultStage): 100 tasks → write to parquet
// В логах DAGScheduler:
// INFO DAGScheduler: Submitting ShuffleMapStage 0 (200 tasks)
// INFO DAGScheduler: ShuffleMapStage 0 finished in 12.3 s
// INFO DAGScheduler: Submitting ResultStage 2 (100 tasks)
// Ключевое: если ShuffleMapStage упал частично
// → пересчитываются только потерянные map outputs
// → ResultStage перезапускается полностьюНаправленный ациклический граф зависимостей между RDD, запоминающий полную цепочку трансформаций от источника данных до результата. Lineage — механизм отказоустойчивости: при потере partition-а Spark восстанавливает его, повторно выполняя соответствующую ветку lineage, а не сохраняя данные на диск. toDebugString() возвращает человекочитаемое представление lineage.
val rdd = sc.textFile("/data")
.map(_.split(","))
.filter(_(2).toInt > 100)
.map(arr => (arr(0), arr(1).toLong))
.reduceByKey(_ + _)
// Печать lineage
println(rdd.toDebugString)
// (8) ShuffledRDD[5]
// +-(8) MapPartitionsRDD[4]
// | MapPartitionsRDD[3]
// | MapPartitionsRDD[2]
// | MapPartitionsRDD[1]
// | /data HadoopRDD[0]Тип зависимости RDD, при котором каждый partition дочернего RDD зависит от одного (или фиксированного числа) partition-ов родительского RDD. Не требует shuffle. Примеры: map, filter, flatMap, union. Узкие зависимости позволяют конвейеризировать вычисления в одном task-е и локально восстанавливать потерянные partition-ы.
// Narrow: каждый выходной partition ← один входной
val rdd2 = rdd1.map(x => x * 2) // MapPartitionsRDD: narrow
val rdd3 = rdd2.filter(_ > 5) // MapPartitionsRDD: narrow
val rdd4 = rdd1.union(rdd2) // UnionRDD: narrow
// Конвейеризация: map + filter выполняются в одном task
// без записи промежуточных данных на диск
// Восстановление: потеря partition i в rdd3
// → пересчитать только partition i rdd1 → rdd2 → rdd3Тип зависимости RDD, при котором каждый partition дочернего RDD зависит от множества partition-ов родительского. Требует shuffle — полного перераспределения данных между executor-ами через запись на диск и передачу по сети. DAGScheduler разрезает граф на stage-и по границам широких зависимостей. Примеры: groupByKey, reduceByKey, join с разными partitioner-ами.
// Wide: каждый выходной partition ← много входных
val counts = rdd.groupByKey() // ShuffledRDD: wide
val joined = rdd1.join(rdd2) // CoGroupedRDD: wide
// DAGScheduler создаёт stage-границу здесь:
// Stage 0: [upstream ops] → shuffle write
// Stage 1: shuffle read → [downstream ops]
// Восстановление wide dependency дорогостоящее:
// потеря partition требует повторного shuffle из Stage 0Объект, определяющий, в какой partition попадёт элемент при shuffle. Два встроенных: HashPartitioner (ключ → partition по key.hashCode % numPartitions) и RangePartitioner (сортирует ключи по диапазонам, эффективен для sortByKey). Custom Partitioner позволяет реализовать domain-специфичную логику распределения, избегая data skew.
import org.apache.spark.HashPartitioner
import org.apache.spark.RangePartitioner
// HashPartitioner — по умолчанию для groupByKey/reduceByKey
val partitioned = rdd.partitionBy(new HashPartitioner(100))
// RangePartitioner — для sortByKey
val rangePart = new RangePartitioner(100, rdd) // сэмплирует данные!
val sorted = rdd.sortByKey() // использует RangePartitioner
// Custom Partitioner
class DomainPartitioner(n: Int) extends Partitioner {
def numPartitions: Int = n
def getPartition(key: Any): Int =
Math.abs(key.hashCode) % n // или своя логика
}Компонент Spark, преобразующий RDD-граф в физический план выполнения. При вызове action DAGScheduler обходит граф зависимостей в обратном направлении, выявляет ShuffleDependency-границы и разрезает граф на stage-и. Для каждого stage создаёт ShuffleMapStage (результат shuffle write) или ResultStage (финальная стадия). Направляет готовые stage-и в TaskScheduler.
// DAGScheduler не вызывается напрямую, но его поведение видно через:
// 1. Spark UI: вкладка Jobs → stages breakdown
// 2. Логи: [DAGScheduler] Submitting ShuffleMapStage 0
// 3. eventLog: SparkListenerStageSubmitted
// Ключевые методы (Scala internals):
// DAGScheduler.handleJobSubmitted() → создаёт finalStage
// DAGScheduler.getOrCreateShuffleMapStage() → для wide deps
// DAGScheduler.submitStage() → проверяет готовность родителейКомпонент Spark, принимающий TaskSet от DAGScheduler и распределяющий task-и по executor-ам с учётом локальности данных. TaskSchedulerImpl реализует delay scheduling — ждёт освобождения executor-а с нужной локальностью определённое время перед переходом к менее локальному. Взаимодействует с cluster manager через SchedulerBackend.
// Конфигурация задержки для delay scheduling:
spark.conf.set("spark.locality.wait", "3s") // общее ожидание
spark.conf.set("spark.locality.wait.node", "3s") // NODE_LOCAL
spark.conf.set("spark.locality.wait.rack", "3s") // RACK_LOCAL
// PROCESS_LOCAL → NODE_LOCAL → RACK_LOCAL → ANY
// Метрики в Spark UI → Stages → Task Metrics:
// Locality Level: PROCESS_LOCAL/NODE_LOCAL/RACK_LOCAL/ANY
// Наличие только ANY-задач → проблема с локальностьюВнутренний компонент TaskScheduler, управляющий одним TaskSet (набором task-ов одного stage). Отвечает за: отслеживание статуса каждого task-а, повторный запуск упавших task-ов (до spark.task.maxFailures попыток), обнаружение и запуск speculative task-ов для straggler-ов, вычисление уровня локальности для каждого pending task-а.
// Настройки TaskSetManager:
spark.conf.set("spark.task.maxFailures", "4") // макс. попыток
spark.conf.set(
"spark.speculation", "true"
) // включить speculative execution
spark.conf.set(
"spark.speculation.multiplier", "1.5"
) // задача-straggler: >1.5x медианы
spark.conf.set(
"spark.speculation.quantile", "0.75"
) // ждать 75% завершённых задач
// В логах:
// TaskSetManager: Starting task 5.0 in stage 2.0 (TID 15)
// TaskSetManager: Lost task 5.0 in stage 2.0 — retrying (1/4)Принцип выполнения task-а как можно ближе к данным, которые он обрабатывает. Уровни приоритета (от лучшего к худшему): PROCESS_LOCAL (данные в памяти того же JVM), NODE_LOCAL (данные на том же узле), NO_PREF (нет предпочтений), RACK_LOCAL (тот же rack), ANY (любой узел). Delay scheduling задерживает отправку task-а ради лучшего уровня локальности.
// Проверка локальности в Spark UI → Stage → Tasks:
// PROCESS_LOCAL: кешированный RDD/DataFrame в памяти executor
// NODE_LOCAL: блок HDFS на том же узле, executor не кеширует
// RACK_LOCAL: блок HDFS на другом узле того же rack
// ANY: данные на другом rack или object storage (S3/GCS)
// Для S3/GCS всегда ANY — нет данных на executor-нодах
// Оптимизация: увеличить cache, уменьшить locality.wait для S3
spark.conf.set("spark.locality.wait", "0s") // для object storageМеханизм запуска дублирующих копий медленных (straggler) task-ов на других executor-ах. TaskSetManager определяет straggler, если его время выполнения превышает spark.speculation.multiplier × медианное время завершённых task-ов. Первый завершившийся результат используется, дубликат отменяется. Помогает при неравномерной нагрузке на узлы, но увеличивает потребление ресурсов.
// Включение speculative execution
spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.interval", "100ms") // частота проверки
spark.conf.set("spark.speculation.multiplier", "1.5") // порог straggler
spark.conf.set("spark.speculation.quantile", "0.75") // % завершённых задач
// В Spark UI → Stage → Tasks:
// task с (speculative) в поле Index — это дубликат
// Успех первого завершённого убивает второй
// ОСТОРОЖНО: non-idempotent задачи могут давать двойные записи!Механизм автоматического масштабирования числа executor-ов в зависимости от нагрузки. При наличии pending task-ов executor-ы добавляются (экспоненциально: 1→2→4→8). Простаивающие executor-ы удаляются после spark.dynamicAllocation.executorIdleTimeout. Требует External Shuffle Service или shuffle tracking (Kubernetes) для сохранения shuffle-данных после удаления executor-а.
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "2")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "100")
spark.conf.set("spark.dynamicAllocation.initialExecutors", "5")
spark.conf.set("spark.dynamicAllocation.executorIdleTimeout", "60s")
// Для Kubernetes (без External Shuffle Service):
spark.conf.set(
"spark.dynamicAllocation.shuffleTracking.enabled", "true"
)
// Spark отслеживает, какой executor держит нужные shuffle-блокиПлагинный интерфейс Spark для управления shuffle-операциями. Производится регистрация через spark.shuffle.manager. Стандартная реализация — SortShuffleManager (с Spark 1.6). Предоставляет ShuffleWriter (для map-стороны) и ShuffleReader (для reduce-стороны). Позволяет заменить встроенный shuffle внешним сервисом (Celeborn, Uniffle).
// По умолчанию:
// spark.shuffle.manager = sort (SortShuffleManager)
// Замена на Celeborn:
// --conf spark.shuffle.manager=
// org.apache.celeborn.client.spark.SparkShuffleManager
// Внутренние методы ShuffleManager:
// registerShuffle() → создаёт ShuffleHandle при начале shuffle
// getWriter() → ShuffleWriter для map-задачи
// getReader() → ShuffleReader для reduce-задачи
// unregisterShuffle() → очистка после завершения заданияСтандартный ShuffleManager в Spark. Реализует три пути записи: BypassMergeSortShuffleWriter (нет combiner, <200 partition-ов — один файл на partition без сортировки), UnsafeShuffleWriter (Tungsten-сериализация, сортировка по 64-bit encoded partition+key, merge), SortShuffleWriter (универсальный — ExternalSorter, поддерживает combiner и spill). Каждый map task создаёт один data-файл и один index-файл.
// Три пути SortShuffleManager:
// 1. BypassMergeSort: нет агрегации И partition < 200
// → spark.shuffle.sort.bypassMergeThreshold = 200
// 2. UnsafeShuffle: serializedSorting поддерживается кодеком
// И нет агрегации И partition ≤ 16777216 (2^24)
// 3. SortShuffle: все остальные случаи (включая combiner)
// Результат всегда: один .data + один .index файл
// .index: массив кумулятивных смещений (8 байт × numPartitions+1)
// .data: конкатенация partition-блоков в порядке partition IDDistributed registry, хранящий информацию о расположении shuffle-выходов (map outputs). Driver-сторона — MapOutputTrackerMaster — принимает регистрации от завершённых map task-ов. Executor-сторона — MapOutputTrackerWorker — запрашивает у Driver информацию о расположении нужных блоков перед чтением shuffle. Кеширует ответы локально для уменьшения RPC-нагрузки на Driver.
// Поток данных MapOutputTracker:
// 1. Map task завершён → executor отправляет
// MapStatus(blockManagerId, sizes[]) → Driver
// 2. Driver сохраняет в MapOutputTrackerMaster
// 3. Reduce task стартует → запрашивает у Driver:
// GetMapOutputStatuses(shuffleId)
// 4. Driver отвечает сериализованным MapStatus[]
// 5. Executor распаковывает → знает, где читать каждый блок
// Мониторинг: spark.reducer.maxSizeInFlight контролирует
// параллельное чтение shuffle-блоковОптимизация shuffle в Spark 3.2+, при которой map task-и активно (push) отправляют свои данные на merge-сервисы External Shuffle Service сразу после записи, не дожидаясь reduce-стадии. Merge-сервис объединяет блоки от разных map task-ов для одного reduce partition-а в один merged file. Снижает число random read при чтении reduce-стороны.
// Включение push-based shuffle
spark.conf.set("spark.shuffle.push.enabled", "true")
spark.conf.set(
"spark.shuffle.push.minShuffleSizeToWait", "500m"
) // минимальный размер для активации push
spark.conf.set(
"spark.shuffle.push.maxBlockSizeToPush", "1m"
) // max размер блока для push
// Требует External Shuffle Service на каждом узле
// Метрики: remote merged fetches vs. remote fetches
// Spark UI → Stage → Task Metrics → Shuffle ReadОтдельный процесс на каждом worker-узле кластера, хранящий shuffle-данные вне executor JVM. Позволяет удалять executor-ы при dynamic allocation без потери их shuffle-выходов. Реализован как долгоживущий сервис (spark-external-shuffle-service), запускаемый вместе с NodeManager (YARN) или DaemonSet (Kubernetes). Обязателен для dynamic allocation в YARN-кластерах.
# YARN: ESS встроен в NodeManager
spark.conf.set("spark.shuffle.service.enabled", "true")
spark.conf.set("spark.shuffle.service.port", "7337")
# Kubernetes: требует sidecar или DaemonSet
spark.conf.set("spark.shuffle.service.enabled", "true")
spark.conf.set(
"spark.kubernetes.shuffle.namespace", "spark-shuffle"
)
# Без ESS на K8s — shuffle tracking:
spark.conf.set(
"spark.dynamicAllocation.shuffleTracking.enabled", "true"
)Компонент SortShuffleWriter, реализующий sort-based shuffle с поддержкой spill-на-диск. Буферизует записи в PartitionedAppendOnlyMap (если есть combiner) или PartitionedPairBuffer (без combiner) до достижения memory threshold, затем делает частичный spill во временный файл. По завершении все spill-файлы merge-сортируются в один итоговый shuffle-файл. Поддерживает Aggregator для partial aggregation до shuffle.
// ExternalSorter активируется в SortShuffleWriter:
// 1. insertAll() — добавляет записи с combiner (reduceByKey) или без
// 2. При нехватке памяти → maybeSpillCollection() → spill на диск
// 3. writePartitionedMapOutput() → merge-сортировка + единый .data файл
// Управление памятью spill:
spark.conf.set(
"spark.shuffle.spill.compress", "true"
) // сжатие spill-файлов
spark.conf.set(
"spark.shuffle.sort.bypassMergeThreshold", "200"
) // избежать sorter для < 200 partitionМенеджер памяти Spark (с версии 1.6), управляющий единым пулом памяти executor-а. Делит доступную память (spark.executor.memory × spark.memory.fraction, default 0.6) на два региона: execution memory (shuffle, sort, join, aggregation) и storage memory (кеш RDD/DataFrame). Регионы динамически делят одну границу — execution может вытеснять storage при нехватке, storage возвращается только при освобождении execution.
// Расчёт регионов памяти:
// executor.memory = 8g
// memory.fraction = 0.6 → 4.8g для UMM
// memory.storageFraction = 0.5 → 2.4g для storage (не вытесняется)
// Оставшиеся 40% (3.2g) — reserved для user code, overhead
spark.conf.set("spark.memory.fraction", "0.6")
spark.conf.set("spark.memory.storageFraction", "0.5")
// Мониторинг в Spark UI → Executors:
// Storage Memory: X.X GiB / Y.Y GiB
// Включить детали: /metrics/json endpointРаспределённая система управления блоками данных в Spark. Каждый executor (и driver) имеет BlockManager, который хранит данные в памяти (MemoryStore) или на диске (DiskStore) и отвечает на запросы от других executor-ов. BlockManagerMaster на driver-е хранит метаданные о расположении всех блоков. BlockId идентифицирует блок: RDDBlockId, ShuffleBlockId, BroadcastBlockId и др.
// Типы BlockId:
// RDDBlockId(rddId, partitionId) → кеш RDD
// ShuffleBlockId(shuffleId, mapId, reduceId) → shuffle данные
// BroadcastBlockId(broadcastId) → broadcast переменная
// TempShuffleBlockId(uuid) → временный shuffle блок
// Чтение блока в порядке приоритета:
// 1. Локальная память (MemoryStore)
// 2. Локальный диск (DiskStore)
// 3. Remote BlockManager (по сети через BlockTransferService)
// 4. Пересчёт через lineage (для RDD-блоков)Компонент BlockManager, хранящий блоки данных в JVM heap или off-heap памяти. Поддерживает два формата: десериализованные Java-объекты (быстрый доступ, больший размер) и сериализованные байтовые буферы (компактнее, медленнее). При нехватке памяти вытесняет блоки по LRU-политике — сначала в DiskStore (если StorageLevel предусматривает disk), затем удаляет.
// StorageLevel контролирует поведение MemoryStore:
df.persist(StorageLevel.MEMORY_ONLY) // только heap, без disk
df.persist(StorageLevel.MEMORY_ONLY_SER) // heap + сериализация
df.persist(StorageLevel.MEMORY_AND_DISK) // heap → spill на диск
df.persist(StorageLevel.OFF_HEAP) // off-heap (Tungsten)
// Мониторинг вытеснений:
// Spark UI → Storage → RDD Partitions:
// Cached In Memory: X, Cached On Disk: Y, Size in Memory: Z
// Если много Cached On Disk → memory pressure, увеличь executor.memoryКомпонент BlockManager, записывающий блоки данных на локальный диск executor-а при нехватке памяти или использовании StorageLevel с disk-составляющей. Блоки хранятся в директории spark.local.dir в виде файлов, именованных по BlockId. DiskStore использует DiskBlockManager для маппинга BlockId → путь к файлу. Поддерживает шифрование блоков через spark.io.encryption.enabled.
// Настройка DiskStore:
spark.conf.set(
"spark.local.dir", "/fast-ssd/spark-temp,/hdd/spark-temp"
) // несколько директорий для striping
// StorageLevel с disk fallback:
df.persist(StorageLevel.MEMORY_AND_DISK) // → DiskStore при вытеснении
df.persist(StorageLevel.DISK_ONLY) // → только DiskStore
// Блоки на диске шифруются если:
spark.conf.set("spark.io.encryption.enabled", "true")
spark.conf.set("spark.io.encryption.keySizeBits", "128")
// Mониторинг: Spark UI → Executors → Disk UsedРеализация broadcast-переменных в Spark по BitTorrent-протоколу. Driver разбивает broadcast-значение на блоки (spark.broadcast.blockSize, default 4m) и хранит их в своём BlockManager. Executor-ы скачивают блоки с driver-а (или от других executor-ов, уже получивших блоки) параллельно. Это снижает нагрузку на driver при большом числе executor-ов по сравнению с наивной рассылкой от одного источника.
// Создание broadcast переменной
val lookup = spark.sparkContext.broadcast(
Map("US" -> "United States", "DE" -> "Germany")
) // → TorrentBroadcast: нарезается на блоки, хранится в driver BlockManager
// Использование на executor-ах
val result = rdd.map(row => lookup.value.getOrElse(row.country, "Unknown"))
// lookup.value → BlockManager читает локальный кеш или
// тянет блоки у соседних executor-ов P2P
// Освобождение
lookup.unpersist() // удалить с executor-ов, driver сохраняет
lookup.destroy() // удалить полностьюДекларативное дерево операций в Catalyst, описывающее «что» вычислить, без физической реализации. Проходит три стадии: UnresolvedLogicalPlan (выражения могут ссылаться на несуществующие колонки), AnalyzedLogicalPlan (все ссылки разрешены через Catalog), OptimizedLogicalPlan (применены rule-based оптимизации: predicate pushdown, column pruning, constant folding, join reordering). Каждый узел — TreeNode с методом transform для pattern matching.
// Просмотр всех стадий logical plan:
val df = spark.read.parquet("/sales")
.filter("amount > 100")
.groupBy("region").sum("amount")
val qe = df.queryExecution
println(qe.logical) // unresolved: UnresolvedAttribute('amount')
println(qe.analyzed) // resolved: amount#5 LongType
println(qe.optimizedPlan) // optimized: predicate pushdown применён
// PySpark:
# df._jdf.queryExecution().logical()
# df._jdf.queryExecution().analyzed()
# df._jdf.queryExecution().optimizedPlan()Исполняемое дерево SparkPlan-операторов, определяющее конкретный способ выполнения запроса. Physical Planning преобразует OptimizedLogicalPlan в SparkPlan через набор Strategy-объектов (JoinSelection, BasicOperators, Aggregation и др.). Каждый PhysicalOperator реализует execute() → RDD[InternalRow]. CostBasedJoinReorder выбирает оптимальный порядок join-ов при наличии column statistics.
// Просмотр физического плана:
df.explain() // краткий
df.explain(True) // расширенный с codegen stage
df.explain('cost') // с оценками стоимости
// Scala:
println(df.queryExecution.sparkPlan) // до codegen
println(df.queryExecution.executedPlan) // после codegen
// Ключевые физические операторы:
// BroadcastHashJoin vs SortMergeJoin vs ShuffledHashJoin
// HashAggregate (с partial + final стадиями)
// SortExec (с spill support)
// Exchange (shuffle boundary)Четырёхфазный pipeline оптимизации запросов: (1) Analysis — разрешение ссылок на таблицы/колонки через Catalog; (2) Logical Optimization — применение rule-based правил (предикат pushdown, column pruning, constant folding, join reordering); (3) Physical Planning — выбор физических стратегий (BroadcastHashJoin vs SortMergeJoin); (4) Code Generation — генерация Java bytecode через Janino. Правила применяются итерационно до фиксированной точки.
// Просмотр всех фаз через DataFrame API:
import org.apache.spark.sql.execution.debug._
df.debugCodegen() // распечатать сгенерированный код
// Через queryExecution:
val qe = df.queryExecution
println(qe.logical) // unresolved logical plan
println(qe.analyzed) // resolved logical plan
println(qe.optimizedPlan) // optimized logical plan
println(qe.sparkPlan) // physical plan (before codegen)
println(qe.executedPlan) // final plan с CodeGenStageПравило трансформации логического плана, реализующее один шаг оптимизации. Наследует Rule[LogicalPlan] и реализует метод apply(plan). Применяется через pattern matching к узлам дерева плана. Примеры встроенных правил: PushDownPredicate, ColumnPruning, ConstantFolding, ReorderJoin, EliminateSorts. SparkSessionExtensions позволяет добавлять custom правила.
// Структура optimizer rule (Scala):
class MyRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan =
plan transform {
// Pattern matching на узлах плана
case Filter(Always True, child) => child
case Project(cols, child)
if cols == child.output => child
}
}
// Регистрация кастомного правила:
spark.experimental.extraOptimizations =
Seq(new MyRule)
// Или через SparkSessionExtensions:
.withExtensions(_.injectOptimizerRule(_ => new MyRule))Техника генерации единого Java-метода для обработки строки данных через весь pipeline физических операторов. Вместо виртуальных вызовов между операторами Volcano-модели (один вызов next() на строку на оператор) генерируется один tight loop с inlined-логикой всех операторов. Устраняет overhead виртуальных вызовов, улучшает CPU instruction cache locality и позволяет JIT-компилятору сильнее оптимизировать код.
// Звёздочка (*) в EXPLAIN = CodeGenStage boundary:
// *(1) HashAggregate(keys=[dept], functions=[sum(salary)])
// +- Exchange hashpartitioning(dept, 200)
// +- *(1) HashAggregate(keys=[dept], ...)
// +- *(1) Filter (active = true)
// +- *(1) ColumnarToRow
// +- Scan parquet [...]
// Все *(1)-операторы скомпилированы в один метод!
// Exchange — граница, начинается новый CodeGenStage *(2)
// Отладка сгенерированного кода:
df.queryExecution.debug.codegen()Компактный бинарный формат строки данных Tungsten, хранящий данные в непрерывном массиве байт (on-heap или off-heap). Структура: null bitmap (ceil(numFields/64)×8 байт), fixed-length значения (8 байт каждое — Int хранится как long), variable-length данные (строки, массивы) с offset+length в fixed-части. Поддерживает O(1) доступ к полю без десериализации и binary comparison без декодирования.
// Структура UnsafeRow для (id: INT, name: STRING, age: INT):
// Offset 0: null bitmap (8 байт, 3 поля → 1 блок)
// Offset 8: id value (8 байт, long-aligned)
// Offset 16: name offset+length (8 байт: [offset:32bit][len:32bit])
// Offset 24: age value (8 байт)
// Offset 32: name bytes (variable-length, выровнено до 8 байт)
// Использование в Spark:
// Все операции shuffle, sort, join работают с UnsafeRow
// Не нужна десериализация для сравнения ключей:
// UnsafeRowUtils.compare(row1, row2, keyFields)
// Размер можно смотреть в Spark UI: shuffle write bytes / num recordsМеханизм сериализации/десериализации между JVM-объектами и внутренним InternalRow / UnsafeRow форматом Spark. Encoders генерируются через Catalyst кодогенерацию для конкретного типа T: ExpressionEncoder[T] содержит сериализатор (JVM → InternalRow) и десериализатор (InternalRow → JVM). Используется в Dataset[T] API для типобезопасных операций.
// Scala: неявные encoder-ы
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
case class User(id: Long, name: String, age: Int)
// Автоматически через implicit:
val ds = spark.createDataset(Seq(User(1, "Alice", 30)))
// Явно:
val enc = Encoders.product[User]
val exprEnc = enc.asInstanceOf[ExpressionEncoder[User]]
// Encoder кодогенерируется один раз и переиспользуется
// PySpark: PickleSerializer или ArrowSerializer (pandas UDF)
import pyspark.sql.functions as FОптимизация AQE в Spark 3.3+: после вычисления одной стороны join-а Spark строит Bloom filter или In-filter и инжектирует его в scan-оператор другой стороны. Это позволяет отфильтровать ненужные строки до shuffle или даже на уровне storage. Работает для inner join, semi join, right outer join (для левой стороны). Bloom filter хранится в памяти driver-а.
// Включение runtime фильтров:
spark.conf.set(
"spark.sql.optimizer.runtimeFilter.enabled", "true"
)
spark.conf.set(
"spark.sql.optimizer.runtimeFilter.creationSideThreshold",
"10m" // строить filter только для сторон < 10MB
)
// В EXPLAIN:
// Filter (isnotnull(id) AND might_contain(bf, xxhash64(id)))
// might_contain → bloom filter probe
// Bloom filter размер:
spark.conf.set(
"spark.sql.optimizer.runtimeFilter.number.bloomFilter.bits",
"8" // bits per item
)Фреймворк динамической re-оптимизации плана выполнения в Spark 3.x на основе runtime-статистик, собранных после каждого shuffle. После завершения shuffle-стадии AQE получает точные размеры partition-ов и перепланирует оставшуюся часть DAG: объединяет мелкие partition-ы (coalesce), реагирует на data skew (skew join), заменяет SortMergeJoin на BroadcastHashJoin если данные оказались меньше порога.
// Включение AQE (по умолчанию с Spark 3.2):
spark.conf.set("spark.sql.adaptive.enabled", "true")
// Три основных оптимизации AQE:
// 1. Coalesce shuffle partitions
spark.conf.set(
"spark.sql.adaptive.coalescePartitions.enabled", "true"
)
spark.conf.set(
"spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m"
)
// 2. Skew join detection & splitting
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
// 3. Runtime SortMergeJoin → BroadcastHashJoin
// Автоматически если размер < autoBroadcastJoinThresholdОптимизация AQE: после завершения shuffle-стадии AQE собирает точные размеры всех partition-ов и объединяет мелкие смежные partition-ы в один, стремясь к targetSize (advisoryPartitionSizeInBytes). Это устраняет проблему пустых/крошечных partition-ов при фиксированном spark.sql.shuffle.partitions=200 и снижает overhead планировщика.
spark.conf.set(
"spark.sql.adaptive.coalescePartitions.enabled", "true"
)
spark.conf.set(
"spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m"
)
spark.conf.set(
"spark.sql.adaptive.coalescePartitions.minPartitionSize", "1m"
)
// minPartitionSize: нижняя граница — не объединять до мельче этого
// Результат: вместо 200 пустых partition-ов после join маленькой таблицы
// AQE может создать 3-5 реальных partition-ов
// Наблюдать в Spark UI → Stage Details → "Custom Metrics: numCoalescedPartitions"Оптимизация AQE для обработки data skew в join-операциях. AQE определяет skewed partition как тот, чей размер превышает max(skewedPartitionFactor × median, skewedPartitionThresholdInBytes). Skewed partition дробится на несколько подзадач, соответствующая часть другой стороны join дублируется для каждой подзадачи. Прозрачно для пользователя.
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set(
"spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5"
) // размер > 5x медианы = skew
spark.conf.set(
"spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes",
"256m"
) // И > 256mb
// Наблюдение в Spark UI → SQL → DAG:
// SortMergeJoin (skew) → появляется при активации
// "number of skewed partitions" в Custom Metrics
// Ограничение: работает только с SortMergeJoin!
// BroadcastHashJoin skew не лечится AQE automaticalyОптимизация Spark 3.x: фильтр, примененный к dimension-стороне join-а, динамически транслируется в subquery и применяется к сканированию fact-таблицы на уровне partition-ов. DPP активируется для star-schema паттернов: broadcast join dimension → dynamically filter fact. Реализуется через вставку DynamicPruningExpression в план сканирования fact-таблицы.
-- DPP автоматически применяется:
SELECT f.sale_amount, d.region_name
FROM fact_sales f
JOIN dim_region d ON f.region_id = d.id
WHERE d.country = 'DE';
-- Spark вставляет в план fact-таблицы:
-- Filter region_id IN (
-- DynamicPruning: SELECT id FROM dim_region WHERE country='DE'
-- )
-- → читает только partition-ы с немецкими region_id
-- Конфиги:
spark.conf.set(
"spark.sql.optimizer.dynamicPartitionPruning.enabled", "true"
)Основной режим выполнения Structured Streaming: MicroBatchExecution периодически опрашивает source на наличие новых данных (до latestOffset), запускает IncrementalExecution для обработки нового batch-а, коммитит offset и повторяет. Каждый micro-batch — полноценный Spark job с планированием задач. Гарантирует exactly-once через checkpoint + idempotent sink.
// MicroBatchExecution поток:
// 1. source.latestOffset() → получить новые offsets
// 2. source.getBatch(start, end) → DataFrame с новыми данными
// 3. IncrementalExecution.executedPlan → оптимизированный план
// 4. query.runBatch() → запуск как обычный Spark job
// 5. sink.addBatch(batchId, data) → запись результата
// 6. offsetLog.add(batchId, offsets) → checkpoint offsets
// 7. commitLog.add(batchId) → checkpoint commit
// Просмотр прогресса:
query.lastProgress // метрики последнего micro-batch
query.recentProgress // история последних N micro-batchВерсионированное key-value хранилище для stateful-операций Structured Streaming. Каждый stateful оператор (агрегация, дедупликация, stream-stream join) использует отдельный StateStore. Реализации: HDFSBackedStateStoreProvider (default, хранит snapshot + delta-файлы в HDFS/S3) и RocksDBStateStoreProvider (Spark 3.2+, on-disk LSM tree на executor, рекомендуется для больших состояний).
// HDFSBackedStateStore — по умолчанию:
// Хранит в checkpointLocation/state/<operatorId>/<partitionId>/
// Snapshot каждые N commit-ов, дельты между снапшотами
// Нагружает память executor (держит всё состояние в HashMap)
// RocksDB — для больших состояний:
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state."
+ "RocksDBStateStoreProvider"
)
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.compactOnCommit", "false"
)
// Метрики: stateRows, stateMemory, loadedMapCacheHitCountСпециализированный QueryExecution для Structured Streaming, добавляющий stateful операторы в физический план. Каждый micro-batch использует один и тот же оптимизированный план, но с обновлёнными ссылками на текущий batch данных. Вставляет StateStoreRestoreExec и StateStoreSaveExec узлы в план для операций с состоянием, передаёт batchId и watermark в эти операторы.
// IncrementalExecution добавляет в план:
// - StateStoreRestoreExec: читает предыдущее состояние из StateStore
// - HashAggregateExec: обрабатывает строки текущего batch
// - StateStoreSaveExec: сохраняет обновлённое состояние
// - WatermarkEvalExec: применяет eviction по watermark
// Просмотр плана streaming-запроса:
query.explain(extended=True)
// или через:
// spark.streams.active.head.explain()Двухступенчатый механизм exactly-once в Structured Streaming. OffsetSeqLog (WAL) сначала сохраняет offsets нового micro-batch-а — это гарантирует, что при падении мы знаем, какие данные читать. CommitLog сохраняется после успешной записи в sink — он подтверждает, что batch завершён. При перезапуске: если offset есть но нет commit → повтор batch-а (idempotent). Если есть оба → пропуск.
// Структура checkpoint директории:
// checkpointLocation/
// metadata — id запроса и версия
// offsets/ — WAL: offset лог
// 0, 1, 2 ... — JSON с офсетами каждого batch
// commits/ — commit лог
// 0, 1, 2 ... — подтверждения завершённых batch
// state/ — состояние StatefulOperator-ов
// sources/ — source-specific состояние
// Два файла для batch N:
// offsets/N существует → batch запущен
// commits/N существует → batch завершён успешноНовый stateful API Spark 4.0 (preview в 3.5), заменяющий flatMapGroupsWithState. Предоставляет типобезопасные операции со state через StatefulProcessor интерфейс: ValueState, ListState, MapState. Поддерживает TTL для автоматического eviction, таймеры (event-time и processing-time), возможность работать с несколькими type-safe state-переменными в одном операторе.
// Spark 4.0: transformWithState
class SessionAggregator
extends StatefulProcessor[String, Event, SessionResult] {
// Объявление state при инициализации
var sessionCount: ValueState[Long] = _
var sessionStart: ValueState[Long] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
sessionCount = getHandle.getValueState[Long]("count", Encoders.scalaLong)
sessionStart = getHandle.getValueState[Long]("start", Encoders.scalaLong)
}
override def handleInputRows(key: String,
rows: Iterator[Event], timerValues: TimerValues): Iterator[SessionResult] = {
// Обновление state и генерация результатов
???
}
}Реализация StateStore на базе встроенной RocksDB (LSM-tree, on-disk key-value store), доступная с Spark 3.2. Хранит state на локальном SSD executor-а с LRU page cache в памяти. Поддерживает состояния большого размера (гигабайты на partition) без OOM — только активная часть в памяти. Snapshot-ы реплицируются в checkpoint location (HDFS/S3) через background upload.
// Конфигурация RocksDB backend:
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state."
+ "RocksDBStateStoreProvider"
)
// Тюнинг RocksDB:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB", "128"
)
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.writeBufferSizeBytes",
str(2 * 1024 * 1024) // 2MB write buffer
)
// Метрики: rocksdbNumSstFiles, rocksdbBytesWritten, rocksdbBlockCacheHitIn-memory колоночный формат данных, являющийся cross-language стандартом (Java, Python, C++, R). Данные одной колонки хранятся в непрерывном буфере памяти (validity bitmap + value buffer), что обеспечивает SIMD-ускорение и эффективную агрегацию. Zero-copy обмен между JVM (Spark) и Python (pandas/pyarrow) через shared memory — без сериализации.
// Включение Arrow для toPandas() и pandas UDF:
spark.conf.set(
"spark.sql.execution.arrow.pyspark.enabled", "true"
)
# Без Arrow: каждая строка → pickle → Python O(n × row_size)
# С Arrow: RecordBatch → shared memory → zero-copy в Python
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("double")
def fast_udf(s: pd.Series) -> pd.Series:
return s * 2.0 # выполняется над колонкой целиком (SIMD)
# Контроль размера batch:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "10000")Интеграция PyArrow в PySpark для ускорения обмена данными между JVM и Python через Arrow IPC. Используется в: toPandas()/createDataFrame() с Arrow (zero-copy где возможно), pandas_udf (SCALAR, GROUPED_MAP, GROUPED_AGG — колонки обрабатываются как pd.Series/DataFrame), Spark Connect (Arrow Flight для результатов), mapInArrow() для нативной Arrow-обработки без conversion в pandas.
import pyarrow as pa
from pyspark.sql.functions import pandas_udf
import pandas as pd
# mapInArrow: обработка данных как ArrowRecordBatch
def process_arrow(batches):
for batch in batches: # batch: pyarrow.RecordBatch
arr = batch.column('amount').to_pylist()
# нативная Arrow обработка без pandas overhead
result = pa.array([x * 1.1 for x in arr])
yield pa.record_batch(
[result], schema=pa.schema([('amount', pa.float64())])
)
df.mapInArrow(process_arrow, schema="amount double")High-performance RPC-протокол для передачи Arrow RecordBatch-ей по gRPC-соединению. Использует zero-copy сериализацию Arrow IPC-формата напрямую в gRPC stream. Spark Connect использует Arrow Flight для передачи данных между thin client-ом и Spark-кластером. Производительность на порядок выше стандартных REST/JDBC подходов для объёмных данных.
// Arrow Flight в контексте Spark Connect:
// Client (Python/Scala) → gRPC → Spark Connect Server
// Server → выполняет plan → возвращает через Arrow Flight Stream
// Схема взаимодействия:
// 1. client.execute(plan) → gRPC ExecutePlan RPC
// 2. Server: runQuery() → DataFrame
// 3. Server: serialize result → Arrow RecordBatches
// 4. Stream RecordBatches через gRPC → client
// 5. Client: pyarrow.RecordBatch → pandas DataFrame
// Конфигурация Flight endpoint:
spark.conf.set("spark.connect.grpc.binding.port", "15002")Клиент-серверная архитектура Spark (GA в 4.0, preview с 3.4), разделяющая клиентское приложение и Spark-кластер через gRPC + Protocol Buffers. Клиент отправляет логический план как protobuf-сообщение. Сервер десериализует план, выполняет через Catalyst/Tungsten и возвращает результаты через Arrow Flight. Thin client не требует JVM — работает на чистом Python/Go/Rust.
# Подключение Spark Connect клиента:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.remote("sc://spark-server:15002") \
.getOrCreate()
# Тот же DataFrame API:
df = spark.read.parquet("s3://bucket/data")
result = df.filter("amount > 1000").groupBy("region").count()
result.show()
# Преимущества:
# - Client изолирован от cluster classpath конфликтов
# - Несколько клиентов на одном кластере
# - Клиент = любой язык с protobuf-поддержкойAPI для расширения Spark без модификации исходного кода. Позволяет инжектировать: optimizer rules (до/после стандартных), parser rules (расширение SQL синтаксиса), analyzer rules, planner strategies (custom физические операторы), post-hoc resolution rules. Реализуется как функция SparkSessionExtensions => Unit и передаётся при построении SparkSession через withExtensions.
// Регистрация расширений:
val spark = SparkSession.builder()
.withExtensions { extensions =>
// Кастомное optimizer rule:
extensions.injectOptimizerRule(_ => new MyOptimizerRule)
// Кастомная planner strategy:
extensions.injectPlannerStrategy(_ => new MyPlannerStrategy)
// Кастомный SQL parser:
extensions.injectParser { (_, parser) =>
new MyParser(parser) // chain с дефолтным парсером
}
// Кастомный post-hoc analyzer rule:
extensions.injectPostHocResolutionRule(_ => new MyAnalyzerRule)
}
.getOrCreate()Внешний remote shuffle service (бывший RemoteShuffleService от Alibaba), хранящий shuffle-данные на выделенных worker-нодах, а не на executor-ах. Map task-и push-отправляют данные на Celeborn worker-ы через netty. Reduce task-и читают от Celeborn, не зная, какой executor писал данные. Полностью отвязывает shuffle lifecycle от executor lifecycle — идеально для dynamic allocation на Kubernetes.
// Подключение Celeborn:
spark-submit \
--conf spark.shuffle.manager=\
org.apache.celeborn.client.spark.SparkShuffleManager \
--conf spark.celeborn.master.endpoints=celeborn-master:9097 \
--conf spark.celeborn.client.spark.push.data.timeout=120s \
--conf spark.celeborn.client.spark.fetch.timeout=120s \
my_app.py
// Преимущества vs ESS:
// - Горизонтально масштабируется (несколько Celeborn workers)
// - Replication для отказоустойчивости
// - Встроенный push-based shuffle
// - Совместим с K8s без DaemonSetСтратегия join в Spark по умолчанию для больших таблиц, когда ни одна не помещается в broadcast threshold. Алгоритм: (1) обе стороны shuffle по join-ключу, (2) обе стороны сортируются по ключу, (3) merge-scan двух отсортированных потоков находит совпадения за O(n+m). Поддерживает все типы join (inner, outer, anti, semi). Требует сортируемых ключей (implements Ordering).
-- Принудительное использование SortMergeJoin:
SELECT /*+ MERGE(orders) */ *
FROM orders o JOIN products p ON o.product_id = p.id;
// В DataFrame API:
import org.apache.spark.sql.functions._
orders.join(products.hint("merge"), Seq("product_id"))
// Отключить broadcast для форсирования SMJ:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
// SMJ можно ускорить Bucketing:
// Если обе таблицы bucketed by join key с одинаковым N buckets
// → shuffle пропускается (pre-sorted + pre-shuffled)Стратегия join, при которой маленькая таблица (dimension) целиком собирается на driver-е, нарезается на TorrentBroadcast-блоки и рассылается всем executor-ам. Каждый executor строит hash table из broadcast-стороны и probe-сканирует свои partition-ы большой таблицы без shuffle. O(n) для построения hash table + O(m) для probe. Порог — spark.sql.autoBroadcastJoinThreshold (default 10m, max 8GB).
// Автоматический broadcast (таблица < threshold):
val result = big.join(small, "id") // → BroadcastHashJoin
// Явный hint:
val result = big.join(broadcast(small), "id")
// SQL hint:
// SELECT /*+ BROADCAST(d) */ f.*, d.name
// FROM fact f JOIN dim d ON f.dim_id = d.id
// Настройка:
spark.conf.set(
"spark.sql.autoBroadcastJoinThreshold", "50m"
)
// AQE может динамически upgrade SMJ → BHJ
// если после shuffle reduce-сторона < thresholdЖизненный цикл executor-а: регистрация в Driver через CoarseGrainedExecutorBackend → получение LaunchTask сообщений → запуск TaskRunner в thread pool → завершение/убийство task-а → DeregisterExecutor при остановке. Executor heartbeat (spark.executor.heartbeatInterval, default 10s) сигнализирует driver-у о доступности — если heartbeat не приходит в течение spark.network.timeout, driver считает executor мёртвым.
// Конфигурация таймаутов executor:
spark.conf.set("spark.executor.heartbeatInterval", "10s")
spark.conf.set("spark.network.timeout", "120s")
// network.timeout должен быть >> heartbeatInterval
// Executor thread pool:
spark.conf.set("spark.executor.cores", "4") // 4 потока
// ThreadPoolExecutor с N = spark.executor.cores потоками
// Каждый поток = один TaskRunner = один task
// Graceful shutdown:
// Executor завершает текущие tasks, затем останавливается
// Принудительный kill: KillTask message от DriverСброс промежуточных shuffle-данных на диск при нехватке execution memory. ExternalSorter и ExternalAppendOnlyMap используют spill для обработки данных, не вмещающихся в память. Данные частично сортируются и записываются во временные spill-файлы, которые затем merge-сортируются при чтении. Метрики в Spark UI: Shuffle Spill (Memory) и Shuffle Spill (Disk).
// Метрики spill в Spark UI → Stage → Task Metrics:
// Shuffle Spill (Memory): объём данных до компрессии
// Shuffle Spill (Disk): объём данных после компрессии на диске
// Spill (Memory) >> Spill (Disk) → высокая сжимаемость
// Предотвращение spill:
spark.conf.set("spark.executor.memory", "16g") // больше памяти
spark.conf.set("spark.memory.fraction", "0.7") // больше для execution
// Если spill неизбежен — ускорить:
spark.conf.set(
"spark.shuffle.spill.compress", "true"
)
spark.conf.set("spark.io.compression.codec", "lz4") // быстрая компрессияЗамена JVM-based Tungsten-движка нативным кодом (C++/Rust) для ускорения CPU-bound операций. Apache DataFusion Comet (Rust + DataFusion) и Apache Gluten (C++ + Velox) реализуют физические операторы Spark на нативном уровне, подключаясь через SparkSessionExtensions. NVIDIA RAPIDS заменяет CPU-операции GPU-ускоренными ядрами.
// Включение Apache DataFusion Comet:
spark.conf.set(
"spark.plugins",
"org.apache.spark.CometPlugin"
)
spark.conf.set("spark.comet.enabled", "true")
spark.conf.set("spark.comet.exec.all.enabled", "true")
// Включение Gluten (Velox backend):
spark.conf.set(
"spark.plugins",
"io.glutenproject.GlutenPlugin"
)
spark.conf.set(
"spark.gluten.sql.columnar.backend.lib", "velox"
)
// Наблюдение в EXPLAIN: NativeFilter, NativeScan вместо стандартныхАльтернативный сериализатор для Spark (вместо Java-сериализации по умолчанию), значительно более компактный и быстрый. Kryo сериализует объекты в бинарный формат без метаданных класса. Требует регистрации классов для максимальной эффективности. Используется для сериализации RDD-трансформаций (closures), broadcast-переменных и объектов shuffle в RDD API.
// Включение Kryo:
spark.conf.set(
"spark.serializer",
"org.apache.spark.serializer.KryoSerializer"
)
// Регистрация классов для максимальной компактности:
spark.conf.set(
"spark.kryo.classesToRegister",
"com.example.User,com.example.Event"
)
// Или через KryoRegistrator:
class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[User])
kryo.register(classOf[Event])
}
}
spark.conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)Память за пределами JVM heap, управляемая напрямую через sun.misc.Unsafe. В Spark используется Tungsten для хранения UnsafeRow объектов и shuffle-буферов без GC-давления. Включается через spark.memory.offHeap.enabled. Также используется для RDD-кеша при StorageLevel.OFF_HEAP. Позволяет работать с данными гигабайтного масштаба без риска Full GC пауз.
// Включение off-heap Tungsten:
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "8g")
// off-heap не входит в spark.executor.memory!
// Реальное потребление памяти executor-контейнера:
// = executor.memory + memoryOverhead + offHeap.size
// Кеш в off-heap:
df.persist(StorageLevel.OFF_HEAP)
// ВНИМАНИЕ: off-heap утечки не видны в GC-метриках
// Диагностика: native memory tracking
// -XX:NativeMemoryTracking=summary в extraJavaOptionsИнтерфейс для перехвата событий жизненного цикла Spark-приложения через event bus (LiveListenerBus). Слушатели получают события: SparkListenerJobStart/End, SparkListenerStageCompleted, SparkListenerTaskEnd (с TaskMetrics), SparkListenerExecutorAdded/Removed. Используется для custom monitoring, alerting, cost tracking. Асинхронен — события обрабатываются в отдельном потоке чтобы не блокировать планировщик.
// Кастомный SparkListener (Scala):
import org.apache.spark.scheduler._
class CostTrackingListener extends SparkListener {
override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
val m = event.taskMetrics
val shuffleWriteBytes = m.shuffleWriteMetrics.bytesWritten
val gcTime = m.jvmGCTime
// отправить в метрики систему (Prometheus, Datadog)
MetricsClient.record("spark.task.shuffle_write", shuffleWriteBytes)
MetricsClient.record("spark.task.gc_time_ms", gcTime)
}
}
spark.sparkContext.addSparkListener(new CostTrackingListener())Ещё один Remote Shuffle Service (от Tencent), конкурент Celeborn. Поддерживает Hadoop YARN, Kubernetes, Standalone. Архитектура: Coordinator (метаданные), ShuffleServer (хранение данных), Client (в executor). Поддерживает несколько storage tier-ов (memory → local SSD → HDFS/S3). Отличается от Celeborn более гибкой системой квот и поддержкой multiple backends.
// Подключение Uniffle к Spark:
spark-submit \
--conf spark.shuffle.manager=\
org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient \
--conf rss.coordinator.quorum=uniffle-coordinator:19999 \
--conf rss.storage.type=MEMORY_LOCALFILE_HDFS \
my_app.py
// Ключевые отличия от Celeborn:
// - Coordinator отдельный процесс (vs Celeborn master)
// - Поддержка HDFS как storage tier напрямую
// - Более гибкая система quota (по user/queue)Плагин для Spark, заменяющий JVM физические операторы нативными Velox-операторами (C++ движок от Meta). Подключается как SparkPlugin — перехватывает SparkPlan, конвертирует поддерживаемые операторы в Velox execution tree. Данные передаются между JVM и native через Arrow-совместимые columnar буферы (ColumnarBatch). Поддерживает Parquet/ORC native scan, hash join, aggregate, window functions.
// Подключение Gluten (Velox backend):
spark-submit \
--conf spark.plugins=io.glutenproject.GlutenPlugin \
--conf spark.gluten.sql.columnar.backend.lib=velox \
--conf spark.gluten.sql.columnar.forceShuffledHashJoin=true \
--conf spark.sql.shuffle.partitions=200 \
my_app.py
// Какие операторы нативны — видно в EXPLAIN:
// VeloxHashAggregateExec вместо HashAggregateExec
// VeloxBroadcastHashJoinExec вместо BroadcastHashJoinExec
// Метрики: veloxToNativeTime, veloxFromNativeTime — overhead конвертацииПередача данных между JVM и native/Python окружением без копирования байтов. Реализуется через разделяемую (shared) память: JVM выделяет Arrow-буфер, передаёт указатель Python-процессу через IPC. Python читает данные напрямую из того же адресного пространства (или mapped file) без дополнительного memcpy. В Spark Connect — Arrow Flight Stream, где RecordBatch передаются через gRPC с minimal copy.
// Zero-copy toPandas() с Arrow:
spark.conf.set(
"spark.sql.execution.arrow.pyspark.enabled", "true"
)
# Без Arrow: Row → pickle(bytes) → Python deserialize → object
# С Arrow: JVM allocates ArrowRecordBatch
# → off-heap buffer
# → Python pyarrow.RecordBatch wraps same memory
# → pandas.DataFrame.from_pandas (column views)
# Измерение разницы:
import time
start = time.time()
pdf = df.toPandas() # с Arrow: ~10-50x быстрее
print(f'toPandas: {time.time()-start:.2f}s')Расширенный API для написания кастомных источников данных и сinks в Spark (с Spark 3.x как стабильный). Разделяет concerns: TableCatalog (метаданные), Table (схема + capabilities), ScanBuilder (построение плана чтения), WriteBuilder (план записи). Поддерживает pushdown фильтров, projection pruning, partition pruning и Statistics provider для Catalyst. Основа для Iceberg, Delta Lake, Hudi коннекторов.
// Структура DataSource V2 коннектора:
// 1. TableCatalog — регистрирует таблицы в Catalog
// 2. Table — возвращает Schema + Set[TableCapability]
// 3. SupportsRead → newScanBuilder()
// 4. ScanBuilder → build() → Scan
// 5. Scan → toBatch() → Batch
// 6. Batch → planInputPartitions() → InputPartition[]
// 7. PartitionReaderFactory → createReader(partition)
// Capabilities: BATCH_READ, BATCH_WRITE,
// OVERWRITE_BY_FILTER, TRUNCATE, V1_BATCH_WRITE
// Регистрация через SparkSessionExtensions или
// spark.sql.catalog.<name>=<CatalogImpl>Плагин для Spark, заменяющий CPU-исполнение физических операторов на GPU-ускоренные ядра через cuDF библиотеку. Подключается через SparkPlugin API и перехватывает физические операторы Catalyst (scan, filter, project, hash join, sort, aggregate). Данные хранятся в GPU-памяти в Arrow-совместимом формате, что минимизирует PCIe-передачи. Ускорение 10-100x для vectorized операций.
spark-submit \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--conf spark.rapids.sql.enabled=true \
--conf spark.rapids.memory.gpu.pooling.enabled=true \
--conf spark.rapids.memory.gpu.pool.size=16G \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.task.resource.gpu.amount=0.25 \
my_app.py
# Мониторинг GPU utilization:
# nvidia-smi dmon -s u # GPU utilization per second
# Spark UI → SQL → "GPU" метрики в операторах