Learning Platform
Глоссарий Troubleshooting
Урок 12.04 · 33 мин
Продвинутый
SparkPlanStrategydoExecuteColumnarRuleCodegenSupportPhysical Operator

Кастомные физические операторы: SparkPlan и Strategy

После того как Catalyst построил оптимизированный логический план, SparkPlanner преобразует его в физический план — дерево SparkPlan. Каждый узел физического плана — это конкретный алгоритм: SortMergeJoinExec, HashAggregateExec, FileScanExec. Именно эти объекты производят RDD и в конечном счёте — данные.

Расширяемость на физическом уровне позволяет делать то, что невозможно через правила оптимизатора: реализовывать нестандартные алгоритмы выполнения, интегрировать нативные вычислительные движки (SIMD, GPU, FPGA), обходить row-by-row execution и переходить на columnar processing. Именно этим пользуются Apache Arrow Datafusion через Ballista, NVIDIA RAPIDS Accelerator, Intel OAP.

SparkPlan: базовый класс физических операторов

Каждый физический оператор наследует SparkPlan:

// org.apache.spark.sql.execution.SparkPlan
abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {

  // Схема выходных данных (наследуется от QueryPlan)
  def output: Seq[Attribute]

  // Дочерние операторы
  def children: Seq[SparkPlan]

  // Основной метод: производит RDD[InternalRow]
  // Вызывается Spark при execute()
  protected def doExecute(): RDD[InternalRow]

  // Метод для columnar execution (если supportsColumnar = true)
  protected def doExecuteColumnar(): RDD[ColumnarBatch] =
    throw new UnsupportedOperationException()

  // Поддерживает ли оператор columnar execution?
  def supportsColumnar: Boolean = false

  // Метрики для Spark UI (должен переопределить для мониторинга)
  override def metrics: Map[String, SQLMetric] = Map.empty

  // Финальный execute() -- вызывает doExecute() или doExecuteColumnar()
  // + оборачивает метрики, обрабатывает codegen
  final def execute(): RDD[InternalRow] = ...
}

Три типа физических операторов:

ТипОписаниеБазовый класс
LeafНет дочерних операторов (scan)LeafExecNode
UnaryОдин дочерний операторUnaryExecNode
BinaryДва дочерних оператора (join)BinaryExecNode

Пример 1: кастомный scan-оператор

Напишем TopKScanExec — оператор, который читает данные из внешнего сервиса уже отсортированными и возвращает только первые K строк. Это позволяет реализовать pushdown для ORDER BY ... LIMIT N:

package com.example.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, SortOrder}
import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.LongType

case class TopKScanExec(
  apiUrl: String,
  apiKey: String,
  outputSchema: Seq[Attribute],
  sortOrders: Seq[SortOrder],
  limit: Int
) extends LeafExecNode {

  override def output: Seq[Attribute] = outputSchema

  // Метрики для Spark UI
  override val metrics: Map[String, SQLMetric] = Map(
    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
    "scanTime"      -> SQLMetrics.createTimingMetric(sparkContext, "scan time"),
    "numBytesRead"  -> SQLMetrics.createSizeMetric(sparkContext, "bytes read")
  )

  override protected def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")
    val scanTime = longMetric("scanTime")
    val numBytesRead = longMetric("numBytesRead")

    // SparkContext.makeRDD создаёт RDD из локальных данных
    // Для scan-операторов нам нужен RDD с кастомным compute()
    sparkContext.makeRDD(Seq(0), numSlices = 1).mapPartitions { _ =>
      val startTime = System.nanoTime()
      val client = new ApiClient(apiUrl, apiKey)

      try {
        // Выполняем запрос с сортировкой и лимитом на сервере
        val sortSpec = sortOrders.map { order =>
          val colName = order.child.asInstanceOf[AttributeReference].name
          val dir = if (order.isAscending) "ASC" else "DESC"
          s"$colName $dir"
        }.mkString(",")

        val records = client.fetchTopK(sortSpec, limit)
        numBytesRead += records.estimatedBytes

        val rows = records.iterator.map { record =>
          numOutputRows += 1
          buildInternalRow(record)
        }

        rows
      } finally {
        scanTime += (System.nanoTime() - startTime) / 1000000
        client.close()
      }
    }
  }

  // Описание для explain()
  override def simpleString(maxFields: Int): String =
    s"TopKScan [${output.map(_.name).mkString(",")}] " +
    s"sortBy=[${sortOrders.mkString(",")}] limit=$limit"

  // codegen не поддерживаем в этом примере
  override def doCanonicalize(): TopKScanExec = copy(apiKey = "")
}

