Конфигурация и тюнинг Comet
Comet управляется параметрами spark.comet.* — поверх стандартной конфигурации Spark. Правильная настройка определяет, какие операторы ускоряются, сколько памяти доступно нативному движку и как диагностировать fallback.
Иерархия конфигурации
Параметры Comet организованы в три уровня:
Уровень 1: включение/выключение
# Основные переключатели
spark.comet.exec.enabled=true # Замена операторов (CometExecRule)
spark.comet.convert.parquet.enabled=true # Нативный scan (CometScanRule)
spark.comet.exec.shuffle.enabled=true # Нативный columnar shuffle
Без spark.comet.exec.enabled=true Comet только читает данные нативно (scan), но обрабатывает их в JVM. Для полного ускорения нужны оба параметра: scan + exec.
Уровень 2: выбор реализации
# Реализация scan (native_comet | native_datafusion | native_iceberg_compat)
spark.comet.scan.impl=native_comet
# Разрешить потенциально несовместимые выражения
spark.comet.expression.allowIncompatible=false
# Разрешить cast без проверки overflow (ускоряет, но менее безопасно)
spark.comet.cast.allowIncompatible=false
Уровень 3: тюнинг производительности
# Размер batch (количество строк в одном RecordBatch)
spark.comet.batchSize=8192
# Количество строк для columnar-to-row конвертации (для fallback)
spark.comet.columnar.toRow.batchSize=8192
# Интервал обновления метрик через JNI (мс)
spark.comet.metrics.updateIntervalMs=3000
Модель памяти
Comet использует off-heap память для нативного исполнения. Рекомендуемая конфигурация:
# Включить off-heap memory для Spark
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=4g
# Доля off-heap памяти для Comet (0.0–1.0)
spark.comet.memory.overhead.factor=0.2
# Минимальный размер overhead (bytes)
spark.comet.memory.overhead.min=402653184 # 384 MB
Как распределяется память
Off-heap память критична для производительности Comet. При spark.memory.offHeap.enabled=false Comet выделяет буферы в JVM heap, что увеличивает GC pressure — именно то, от чего Comet призван избавить. Всегда включайте off-heap для production.
Когда данные сбрасываются на диск
Если off-heap буфер превышает лимит, DiskManager сбрасывает данные:
# Директория для temp-файлов spill
spark.comet.exec.shuffle.spill.dir=/tmp/comet-spill
# Альтернативно — использовать Spark scratch dirs
spark.local.dir=/mnt/nvme/spark-scratch
DiskManager создаёт temp-файлы в формате Arrow IPC. После завершения query stage файлы удаляются автоматически. На NVMe-дисках spill-overhead минимален — bottleneck обычно в сериализации, а не в I/O.
Диагностика: EXPLAIN и метрики
Расширенный EXPLAIN
Comet добавляет форматы COMET_EXTENDED_EXPLAIN_FORMAT для анализа трансляции:
-- Включить расширенную диагностику Comet
SET spark.comet.explain.verbose.enabled=true;
-- Стандартный EXPLAIN покажет Comet-операторы
EXPLAIN SELECT dept, SUM(salary) FROM employees GROUP BY dept;
Вывод включает:
- Какие операторы заменены на Comet (
[COMET]) - Причины fallback для незаменённых операторов
- Protobuf plan summary (при verbose mode)
Метрики в Spark UI
Comet публикует метрики через стандартный Spark Metrics API:
| Группа | Метрики |
|---|---|
| Scan | scanTime, numOutputRows, numOutputBatches, readBytes |
| Exec | executionTime, numOutputRows, peakMemoryUsed |
| Shuffle | shuffleWriteTime, shuffleBytesWritten, shuffleRecordsWritten |
| Spill | spillBytes, spillCount, spillReadTime |
Метрики обновляются каждые 3 секунды (настраивается через spark.comet.metrics.updateIntervalMs). Значения видны в Spark UI → SQL tab → Query Details.
Метрики Comet обновляются периодически, а не в реальном времени. Для коротких запросов (< 3 секунд) итоговые метрики могут быть неточными. Для точного профилирования уменьшите интервал: spark.comet.metrics.updateIntervalMs=500.
Совместимость и fallback guide
Что вызывает fallback
Основные причины fallback на стандартный Spark:
| Причина | Пример | Решение |
|---|---|---|
| Неподдерживаемое выражение | Кастомный Spark UDF | Переписать как DataFusion UDF или использовать allowIncompatible |
| Неподдерживаемый тип данных | CalendarIntervalType | Дождаться поддержки в следующих версиях |
| Incompatible cast | String → Date с нестандартным форматом | spark.comet.cast.allowIncompatible=true (с осторожностью) |
| Complex partition expression | UDF в ключе партиционирования | Вынести UDF за пределы shuffle |
| Не-Parquet формат | ORC, CSV, JSON scan | Конвертировать данные в Parquet |
Проверка совместимости конкретного запроса
-- 1. Выполнить EXPLAIN EXTENDED
EXPLAIN EXTENDED
SELECT ...
-- 2. Искать в выводе строки без [COMET]
-- 3. Для каждого non-Comet оператора — проверить причину:
-- "unsupported expression: ..."
-- "unsupported data type: ..."
-- "incompatible cast: ..."
Стратегия постепенного включения
# Шаг 1: только scan (безопасно, максимальная совместимость)
spark.comet.exec.enabled=false
spark.comet.convert.parquet.enabled=true
# Шаг 2: scan + exec (основное ускорение)
spark.comet.exec.enabled=true
spark.comet.convert.parquet.enabled=true
# Шаг 3: scan + exec + shuffle (полное ускорение)
spark.comet.exec.enabled=true
spark.comet.convert.parquet.enabled=true
spark.comet.exec.shuffle.enabled=true
Интеграция с Iceberg
Comet поддерживает Iceberg-таблицы через два режима:
Режим 1: стандартный Iceberg connector + Comet scan
# Iceberg использует BatchScanExec → CometBatchScanExec
spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog
spark.comet.convert.parquet.enabled=true
spark.comet.scan.impl=native_comet
Comet перехватывает BatchScanExec, созданный Iceberg connector, и читает Parquet-файлы нативно. Iceberg metadata (snapshots, manifest files) обрабатывается стандартным Spark-коннектором.
Режим 2: нативный Iceberg scan (экспериментальный)
# Полностью нативное чтение Iceberg через iceberg-rust
spark.comet.scan.impl=native_iceberg_compat
В этом режиме Comet использует iceberg-rust для чтения Iceberg metadata и Parquet-данных. Это устраняет JVM overhead даже для metadata-операций, но:
- Доступно с Comet 0.12.0+
- Требует сборки с feature flag:
mvn ... -Piceberg - Не все Iceberg-операции поддерживаны (time travel, complex partition transforms)
- Production рекомендация — использовать режим 1
Для большинства production-сценариев режим 1 (стандартный Iceberg connector + Comet scan) — оптимальный выбор. Он сохраняет полную совместимость с Iceberg API при ускорении чтения данных.
ANSI mode (Spark 4.0)
Spark 4.0 включает ANSI mode по умолчанию (spark.sql.ansi.enabled=true), что меняет поведение при overflow и cast:
# Spark 4.0 default
spark.sql.ansi.enabled=true
# Поведение при overflow: ANSI → exception, legacy → silent wrap
# Int32.MAX_VALUE + 1:
# ANSI: ArithmeticException
# Legacy: -2147483648 (silent overflow)
Comet 0.14.0 (релиз 2026-03-18) поддерживает ANSI mode — нативные операторы корректно выбрасывают исключения при overflow вместо silent wrap, и теперь сообщения об ошибках точно соответствуют ожидаемому Spark output. Это важно для корректности при миграции на Spark 4.0.
Рекомендации по конфигурации
Минимальная production-конфигурация
# Plugin
spark.plugins=org.apache.comet.CometPlugin
spark.comet.exec.enabled=true
spark.comet.convert.parquet.enabled=true
spark.comet.exec.shuffle.enabled=true
# Memory
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=4g
# Diagnostics
spark.comet.explain.verbose.enabled=true
Конфигурация для максимальной производительности
# Всё из минимальной конфигурации, плюс:
# Увеличить batch size для больших таблиц
spark.comet.batchSize=16384
# NVMe директория для spill
spark.comet.exec.shuffle.spill.dir=/mnt/nvme/comet-spill
# Больше off-heap памяти
spark.memory.offHeap.size=8g
# Разрешить совместимые, но потенциально отличающиеся выражения
# (для запросов, где точность до бита не критична)
spark.comet.expression.allowIncompatible=true
Итоги
- Иерархия
spark.comet.*: включение (exec/scan/shuffle) → реализация (scan.impl, allowIncompatible) → тюнинг (batchSize, memory) - Off-heap память обязательна для production — без неё Comet увеличивает GC pressure
-
DiskManagerсбрасывает данные в Arrow IPC при превышении off-heap лимита -
spark.comet.explain.verbose.enabled— ключевой инструмент диагностики fallback - Iceberg: два режима — стандартный connector + Comet scan (рекомендуется) и нативный iceberg-rust (экспериментальный)
- ANSI mode поддерживается в Comet 0.14.0 — корректное поведение при overflow для Spark 4.0