Трансляция плана: от Spark к DataFusion
В предыдущем уроке мы увидели архитектуру Comet на высоком уровне: CometPlugin регистрирует правила, которые заменяют Spark-операторы на нативные. Теперь разберём, как именно физический план Spark превращается в DataFusion ExecutionPlan — от обхода дерева до десериализации на Rust-стороне.
CometExecRule: обход снизу вверх
CometExecRule — центральное правило трансляции. Оно получает физический план Spark (дерево SparkPlan) и обходит его снизу вверх (bottom-up):
class CometExecRule extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
plan.transformUp {
case p: ProjectExec => tryConvert(p, CometProjectExec(_))
case f: FilterExec => tryConvert(f, CometFilterExec(_))
case a: HashAggregateExec => tryConvert(a, CometHashAggregateExec(_))
case j: SortMergeJoinExec => tryConvert(j, CometSortMergeJoinExec(_))
case s: SortExec => tryConvert(s, CometSortExec(_))
// ... десятки операторов
}
}
}
Порядок обхода критичен — bottom-up гарантирует, что к моменту замены родительского оператора все его потомки уже заменены (или помечены как incompatible).
Какие операторы поддерживаются
Comet 0.14.0 заменяет основные физические операторы Spark:
| Категория | Операторы |
|---|---|
| Scan | FileSourceScanExec (Parquet), BatchScanExec |
| Projection / Filter | ProjectExec, FilterExec |
| Aggregation | HashAggregateExec (Partial, Final, Complete) |
| Join | SortMergeJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec |
| Sort | SortExec |
| Exchange | ShuffleExchangeExec (native columnar shuffle) |
| Window | WindowExec |
| Expand | ExpandExec (для CUBE, ROLLUP) |
Каждый Spark-оператор имеет Comet-обёртку (CometProjectExec, CometFilterExec, …), которая сериализует подплан и делегирует исполнение Rust-стороне.
Protobuf-сериализация
Между JVM и Rust данные и планы передаются через protobuf. Файл expr.proto определяет иерархию сообщений:
Структура protobuf-сообщений
// expr.proto (упрощённо)
message Operator {
oneof op_type {
Scan scan = 1;
Filter filter = 2;
Projection projection = 3;
Aggregate aggregate = 4;
Sort sort = 5;
SortMergeJoin sort_merge_join = 6;
HashJoin hash_join = 7;
ShuffleWriter shuffle_writer = 8;
Expand expand = 9;
Window window = 10;
// ...
}
repeated Operator children = 100;
}
message Expr {
oneof expr_type {
Literal literal = 1;
ColumnRef column_ref = 2;
BinaryExpr binary_expr = 3;
Cast cast = 4;
ScalarFunc scalar_func = 5;
AggregateFunc aggregate_func = 6;
// ...
}
}
Каждый Spark-оператор сериализует себя и все дочерние выражения в единый protobuf-граф. Размер сообщения — от килобайт для простых запросов до десятков килобайт для complex join chains.
JNI-мост
Коммуникация JVM-to-Rust происходит через Java Native Interface:
// CometNativeExec — Scala-сторона
object Native {
@native def createPlan(
serializedPlan: Array[Byte], // protobuf bytes
metrics: CometMetricNode,
taskMemoryManager: Long
): Long // указатель на нативный ExecutionPlan
}
// native.rs — Rust-сторона (через jni crate)
#[no_mangle]
pub extern "system" fn Java_org_apache_comet_Native_createPlan(
env: JNIEnv,
_class: JClass,
serialized_plan: jbyteArray,
// ...
) -> jlong {
let bytes = env.convert_byte_array(serialized_plan).unwrap();
let spark_plan = Operator::decode(&bytes[..]).unwrap();
let exec_plan = PhysicalPlanner::new().create_plan(&spark_plan);
Box::into_raw(Box::new(exec_plan)) as jlong
}
JNI-вызов происходит один раз на query stage — дальнейшее исполнение идёт полностью на Rust-стороне без обратных вызовов в JVM.
JNI-overhead минимален: передаётся один protobuf blob (обычно несколько КБ) на старте query stage. В процессе исполнения DataFusion работает с данными нативно — JNI не вызывается до возврата результата.
PhysicalPlanner (planner.rs)
На Rust-стороне PhysicalPlanner десериализует protobuf и строит дерево DataFusion ExecutionPlan:
// planner.rs (упрощённо)
pub struct PhysicalPlanner;
impl PhysicalPlanner {
pub fn create_plan(
&self,
spark_plan: &Operator,
) -> Result<Arc<dyn ExecutionPlan>> {
match &spark_plan.op_type {
OpType::Scan(scan) => self.create_scan(scan),
OpType::Filter(filter) => {
let input = self.create_plan(&spark_plan.children[0])?;
let predicate = self.create_expr(&filter.condition)?;
Ok(Arc::new(FilterExec::try_new(predicate, input)?))
}
OpType::Projection(proj) => {
let input = self.create_plan(&spark_plan.children[0])?;
let exprs = proj.exprs.iter()
.map(|e| self.create_expr(e))
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(ProjectionExec::try_new(exprs, input)?))
}
// ... другие операторы
}
}
}
Каждый protobuf Operator отображается на стандартный DataFusion ExecutionPlan. Comet не создаёт кастомные execution nodes — он использует те же FilterExec, ProjectionExec, HashJoinExec, что входят в DataFusion.
Фреймворк сериализации выражений
Comet использует trait-based систему для расширяемой сериализации:
CometExpressionSerde
/// Trait для сериализации Spark-выражений в DataFusion PhysicalExpr
pub trait CometExpressionSerde: Send + Sync {
/// Имя Spark-выражения (для диагностики)
fn spark_expression_name(&self) -> &str;
/// Конвертация protobuf Expr → DataFusion PhysicalExpr
fn to_datafusion(
&self,
expr: &Expr,
input_schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>>;
}
CometOperatorSerde
/// Trait для сериализации Spark-операторов в DataFusion ExecutionPlan
pub trait CometOperatorSerde: Send + Sync {
fn spark_operator_name(&self) -> &str;
fn to_datafusion(
&self,
op: &Operator,
inputs: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>;
}
Эти trait позволяют добавлять поддержку новых выражений и операторов без изменения ядра PhysicalPlanner. Каждая реализация регистрируется в SerdeFactory — аналогично registry-паттерну в DataFusion (рассмотренному в модуле 05 — Паттерны расширяемости).
Если вы хотите добавить поддержку нового Spark-выражения в Comet, нужно реализовать CometExpressionSerde на Rust-стороне и добавить protobuf-сообщение в expr.proto. На Scala-стороне — добавить ветку в CometExecRule для сериализации этого выражения.
Механизм Fallback
Fallback в Comet работает по принципу all-or-nothing на уровне query stage:
Почему не частичная замена?
Частичная замена потребовала бы конвертации данных между нативным и JVM-исполнением на каждой границе:
CometFilterExec (native, Arrow columnar)
↓ ColumnarToRow (дорого!)
SparkProjectExec (JVM, InternalRow)
↓ RowToColumnar (дорого!)
CometScanExec (native, Arrow columnar)
Конвертация ColumnarToRow / RowToColumnar на каждой границе съедает выигрыш от нативного исполнения. Поэтому Comet проверяет весь query stage целиком: если хотя бы один оператор или выражение не поддерживается — весь stage выполняется в Spark.
All-or-nothing fallback означает, что одно неподдерживаемое выражение (например, специфичный UDF) может заблокировать нативное ускорение для всего stage, даже если 99% операторов поддерживаются. Параметр spark.comet.expression.allowIncompatible=true ослабляет проверку, но может дать некорректные результаты для граничных случаев.
Диагностика fallback
Comet предоставляет расширенный формат EXPLAIN для диагностики:
-- Включить расширенную диагностику
SET spark.comet.explain.verbose.enabled=true;
EXPLAIN EXTENDED SELECT ...
Вывод показывает каждый оператор с пометкой [COMET] (заменён) или причиной fallback:
== Physical Plan ==
*(1) CometHashAggregate [COMET] -- нативный
+- CometExchange hashpartitioning [COMET] -- нативный shuffle
+- CometHashAggregate (partial) [COMET] -- нативный
+- CometFilter [COMET]
+- CometScan parquet [COMET]
-- или при fallback:
== Physical Plan ==
*(1) HashAggregate -- JVM fallback
reason: unsupported expression: MyCustomUDF
Пример: жизненный цикл трансляции
Рассмотрим полный путь простого запроса:
SELECT dept, SUM(salary)
FROM employees
WHERE age > 30
GROUP BY dept
- Spark Catalyst строит физический план:
HashAggregate → Filter → FileSourceScan - CometScanRule заменяет
FileSourceScanнаCometScanExec - CometExecRule (bottom-up):
CometScanExec— уже заменёнFilterExec(age > 30)→ проверяет: выражение>поддерживается, типIntподдерживается →CometFilterExecHashAggregateExec(SUM)→ проверяет:SUMподдерживается,GROUP BYcolumn поддерживается →CometHashAggregateExec
- CometNativeExec сериализует всё дерево в protobuf
- JNI передаёт
byte[]в Rust - PhysicalPlanner десериализует и строит:
AggregateExec → FilterExec → DataSourceExec - DataFusion исполняет план нативно, возвращает
RecordBatch
Итоги
-
CometExecRuleобходит Spark physical plan снизу вверх, заменяя поддерживаемые операторы - Protobuf (
expr.proto) — единый формат сериализации операторов, выражений и типов данных между JVM и Rust - JNI-мост передаёт protobuf один раз на query stage — дальнейшее исполнение полностью нативное
-
PhysicalPlanner(planner.rs) строит стандартные DataFusionExecutionPlanиз protobuf -
CometExpressionSerde/CometOperatorSerde— расширяемые trait для добавления поддержки новых выражений - Fallback — all-or-nothing на уровне query stage: частичная замена невыгодна из-за ColumnarToRow overhead