Нативный scan и columnar shuffle
Два компонента Comet обеспечивают максимальный прирост производительности: нативный scan (чтение Parquet без JVM overhead) и нативный columnar shuffle (перераспределение данных без row-конвертации). Разберём оба.
CometScanRule
CometScanRule перехватывает Spark-операторы чтения данных и заменяет их на нативные реализации:
class CometScanRule extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
plan.transform {
// Spark v1 data source API
case scan: FileSourceScanExec
if scan.relation.fileFormat.isInstanceOf[ParquetFileFormat] =>
CometScanExec(scan)
// Spark v2 data source API (Iceberg, Delta, etc.)
case scan: BatchScanExec
if isParquetBatchScan(scan) =>
CometBatchScanExec(scan)
}
}
}
CometScanRule работает только с Parquet. Для CSV, ORC и других форматов Comet использует стандартный Spark scan. Это не ограничение — Parquet покрывает подавляющее большинство аналитических хранилищ.
Три реализации scan
Comet предоставляет три backend-а для нативного чтения Parquet, переключаемых через конфигурацию spark.comet.scan.impl:
native_comet (по умолчанию)
Собственный Parquet-ридер Comet, оптимизированный для совместимости со Spark:
- Поддерживает все Spark-специфичные правила чтения (decimal precision, timestamp rebase, int96 calendar)
- Pushdown предикатов на уровень row group
- Projection pushdown (чтение только запрошенных столбцов)
- Конвертация напрямую в Arrow
RecordBatchбез промежуточных буферов
spark.comet.scan.impl=native_comet # по умолчанию
native_datafusion
Использует стандартный DataSourceExec из DataFusion:
- Лучшая поддержка сложных типов:
Map,Array,Struct - Асинхронное I/O через Tokio (pipeline с compute)
- Не все Spark-специфичные quirks воспроизведены — используйте когда совместимость не критична
spark.comet.scan.impl=native_datafusion
native_iceberg_compat
Нативный Iceberg scan через библиотеку iceberg-rust:
- Работает с Iceberg FileScan tasks напрямую, минуя Spark Iceberg connector
- Нативное чтение Parquet-файлов, на которые ссылается Iceberg metadata
- Доступно с Comet 0.12.0+
- Требует сборки с feature flag:
mvn ... -Piceberg
spark.comet.scan.impl=native_iceberg_compat
native_iceberg_compat — экспериментальная функциональность. Не все Iceberg-операции поддержаны (например, time travel и complex partition transforms). Для production используйте стандартный Iceberg connector с native_comet.
CometScanExec vs CometBatchScanExec
Spark имеет два API для источников данных, и Comet поддерживает оба:
| API | Spark-оператор | Comet-замена | Когда используется |
|---|---|---|---|
| DataSource V1 | FileSourceScanExec | CometScanExec | Стандартный Parquet, CSV, ORC |
| DataSource V2 | BatchScanExec | CometBatchScanExec | Iceberg, Delta Lake, кастомные source |
CometScanExec (V1)
Оборачивает FileSourceScanExec — основной случай для чистого Parquet:
case class CometScanExec(
originalScan: FileSourceScanExec,
override val output: Seq[Attribute]
) extends SparkPlan with CometPlan {
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
// 1. Получаем список Parquet-файлов от Spark
val files = originalScan.relation.location.listFiles(...)
// 2. Для каждого split — нативное чтение через JNI
files.map { file =>
Native.readParquet(file.getPath, schema, predicates)
}
}
}
CometBatchScanExec (V2)
Оборачивает BatchScanExec — для Iceberg и других V2-источников:
case class CometBatchScanExec(
originalScan: BatchScanExec
) extends SparkPlan with CometPlan {
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
// V2 API: получаем InputPartitions от Scan
val partitions = originalScan.batch.planInputPartitions()
// Для Iceberg: partition содержит FileScanTask
// с метаданными о Parquet-файлах
partitions.map { partition =>
Native.readPartition(partition, schema)
}
}
}
Выбор между V1 и V2 определяется конфигурацией источника данных, а не пользователем — Comet автоматически использует нужную обёртку.
Нативный Columnar Shuffle
Стандартный Spark shuffle сериализует данные в row-based формат (UnsafeRow) для перераспределения. Comet заменяет это на columnar shuffle — данные остаются в Arrow-формате:
CometShuffleExchangeExec
Замена стандартного ShuffleExchangeExec:
case class CometShuffleExchangeExec(
outputPartitioning: Partitioning,
child: SparkPlan
) extends Exchange with CometPlan {
// Поддерживаемые типы партиционирования:
// - HashPartitioning (самый частый — GROUP BY, JOIN)
// - RangePartitioning (ORDER BY)
// - SinglePartition (финальная агрегация)
// - RoundRobinPartitioning
}
Преимущества columnar shuffle:
- Нет ColumnarToRow/RowToColumnar — данные не конвертируются
- Лучшая компрессия — columnar формат сжимается эффективнее row-based
- Поддержка сложных типов — Array, Struct, Map передаются нативно через Arrow IPC
Поддержка типов в shuffle
Columnar shuffle поддерживает все Arrow-типы, включая вложенные:
Поддерживаемые типы:
Примитивные: Boolean, Int8..Int64, Float32, Float64, Decimal128, Utf8, Binary
Temporal: Date32, Timestamp (микро/нано, с/без timezone)
Complex: Struct, List, Map, LargeUtf8, LargeBinary
Special: Null, Dictionary-encoded
Spill-механизм
При превышении лимита off-heap памяти Comet сбрасывает промежуточные данные на диск:
spark.comet.memory.overhead.factor=0.2 # 20% от executor memory
spark.comet.exec.shuffle.spill.dir=/tmp/comet-spill
Как работает spill
DiskManagerотслеживает потребление off-heap памяти- При достижении порога — данные агрегации или shuffle сбрасываются в temp-файлы в формате Arrow IPC
- При чтении — merge из in-memory и on-disk буферов
- Temp-файлы удаляются по завершении query stage
// DiskManager (упрощённо)
pub struct DiskManager {
spill_dir: PathBuf,
memory_used: AtomicUsize,
memory_limit: usize,
}
impl DiskManager {
pub fn spill_if_needed(
&self,
batch: &RecordBatch,
) -> Result<SpillAction> {
if self.memory_used.load(Ordering::Relaxed) > self.memory_limit {
let path = self.spill_dir.join(format!("spill_{}.arrow", uuid()));
write_ipc(&path, batch)?;
Ok(SpillAction::Spilled(path))
} else {
Ok(SpillAction::InMemory)
}
}
}
Рекомендуется использовать off-heap memory (spark.memory.offHeap.enabled=true) совместно с Comet. Это позволяет DataFusion работать с off-heap буферами напрямую, минуя JVM heap и избегая GC overhead.
Метрики scan и shuffle
Comet обновляет Spark-метрики через JNI каждые ~3 секунды:
| Метрика | Описание |
|---|---|
numOutputRows | Количество строк после фильтрации |
numOutputBatches | Количество Arrow RecordBatch |
scanTime | Время нативного чтения (мс) |
shuffleWriteTime | Время записи shuffle-данных |
shuffleBytesWritten | Объём shuffle-данных (bytes) |
spillBytes | Объём данных, сброшенных на диск |
peakMemoryUsed | Пиковое потребление off-heap памяти |
Метрики доступны в Spark UI → SQL → Query Details → CometScanExec / CometShuffleExchangeExec.
Итоги
-
CometScanRuleзаменяет Parquet scan на нативное чтение — три реализации на выбор -
native_comet— по умолчанию, максимальная Spark-совместимость;native_datafusion— для сложных типов;native_iceberg_compat— для нативного Iceberg (0.12.0+) -
CometScanExec(V1 API) иCometBatchScanExec(V2 API) — автоматический выбор по типу источника - Columnar shuffle: данные остаются в Arrow IPC, нет ColumnarToRow / RowToColumnar конвертации
- Spill через
DiskManagerв формате Arrow IPC при превышении off-heap лимита - Метрики обновляются каждые 3 секунды через JNI — видны в Spark UI