Learning Platform
Глоссарий Troubleshooting
Урок 08.02 · 18 мин
Продвинутый
CometExecRuleprotobuf serializationJNI bridgePhysicalPlannerCometExpressionSerdeCometOperatorSerdefallback

Трансляция плана: от Spark к DataFusion

В предыдущем уроке мы увидели архитектуру Comet на высоком уровне: CometPlugin регистрирует правила, которые заменяют Spark-операторы на нативные. Теперь разберём, как именно физический план Spark превращается в DataFusion ExecutionPlan — от обхода дерева до десериализации на Rust-стороне.

CometExecRule: обход снизу вверх

CometExecRule — центральное правило трансляции. Оно получает физический план Spark (дерево SparkPlan) и обходит его снизу вверх (bottom-up):

class CometExecRule extends Rule[SparkPlan] {
  override def apply(plan: SparkPlan): SparkPlan = {
    plan.transformUp {
      case p: ProjectExec => tryConvert(p, CometProjectExec(_))
      case f: FilterExec  => tryConvert(f, CometFilterExec(_))
      case a: HashAggregateExec => tryConvert(a, CometHashAggregateExec(_))
      case j: SortMergeJoinExec => tryConvert(j, CometSortMergeJoinExec(_))
      case s: SortExec => tryConvert(s, CometSortExec(_))
      // ... десятки операторов
    }
  }
}

Порядок обхода критичен — bottom-up гарантирует, что к моменту замены родительского оператора все его потомки уже заменены (или помечены как incompatible).

Какие операторы поддерживаются

Comet 0.14.0 заменяет основные физические операторы Spark:

КатегорияОператоры
ScanFileSourceScanExec (Parquet), BatchScanExec
Projection / FilterProjectExec, FilterExec
AggregationHashAggregateExec (Partial, Final, Complete)
JoinSortMergeJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec
SortSortExec
ExchangeShuffleExchangeExec (native columnar shuffle)
WindowWindowExec
ExpandExpandExec (для CUBE, ROLLUP)

Каждый Spark-оператор имеет Comet-обёртку (CometProjectExec, CometFilterExec, …), которая сериализует подплан и делегирует исполнение Rust-стороне.

Protobuf-сериализация

Между JVM и Rust данные и планы передаются через protobuf. Файл expr.proto определяет иерархию сообщений:

Иерархия protobuf-сообщений
Spark PhysicalPlan (JVM)Исходный физический план Spark — дерево SparkPlan-операторов в JVM
CometExecRule
CometExec (Scala)Scala-обёртка, сериализующая поддеревья Spark-плана в protobuf для передачи в Rust
serialize to protobuf
OperatorProtobuf-сообщения для физических операторов: Filter, Project, Aggregate, Join, Sort
ExprProtobuf-сообщения для выражений: литералы, ссылки на колонки, бинарные операции, Cast
DataTypeProtobuf-представление типов данных Arrow: примитивные, вложенные (Struct, List, Map)
JNI (byte[])
Rust: PhysicalPlanner (planner.rs)Rust-сторона десериализует protobuf и рекурсивно строит дерево DataFusion ExecutionPlan
deserialize + build
DataFusion ExecutionPlanГотовый нативный план — стандартные DataFusion-узлы (FilterExec, ProjectionExec и др.)

Структура protobuf-сообщений

// expr.proto (упрощённо)
message Operator {
  oneof op_type {
    Scan scan = 1;
    Filter filter = 2;
    Projection projection = 3;
    Aggregate aggregate = 4;
    Sort sort = 5;
    SortMergeJoin sort_merge_join = 6;
    HashJoin hash_join = 7;
    ShuffleWriter shuffle_writer = 8;
    Expand expand = 9;
    Window window = 10;
    // ...
  }
  repeated Operator children = 100;
}

message Expr {
  oneof expr_type {
    Literal literal = 1;
    ColumnRef column_ref = 2;
    BinaryExpr binary_expr = 3;
    Cast cast = 4;
    ScalarFunc scalar_func = 5;
    AggregateFunc aggregate_func = 6;
    // ...
  }
}