Strategy: преобразование логического плана в физический

Strategy — это механизм, через который SparkPlanner преобразует логические узлы в физические. Каждая стратегия реализует метод apply:

abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] {
  def apply(plan: LogicalPlan): Seq[SparkPlan]
}

Стратегия возвращает Seq[SparkPlan] — один или несколько физических планов для данного логического узла. Если стратегия не знает, как обработать узел, она возвращает Nil, и следующая стратегия в цепочке получает шанс.

Напишем стратегию, которая перехватывает Sort + Limit над нашим DataSourceV2Relation и заменяет их на TopKScanExec:

package com.example.execution

import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Strategy

class HttpApiTopKStrategy(spark: SparkSession) extends Strategy {

  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {

    // GlobalLimit(n, Sort(orders, global=true, DataSourceV2ScanRelation(table: HttpApiTable)))
    // Это паттерн для SELECT * FROM t ORDER BY col LIMIT n
    case GlobalLimit(
          limitExpr,
          Sort(sortOrders, global,
            relation @ DataSourceV2ScanRelation(table: HttpApiTable, scan, output, _, _)
          )
        ) =>
      val limit = limitExpr.eval().asInstanceOf[Int]

      if (isAllSortColumnsSupported(sortOrders, table)) {
        // Заменяем весь паттерн на один TopKScanExec
        Seq(TopKScanExec(
          apiUrl = table.options.get("baseUrl"),
          apiKey = table.options.get("apiKey"),
          outputSchema = output,
          sortOrders = sortOrders,
          limit = limit
        ))
      } else {
        Nil  // Передаём стандартным стратегиям
      }

    case _ => Nil
  }

  private def isAllSortColumnsSupported(
    orders: Seq[SortOrder],
    table: HttpApiTable
  ): Boolean = {
    val indexedCols = Set("id", "amount", "ts")  // API поддерживает сортировку по этим полям
    orders.forall { order =>
      order.child match {
        case attr: AttributeReference => indexedCols.contains(attr.name.toLowerCase)
        case _ => false
      }
    }
  }
}

Регистрация:

val spark = SparkSession.builder()
  .withExtensions(ext =>
    ext.injectPlannerStrategy(session => new HttpApiTopKStrategy(session))
  )
  .getOrCreate()

// Теперь этот запрос пойдёт через TopKScanExec
spark.read.format("com.example.connector.HttpApiDataSource")
     .option("baseUrl", "https://api.example.com")
     .load()
     .orderBy("amount".desc)
     .limit(10)
     .explain("formatted")

// Физический план:
// TopKScan [id, name, amount] sortBy=[amount DESC] limit=10
// (вместо стандартного TakeOrderedAndProjectExec + BatchScanExec)

Columnar операторы: ColumnarRule и SupportsColumnar

Row-by-row execution — это Node виртуальных вызовов next()/get() на каждой строке. При 1 миллиарде строк это 1 миллиард виртуальных вызовов. Columnar execution работает с батчами (по умолчанию 4096 строк в батче) — один вызов обрабатывает 4096 строк, а внутри батча работает SIMD или векторизованный код.

Для поддержки columnar execution оператор переопределяет supportsColumnar и doExecuteColumnar:

package com.example.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}

