Apache Comet: нативное ускорение Spark
Apache Spark остаётся стандартом для распределённой аналитики, но архитектурные ограничения JVM — overhead от garbage collection, row-based внутреннее представление в Whole-Stage CodeGen, ограниченная векторизация — снижают производительность на аналитических нагрузках. Apache Comet решает эту проблему: он заменяет JVM-исполнение физического плана на нативный код, построенный поверх DataFusion.
Проблема: узкие места JVM в Spark
Spark Whole-Stage CodeGen (WSCG) генерирует Java bytecode для цепочки операторов. Это быстрее итераторной модели Volcano, но:
- Garbage Collection — аналитические запросы создают миллионы промежуточных объектов. GC-паузы растут нелинейно с объёмом данных
- Row-based промежуточное представление — даже при чтении Parquet (columnar) данные конвертируются в
InternalRowдля обработки WSCG - Ограниченная SIMD-векторизация — JIT-компилятор JVM не гарантирует автовекторизацию hot loops
- Off-heap vs On-heap — управление памятью через Unsafe API добавляет complexity и не устраняет GC для метаданных
Что такое Comet
Apache Comet — плагин для Spark, который перехватывает физический план выполнения и заменяет JVM-операторы на нативные, использующие DataFusion в качестве execution engine. Ключевое свойство: zero code changes — приложения Spark продолжают работать без модификации.
Справа DataFusion работает с данными в формате Arrow (columnar) от начала до конца — нет конвертации columnar-to-row-to-columnar. Это главный источник ускорения.
Архитектура CometPlugin
Comet интегрируется в Spark через стандартный механизм SparkPlugin:
class CometPlugin extends SparkPlugin {
override def driverPlugin(): DriverPlugin = new CometDriverPlugin
override def executorPlugin(): ExecutorPlugin = new CometExecutorPlugin
}
При загрузке плагина (spark.plugins=org.apache.comet.CometPlugin) происходит следующее:
CometDriverPlugin.init()регистрируетCometSparkSessionExtensions- Extensions добавляют два правила в Spark-планировщик:
- CometScanRule — заменяет Parquet scan на нативный (CometScanExec)
- CometExecRule — заменяет операторы физического плана (Filter, Project, Aggregate, Join, Sort, …) на Comet-обёртки
- Каждый CometExec-оператор сериализует свой подплан в protobuf и передаёт через JNI в Rust-код
- Rust-сторона десериализует protobuf и строит
DataFusion ExecutionPlan - Исполнение происходит нативно, результат возвращается как Arrow
RecordBatch
Регистрация расширений
CometSparkSessionExtensions использует Spark Extension API для внедрения кастомных правил:
class CometSparkSessionExtensions
extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
// Правило замены scan-операторов
extensions.injectColumnarRule(_ => CometScanRule)
// Правило замены execution-операторов
extensions.injectColumnarRule(_ => CometExecRule)
}
}
injectColumnarRule — точка расширения Spark для замены физических операторов. Comet использует именно этот механизм, а не более низкоуровневый injectOptimizerRule, потому что работает на уровне физического плана, а не логического.
WSCG vs нативное исполнение: почему разница существенна
Для понимания выигрыша Comet полезно сравнить, как данные проходят через pipeline в стандартном Spark и в Comet:
Стандартный Spark pipeline
Parquet (columnar on disk)
→ Decode to InternalRow (row-based in JVM heap)
→ WSCG operator chain (row-by-row, JIT-compiled bytecode)
→ Shuffle: serialize UnsafeRow → network → deserialize UnsafeRow
→ WSCG operator chain (продолжение)
→ Output (InternalRow → client)
Ключевая проблема: данные конвертируются из columnar в row на входе и обрабатываются row-by-row. WSCG генерирует tight loops (без virtual dispatch), но JVM JIT не может гарантировать SIMD-векторизацию этих loops.
Comet pipeline
Parquet (columnar on disk)
→ Native decode to Arrow RecordBatch (columnar in off-heap memory)
→ DataFusion operators (vectorized, SIMD-aware)
→ Shuffle: Arrow IPC columnar → network → Arrow IPC columnar
→ DataFusion operators (продолжение)
→ Output (Arrow RecordBatch → JVM → client)
Данные остаются columnar на всех этапах. DataFusion использует Arrow compute kernels, которые компилируются с авто-векторизацией (-C target-cpu=native в Rust).
Производительность
Бенчмарки TPC-H (SF100, один executor, 8 ядер):
| Метрика | Значение |
|---|---|
| Средний прирост | 2.2x по сравнению со стандартным Spark |
| Совместимость | 97% тестов Spark SQL проходят |
| Формат данных | Arrow columnar (нет row-to-columnar конвертации) |
Прирост неравномерен по запросам:
- Scan + Filter + Project (Q1, Q6) — максимальный прирост (3-4x): нативный Parquet-ридер + SIMD фильтрация
- Complex Join + Aggregation (Q9, Q21) — умеренный прирост (1.5-2x): join-алгоритмы DataFusion сопоставимы с Spark
- Запросы с fallback — нет прироста: весь stage выполняется в JVM
Темп развития
Comet выпускает релизы примерно раз в 6 недель. За 2025 год вышло 8 релизов (0.6.0 — 0.13.0), каждый из которых расширял покрытие операторов и выражений. Актуальный релиз — 0.14.0 (2026-03-18, +189 PR от 21 контрибьютора). Это важно для production — с каждым релизом процент запросов с fallback снижается.
97% совместимость означает, что 3% запросов используют операторы или выражения, которые Comet ещё не поддерживает. Для них срабатывает fallback на стандартный Spark — без ошибок, но без ускорения. Механизм fallback рассматривается в следующем уроке.
Поддерживаемые версии
Comet 0.14.0 (актуальная версия, релиз 2026-03-18) поддерживает:
- Spark 3.5.x — основная production-ветка
- Spark 4.0.x — поддержка ANSI mode (default в Spark 4.0)
- Java 11, 17, 21 — все LTS-версии
- Scala 2.12, 2.13
Подключение
# Spark 3.5
spark-shell --packages org.apache.datafusion:comet-spark-spark3.5_2.12:0.14.0 \
--conf spark.plugins=org.apache.comet.CometPlugin \
--conf spark.comet.exec.enabled=true \
--conf spark.comet.convert.parquet.enabled=true
Параметр spark.comet.exec.enabled=true включает замену операторов (CometExecRule). Параметр spark.comet.convert.parquet.enabled=true включает нативный scan (CometScanRule). Оба нужны для полного ускорения — без exec.enabled Comet только читает данные нативно, но обрабатывает в JVM.
Comet в экосистеме DataFusion
Comet — один из нескольких проектов, использующих DataFusion как встроенный execution engine (паттерн «библиотека», рассмотренный в модуле 05 — Паттерны расширяемости). В отличие от InfluxDB или GreptimeDB, Comet не создаёт новую СУБД — он ускоряет существующую экосистему Spark.
Связь с другими модулями курса:
- Модуль 02 (Crate-архитектура) — Comet зависит от crate
datafusion-physical-planиdatafusion-common, детали в уроке о crate-зависимостях - Модуль 06 (Оптимизация) — Comet строит
ExecutionPlanиз тех же примитивов (FilterExec, ProjectionExec), что разобраны в уроке о физических правилах
Итоги
- Comet — Spark-плагин, заменяющий JVM codegen на нативное исполнение через DataFusion
-
CometPluginрегистрирует два правила:CometScanRule(scan) иCometExecRule(операторы) - Данные остаются в Arrow columnar формате — нет row-to-columnar конвертации
- Сериализация плана через protobuf + JNI bridge между JVM и Rust
- TPC-H SF100: 2.2x ускорение, 97% совместимость с тестами Spark SQL
- Поддержка Spark 3.5.x и 4.0.x, zero code changes для существующих приложений