Learning Platform
Глоссарий Troubleshooting
Урок 14.02 · 20 мин
Продвинутый
DataFusion CometRustSpark PluginNative ExecutionArrow FFIBenchmarks

Apache DataFusion Comet

Что такое Comet

Apache DataFusion Comet — Spark-плагин, написанный на Rust, который заменяет JVM-execution на нативное исполнение через Apache DataFusion. Comet был создан в Apple и передан в Apache Software Foundation в марте 2024 года.

Spark3.4 Comet поддерживает Spark 3.4.3—3.5.8 (полная поддержка) и Spark 4.0.1 (экспериментальная). Java 11/17, Scala 2.12/2.13.

Последний релиз — 0.14.0 (18 марта 2026), 189 PR от 21 контрибьютора. Ключевые улучшения: расширенная нативная Iceberg-интеграция (per-partition plan serialization, vended credentials) и нативная columnar-to-row конвертация по умолчанию (Rust вместо JVM). Comet проходит 97% тестов Spark SQL (24,000+ тестов по состоянию на v0.9.0), что делает его одним из наиболее совместимых нативных ускорителей.

Архитектура

Comet интегрируется через Spark Plugin API (см. урок L01) и перехватывает физический план в два этапа:

CometScanRule: ускорение чтения

Первый этап — замена scan-операторов:

  • FileSourceScanExec (DataSource v1) заменяется на CometScanExec
  • BatchScanExec (DataSource v2) также заменяется на CometScanExec

CometScanExec декодирует Parquet нативно — Rust-код читает Parquet-файлы напрямую в Arrow RecordBatch, минуя Java Parquet reader и JVM-аллокации. Для scan-heavy queries это один из главных источников ускорения.

CometExecRule: ускорение исполнения

Второй этап — замена execution операторов. CometExecRule обходит план снизу вверх (bottom-up traversal) и заменяет поддерживаемые операторы на Comet-эквиваленты. Подряд идущие Comet-операторы объединяются в единый CometNativeExec node.

Pipeline трансляции

Полный путь от Spark-плана к нативному исполнению:

Spark PhysicalPlan
  -> CometScanRule (scan acceleration)
  -> CometExecRule (exec acceleration, bottom-up traversal)
    -> Supported: CometNativeExec (ProtoBuf -> JNI -> DataFusion)
    -> Unsupported: stays on Spark JVM (reason stored on node)
  -> Arrow RecordBatch via Arrow FFI

CometNativeExec — ключевой узел. Он:

  1. Сериализует operator tree в Protocol Buffer формат
  2. Вызывает нативный Rust-код через JNI (Java Native Interface)
  3. Rust-код десериализует ProtoBuf в DataFusion ExecutionPlan
  4. DataFusion выполняет план с векторизацией и SIMD
  5. Результат — Arrow RecordBatch — возвращается в JVM через Arrow FFI (zero-copy transfer)
Проверка знанийKnowledge check
ОтветAnswer

Fallback behavior

Fallback — критическая концепция для production использования Comet. Если оператор, выражение или тип данных не поддерживается нативно, Comet не заменяет его — он остаётся на JVM.

Диагностика fallback

Включите диагностику, чтобы видеть причины fallback:

spark.comet.explainFallback.enabled=true

Пример вывода:

WARN CometExecRule: Comet cannot execute operator SortMergeJoinExec:
  Reason: Sort-merge join not supported, use spark.comet.exec.replaceSortMergeJoin=true
  to replace with hash join

WARN CometExecRule: Comet cannot execute expression CurrentTimestamp:
  Reason: Expression not supported in native execution

Причина fallback хранится на узле Spark-плана — это позволяет инструментам мониторинга автоматически отслеживать fallback rate.

Стратегия Comet: avoid partial replacement

Comet намеренно избегает частичной замены одного оператора, если это приведёт к C2R (ColumnarToRow) конвертации на входе следующего JVM-оператора. Вместо этого Comet оставляет всю цепочку на JVM, если не может заменить её целиком. Это предотвращает ситуацию, когда overhead конвертации нивелирует выигрыш от нативного исполнения (см. L01 KnowledgeCheck о частичной замене).

Проверка знанийKnowledge check
ОтветAnswer

Конфигурация

Полный пример подключения Comet через spark-submit:

spark-submit \
  --jars comet-spark-spark3.5_2.12-0.14.0.jar \
  --conf spark.driver.extraClassPath=comet-spark-spark3.5_2.12-0.14.0.jar \
  --conf spark.executor.extraClassPath=comet-spark-spark3.5_2.12-0.14.0.jar \
  --conf spark.plugins=org.apache.spark.CometPlugin \
  --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
  --conf spark.memory.offHeap.enabled=true \
  --conf spark.memory.offHeap.size=16g \
  --conf spark.comet.explainFallback.enabled=true \
  your_spark_app.py