// Кастомный фильтр-оператор с columnar execution
// Фильтрует батчи целиком через векторизованное сравнение
case class FastColumnarFilterExec(
  predicate: ColumnPredicate,  // Наш кастомный predicate для columnar evaluation
  child: SparkPlan
) extends UnaryExecNode {

  override def output: Seq[Attribute] = child.output

  // Объявляем поддержку columnar -- ключевой флаг
  override def supportsColumnar: Boolean = child.supportsColumnar

  override val metrics: Map[String, SQLMetric] = Map(
    "numInputRows"  -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
    "numInputBatches"  -> SQLMetrics.createMetric(sparkContext, "number of input batches")
  )

  // Row-based fallback (если child не поддерживает columnar)
  override protected def doExecute(): RDD[InternalRow] = {
    child.execute().mapPartitions { iter =>
      val numInputRows = longMetric("numInputRows")
      val numOutputRows = longMetric("numOutputRows")

      iter.filter { row =>
        numInputRows += 1
        val pass = predicate.evalRow(row)
        if (pass) numOutputRows += 1
        pass
      }
    }
  }

  // Columnar execution -- работаем с батчами
  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
    child.executeColumnar().mapPartitions { batchIter =>
      val numInputRows = longMetric("numInputRows")
      val numOutputRows = longMetric("numOutputRows")
      val numInputBatches = longMetric("numInputBatches")

      batchIter.flatMap { batch =>
        numInputBatches += 1
        numInputRows += batch.numRows()

        // Создаём boolean selection vector для всего батча
        // Это ключевое отличие от row-by-row: один вызов на весь батч
        val selectionVector = predicate.evalBatch(batch)

        val filteredBatch = applySelectionVector(batch, selectionVector)
        numOutputRows += filteredBatch.numRows()

        if (filteredBatch.numRows() == 0) {
          batch.close()
          Iterator.empty
        } else {
          batch.close()
          Iterator.single(filteredBatch)
        }
      }
    }
  }

  // Применяем selection vector к батчу
  private def applySelectionVector(
    batch: ColumnarBatch,
    selection: Array[Boolean]
  ): ColumnarBatch = {
    val numSelected = selection.count(identity)
    val outputVectors = (0 until batch.numCols()).map { colIdx =>
      val col = batch.column(colIdx)
      val newCol = new OnHeapColumnVector(numSelected, col.dataType())
      var outIdx = 0
      for (i <- 0 until batch.numRows()) {
        if (selection(i)) {
          newCol.putRow(outIdx, col, i)
          outIdx += 1
        }
      }
      newCol.asInstanceOf[ColumnVector]
    }.toArray
    new ColumnarBatch(outputVectors, numSelected)
  }
}

ColumnarRule: вставка конверсий между row и columnar

Не все операторы в плане поддерживают columnar. Spark должен вставить операторы конверсии ColumnarToRowExec и RowToColumnarExec на границах. Это делается через ColumnarRule:

package com.example.execution

import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan}

// ColumnarRule вставляется через ext.injectColumnar(...)
class HttpApiColumnarRule extends ColumnarRule {

  // Вызывается перед стандартной обработкой columnar transitions
  override def preColumnarTransitions: Rule[SparkPlan] = new Rule[SparkPlan] {
    def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
      // Заменяем стандартный BatchScanExec нашим columnar-aware вариантом
      case scan: BatchScanExec if scan.scan.isInstanceOf[HttpApiScan] =>
        HttpApiColumnarScanExec(scan.scan.asInstanceOf[HttpApiScan])
    }
  }

  // Вызывается после стандартной обработки columnar transitions
  override def postColumnarTransitions: Rule[SparkPlan] = new Rule[SparkPlan] {
    def apply(plan: SparkPlan): SparkPlan = plan.transformDown {
      // Если над нашим columnar scan-ом стоит ColumnarToRow,
      // и дальше только row-операторы -- можно слить их
      case ColumnarToRowExec(scan: HttpApiColumnarScanExec) =>
        // Альтернатива: оставить как есть, Spark вставил корректный ColumnarToRow
        ColumnarToRowExec(scan)
    }
  }
}

Регистрация:

