Learning Platform
Глоссарий Troubleshooting
Урок 08.04 · 16 мин
Продвинутый
spark.comet.*off-heap memoryDiskManagerEXPLAINfallback diagnosticsIceberg integrationANSI modecompatibility

Конфигурация и тюнинг Comet

Comet управляется параметрами spark.comet.* — поверх стандартной конфигурации Spark. Правильная настройка определяет, какие операторы ускоряются, сколько памяти доступно нативному движку и как диагностировать fallback.

Иерархия конфигурации

Параметры Comet организованы в три уровня:

Уровень 1: включение/выключение

# Основные переключатели
spark.comet.exec.enabled=true           # Замена операторов (CometExecRule)
spark.comet.convert.parquet.enabled=true # Нативный scan (CometScanRule)
spark.comet.exec.shuffle.enabled=true    # Нативный columnar shuffle
NOTE

Без spark.comet.exec.enabled=true Comet только читает данные нативно (scan), но обрабатывает их в JVM. Для полного ускорения нужны оба параметра: scan + exec.

Уровень 2: выбор реализации

# Реализация scan (native_comet | native_datafusion | native_iceberg_compat)
spark.comet.scan.impl=native_comet

# Разрешить потенциально несовместимые выражения
spark.comet.expression.allowIncompatible=false

# Разрешить cast без проверки overflow (ускоряет, но менее безопасно)
spark.comet.cast.allowIncompatible=false

Уровень 3: тюнинг производительности

# Размер batch (количество строк в одном RecordBatch)
spark.comet.batchSize=8192

# Количество строк для columnar-to-row конвертации (для fallback)
spark.comet.columnar.toRow.batchSize=8192

# Интервал обновления метрик через JNI (мс)
spark.comet.metrics.updateIntervalMs=3000

Модель памяти

Comet использует off-heap память для нативного исполнения. Рекомендуемая конфигурация:

# Включить off-heap memory для Spark
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=4g

# Доля off-heap памяти для Comet (0.0–1.0)
spark.comet.memory.overhead.factor=0.2

# Минимальный размер overhead (bytes)
spark.comet.memory.overhead.min=402653184  # 384 MB

Как распределяется память

Модель памяти Comet
Executor Memory (spark.executor.memory)Общий объём памяти executor — делится между JVM heap и off-heap
JVM HeapJVM-heap используется Spark Storage и Execution — подвержен GC-паузам
Off-HeapOff-heap для Comet: Arrow-буферы, агрегация, сортировка, shuffle — без GC overhead
TIP

Off-heap память критична для производительности Comet. При spark.memory.offHeap.enabled=false Comet выделяет буферы в JVM heap, что увеличивает GC pressure — именно то, от чего Comet призван избавить. Всегда включайте off-heap для production.

Когда данные сбрасываются на диск

Если off-heap буфер превышает лимит, DiskManager сбрасывает данные:

# Директория для temp-файлов spill
spark.comet.exec.shuffle.spill.dir=/tmp/comet-spill

# Альтернативно — использовать Spark scratch dirs
spark.local.dir=/mnt/nvme/spark-scratch

DiskManager создаёт temp-файлы в формате Arrow IPC. После завершения query stage файлы удаляются автоматически. На NVMe-дисках spill-overhead минимален — bottleneck обычно в сериализации, а не в I/O.

Диагностика: EXPLAIN и метрики

Расширенный EXPLAIN

Comet добавляет форматы COMET_EXTENDED_EXPLAIN_FORMAT для анализа трансляции:

-- Включить расширенную диагностику Comet
SET spark.comet.explain.verbose.enabled=true;

-- Стандартный EXPLAIN покажет Comet-операторы
EXPLAIN SELECT dept, SUM(salary) FROM employees GROUP BY dept;

Вывод включает:

  • Какие операторы заменены на Comet ([COMET])
  • Причины fallback для незаменённых операторов
  • Protobuf plan summary (при verbose mode)

Метрики в Spark UI

Comet публикует метрики через стандартный Spark Metrics API:

ГруппаМетрики
ScanscanTime, numOutputRows, numOutputBatches, readBytes
ExecexecutionTime, numOutputRows, peakMemoryUsed
ShuffleshuffleWriteTime, shuffleBytesWritten, shuffleRecordsWritten
SpillspillBytes, spillCount, spillReadTime

Метрики обновляются каждые 3 секунды (настраивается через spark.comet.metrics.updateIntervalMs). Значения видны в Spark UI → SQL tab → Query Details.

WARNING

Метрики Comet обновляются периодически, а не в реальном времени. Для коротких запросов (< 3 секунд) итоговые метрики могут быть неточными. Для точного профилирования уменьшите интервал: spark.comet.metrics.updateIntervalMs=500.

Совместимость и fallback guide

Что вызывает fallback

Основные причины fallback на стандартный Spark:

ПричинаПримерРешение
Неподдерживаемое выражениеКастомный Spark UDFПереписать как DataFusion UDF или использовать allowIncompatible
Неподдерживаемый тип данныхCalendarIntervalTypeДождаться поддержки в следующих версиях
Incompatible castString → Date с нестандартным форматомspark.comet.cast.allowIncompatible=true (с осторожностью)
Complex partition expressionUDF в ключе партиционированияВынести UDF за пределы shuffle
Не-Parquet форматORC, CSV, JSON scanКонвертировать данные в Parquet

Проверка совместимости конкретного запроса

-- 1. Выполнить EXPLAIN EXTENDED
EXPLAIN EXTENDED
SELECT ...

-- 2. Искать в выводе строки без [COMET]
-- 3. Для каждого non-Comet оператора — проверить причину:
--    "unsupported expression: ..."
--    "unsupported data type: ..."
--    "incompatible cast: ..."

Стратегия постепенного включения

# Шаг 1: только scan (безопасно, максимальная совместимость)
spark.comet.exec.enabled=false
spark.comet.convert.parquet.enabled=true

# Шаг 2: scan + exec (основное ускорение)
spark.comet.exec.enabled=true
spark.comet.convert.parquet.enabled=true

# Шаг 3: scan + exec + shuffle (полное ускорение)
spark.comet.exec.enabled=true
spark.comet.convert.parquet.enabled=true
spark.comet.exec.shuffle.enabled=true

Интеграция с Iceberg

Comet поддерживает Iceberg-таблицы через два режима:

Режим 1: стандартный Iceberg connector + Comet scan

# Iceberg использует BatchScanExec → CometBatchScanExec
spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog
spark.comet.convert.parquet.enabled=true
spark.comet.scan.impl=native_comet

Comet перехватывает BatchScanExec, созданный Iceberg connector, и читает Parquet-файлы нативно. Iceberg metadata (snapshots, manifest files) обрабатывается стандартным Spark-коннектором.

Режим 2: нативный Iceberg scan (экспериментальный)

# Полностью нативное чтение Iceberg через iceberg-rust
spark.comet.scan.impl=native_iceberg_compat

В этом режиме Comet использует iceberg-rust для чтения Iceberg metadata и Parquet-данных. Это устраняет JVM overhead даже для metadata-операций, но:

  • Доступно с Comet 0.12.0+
  • Требует сборки с feature flag: mvn ... -Piceberg
  • Не все Iceberg-операции поддерживаны (time travel, complex partition transforms)
  • Production рекомендация — использовать режим 1
NOTE

Для большинства production-сценариев режим 1 (стандартный Iceberg connector + Comet scan) — оптимальный выбор. Он сохраняет полную совместимость с Iceberg API при ускорении чтения данных.

ANSI mode (Spark 4.0)

Spark 4.0 включает ANSI mode по умолчанию (spark.sql.ansi.enabled=true), что меняет поведение при overflow и cast:

# Spark 4.0 default
spark.sql.ansi.enabled=true

# Поведение при overflow: ANSI → exception, legacy → silent wrap
# Int32.MAX_VALUE + 1:
#   ANSI: ArithmeticException
#   Legacy: -2147483648 (silent overflow)

Comet 0.14.0 (релиз 2026-03-18) поддерживает ANSI mode — нативные операторы корректно выбрасывают исключения при overflow вместо silent wrap, и теперь сообщения об ошибках точно соответствуют ожидаемому Spark output. Это важно для корректности при миграции на Spark 4.0.

Рекомендации по конфигурации

Минимальная production-конфигурация

# Plugin
spark.plugins=org.apache.comet.CometPlugin
spark.comet.exec.enabled=true
spark.comet.convert.parquet.enabled=true
spark.comet.exec.shuffle.enabled=true

# Memory
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=4g

# Diagnostics
spark.comet.explain.verbose.enabled=true

Конфигурация для максимальной производительности

# Всё из минимальной конфигурации, плюс:

# Увеличить batch size для больших таблиц
spark.comet.batchSize=16384

# NVMe директория для spill
spark.comet.exec.shuffle.spill.dir=/mnt/nvme/comet-spill

# Больше off-heap памяти
spark.memory.offHeap.size=8g

# Разрешить совместимые, но потенциально отличающиеся выражения
# (для запросов, где точность до бита не критична)
spark.comet.expression.allowIncompatible=true

Итоги

  • Иерархия spark.comet.*: включение (exec/scan/shuffle) → реализация (scan.impl, allowIncompatible) → тюнинг (batchSize, memory)
  • Off-heap память обязательна для production — без неё Comet увеличивает GC pressure
  • DiskManager сбрасывает данные в Arrow IPC при превышении off-heap лимита
  • spark.comet.explain.verbose.enabled — ключевой инструмент диагностики fallback
  • Iceberg: два режима — стандартный connector + Comet scan (рекомендуется) и нативный iceberg-rust (экспериментальный)
  • ANSI mode поддерживается в Comet 0.14.0 — корректное поведение при overflow для Spark 4.0

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Параметр spark.comet.exec.enabled=true включает замену операторов. Что произойдёт, если включить только spark.comet.convert.parquet.enabled=true без exec.enabled?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5