Learning Platform
Глоссарий Troubleshooting
Урок 08.01 · 15 мин
Продвинутый
Apache CometCometPluginCometSparkSessionExtensionsCometScanRuleCometExecRuleSpark acceleration

Apache Comet: нативное ускорение Spark

Apache Spark остаётся стандартом для распределённой аналитики, но архитектурные ограничения JVM — overhead от garbage collection, row-based внутреннее представление в Whole-Stage CodeGen, ограниченная векторизация — снижают производительность на аналитических нагрузках. Apache Comet решает эту проблему: он заменяет JVM-исполнение физического плана на нативный код, построенный поверх DataFusion.

Проблема: узкие места JVM в Spark

Spark Whole-Stage CodeGen (WSCG) генерирует Java bytecode для цепочки операторов. Это быстрее итераторной модели Volcano, но:

  • Garbage Collection — аналитические запросы создают миллионы промежуточных объектов. GC-паузы растут нелинейно с объёмом данных
  • Row-based промежуточное представление — даже при чтении Parquet (columnar) данные конвертируются в InternalRow для обработки WSCG
  • Ограниченная SIMD-векторизация — JIT-компилятор JVM не гарантирует автовекторизацию hot loops
  • Off-heap vs On-heap — управление памятью через Unsafe API добавляет complexity и не устраняет GC для метаданных

Что такое Comet

Apache Comet — плагин для Spark, который перехватывает физический план выполнения и заменяет JVM-операторы на нативные, использующие DataFusion в качестве execution engine. Ключевое свойство: zero code changes — приложения Spark продолжают работать без модификации.

Spark-запрос: без Comet и с Comet
Без Comet (стандартный Spark)Стандартный конвейер Spark: columnar→row конвертация, JVM codegen, GC-паузы
С CometComet-конвейер: нативное columnar исполнение через DataFusion, zero copy

Справа DataFusion работает с данными в формате Arrow (columnar) от начала до конца — нет конвертации columnar-to-row-to-columnar. Это главный источник ускорения.

Архитектура CometPlugin

Comet интегрируется в Spark через стандартный механизм SparkPlugin:

class CometPlugin extends SparkPlugin {
  override def driverPlugin(): DriverPlugin = new CometDriverPlugin
  override def executorPlugin(): ExecutorPlugin = new CometExecutorPlugin
}

При загрузке плагина (spark.plugins=org.apache.comet.CometPlugin) происходит следующее:

  1. CometDriverPlugin.init() регистрирует CometSparkSessionExtensions
  2. Extensions добавляют два правила в Spark-планировщик:
    • CometScanRule — заменяет Parquet scan на нативный (CometScanExec)
    • CometExecRule — заменяет операторы физического плана (Filter, Project, Aggregate, Join, Sort, …) на Comet-обёртки
  3. Каждый CometExec-оператор сериализует свой подплан в protobuf и передаёт через JNI в Rust-код
  4. Rust-сторона десериализует protobuf и строит DataFusion ExecutionPlan
  5. Исполнение происходит нативно, результат возвращается как Arrow RecordBatch
Архитектура CometPlugin
SparkSession (spark.plugins = CometPlugin)Точка входа: плагин загружается через spark.plugins и инициализирует расширения
CometSparkSessionExtensionsРегистрирует CometScanRule и CometExecRule через Spark Extension API
CometScanRuleЗаменяет Parquet scan на нативный CometScanExec — чтение без row-конвертации
CometExecRuleЗаменяет физические операторы (Filter, Project, Agg, Join) на Comet-обёртки
protobuf + JNI
Rust: PhysicalPlanner (planner.rs)Rust-сторона десериализует protobuf и строит нативный план исполнения
DataFusion ExecutionPlanФинальный план исполняется нативно с Arrow compute kernels и SIMD

Регистрация расширений

CometSparkSessionExtensions использует Spark Extension API для внедрения кастомных правил:

class CometSparkSessionExtensions
    extends (SparkSessionExtensions => Unit) {
  override def apply(extensions: SparkSessionExtensions): Unit = {
    // Правило замены scan-операторов
    extensions.injectColumnarRule(_ => CometScanRule)
    // Правило замены execution-операторов
    extensions.injectColumnarRule(_ => CometExecRule)
  }
}

injectColumnarRule — точка расширения Spark для замены физических операторов. Comet использует именно этот механизм, а не более низкоуровневый injectOptimizerRule, потому что работает на уровне физического плана, а не логического.

WSCG vs нативное исполнение: почему разница существенна

Для понимания выигрыша Comet полезно сравнить, как данные проходят через pipeline в стандартном Spark и в Comet:

