Apache DataFusion Comet
Что такое Comet
Apache DataFusion Comet — Spark-плагин, написанный на Rust, который заменяет JVM-execution на нативное исполнение через Apache DataFusion. Comet был создан в Apple и передан в Apache Software Foundation в марте 2024 года.
Spark3.4 Comet поддерживает Spark 3.4.3—3.5.8 (полная поддержка) и Spark 4.0.1 (экспериментальная). Java 11/17, Scala 2.12/2.13.
Последний релиз — 0.14.0 (18 марта 2026), 189 PR от 21 контрибьютора. Ключевые улучшения: расширенная нативная Iceberg-интеграция (per-partition plan serialization, vended credentials) и нативная columnar-to-row конвертация по умолчанию (Rust вместо JVM). Comet проходит 97% тестов Spark SQL (24,000+ тестов по состоянию на v0.9.0), что делает его одним из наиболее совместимых нативных ускорителей.
Архитектура
Comet интегрируется через Spark Plugin API (см. урок L01) и перехватывает физический план в два этапа:
CometScanRule: ускорение чтения
Первый этап — замена scan-операторов:
FileSourceScanExec(DataSource v1) заменяется наCometScanExecBatchScanExec(DataSource v2) также заменяется наCometScanExec
CometScanExec декодирует Parquet нативно — Rust-код читает Parquet-файлы напрямую в Arrow RecordBatch, минуя Java Parquet reader и JVM-аллокации. Для scan-heavy queries это один из главных источников ускорения.
CometExecRule: ускорение исполнения
Второй этап — замена execution операторов. CometExecRule обходит план снизу вверх (bottom-up traversal) и заменяет поддерживаемые операторы на Comet-эквиваленты. Подряд идущие Comet-операторы объединяются в единый CometNativeExec node.
Pipeline трансляции
Полный путь от Spark-плана к нативному исполнению:
Spark PhysicalPlan
-> CometScanRule (scan acceleration)
-> CometExecRule (exec acceleration, bottom-up traversal)
-> Supported: CometNativeExec (ProtoBuf -> JNI -> DataFusion)
-> Unsupported: stays on Spark JVM (reason stored on node)
-> Arrow RecordBatch via Arrow FFI
CometNativeExec — ключевой узел. Он:
- Сериализует operator tree в Protocol Buffer формат
- Вызывает нативный Rust-код через JNI (Java Native Interface)
- Rust-код десериализует ProtoBuf в DataFusion ExecutionPlan
- DataFusion выполняет план с векторизацией и SIMD
- Результат — Arrow RecordBatch — возвращается в JVM через Arrow FFI (zero-copy transfer)
Fallback behavior
Fallback — критическая концепция для production использования Comet. Если оператор, выражение или тип данных не поддерживается нативно, Comet не заменяет его — он остаётся на JVM.
Диагностика fallback
Включите диагностику, чтобы видеть причины fallback:
spark.comet.explainFallback.enabled=true
Пример вывода:
WARN CometExecRule: Comet cannot execute operator SortMergeJoinExec:
Reason: Sort-merge join not supported, use spark.comet.exec.replaceSortMergeJoin=true
to replace with hash join
WARN CometExecRule: Comet cannot execute expression CurrentTimestamp:
Reason: Expression not supported in native execution
Причина fallback хранится на узле Spark-плана — это позволяет инструментам мониторинга автоматически отслеживать fallback rate.
Стратегия Comet: avoid partial replacement
Comet намеренно избегает частичной замены одного оператора, если это приведёт к C2R (ColumnarToRow) конвертации на входе следующего JVM-оператора. Вместо этого Comet оставляет всю цепочку на JVM, если не может заменить её целиком. Это предотвращает ситуацию, когда overhead конвертации нивелирует выигрыш от нативного исполнения (см. L01 KnowledgeCheck о частичной замене).
Конфигурация
Полный пример подключения Comet через spark-submit:
spark-submit \
--jars comet-spark-spark3.5_2.12-0.14.0.jar \
--conf spark.driver.extraClassPath=comet-spark-spark3.5_2.12-0.14.0.jar \
--conf spark.executor.extraClassPath=comet-spark-spark3.5_2.12-0.14.0.jar \
--conf spark.plugins=org.apache.spark.CometPlugin \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=16g \
--conf spark.comet.explainFallback.enabled=true \
your_spark_app.py
Разберём каждый параметр:
| Параметр | Назначение |
|---|---|
spark.plugins=org.apache.spark.CometPlugin | Активация Comet через Spark Plugin API |
spark.shuffle.manager=...CometShuffleManager | Замена shuffle на Comet-native (Arrow IPC batches) |
spark.memory.offHeap.enabled=true | Критично! Нативное исполнение требует off-heap память |
spark.memory.offHeap.size=16g | Размер off-heap для нативных операций. Без этого — crash или OOM |
spark.comet.explainFallback.enabled=true | Диагностика: какие операторы не поддержаны |
Дополнительные параметры
# Замена SortMergeJoin на hash join (Comet не поддерживает SMJ нативно)
spark.comet.exec.replaceSortMergeJoin=true
# Экспериментальный нативный Parquet reader на DataFusion
spark.comet.scan.impl=native_datafusion
CometShuffleManager использует Arrow IPC формат для shuffle-данных вместо Java serialization. Это устраняет сериализационный overhead на shuffle — одном из самых дорогих этапов Spark pipeline.
Off-heap memory — обязательная настройка. Включение Comet без spark.memory.offHeap.enabled=true и адекватного offHeap.size приведёт к crash или катастрофической деградации производительности. Нативный код (Rust/DataFusion) аллоцирует память вне JVM heap — без off-heap конфигурации эта память будет ограничена.
Бенчмарки
TPC-H результаты
Официальные бенчмарки Comet 0.11.0 на Spark 3.5.3:
| Метрика | Значение |
|---|---|
| Общее ускорение | 2.2—2.4x vs vanilla Spark |
| Hardware | AMD Ryzen 7950X 16-core, 128GB RAM, NVMe SSD |
| Режим | Single-node |
| Dataset | TPC-H standard scale factor |
Scan-heavy queries показывают наибольшее ускорение: нативный Parquet reader в Rust обходит Java Parquet reader на чтении, декодировании и десжатии данных. Queries с complex aggregations и hash joins также значительно ускоряются.
UDF-heavy queries не получают ускорения: пользовательские UDF выполняются на JVM, и Comet не может их оптимизировать. Если ваш workload на 80% состоит из UDF-вычислений, Comet не поможет.
Детальные per-query результаты доступны на официальной странице бенчмарков Comet.
Production adoption
Comet используется в production крупными компаниями:
- Apple — создатели Comet, используют для внутренних data pipelines
- eBay — контрибьюторы в open-source
- Kuaishou (Kwai) — китайская видео-платформа, scale data processing
- Airbnb — data engineering workloads
- TikTok/ByteDance — контрибьюторы
- Huawei, Alibaba — enterprise adoption
Native Iceberg scan
Начиная с версии 0.10.0, Comet поддерживает экспериментальный native Iceberg scan через iceberg-rust. Это позволяет читать Iceberg-таблицы нативно, минуя Java Iceberg reader. Для lakehouse workloads (см. М10) это перспективное направление.
Anti-patterns
1. Comet без off-heap конфигурации
# НЕПРАВИЛЬНО -- crash или деградация
spark.plugins=org.apache.spark.CometPlugin
# Нет spark.memory.offHeap.enabled=true
# Нет spark.memory.offHeap.size
Без off-heap конфигурации нативный код не получает память для работы. Всегда устанавливайте offHeap.enabled=true и offHeap.size при использовании Comet.
2. Не проверять fallback rate
Включение Comet без проверки explainFallback — рискованно. Если ваш workload на 70% состоит из неподдерживаемых операций, вы получите overhead от нативных библиотек без ускорения. Всегда проверяйте fallback rate на dev-окружении перед production deployment.
3. Comet для UDF-heavy workloads
Если ваш pipeline построен на PySpark UDF или Java UDF — Comet не ускорит его. Все UDF выполняются на JVM. Рассмотрите pandas UDF через Arrow (М06, М11) или переписывание UDF на built-in functions.
Итоги
Apache DataFusion Comet — Spark-плагин на Rust, который:
- Перехватывает физический план через CometScanRule и CometExecRule
- Транслирует operator tree в Protocol Buffer и исполняет через DataFusion
- Возвращает результаты через Arrow FFI (zero-copy)
- Обеспечивает 2.2—2.4x ускорение на TPC-H бенчмарках
- Требует off-heap memory и проверки fallback rate перед production deployment
- Поддерживает Spark 3.4+ с 97% совместимостью Spark SQL тестов
В следующем уроке мы разберём Apache Gluten — второй нативный движок, использующий C++/Velox или ClickHouse как execution backend.
Comet с точки зрения DataFusion Comet: трансляция планов