Каждый Spark-оператор сериализует себя и все дочерние выражения в единый protobuf-граф. Размер сообщения — от килобайт для простых запросов до десятков килобайт для complex join chains.

JNI-мост

Коммуникация JVM-to-Rust происходит через Java Native Interface:

// CometNativeExec — Scala-сторона
object Native {
  @native def createPlan(
    serializedPlan: Array[Byte],  // protobuf bytes
    metrics: CometMetricNode,
    taskMemoryManager: Long
  ): Long  // указатель на нативный ExecutionPlan
}
// native.rs — Rust-сторона (через jni crate)
#[no_mangle]
pub extern "system" fn Java_org_apache_comet_Native_createPlan(
    env: JNIEnv,
    _class: JClass,
    serialized_plan: jbyteArray,
    // ...
) -> jlong {
    let bytes = env.convert_byte_array(serialized_plan).unwrap();
    let spark_plan = Operator::decode(&bytes[..]).unwrap();
    let exec_plan = PhysicalPlanner::new().create_plan(&spark_plan);
    Box::into_raw(Box::new(exec_plan)) as jlong
}

JNI-вызов происходит один раз на query stage — дальнейшее исполнение идёт полностью на Rust-стороне без обратных вызовов в JVM.

NOTE

JNI-overhead минимален: передаётся один protobuf blob (обычно несколько КБ) на старте query stage. В процессе исполнения DataFusion работает с данными нативно — JNI не вызывается до возврата результата.

PhysicalPlanner (planner.rs)

На Rust-стороне PhysicalPlanner десериализует protobuf и строит дерево DataFusion ExecutionPlan:

// planner.rs (упрощённо)
pub struct PhysicalPlanner;

impl PhysicalPlanner {
    pub fn create_plan(
        &self,
        spark_plan: &Operator,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        match &spark_plan.op_type {
            OpType::Scan(scan) => self.create_scan(scan),
            OpType::Filter(filter) => {
                let input = self.create_plan(&spark_plan.children[0])?;
                let predicate = self.create_expr(&filter.condition)?;
                Ok(Arc::new(FilterExec::try_new(predicate, input)?))
            }
            OpType::Projection(proj) => {
                let input = self.create_plan(&spark_plan.children[0])?;
                let exprs = proj.exprs.iter()
                    .map(|e| self.create_expr(e))
                    .collect::<Result<Vec<_>>>()?;
                Ok(Arc::new(ProjectionExec::try_new(exprs, input)?))
            }
            // ... другие операторы
        }
    }
}

Каждый protobuf Operator отображается на стандартный DataFusion ExecutionPlan. Comet не создаёт кастомные execution nodes — он использует те же FilterExec, ProjectionExec, HashJoinExec, что входят в DataFusion.

Фреймворк сериализации выражений

Comet использует trait-based систему для расширяемой сериализации:

CometExpressionSerde

/// Trait для сериализации Spark-выражений в DataFusion PhysicalExpr
pub trait CometExpressionSerde: Send + Sync {
    /// Имя Spark-выражения (для диагностики)
    fn spark_expression_name(&self) -> &str;

    /// Конвертация protobuf Expr → DataFusion PhysicalExpr
    fn to_datafusion(
        &self,
        expr: &Expr,
        input_schema: &Schema,
    ) -> Result<Arc<dyn PhysicalExpr>>;
}

CometOperatorSerde

/// Trait для сериализации Spark-операторов в DataFusion ExecutionPlan
pub trait CometOperatorSerde: Send + Sync {
    fn spark_operator_name(&self) -> &str;

    fn to_datafusion(
        &self,
        op: &Operator,
        inputs: Vec<Arc<dyn ExecutionPlan>>,
    ) -> Result<Arc<dyn ExecutionPlan>>;
}

Эти trait позволяют добавлять поддержку новых выражений и операторов без изменения ядра PhysicalPlanner. Каждая реализация регистрируется в SerdeFactory — аналогично registry-паттерну в DataFusion (рассмотренному в модуле 05 — Паттерны расширяемости).

TIP