ext.injectColumnar(session => new HttpApiColumnarRule())
Row vs Columnar execution: как Spark выбирает путь
HashAggregateExec (row)HashAggregateExec -- row-based оператор. supportsColumnar=false. Получает InternalRow по одной. Стандартная реализация агрегации
если child.supportsColumnar=true: вставляется ColumnarToRowExec
ColumnarToRowExecColumnarToRowExec -- конвертер батч -> строки. supportsColumnar=false (он сам row-based). Преобразует ColumnarBatch в поток InternalRow. Это точка перехода режимов
ColumnarBatch (4096 строк за вызов)
FastColumnarFilterExec (columnar)FastColumnarFilterExec -- наш оператор. supportsColumnar=true. doExecuteColumnar() обрабатывает батчи целиком через векторизованный predicate. До 10x быстрее row-by-row
ColumnarBatch или (если child row-based): RowToColumnarExec
ScanExec (row или columnar)BatchScanExec или FileSourceScanExec -- может быть как row (legacy V1) так и columnar (Parquet vectorized, ORC vectorized, DSv2 с createColumnarReader). Определяет нижнюю границу columnar-зоны

CodegenSupport: генерация Java-кода для оператора

Codegen (Whole-Stage Code Generation) — это оптимизация, при которой несколько операторов объединяются в один синтезированный Java-метод. Это устраняет виртуальные вызовы и позволяет JIT полностью инлайнить логику.

Для участия в codegen оператор должен реализовывать CodegenSupport:

package com.example.execution

import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan}

// Кастомный hash-оператор с кодогенерацией
case class CustomHashFilterExec(
  hashExpression: Expression,  // Выражение, которое мы хэшируем
  child: SparkPlan
) extends UnaryExecNode with CodegenSupport {

  override def output: Seq[Attribute] = child.output

  // Получаем дочерний план в контексте codegen
  override def inputRDDs(): Seq[RDD[InternalRow]] = child.asInstanceOf[CodegenSupport].inputRDDs()

  // Генерируем код для обработки одной строки
  // ctx: CodegenContext -- контекст генерации (переменные, методы)
  // input: Seq[ExprCode] -- код для вычисления входных выражений
  override def doProduce(ctx: CodegenContext): String = {
    child.asInstanceOf[CodegenSupport].produce(ctx, this)
  }

  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
    // Генерируем код вычисления хэша
    val hashEval = hashExpression.genCode(ctx)

    // Генерируем код фильтрации: пропускаем строки где hash % 2 != 0
    s"""
       |${hashEval.code}
       |if ((${hashEval.value} & 1) == 0) {  // Только чётные хэши
       |  ${consume(ctx, input)}  // Передаём строку следующему оператору
       |}
     """.stripMargin
  }

  // Fallback для случая без codegen
  override protected def doExecute(): RDD[InternalRow] = {
    child.execute().filter { row =>
      val hash = hashExpression.eval(row).asInstanceOf[Int]
      (hash & 1) == 0
    }
  }
}

CodegenContext предоставляет методы для безопасной генерации кода:

// Добавить переменную состояния (живёт между строками)
val counterVar = ctx.addMutableState("long", "counter", v => s"$v = 0;")

// Добавить метод (для больших фрагментов кода)
val methodName = ctx.addNewFunction("processRow",
  s"""
     |private void processRow(InternalRow row) {
     |  // ...
     |}
  """.stripMargin)

// Вычислить выражение
val exprCode: ExprCode = expr.genCode(ctx)
// exprCode.code -- код для вычисления выражения
// exprCode.value -- Java-переменная с результатом
// exprCode.isNull -- Java-переменная с флагом null

Посмотреть сгенерированный код можно через:

spark.conf.set("spark.sql.codegen.wholeStage", "true")
spark.conf.set("spark.sql.codegen.comments", "true")  # добавит комментарии из Spark
df = spark.sql("SELECT id, amount * 1.1 FROM events WHERE amount > 100")
# В логах (DEBUG) найти строку "Generated code:"
# Или через Java debugging: прикрепить debugger и breakpoint в CodegenContext.compile()

Метрики физического оператора

Метрики — критически важная часть production-оператора. Без них отладка производительности невозможна:

case class MyOperatorExec(child: SparkPlan) extends UnaryExecNode {

  override val metrics: Map[String, SQLMetric] = Map(
    // Счётчик строк -- самый базовый
    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),

    // Время выполнения в нс
    "processingTime" -> SQLMetrics.createTimingMetric(sparkContext, "processing time"),

    // Размер данных -- автоматически форматируется как KB/MB/GB в UI
    "numBytesOutput" -> SQLMetrics.createSizeMetric(sparkContext, "bytes output"),

    // Среднее время (нс) -- для latency-метрик
    "avgRowSize" -> SQLMetrics.createAverageMetric(sparkContext, "average row size")
  )

  override protected def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")
    val processingTime = longMetric("processingTime")

    child.execute().mapPartitions { iter =>
      val startTime = System.nanoTime()
      iter.map { row =>
        numOutputRows += 1
        transform(row)
      } ++ Iterator.tabulate(0)(_ => {
        // Этот код не выполнится, но SQLMetric накапливается в accumulators
        processingTime += (System.nanoTime() - startTime) / 1000000
        null.asInstanceOf[InternalRow]
      })
    }
  }
}
NOTE

SQLMetric реализован через Spark Accumulators. Обновления метрик (longMetric(“x”) += n) автоматически агрегируются из всех executor-задач и отображаются в Spark UI на вкладке “SQL” в разделе “Details for Query”. Если вы не видите своих метрик в UI — убедитесь что добавили их в metrics Map с правильными именами.

Попробуй сам

Исследуем существующие физические операторы через Spark UI и explain:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, count

spark = SparkSession.builder \
    .appName("physical-operators-demo") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.codegen.comments", "true") \
    .getOrCreate()

# Создаём данные для демонстрации разных физических операторов
import random
data = [(i, f"product_{i % 50}", random.choice(["A","B","C","D"]),
         float(random.randint(1, 1000))) for i in range(100000)]

df = spark.createDataFrame(data, ["id", "product_name", "category", "amount"])

# --- Агрегация: смотрим на HashAggregateExec ---
agg_query = df.groupBy("category").agg(
    count("*").alias("cnt"),
    spark_sum("amount").alias("total")
)
print("=== HashAggregate Plan ===")
agg_query.explain("formatted")
# Найдём в плане:
# HashAggregate(keys=[category#3], functions=[count(1), sum(amount#4)])
# +- Exchange hashpartitioning(category#3, 200)
#    +- HashAggregate(keys=[category#3], functions=[partial_count(1), partial_sum(amount#4)])
# Обратите внимание: два HashAggregate -- partial (на каждом partition) и final (после shuffle)

# --- JOIN: SortMergeJoin vs BroadcastHashJoin ---
small_df = spark.createDataFrame(
    [("A", "Category A"), ("B", "Category B"), ("C", "Category C"), ("D", "Category D")],
    ["category_code", "category_name"]
)
small_df.cache()  # Кэшируем маленькую таблицу

join_query = df.join(small_df, df.category == small_df.category_code)
print("=== BroadcastHashJoin Plan (маленькая таблица) ===")
join_query.explain("formatted")
# С маленькой таблицей Spark выберет BroadcastHashJoin:
# BroadcastHashJoin [category#3], [category_code#10], Inner, BuildRight
# :- Scan ...
# +- BroadcastExchange HashedRelationBroadcastMode([category_code#10]), ...

# --- Window функция: WindowExec ---
from pyspark.sql.window import Window
w = Window.partitionBy("category").orderBy(col("amount").desc())
window_query = df.withColumn("rank", col("amount").cast("long"))

# --- Посмотрим на codegen ---
spark.conf.set("spark.sql.codegen.logging.maxLines", "1000")
simple_query = df.filter(col("amount") > 500).select("id", "amount")
simple_query.explain("codegen")
# Вывод покажет сгенерированный Java-код для Filter + Project
# Ищем: "/* 001 */ public Object generate(Object[] references)"
# Вся логика filter + projection скомпилирована в один Java-метод

# --- Измерим реальную производительность ---
import time

# Row-by-row (принудительно отключаем codegen)
spark.conf.set("spark.sql.codegen.wholeStage", "false")
start = time.time()
df.filter(col("amount") > 500).select("id", "amount").count()
no_codegen_time = time.time() - start