Разберём каждый параметр:

ПараметрНазначение
spark.plugins=org.apache.spark.CometPluginАктивация Comet через Spark Plugin API
spark.shuffle.manager=...CometShuffleManagerЗамена shuffle на Comet-native (Arrow IPC batches)
spark.memory.offHeap.enabled=trueКритично! Нативное исполнение требует off-heap память
spark.memory.offHeap.size=16gРазмер off-heap для нативных операций. Без этого — crash или OOM
spark.comet.explainFallback.enabled=trueДиагностика: какие операторы не поддержаны

Дополнительные параметры

# Замена SortMergeJoin на hash join (Comet не поддерживает SMJ нативно)
spark.comet.exec.replaceSortMergeJoin=true

# Экспериментальный нативный Parquet reader на DataFusion
spark.comet.scan.impl=native_datafusion

CometShuffleManager использует Arrow IPC формат для shuffle-данных вместо Java serialization. Это устраняет сериализационный overhead на shuffle — одном из самых дорогих этапов Spark pipeline.

WARNING

Off-heap memory — обязательная настройка. Включение Comet без spark.memory.offHeap.enabled=true и адекватного offHeap.size приведёт к crash или катастрофической деградации производительности. Нативный код (Rust/DataFusion) аллоцирует память вне JVM heap — без off-heap конфигурации эта память будет ограничена.

Бенчмарки

TPC-H результаты

Официальные бенчмарки Comet 0.11.0 на Spark 3.5.3:

МетрикаЗначение
Общее ускорение2.2—2.4x vs vanilla Spark
HardwareAMD Ryzen 7950X 16-core, 128GB RAM, NVMe SSD
РежимSingle-node
DatasetTPC-H standard scale factor

Scan-heavy queries показывают наибольшее ускорение: нативный Parquet reader в Rust обходит Java Parquet reader на чтении, декодировании и десжатии данных. Queries с complex aggregations и hash joins также значительно ускоряются.

UDF-heavy queries не получают ускорения: пользовательские UDF выполняются на JVM, и Comet не может их оптимизировать. Если ваш workload на 80% состоит из UDF-вычислений, Comet не поможет.

Детальные per-query результаты доступны на официальной странице бенчмарков Comet.

Проверка знанийKnowledge check
ОтветAnswer

Production adoption

Comet используется в production крупными компаниями:

  • Apple — создатели Comet, используют для внутренних data pipelines
  • eBay — контрибьюторы в open-source
  • Kuaishou (Kwai) — китайская видео-платформа, scale data processing
  • Airbnb — data engineering workloads
  • TikTok/ByteDance — контрибьюторы
  • Huawei, Alibaba — enterprise adoption

Native Iceberg scan

Начиная с версии 0.10.0, Comet поддерживает экспериментальный native Iceberg scan через iceberg-rust. Это позволяет читать Iceberg-таблицы нативно, минуя Java Iceberg reader. Для lakehouse workloads (см. М10) это перспективное направление.

Anti-patterns

1. Comet без off-heap конфигурации

# НЕПРАВИЛЬНО -- crash или деградация
spark.plugins=org.apache.spark.CometPlugin
# Нет spark.memory.offHeap.enabled=true
# Нет spark.memory.offHeap.size

Без off-heap конфигурации нативный код не получает память для работы. Всегда устанавливайте offHeap.enabled=true и offHeap.size при использовании Comet.

2. Не проверять fallback rate

Включение Comet без проверки explainFallback — рискованно. Если ваш workload на 70% состоит из неподдерживаемых операций, вы получите overhead от нативных библиотек без ускорения. Всегда проверяйте fallback rate на dev-окружении перед production deployment.

3. Comet для UDF-heavy workloads

Если ваш pipeline построен на PySpark UDF или Java UDF — Comet не ускорит его. Все UDF выполняются на JVM. Рассмотрите pandas UDF через Arrow (М06, М11) или переписывание UDF на built-in functions.

Итоги

Apache DataFusion Comet — Spark-плагин на Rust, который:

  • Перехватывает физический план через CometScanRule и CometExecRule
  • Транслирует operator tree в Protocol Buffer и исполняет через DataFusion
  • Возвращает результаты через Arrow FFI (zero-copy)
  • Обеспечивает 2.2—2.4x ускорение на TPC-H бенчмарках
  • Требует off-heap memory и проверки fallback rate перед production deployment
  • Поддерживает Spark 3.4+ с 97% совместимостью Spark SQL тестов

В следующем уроке мы разберём Apache Gluten — второй нативный движок, использующий C++/Velox или ClickHouse как execution backend.

Comet с точки зрения DataFusion Comet: трансляция планов

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 9. Comet перехватывает физический план Spark в два этапа. Какой этап отвечает за ускорение чтения Parquet?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6