Если вы хотите добавить поддержку нового Spark-выражения в Comet, нужно реализовать CometExpressionSerde на Rust-стороне и добавить protobuf-сообщение в expr.proto. На Scala-стороне — добавить ветку в CometExecRule для сериализации этого выражения.

Механизм Fallback

Fallback в Comet работает по принципу all-or-nothing на уровне query stage:

All-or-nothing fallback
Все операторы поддерживаютсяВесь query stage целиком заменён на нативное исполнение — максимальный прирост
Один оператор не поддерживаетсяОдин неподдерживаемый оператор или выражение — весь stage откатывается на JVM

Почему не частичная замена?

Частичная замена потребовала бы конвертации данных между нативным и JVM-исполнением на каждой границе:

CometFilterExec (native, Arrow columnar)
  ↓ ColumnarToRow (дорого!)
SparkProjectExec (JVM, InternalRow)
  ↓ RowToColumnar (дорого!)
CometScanExec (native, Arrow columnar)

Конвертация ColumnarToRow / RowToColumnar на каждой границе съедает выигрыш от нативного исполнения. Поэтому Comet проверяет весь query stage целиком: если хотя бы один оператор или выражение не поддерживается — весь stage выполняется в Spark.

WARNING

All-or-nothing fallback означает, что одно неподдерживаемое выражение (например, специфичный UDF) может заблокировать нативное ускорение для всего stage, даже если 99% операторов поддерживаются. Параметр spark.comet.expression.allowIncompatible=true ослабляет проверку, но может дать некорректные результаты для граничных случаев.

Диагностика fallback

Comet предоставляет расширенный формат EXPLAIN для диагностики:

-- Включить расширенную диагностику
SET spark.comet.explain.verbose.enabled=true;

EXPLAIN EXTENDED SELECT ...

Вывод показывает каждый оператор с пометкой [COMET] (заменён) или причиной fallback:

== Physical Plan ==
*(1) CometHashAggregate [COMET]  -- нативный
+- CometExchange hashpartitioning [COMET]  -- нативный shuffle
   +- CometHashAggregate (partial) [COMET]  -- нативный
      +- CometFilter [COMET]
         +- CometScan parquet [COMET]

-- или при fallback:
== Physical Plan ==
*(1) HashAggregate  -- JVM fallback
   reason: unsupported expression: MyCustomUDF

Пример: жизненный цикл трансляции

Рассмотрим полный путь простого запроса:

SELECT dept, SUM(salary)
FROM employees
WHERE age > 30
GROUP BY dept
  1. Spark Catalyst строит физический план: HashAggregate → Filter → FileSourceScan
  2. CometScanRule заменяет FileSourceScan на CometScanExec
  3. CometExecRule (bottom-up):
    • CometScanExec — уже заменён
    • FilterExec(age > 30) → проверяет: выражение > поддерживается, тип Int поддерживается → CometFilterExec
    • HashAggregateExec(SUM) → проверяет: SUM поддерживается, GROUP BY column поддерживается → CometHashAggregateExec
  4. CometNativeExec сериализует всё дерево в protobuf
  5. JNI передаёт byte[] в Rust
  6. PhysicalPlanner десериализует и строит: AggregateExec → FilterExec → DataSourceExec
  7. DataFusion исполняет план нативно, возвращает RecordBatch

Итоги

  • CometExecRule обходит Spark physical plan снизу вверх, заменяя поддерживаемые операторы
  • Protobuf (expr.proto) — единый формат сериализации операторов, выражений и типов данных между JVM и Rust
  • JNI-мост передаёт protobuf один раз на query stage — дальнейшее исполнение полностью нативное
  • PhysicalPlanner (planner.rs) строит стандартные DataFusion ExecutionPlan из protobuf
  • CometExpressionSerde / CometOperatorSerde — расширяемые trait для добавления поддержки новых выражений
  • Fallback — all-or-nothing на уровне query stage: частичная замена невыгодна из-за ColumnarToRow overhead

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. CometExecRule обходит физический план Spark снизу вверх (bottom-up). Почему выбран именно этот порядок?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5