Справочник internals-конфигов Spark
Этот справочник охватывает конфигурации, непосредственно влияющие на внутреннее устройство и производительность Spark 4.0. Каждый конфиг сопровождается дефолтным значением, механизмом действия и сигналом “когда менять”. Конфиги, изменение которых требует особой осторожности, помечены явно.
Задать конфиг можно тремя способами (убывающий приоритет):
- программно:
spark.conf.set("key", "value") - при запуске:
spark-submit --conf key=value - в
spark-defaults.conf
1. Память executor’а
Правильная конфигурация памяти — фундамент стабильной работы в production. Неверный баланс между execution и storage memory — причина большинства OOM и spill-проблем.
Основные параметры
| Конфиг | Дефолт | Что делает |
|---|---|---|
spark.executor.memory | 1g | Heap JVM для каждого executor’а. Это основная настройка объёма работы. |
spark.executor.memoryOverhead | max(executorMemory * 0.10, minMemoryOverhead) | Внепроцессная память (native libs, Netty, PySpark worker). Вычисляется от heap, не суммируется. |
spark.executor.memoryOverheadFactor | 0.10 | Множитель для вычисления overhead если явное значение не задано. |
spark.driver.memory | 1g | Heap JVM драйвера. |
spark.driver.memoryOverhead | max(driverMemory * 0.10, minMemoryOverhead) | Off-heap overhead драйвера. |
Unified Memory Manager
| Конфиг | Дефолт | Что делает |
|---|---|---|
spark.memory.fraction | 0.6 | Доля heap (после Reserved Memory ~300 MB), отведённая под unified-пул (Execution + Storage). Оставшиеся 0.4 — для пользовательских объектов. |
spark.memory.storageFraction | 0.5 | Доля unified-пула, которую Storage Memory защищает от вытеснения Execution Memory. Нижняя граница, не фиксированное разделение. |
spark.memory.offHeap.enabled | false | Включает off-heap аллоцирование через Unsafe. Снижает GC-давление для больших heap. |
spark.memory.offHeap.size | 0 | Размер off-heap пула в байтах. Имеет смысл только при offHeap.enabled=true. |
spark.memory.fraction и spark.memory.storageFraction работают на уровне единственного executor’а, не кластера. Суммарная unified-память одного executor’а: (heapSize - 300MB) * 0.6. Execution Memory и Storage Memory делят эту область динамически — нет жёсткой границы, если одна сторона свободна.
Когда менять:
- Поднимите
spark.memory.fractionдо 0.7-0.75 если видите частые spill при небольшом числе кэшированных RDD. - Поднимите
spark.executor.memoryесли heap usage >85% по метрикам Spark UI. - Включайте
offHeapдля UnsafeRow-интенсивных join/sort workload’ов с heap >16 GB: GC паузы резко снижаются.
Ссылки на подробное описание
Детали UnifiedMemoryManager описаны в модуле 05 (Memory & Storage Internals).
2. Shuffle
Shuffle — самая дорогая операция. Эти конфиги определяют, сколько памяти тратится на буферы, когда происходит spill, и как данные передаются по сети.
Буферы и сжатие
| Конфиг | Дефолт | Что делает |
|---|---|---|
spark.shuffle.file.buffer | 32k | Размер in-memory буфера для каждого output-файла при записи shuffle. Больший буфер снижает число syscall write. |
spark.shuffle.sort.bypassMergeThreshold | 200 | При числе output-партиций ниже порога и без map-side aggregation используется bypass merge-sort writer (конкатенация файлов вместо сортировки). |
spark.shuffle.compress | true | Сжимает shuffle files. Снижает I/O, увеличивает CPU. |
spark.shuffle.spill.compress | true | Сжимает spill-файлы при сбросе на диск. |
spark.shuffle.manager | sort | Реализация ShuffleManager. В Spark 4.x единственное значение — sort (SortShuffleManager). |
Fetch и сеть
| Конфиг | Дефолт | Что делает |
|---|---|---|
spark.reducer.maxSizeInFlight | 48m | Максимальный суммарный объём данных, запрашиваемых reducer’ом у всех mapper’ов одновременно. Ограничивает потребление памяти на стороне reducer при fetch. |
spark.shuffle.io.maxRetries | 3 | Число повторных попыток при сетевых ошибках во время shuffle fetch. |
spark.shuffle.io.retryWait | 5s | Пауза между retry. Максимальная задержка: maxRetries * retryWait = 15s. |
Push-based shuffle
| Конфиг | Дефолт | Что делает |
|---|---|---|
spark.shuffle.push.enabled | false | Включает push-based shuffle: mapper’ы проталкивают блоки на ESS заранее, ESS мержит их. Снижает random IO при fetch. Требует ESS. |
spark.shuffle.service.enabled | false | Включает External Shuffle Service. Обязателен для Dynamic Allocation и push-based shuffle. |
SQL shuffle
| Конфиг | Дефолт | Что делает |
|---|---|---|
spark.sql.shuffle.partitions | 200 | Число партиций для shuffle в SQL-операциях (join, groupBy). Это “стартовое” число до коалесценции AQE. |
spark.default.parallelism | Зависит от кластера | Дефолтное число партиций для RDD-операций. Для SQL не применяется — там spark.sql.shuffle.partitions. |
spark.sql.shuffle.partitions=200 — историческое значение из эпохи малых кластеров. При AQE (включён по умолчанию) это число автоматически коалесцируется до оптимального. Для больших датасетов без AQE ставьте 2-4 * число ядер кластера. Практическое правило: целевой размер партиции 100-200 MB после shuffle.
Когда менять:
- Поднимите
spark.shuffle.file.bufferдо 64k-128k при большом числе map-задач на spinning disk. - Поднимите
spark.reducer.maxSizeInFlightдо 96m-128m при большой пропускной способности сети (10Gbit+) и видимых задержках fetch. - Снизьте
spark.shuffle.sort.bypassMergeThresholdдо 50-100 при workload с малым числом ключей, где сортировка избыточна.
3. Adaptive Query Execution (AQE)
AQE переоптимизирует план во время выполнения на основе реальной статистики shuffle. Включён по умолчанию с Spark 3.2.
Основные переключатели
| Конфиг | Дефолт | Что делает |
|---|---|---|
spark.sql.adaptive.enabled | true | Главный переключатель AQE. При false отключает всю адаптивную оптимизацию. |
spark.sql.adaptive.coalescePartitions.enabled | true | Коалесцирует смежные shuffle-партиции малого размера в одну. Уменьшает число задач и overhead планировщика. |
spark.sql.adaptive.skewJoin.enabled | true | Автоматически разбивает skewed-партиции при sort-merge join. |
spark.sql.adaptive.localShuffleReader.enabled | true | При конвертации shuffle-join в broadcast-join использует local reader вместо полного fetch. |
Partition coalescing
| Конфиг | Дефолт | Что делает |
|---|---|---|
spark.sql.adaptive.advisoryPartitionSizeInBytes | 64MB | Целевой размер партиции после коалесценции. AQE объединяет партиции, пока их сумма не достигнет этого значения. |
spark.sql.adaptive.coalescePartitions.minPartitionSize | 1MB | Минимальный размер партиции после коалесценции. Партиции меньше этого порога принудительно объединяются. |
spark.sql.adaptive.coalescePartitions.parallelismFirst | true | Приоритизирует параллелизм над размером: игнорирует advisoryPartitionSizeInBytes и объединяет только при превышении minPartitionSize. |
spark.sql.adaptive.coalescePartitions.initialPartitionNum | Нет (= spark.sql.shuffle.partitions) | Начальное число shuffle-партиций до коалесценции. Если не задан, равен spark.sql.shuffle.partitions. |
Skew join
| Конфиг | Дефолт | Что делает |
|---|---|---|
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes | 256MB | Партиция считается skewed, если её размер превышает этот порог И в skewedPartitionFactor раз больше медианного. |
spark.sql.adaptive.skewJoin.skewedPartitionFactor | 5.0 | Партиция считается skewed, если её размер в N раз больше медианы. Работает в паре с threshold. |
spark.sql.adaptive.forceOptimizeSkewedJoin | false | Принудительно применяет skew-оптимизацию даже если она требует дополнительного shuffle. |
Partition coalescing с дефолтным coalescePartitions.parallelismFirst=true означает, что AQE в первую очередь сохраняет параллелизм и объединяет партиции, только если они меньше minPartitionSize (1 MB). Чтобы активно укрупнять партиции до advisoryPartitionSizeInBytes, установите parallelismFirst=false. Это полезно при очень большом числе исходных партиций и медленных задачах из-за overhead планировщика.
Когда менять:
- Увеличьте
advisoryPartitionSizeInBytesдо 128-256 MB при больших агрегациях с малым числом ключей. - Снизьте
skewedPartitionThresholdInBytesдо 64-128 MB при агрессивном skew на малых датасетах. - Отключите AQE (
enabled=false) только при отладке производительности — иначе нет смысла.
4. Планировщик, локальность, спекуляция
Локальность задач
| Конфиг | Дефолт | Что делает |
|---|---|---|
spark.locality.wait | 3s | Время ожидания перед понижением уровня локальности. Применяется к переходу PROCESS_LOCAL -> NODE_LOCAL, NODE_LOCAL -> RACK_LOCAL. |
spark.locality.wait.node | spark.locality.wait | Ожидание конкретно для уровня NODE_LOCAL. По умолчанию равно базовому spark.locality.wait. |
spark.locality.wait.process | spark.locality.wait | Ожидание конкретно для уровня PROCESS_LOCAL. |
spark.locality.wait.rack | spark.locality.wait | Ожидание конкретно для уровня RACK_LOCAL. |
Спекулятивное выполнение
| Конфиг | Дефолт | Что делает |
|---|---|---|
spark.speculation | false | Включает запуск дублирующей задачи при медленно выполняющейся. |
spark.speculation.interval | 100ms | Интервал проверки медленных задач. |
spark.speculation.multiplier | 3 | Задача считается медленной, если её время превышает multiplier * медианное время задач стадии. |
spark.speculation.minTaskRuntime | 100ms | Минимальное время выполнения задачи до того, как её можно посчитать кандидатом на спекуляцию. |
spark.speculation.quantile | 0.75 | Доля завершённых задач стадии, необходимая для включения спекуляции. При 0.75 первые три четверти задач должны завершиться раньше. |
Провалы задач
| Конфиг | Дефолт | Что делает |
|---|---|---|
spark.task.maxFailures | 4 | Максимальное число провалов одной задачи до провала стадии. |
spark.task.cpus | 1 | Число ядер, резервируемых на одну задачу. Для тяжёлых ML-задач увеличивают до 2-4. |
spark.blacklist.enabled | false | Блокирует назначение задач на repeatedly failing executor’а или узлы. В Spark 3.1+ переименован в spark.excludeOnFailure.enabled. |
Dynamic Allocation
| Конфиг | Дефолт | Что делает |
|---|---|---|
spark.dynamicAllocation.enabled | false | Включает динамическое выделение executor’ов. |
spark.dynamicAllocation.minExecutors | 0 | Минимальное число executor’ов, которые не будут освобождены. |
spark.dynamicAllocation.maxExecutors | infinity | Верхняя граница числа executor’ов. |
spark.dynamicAllocation.initialExecutors | = minExecutors | Стартовое число executor’ов. |
spark.dynamicAllocation.executorIdleTimeout | 60s | Время простоя executor’а без задач до освобождения. |
spark.dynamicAllocation.cachedExecutorIdleTimeout | infinity | Время простоя executor’а с кэшированными данными до освобождения. По умолчанию такие executor’ы не освобождаются. |
spark.dynamicAllocation.schedulerBacklogTimeout | 1s | Время наличия очереди задач до запроса нового executor’а. |
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout | 15s | Интервал последующих запросов executor’ов при продолжающейся очереди. |
Dynamic Allocation требует spark.shuffle.service.enabled=true (External Shuffle Service) или push-based shuffle. Без ESS при освобождении executor’а теряются его shuffle файлы, и DAGScheduler вынужден перезапускать ShuffleMapStage. В Kubernetes-средах без ESS используют Remote Shuffle Service (RSS) или push-based shuffle.
Когда менять:
- Снизьте
spark.locality.waitдо 1s в Kubernetes (там нет rack topology и node locality работает хуже). - Включайте
spark.speculationтолько при workload’ах с гарантированной идемпотентностью задач и явным “хвостом” медленных задач. - Для Dynamic Allocation задавайте
minExecutorsбольше 0 (например, 2-4) если критична латентность первого запроса.
5. Сериализация
Сериализатор влияет на скорость shuffle, размер данных в сети и накладные расходы GC.
| Конфиг | Дефолт | Что делает |
|---|---|---|
spark.serializer | org.apache.spark.serializer.JavaSerializer | Сериализатор для shuffle и broadcast. KryoSerializer значительно быстрее и компактнее для большинства типов. |
spark.kryo.registrationRequired | false | При true Kryo выбрасывает исключение для незарегистрированных классов. Гарантирует явную регистрацию и оптимальную производительность. |
spark.kryo.registrator | нет | FQCN класса реализующего KryoRegistrator для регистрации пользовательских классов. |
spark.kryoserializer.buffer | 64k | Начальный размер буфера Kryo на каждый поток сериализации. |
spark.kryoserializer.buffer.max | 64m | Максимальный размер буфера Kryo. Если объект больше — выбрасывается исключение. |
spark.kryo.unsafe | false | Включает Kryo UnsafeInput/UnsafeOutput для более быстрой (de)сериализации через Unsafe API. |
Для большинства production workload’ов рекомендуется spark.serializer=org.apache.spark.serializer.KryoSerializer. Крио сериализует быстрее Java в 5-10x и выдаёт меньший по размеру output, что прямо ускоряет shuffle. Основное ограничение: кастомные классы нужно регистрировать явно для максимальной эффективности. Для DataFrame/Dataset workload’ов Spark внутри и так использует UnsafeRow — смена сериализатора влияет только на RDD-операции и broadcast.
Когда менять:
- Поднимайте
spark.kryoserializer.buffer.maxприBuffer overflowошибках Kryo (появляются в stacktrace). - Устанавливайте
kryo.registrationRequired=trueв production, если хотите гарантировать отсутствие неожиданных slow-path’ей сериализации.
6. Tungsten и кодогенерация
| Конфиг | Дефолт | Что делает |
|---|---|---|
spark.sql.codegen.wholeStage | true | Включает Whole-Stage CodeGen: генерирует единый Java-метод для цепочки операторов. Устраняет virtual dispatch. |
spark.sql.codegen.fallback | true | При ошибке кодогенерации автоматически переходит к интерпретируемому выполнению. |
spark.sql.codegen.maxFields | 100 | Максимальное число полей в схеме, при котором включается кодогенерация. При большем числе полей WSCG отключается. |
spark.sql.codegen.hugeMethodLimit | 65535 | Максимальный размер сгенерированного метода (в байткоде). При превышении Spark разбивает метод. |
spark.sql.execution.arrow.pyspark.enabled | false | Включает Apache Arrow для передачи данных между JVM и Python в PySpark. Ускоряет toPandas() и createDataFrame() в 10-100x. |
Когда менять:
- Отключайте
wholeStageтолько для диагностики:spark.conf.set("spark.sql.codegen.wholeStage", "false"). В production всегда должен быть включён. - Включайте
arrow.pyspark.enabledпри любом PySpark workload’е — это почти всегда ускорение без побочных эффектов.
7. SQL join стратегии
| Конфиг | Дефолт | Что делает |
|---|---|---|
spark.sql.autoBroadcastJoinThreshold | 10485760 (10 MB) | Таблица меньше этого размера автоматически broadcastится при join. -1 отключает автоматический broadcast. |
spark.sql.broadcastTimeout | 300 (секунд) | Таймаут ожидания broadcast-значения. При больших таблицах на медленной сети увеличивают. |
spark.sql.join.preferSortMergeJoin | true | При включённом предпочитает sort-merge join вместо shuffle hash join для больших таблиц. |
spark.sql.files.maxPartitionBytes | 134217728 (128 MB) | Максимальный размер блока данных при чтении файлов. Определяет начальный параллелизм чтения Parquet/ORC. |
spark.sql.autoBroadcastJoinThreshold основан на размере сжатых данных по статистике Spark Catalog. Если статистики нет (устаревшие или не собраны через ANALYZE TABLE), Spark не знает реальный размер и может не применить broadcast. При AQE threshold может быть переопределён spark.sql.adaptive.autoBroadcastJoinThreshold.
8. Structured Streaming
| Конфиг | Дефолт | Что делает |
|---|---|---|
spark.sql.streaming.checkpointLocation | нет | Дефолтная директория checkpoint для streaming запросов. Переопределяется через writeStream.option("checkpointLocation", ...). |
spark.sql.streaming.stateStore.providerClass | org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider | Реализация State Store. Для production используют RocksDB: RocksDBStateStoreProvider. |
spark.sql.streaming.minBatchesToRetain | 100 | Минимальное число batch’ей, хранимых в метаданных checkpoint. Влияет на время восстановления и размер директории checkpoint. |
spark.sql.streaming.noDataMicroBatches.enabled | true | При true Spark запускает micro-batch даже при отсутствии новых данных (для прогресса watermark и триггеров). При false batch пропускается — снижает нагрузку, но watermark продвигается медленнее. |
spark.sql.streaming.maxBatchesToRetainInMemory | 2 | Число batch’ей, кэшируемых в памяти для ускорения StreamingQueryProgress. |
spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled | false | При RocksDB state store включает инкрементальные changelog-checkpoint’ы вместо полных снапшотов. Снижает I/O при checkpoint’е. |
Для stateful streaming с большим состоянием (windowed join, session windows) замените дефолтный HDFS-based State Store на RocksDB: spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider. RocksDB хранит state on-disk с compact представлением, не держит всё в heap, и поддерживает changelog checkpoint’ы.
Когда менять:
- Снизьте
minBatchesToRetainдо 10-20 если checkpoint директория растёт слишком быстро и recovery с нуля приемлем. - Отключите
noDataMicroBatches.enabledдля источников с редкими данными чтобы снизить нагрузку на State Store при пустых batch’ах.
9. Быстрая таблица-шпаргалка по симптомам
| Симптом | Первые конфиги для проверки |
|---|---|
| OOM в executor | spark.executor.memory, spark.memory.fraction, spark.memory.offHeap.* |
| Много spill на диск | spark.memory.fraction (вверх), spark.sql.shuffle.partitions (вниз) |
| Медленный shuffle fetch | spark.reducer.maxSizeInFlight, spark.shuffle.io.maxRetries |
| Skewed join | spark.sql.adaptive.skewJoin.*, AQE enabled |
| Много мелких задач после shuffle | spark.sql.adaptive.coalescePartitions.*, advisoryPartitionSizeInBytes |
| Долгий broadcast | spark.sql.broadcastTimeout, spark.sql.autoBroadcastJoinThreshold |
| Медленная сериализация | spark.serializer -> KryoSerializer, spark.kryoserializer.buffer.max |
| Executor’ы долго не освобождаются | spark.dynamicAllocation.executorIdleTimeout |
| Slow tasks в конце стадии | spark.speculation=true, spark.speculation.multiplier |
| PySpark toPandas медленно | spark.sql.execution.arrow.pyspark.enabled=true |