Кастомные физические операторы: SparkPlan и Strategy
После того как Catalyst построил оптимизированный логический план, SparkPlanner преобразует его в физический план — дерево SparkPlan. Каждый узел физического плана — это конкретный алгоритм: SortMergeJoinExec, HashAggregateExec, FileScanExec. Именно эти объекты производят RDD и в конечном счёте — данные.
Расширяемость на физическом уровне позволяет делать то, что невозможно через правила оптимизатора: реализовывать нестандартные алгоритмы выполнения, интегрировать нативные вычислительные движки (SIMD, GPU, FPGA), обходить row-by-row execution и переходить на columnar processing. Именно этим пользуются Apache Arrow Datafusion через Ballista, NVIDIA RAPIDS Accelerator, Intel OAP.
SparkPlan: базовый класс физических операторов
Каждый физический оператор наследует SparkPlan:
// org.apache.spark.sql.execution.SparkPlan
abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
// Схема выходных данных (наследуется от QueryPlan)
def output: Seq[Attribute]
// Дочерние операторы
def children: Seq[SparkPlan]
// Основной метод: производит RDD[InternalRow]
// Вызывается Spark при execute()
protected def doExecute(): RDD[InternalRow]
// Метод для columnar execution (если supportsColumnar = true)
protected def doExecuteColumnar(): RDD[ColumnarBatch] =
throw new UnsupportedOperationException()
// Поддерживает ли оператор columnar execution?
def supportsColumnar: Boolean = false
// Метрики для Spark UI (должен переопределить для мониторинга)
override def metrics: Map[String, SQLMetric] = Map.empty
// Финальный execute() -- вызывает doExecute() или doExecuteColumnar()
// + оборачивает метрики, обрабатывает codegen
final def execute(): RDD[InternalRow] = ...
}
Три типа физических операторов:
| Тип | Описание | Базовый класс |
|---|---|---|
| Leaf | Нет дочерних операторов (scan) | LeafExecNode |
| Unary | Один дочерний оператор | UnaryExecNode |
| Binary | Два дочерних оператора (join) | BinaryExecNode |
Пример 1: кастомный scan-оператор
Напишем TopKScanExec — оператор, который читает данные из внешнего сервиса уже отсортированными и возвращает только первые K строк. Это позволяет реализовать pushdown для ORDER BY ... LIMIT N:
package com.example.execution
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, SortOrder}
import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.LongType
case class TopKScanExec(
apiUrl: String,
apiKey: String,
outputSchema: Seq[Attribute],
sortOrders: Seq[SortOrder],
limit: Int
) extends LeafExecNode {
override def output: Seq[Attribute] = outputSchema
// Метрики для Spark UI
override val metrics: Map[String, SQLMetric] = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"),
"numBytesRead" -> SQLMetrics.createSizeMetric(sparkContext, "bytes read")
)
override protected def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val scanTime = longMetric("scanTime")
val numBytesRead = longMetric("numBytesRead")
// SparkContext.makeRDD создаёт RDD из локальных данных
// Для scan-операторов нам нужен RDD с кастомным compute()
sparkContext.makeRDD(Seq(0), numSlices = 1).mapPartitions { _ =>
val startTime = System.nanoTime()
val client = new ApiClient(apiUrl, apiKey)
try {
// Выполняем запрос с сортировкой и лимитом на сервере
val sortSpec = sortOrders.map { order =>
val colName = order.child.asInstanceOf[AttributeReference].name
val dir = if (order.isAscending) "ASC" else "DESC"
s"$colName $dir"
}.mkString(",")
val records = client.fetchTopK(sortSpec, limit)
numBytesRead += records.estimatedBytes
val rows = records.iterator.map { record =>
numOutputRows += 1
buildInternalRow(record)
}
rows
} finally {
scanTime += (System.nanoTime() - startTime) / 1000000
client.close()
}
}
}
// Описание для explain()
override def simpleString(maxFields: Int): String =
s"TopKScan [${output.map(_.name).mkString(",")}] " +
s"sortBy=[${sortOrders.mkString(",")}] limit=$limit"
// codegen не поддерживаем в этом примере
override def doCanonicalize(): TopKScanExec = copy(apiKey = "")
}
Strategy: преобразование логического плана в физический
Strategy — это механизм, через который SparkPlanner преобразует логические узлы в физические. Каждая стратегия реализует метод apply:
abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] {
def apply(plan: LogicalPlan): Seq[SparkPlan]
}
Стратегия возвращает Seq[SparkPlan] — один или несколько физических планов для данного логического узла. Если стратегия не знает, как обработать узел, она возвращает Nil, и следующая стратегия в цепочке получает шанс.
Напишем стратегию, которая перехватывает Sort + Limit над нашим DataSourceV2Relation и заменяет их на TopKScanExec:
package com.example.execution
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Strategy
class HttpApiTopKStrategy(spark: SparkSession) extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// GlobalLimit(n, Sort(orders, global=true, DataSourceV2ScanRelation(table: HttpApiTable)))
// Это паттерн для SELECT * FROM t ORDER BY col LIMIT n
case GlobalLimit(
limitExpr,
Sort(sortOrders, global,
relation @ DataSourceV2ScanRelation(table: HttpApiTable, scan, output, _, _)
)
) =>
val limit = limitExpr.eval().asInstanceOf[Int]
if (isAllSortColumnsSupported(sortOrders, table)) {
// Заменяем весь паттерн на один TopKScanExec
Seq(TopKScanExec(
apiUrl = table.options.get("baseUrl"),
apiKey = table.options.get("apiKey"),
outputSchema = output,
sortOrders = sortOrders,
limit = limit
))
} else {
Nil // Передаём стандартным стратегиям
}
case _ => Nil
}
private def isAllSortColumnsSupported(
orders: Seq[SortOrder],
table: HttpApiTable
): Boolean = {
val indexedCols = Set("id", "amount", "ts") // API поддерживает сортировку по этим полям
orders.forall { order =>
order.child match {
case attr: AttributeReference => indexedCols.contains(attr.name.toLowerCase)
case _ => false
}
}
}
}
Регистрация:
val spark = SparkSession.builder()
.withExtensions(ext =>
ext.injectPlannerStrategy(session => new HttpApiTopKStrategy(session))
)
.getOrCreate()
// Теперь этот запрос пойдёт через TopKScanExec
spark.read.format("com.example.connector.HttpApiDataSource")
.option("baseUrl", "https://api.example.com")
.load()
.orderBy("amount".desc)
.limit(10)
.explain("formatted")
// Физический план:
// TopKScan [id, name, amount] sortBy=[amount DESC] limit=10
// (вместо стандартного TakeOrderedAndProjectExec + BatchScanExec)
Columnar операторы: ColumnarRule и SupportsColumnar
Row-by-row execution — это Node виртуальных вызовов next()/get() на каждой строке. При 1 миллиарде строк это 1 миллиард виртуальных вызовов. Columnar execution работает с батчами (по умолчанию 4096 строк в батче) — один вызов обрабатывает 4096 строк, а внутри батча работает SIMD или векторизованный код.
Для поддержки columnar execution оператор переопределяет supportsColumnar и doExecuteColumnar:
package com.example.execution
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
// Кастомный фильтр-оператор с columnar execution
// Фильтрует батчи целиком через векторизованное сравнение
case class FastColumnarFilterExec(
predicate: ColumnPredicate, // Наш кастомный predicate для columnar evaluation
child: SparkPlan
) extends UnaryExecNode {
override def output: Seq[Attribute] = child.output
// Объявляем поддержку columnar -- ключевой флаг
override def supportsColumnar: Boolean = child.supportsColumnar
override val metrics: Map[String, SQLMetric] = Map(
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches")
)
// Row-based fallback (если child не поддерживает columnar)
override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitions { iter =>
val numInputRows = longMetric("numInputRows")
val numOutputRows = longMetric("numOutputRows")
iter.filter { row =>
numInputRows += 1
val pass = predicate.evalRow(row)
if (pass) numOutputRows += 1
pass
}
}
}
// Columnar execution -- работаем с батчами
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
child.executeColumnar().mapPartitions { batchIter =>
val numInputRows = longMetric("numInputRows")
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
batchIter.flatMap { batch =>
numInputBatches += 1
numInputRows += batch.numRows()
// Создаём boolean selection vector для всего батча
// Это ключевое отличие от row-by-row: один вызов на весь батч
val selectionVector = predicate.evalBatch(batch)
val filteredBatch = applySelectionVector(batch, selectionVector)
numOutputRows += filteredBatch.numRows()
if (filteredBatch.numRows() == 0) {
batch.close()
Iterator.empty
} else {
batch.close()
Iterator.single(filteredBatch)
}
}
}
}
// Применяем selection vector к батчу
private def applySelectionVector(
batch: ColumnarBatch,
selection: Array[Boolean]
): ColumnarBatch = {
val numSelected = selection.count(identity)
val outputVectors = (0 until batch.numCols()).map { colIdx =>
val col = batch.column(colIdx)
val newCol = new OnHeapColumnVector(numSelected, col.dataType())
var outIdx = 0
for (i <- 0 until batch.numRows()) {
if (selection(i)) {
newCol.putRow(outIdx, col, i)
outIdx += 1
}
}
newCol.asInstanceOf[ColumnVector]
}.toArray
new ColumnarBatch(outputVectors, numSelected)
}
}
ColumnarRule: вставка конверсий между row и columnar
Не все операторы в плане поддерживают columnar. Spark должен вставить операторы конверсии ColumnarToRowExec и RowToColumnarExec на границах. Это делается через ColumnarRule:
package com.example.execution
import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan}
// ColumnarRule вставляется через ext.injectColumnar(...)
class HttpApiColumnarRule extends ColumnarRule {
// Вызывается перед стандартной обработкой columnar transitions
override def preColumnarTransitions: Rule[SparkPlan] = new Rule[SparkPlan] {
def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
// Заменяем стандартный BatchScanExec нашим columnar-aware вариантом
case scan: BatchScanExec if scan.scan.isInstanceOf[HttpApiScan] =>
HttpApiColumnarScanExec(scan.scan.asInstanceOf[HttpApiScan])
}
}
// Вызывается после стандартной обработки columnar transitions
override def postColumnarTransitions: Rule[SparkPlan] = new Rule[SparkPlan] {
def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
// Если над нашим columnar scan-ом стоит ColumnarToRow,
// и дальше только row-операторы -- можно слить их
case ColumnarToRowExec(scan: HttpApiColumnarScanExec) =>
// Альтернатива: оставить как есть, Spark вставил корректный ColumnarToRow
ColumnarToRowExec(scan)
}
}
}
Регистрация:
ext.injectColumnar(session => new HttpApiColumnarRule())
CodegenSupport: генерация Java-кода для оператора
Codegen (Whole-Stage Code Generation) — это оптимизация, при которой несколько операторов объединяются в один синтезированный Java-метод. Это устраняет виртуальные вызовы и позволяет JIT полностью инлайнить логику.
Для участия в codegen оператор должен реализовывать CodegenSupport:
package com.example.execution
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan}
// Кастомный hash-оператор с кодогенерацией
case class CustomHashFilterExec(
hashExpression: Expression, // Выражение, которое мы хэшируем
child: SparkPlan
) extends UnaryExecNode with CodegenSupport {
override def output: Seq[Attribute] = child.output
// Получаем дочерний план в контексте codegen
override def inputRDDs(): Seq[RDD[InternalRow]] = child.asInstanceOf[CodegenSupport].inputRDDs()
// Генерируем код для обработки одной строки
// ctx: CodegenContext -- контекст генерации (переменные, методы)
// input: Seq[ExprCode] -- код для вычисления входных выражений
override def doProduce(ctx: CodegenContext): String = {
child.asInstanceOf[CodegenSupport].produce(ctx, this)
}
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
// Генерируем код вычисления хэша
val hashEval = hashExpression.genCode(ctx)
// Генерируем код фильтрации: пропускаем строки где hash % 2 != 0
s"""
|${hashEval.code}
|if ((${hashEval.value} & 1) == 0) { // Только чётные хэши
| ${consume(ctx, input)} // Передаём строку следующему оператору
|}
""".stripMargin
}
// Fallback для случая без codegen
override protected def doExecute(): RDD[InternalRow] = {
child.execute().filter { row =>
val hash = hashExpression.eval(row).asInstanceOf[Int]
(hash & 1) == 0
}
}
}
CodegenContext предоставляет методы для безопасной генерации кода:
// Добавить переменную состояния (живёт между строками)
val counterVar = ctx.addMutableState("long", "counter", v => s"$v = 0;")
// Добавить метод (для больших фрагментов кода)
val methodName = ctx.addNewFunction("processRow",
s"""
|private void processRow(InternalRow row) {
| // ...
|}
""".stripMargin)
// Вычислить выражение
val exprCode: ExprCode = expr.genCode(ctx)
// exprCode.code -- код для вычисления выражения
// exprCode.value -- Java-переменная с результатом
// exprCode.isNull -- Java-переменная с флагом null
Посмотреть сгенерированный код можно через:
spark.conf.set("spark.sql.codegen.wholeStage", "true")
spark.conf.set("spark.sql.codegen.comments", "true") # добавит комментарии из Spark
df = spark.sql("SELECT id, amount * 1.1 FROM events WHERE amount > 100")
# В логах (DEBUG) найти строку "Generated code:"
# Или через Java debugging: прикрепить debugger и breakpoint в CodegenContext.compile()
Метрики физического оператора
Метрики — критически важная часть production-оператора. Без них отладка производительности невозможна:
case class MyOperatorExec(child: SparkPlan) extends UnaryExecNode {
override val metrics: Map[String, SQLMetric] = Map(
// Счётчик строк -- самый базовый
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
// Время выполнения в нс
"processingTime" -> SQLMetrics.createTimingMetric(sparkContext, "processing time"),
// Размер данных -- автоматически форматируется как KB/MB/GB в UI
"numBytesOutput" -> SQLMetrics.createSizeMetric(sparkContext, "bytes output"),
// Среднее время (нс) -- для latency-метрик
"avgRowSize" -> SQLMetrics.createAverageMetric(sparkContext, "average row size")
)
override protected def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val processingTime = longMetric("processingTime")
child.execute().mapPartitions { iter =>
val startTime = System.nanoTime()
iter.map { row =>
numOutputRows += 1
transform(row)
} ++ Iterator.tabulate(0)(_ => {
// Этот код не выполнится, но SQLMetric накапливается в accumulators
processingTime += (System.nanoTime() - startTime) / 1000000
null.asInstanceOf[InternalRow]
})
}
}
}
SQLMetric реализован через Spark Accumulators. Обновления метрик (longMetric(“x”) += n) автоматически агрегируются из всех executor-задач и отображаются в Spark UI на вкладке “SQL” в разделе “Details for Query”. Если вы не видите своих метрик в UI — убедитесь что добавили их в metrics Map с правильными именами.
Попробуй сам
Исследуем существующие физические операторы через Spark UI и explain:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, count
spark = SparkSession.builder \
.appName("physical-operators-demo") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.codegen.comments", "true") \
.getOrCreate()
# Создаём данные для демонстрации разных физических операторов
import random
data = [(i, f"product_{i % 50}", random.choice(["A","B","C","D"]),
float(random.randint(1, 1000))) for i in range(100000)]
df = spark.createDataFrame(data, ["id", "product_name", "category", "amount"])
# --- Агрегация: смотрим на HashAggregateExec ---
agg_query = df.groupBy("category").agg(
count("*").alias("cnt"),
spark_sum("amount").alias("total")
)
print("=== HashAggregate Plan ===")
agg_query.explain("formatted")
# Найдём в плане:
# HashAggregate(keys=[category#3], functions=[count(1), sum(amount#4)])
# +- Exchange hashpartitioning(category#3, 200)
# +- HashAggregate(keys=[category#3], functions=[partial_count(1), partial_sum(amount#4)])
# Обратите внимание: два HashAggregate -- partial (на каждом partition) и final (после shuffle)
# --- JOIN: SortMergeJoin vs BroadcastHashJoin ---
small_df = spark.createDataFrame(
[("A", "Category A"), ("B", "Category B"), ("C", "Category C"), ("D", "Category D")],
["category_code", "category_name"]
)
small_df.cache() # Кэшируем маленькую таблицу
join_query = df.join(small_df, df.category == small_df.category_code)
print("=== BroadcastHashJoin Plan (маленькая таблица) ===")
join_query.explain("formatted")
# С маленькой таблицей Spark выберет BroadcastHashJoin:
# BroadcastHashJoin [category#3], [category_code#10], Inner, BuildRight
# :- Scan ...
# +- BroadcastExchange HashedRelationBroadcastMode([category_code#10]), ...
# --- Window функция: WindowExec ---
from pyspark.sql.window import Window
w = Window.partitionBy("category").orderBy(col("amount").desc())
window_query = df.withColumn("rank", col("amount").cast("long"))
# --- Посмотрим на codegen ---
spark.conf.set("spark.sql.codegen.logging.maxLines", "1000")
simple_query = df.filter(col("amount") > 500).select("id", "amount")
simple_query.explain("codegen")
# Вывод покажет сгенерированный Java-код для Filter + Project
# Ищем: "/* 001 */ public Object generate(Object[] references)"
# Вся логика filter + projection скомпилирована в один Java-метод
# --- Измерим реальную производительность ---
import time
# Row-by-row (принудительно отключаем codegen)
spark.conf.set("spark.sql.codegen.wholeStage", "false")
start = time.time()
df.filter(col("amount") > 500).select("id", "amount").count()
no_codegen_time = time.time() - start
# С codegen
spark.conf.set("spark.sql.codegen.wholeStage", "true")
start = time.time()
df.filter(col("amount") > 500).select("id", "amount").count()
codegen_time = time.time() - start
print(f"Without codegen: {no_codegen_time:.3f}s")
print(f"With codegen: {codegen_time:.3f}s")
print(f"Speedup: {no_codegen_time / codegen_time:.1f}x")
# На современном JVM ожидаем 1.5-3x ускорение от codegen на чистых вычислениях
Для исследования columnar execution с Parquet:
# Parquet использует vectorized reader -- это columnar execution
import os
df.write.parquet("/tmp/spark-demo-parquet")
df2 = spark.read.parquet("/tmp/spark-demo-parquet")
df2.filter(col("amount") > 500).explain("formatted")
# В плане увидим:
# *(1) Filter (amount#4 > 500.0)
# +- *(1) ColumnarToRow <-- переход columnar -> row перед Filter
# +- FileScan parquet <-- читает vectorized ColumnarBatch (4096 строк за раз)
# Включим статистику vectorized reading
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")
df2.filter(col("amount") > 500).select("id", "amount").show(5)
# В Spark UI -> SQL tab -> Query details:
# "vectorized batch size" = 4096 (по умолчанию spark.sql.parquet.columnarReaderBatchSize)
При реализации кастомного физического оператора всегда начинайте с row-based doExecute(), убедитесь что семантика корректна, а потом добавляйте doExecuteColumnar() как оптимизацию. Тестировать columnar-версию отдельно труднее, поэтому сначала проверяйте корректность на строках, а скорость — на батчах. Также убедитесь что close() на ColumnarBatch вызывается в обоих ветках (успех и исключение) — утечка off-heap памяти от ColumnarBatch крайне трудно диагностируется в production.