Стандартный Spark pipeline

Parquet (columnar on disk)
  → Decode to InternalRow (row-based in JVM heap)
  → WSCG operator chain (row-by-row, JIT-compiled bytecode)
  → Shuffle: serialize UnsafeRow → network → deserialize UnsafeRow
  → WSCG operator chain (продолжение)
  → Output (InternalRow → client)

Ключевая проблема: данные конвертируются из columnar в row на входе и обрабатываются row-by-row. WSCG генерирует tight loops (без virtual dispatch), но JVM JIT не может гарантировать SIMD-векторизацию этих loops.

Comet pipeline

Parquet (columnar on disk)
  → Native decode to Arrow RecordBatch (columnar in off-heap memory)
  → DataFusion operators (vectorized, SIMD-aware)
  → Shuffle: Arrow IPC columnar → network → Arrow IPC columnar
  → DataFusion operators (продолжение)
  → Output (Arrow RecordBatch → JVM → client)

Данные остаются columnar на всех этапах. DataFusion использует Arrow compute kernels, которые компилируются с авто-векторизацией (-C target-cpu=native в Rust).

Производительность

Бенчмарки TPC-H (SF100, один executor, 8 ядер):

МетрикаЗначение
Средний прирост2.2x по сравнению со стандартным Spark
Совместимость97% тестов Spark SQL проходят
Формат данныхArrow columnar (нет row-to-columnar конвертации)

Прирост неравномерен по запросам:

  • Scan + Filter + Project (Q1, Q6) — максимальный прирост (3-4x): нативный Parquet-ридер + SIMD фильтрация
  • Complex Join + Aggregation (Q9, Q21) — умеренный прирост (1.5-2x): join-алгоритмы DataFusion сопоставимы с Spark
  • Запросы с fallback — нет прироста: весь stage выполняется в JVM

Темп развития

Comet выпускает релизы примерно раз в 6 недель. За 2025 год вышло 8 релизов (0.6.0 — 0.13.0), каждый из которых расширял покрытие операторов и выражений. Актуальный релиз — 0.14.0 (2026-03-18, +189 PR от 21 контрибьютора). Это важно для production — с каждым релизом процент запросов с fallback снижается.

NOTE

97% совместимость означает, что 3% запросов используют операторы или выражения, которые Comet ещё не поддерживает. Для них срабатывает fallback на стандартный Spark — без ошибок, но без ускорения. Механизм fallback рассматривается в следующем уроке.

Поддерживаемые версии

Comet 0.14.0 (актуальная версия, релиз 2026-03-18) поддерживает:

  • Spark 3.5.x — основная production-ветка
  • Spark 4.0.x — поддержка ANSI mode (default в Spark 4.0)
  • Java 11, 17, 21 — все LTS-версии
  • Scala 2.12, 2.13

Подключение

# Spark 3.5
spark-shell --packages org.apache.datafusion:comet-spark-spark3.5_2.12:0.14.0 \
  --conf spark.plugins=org.apache.comet.CometPlugin \
  --conf spark.comet.exec.enabled=true \
  --conf spark.comet.convert.parquet.enabled=true
TIP

Параметр spark.comet.exec.enabled=true включает замену операторов (CometExecRule). Параметр spark.comet.convert.parquet.enabled=true включает нативный scan (CometScanRule). Оба нужны для полного ускорения — без exec.enabled Comet только читает данные нативно, но обрабатывает в JVM.

Comet в экосистеме DataFusion

Comet — один из нескольких проектов, использующих DataFusion как встроенный execution engine (паттерн «библиотека», рассмотренный в модуле 05 — Паттерны расширяемости). В отличие от InfluxDB или GreptimeDB, Comet не создаёт новую СУБД — он ускоряет существующую экосистему Spark.

Связь с другими модулями курса:

Итоги

  • Comet — Spark-плагин, заменяющий JVM codegen на нативное исполнение через DataFusion
  • CometPlugin регистрирует два правила: CometScanRule (scan) и CometExecRule (операторы)
  • Данные остаются в Arrow columnar формате — нет row-to-columnar конвертации
  • Сериализация плана через protobuf + JNI bridge между JVM и Rust
  • TPC-H SF100: 2.2x ускорение, 97% совместимость с тестами Spark SQL
  • Поддержка Spark 3.5.x и 4.0.x, zero code changes для существующих приложений
Spark + Comet: взгляд из JVM-мира Native execution в Spark: фундамент

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какую проблему JVM решает Apache Comet при выполнении аналитических запросов в Spark?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5