# С codegen
spark.conf.set("spark.sql.codegen.wholeStage", "true")
start = time.time()
df.filter(col("amount") > 500).select("id", "amount").count()
codegen_time = time.time() - start

print(f"Without codegen: {no_codegen_time:.3f}s")
print(f"With codegen:    {codegen_time:.3f}s")
print(f"Speedup: {no_codegen_time / codegen_time:.1f}x")
# На современном JVM ожидаем 1.5-3x ускорение от codegen на чистых вычислениях

Для исследования columnar execution с Parquet:

# Parquet использует vectorized reader -- это columnar execution
import os
df.write.parquet("/tmp/spark-demo-parquet")
df2 = spark.read.parquet("/tmp/spark-demo-parquet")

df2.filter(col("amount") > 500).explain("formatted")
# В плане увидим:
# *(1) Filter (amount#4 > 500.0)
# +- *(1) ColumnarToRow   <-- переход columnar -> row перед Filter
#    +- FileScan parquet   <-- читает vectorized ColumnarBatch (4096 строк за раз)

# Включим статистику vectorized reading
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")
df2.filter(col("amount") > 500).select("id", "amount").show(5)
# В Spark UI -> SQL tab -> Query details:
# "vectorized batch size" = 4096 (по умолчанию spark.sql.parquet.columnarReaderBatchSize)
TIP

При реализации кастомного физического оператора всегда начинайте с row-based doExecute(), убедитесь что семантика корректна, а потом добавляйте doExecuteColumnar() как оптимизацию. Тестировать columnar-версию отдельно труднее, поэтому сначала проверяйте корректность на строках, а скорость — на батчах. Также убедитесь что close() на ColumnarBatch вызывается в обоих ветках (успех и исключение) — утечка off-heap памяти от ColumnarBatch крайне трудно диагностируется в production.

Spark Performance Tuning: откуда берётся скорость
Проверка знанийKnowledge check
Вы реализовали кастомный физический оператор MyAggExec с поддержкой columnar execution (supportsColumnar=true, doExecuteColumnar() возвращает RDD[ColumnarBatch]). В production вы замечаете, что ваш оператор иногда работает медленнее стандартного HashAggregateExec, хотя на тестах был быстрее. При анализе Spark UI вы видите, что перед вашим оператором Spark вставил RowToColumnarExec, а после него -- ColumnarToRowExec. Что происходит и как это исправить?
ОтветAnswer
Проблема: дочерний оператор (child) не поддерживает columnar execution (child.supportsColumnar=false), поэтому Spark вставил RowToColumnarExec для конверсии строк в батчи перед вашим оператором. Аналогично, если родительский оператор не поддерживает columnar -- вставляется ColumnarToRowExec после. Эти конверсии дорогие: RowToColumnar копирует каждую строку в новый ColumnarBatch, выполняя allocation + copy для каждой строки. Если размер данных велик, overhead конверсий превышает выигрыш от columnar processing. Решения: 1) Переопределить supportsColumnar так чтобы он возвращал true ТОЛЬКО если child.supportsColumnar=true -- тогда Spark выберет row-based путь (doExecute) когда child row-based, и columnar путь (doExecuteColumnar) когда child columnar. Это правильный подход: `override def supportsColumnar: Boolean = child.supportsColumnar`. 2) Если конверсия неизбежна (вы хотите columnar всегда), убедитесь что ваш оператор даёт достаточное ускорение (хотя бы 5x) чтобы окупить оба RowToColumnar + ColumnarToRow. 3) Рассмотреть ColumnarRule, который вставляет RowToColumnarExec с более эффективной реализацией (например, Arrow-based конверсия вместо стандартной OnHeap конверсии). Диагностика: в explain("formatted") строки RowToColumnarExec и ColumnarToRowExec явно видны в плане.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Strategy.apply() вызвана с узлом GlobalLimit(10, Sort([amount DESC], DataSourceV2ScanRelation(HttpApiTable))). Ваша стратегия вернула Nil. Что произойдёт?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5