Learning Platform
Глоссарий Troubleshooting
Урок 08.03 · 17 мин
Продвинутый
CometScanRuleCometScanExecCometBatchScanExecnative_cometnative_datafusionnative_iceberg_compatcolumnar shufflespill

Нативный scan и columnar shuffle

Два компонента Comet обеспечивают максимальный прирост производительности: нативный scan (чтение Parquet без JVM overhead) и нативный columnar shuffle (перераспределение данных без row-конвертации). Разберём оба.

CometScanRule

CometScanRule перехватывает Spark-операторы чтения данных и заменяет их на нативные реализации:

class CometScanRule extends Rule[SparkPlan] {
  override def apply(plan: SparkPlan): SparkPlan = {
    plan.transform {
      // Spark v1 data source API
      case scan: FileSourceScanExec
          if scan.relation.fileFormat.isInstanceOf[ParquetFileFormat] =>
        CometScanExec(scan)

      // Spark v2 data source API (Iceberg, Delta, etc.)
      case scan: BatchScanExec
          if isParquetBatchScan(scan) =>
        CometBatchScanExec(scan)
    }
  }
}
NOTE

CometScanRule работает только с Parquet. Для CSV, ORC и других форматов Comet использует стандартный Spark scan. Это не ограничение — Parquet покрывает подавляющее большинство аналитических хранилищ.

Три реализации scan

Comet предоставляет три backend-а для нативного чтения Parquet, переключаемых через конфигурацию spark.comet.scan.impl:

Три пути нативного scan
Parquet файлВходной Parquet-файл — основной формат аналитических хранилищ
spark.comet.scan.impl
native_cometСобственный Parquet-ридер Comet — максимальная совместимость со Spark quirks (decimal, timestamp rebase)
native_datafusionСтандартный DataSourceExec DataFusion — лучше для сложных типов, асинхронное I/O через Tokio
native_iceberg_compatНативный Iceberg scan через iceberg-rust — минует Spark Iceberg connector, доступен с 0.12.0
Arrow RecordBatchРезультат чтения — Arrow RecordBatch в off-heap памяти, без JVM heap аллокаций

native_comet (по умолчанию)

Собственный Parquet-ридер Comet, оптимизированный для совместимости со Spark:

  • Поддерживает все Spark-специфичные правила чтения (decimal precision, timestamp rebase, int96 calendar)
  • Pushdown предикатов на уровень row group
  • Projection pushdown (чтение только запрошенных столбцов)
  • Конвертация напрямую в Arrow RecordBatch без промежуточных буферов
spark.comet.scan.impl=native_comet  # по умолчанию

native_datafusion

Использует стандартный DataSourceExec из DataFusion:

  • Лучшая поддержка сложных типов: Map, Array, Struct
  • Асинхронное I/O через Tokio (pipeline с compute)
  • Не все Spark-специфичные quirks воспроизведены — используйте когда совместимость не критична
spark.comet.scan.impl=native_datafusion

native_iceberg_compat

Нативный Iceberg scan через библиотеку iceberg-rust:

  • Работает с Iceberg FileScan tasks напрямую, минуя Spark Iceberg connector
  • Нативное чтение Parquet-файлов, на которые ссылается Iceberg metadata
  • Доступно с Comet 0.12.0+
  • Требует сборки с feature flag: mvn ... -Piceberg
spark.comet.scan.impl=native_iceberg_compat
WARNING

native_iceberg_compat — экспериментальная функциональность. Не все Iceberg-операции поддержаны (например, time travel и complex partition transforms). Для production используйте стандартный Iceberg connector с native_comet.

CometScanExec vs CometBatchScanExec

Spark имеет два API для источников данных, и Comet поддерживает оба:

APISpark-операторComet-заменаКогда используется
DataSource V1FileSourceScanExecCometScanExecСтандартный Parquet, CSV, ORC
DataSource V2BatchScanExecCometBatchScanExecIceberg, Delta Lake, кастомные source

CometScanExec (V1)

Оборачивает FileSourceScanExec — основной случай для чистого Parquet:

case class CometScanExec(
    originalScan: FileSourceScanExec,
    override val output: Seq[Attribute]
) extends SparkPlan with CometPlan {

  override def doExecuteColumnar(): RDD[ColumnarBatch] = {
    // 1. Получаем список Parquet-файлов от Spark
    val files = originalScan.relation.location.listFiles(...)
    // 2. Для каждого split — нативное чтение через JNI
    files.map { file =>
      Native.readParquet(file.getPath, schema, predicates)
    }
  }
}

CometBatchScanExec (V2)

Оборачивает BatchScanExec — для Iceberg и других V2-источников:

case class CometBatchScanExec(
    originalScan: BatchScanExec
) extends SparkPlan with CometPlan {

  override def doExecuteColumnar(): RDD[ColumnarBatch] = {
    // V2 API: получаем InputPartitions от Scan
    val partitions = originalScan.batch.planInputPartitions()
    // Для Iceberg: partition содержит FileScanTask
    // с метаданными о Parquet-файлах
    partitions.map { partition =>
      Native.readPartition(partition, schema)
    }
  }
}

Выбор между V1 и V2 определяется конфигурацией источника данных, а не пользователем — Comet автоматически использует нужную обёртку.

Нативный Columnar Shuffle

Стандартный Spark shuffle сериализует данные в row-based формат (UnsafeRow) для перераспределения. Comet заменяет это на columnar shuffle — данные остаются в Arrow-формате:

Spark shuffle vs Comet columnar shuffle
Стандартный Spark shuffleRow-based shuffle: двойная конвертация columnar↔row снижает пропускную способность
Comet columnar shuffleColumnar shuffle: данные остаются в Arrow IPC — нет row-конвертации, лучшая компрессия

CometShuffleExchangeExec

Замена стандартного ShuffleExchangeExec:

case class CometShuffleExchangeExec(
    outputPartitioning: Partitioning,
    child: SparkPlan
) extends Exchange with CometPlan {
  // Поддерживаемые типы партиционирования:
  // - HashPartitioning (самый частый — GROUP BY, JOIN)
  // - RangePartitioning (ORDER BY)
  // - SinglePartition (финальная агрегация)
  // - RoundRobinPartitioning
}

Преимущества columnar shuffle:

  • Нет ColumnarToRow/RowToColumnar — данные не конвертируются
  • Лучшая компрессия — columnar формат сжимается эффективнее row-based
  • Поддержка сложных типов — Array, Struct, Map передаются нативно через Arrow IPC

Поддержка типов в shuffle

Columnar shuffle поддерживает все Arrow-типы, включая вложенные:

Поддерживаемые типы:
  Примитивные: Boolean, Int8..Int64, Float32, Float64, Decimal128, Utf8, Binary
  Temporal:    Date32, Timestamp (микро/нано, с/без timezone)
  Complex:     Struct, List, Map, LargeUtf8, LargeBinary
  Special:     Null, Dictionary-encoded

Spill-механизм

При превышении лимита off-heap памяти Comet сбрасывает промежуточные данные на диск:

spark.comet.memory.overhead.factor=0.2  # 20% от executor memory
spark.comet.exec.shuffle.spill.dir=/tmp/comet-spill

Как работает spill

  1. DiskManager отслеживает потребление off-heap памяти
  2. При достижении порога — данные агрегации или shuffle сбрасываются в temp-файлы в формате Arrow IPC
  3. При чтении — merge из in-memory и on-disk буферов
  4. Temp-файлы удаляются по завершении query stage
// DiskManager (упрощённо)
pub struct DiskManager {
    spill_dir: PathBuf,
    memory_used: AtomicUsize,
    memory_limit: usize,
}

impl DiskManager {
    pub fn spill_if_needed(
        &self,
        batch: &RecordBatch,
    ) -> Result<SpillAction> {
        if self.memory_used.load(Ordering::Relaxed) > self.memory_limit {
            let path = self.spill_dir.join(format!("spill_{}.arrow", uuid()));
            write_ipc(&path, batch)?;
            Ok(SpillAction::Spilled(path))
        } else {
            Ok(SpillAction::InMemory)
        }
    }
}
TIP

Рекомендуется использовать off-heap memory (spark.memory.offHeap.enabled=true) совместно с Comet. Это позволяет DataFusion работать с off-heap буферами напрямую, минуя JVM heap и избегая GC overhead.

Метрики scan и shuffle

Comet обновляет Spark-метрики через JNI каждые ~3 секунды:

МетрикаОписание
numOutputRowsКоличество строк после фильтрации
numOutputBatchesКоличество Arrow RecordBatch
scanTimeВремя нативного чтения (мс)
shuffleWriteTimeВремя записи shuffle-данных
shuffleBytesWrittenОбъём shuffle-данных (bytes)
spillBytesОбъём данных, сброшенных на диск
peakMemoryUsedПиковое потребление off-heap памяти

Метрики доступны в Spark UI → SQL → Query Details → CometScanExec / CometShuffleExchangeExec.

Итоги

  • CometScanRule заменяет Parquet scan на нативное чтение — три реализации на выбор
  • native_comet — по умолчанию, максимальная Spark-совместимость; native_datafusion — для сложных типов; native_iceberg_compat — для нативного Iceberg (0.12.0+)
  • CometScanExec (V1 API) и CometBatchScanExec (V2 API) — автоматический выбор по типу источника
  • Columnar shuffle: данные остаются в Arrow IPC, нет ColumnarToRow / RowToColumnar конвертации
  • Spill через DiskManager в формате Arrow IPC при превышении off-heap лимита
  • Метрики обновляются каждые 3 секунды через JNI — видны в Spark UI

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Comet предоставляет три реализации scan (native_comet, native_datafusion, native_iceberg_compat). Какая используется по умолчанию и почему?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5