Learning Platform
Глоссарий Troubleshooting
Урок 17.02 · 20 мин
Продвинутый
ConfigurationTuningMemoryShuffleAQESchedulerSerializationStreamingReference

Справочник internals-конфигов Spark

Этот справочник охватывает конфигурации, непосредственно влияющие на внутреннее устройство и производительность Spark 4.0. Каждый конфиг сопровождается дефолтным значением, механизмом действия и сигналом “когда менять”. Конфиги, изменение которых требует особой осторожности, помечены явно.

Задать конфиг можно тремя способами (убывающий приоритет):

  • программно: spark.conf.set("key", "value")
  • при запуске: spark-submit --conf key=value
  • в spark-defaults.conf

1. Память executor’а

Правильная конфигурация памяти — фундамент стабильной работы в production. Неверный баланс между execution и storage memory — причина большинства OOM и spill-проблем.

Основные параметры

КонфигДефолтЧто делает
spark.executor.memory1gHeap JVM для каждого executor’а. Это основная настройка объёма работы.
spark.executor.memoryOverheadmax(executorMemory * 0.10, minMemoryOverhead)Внепроцессная память (native libs, Netty, PySpark worker). Вычисляется от heap, не суммируется.
spark.executor.memoryOverheadFactor0.10Множитель для вычисления overhead если явное значение не задано.
spark.driver.memory1gHeap JVM драйвера.
spark.driver.memoryOverheadmax(driverMemory * 0.10, minMemoryOverhead)Off-heap overhead драйвера.

Unified Memory Manager

КонфигДефолтЧто делает
spark.memory.fraction0.6Доля heap (после Reserved Memory ~300 MB), отведённая под unified-пул (Execution + Storage). Оставшиеся 0.4 — для пользовательских объектов.
spark.memory.storageFraction0.5Доля unified-пула, которую Storage Memory защищает от вытеснения Execution Memory. Нижняя граница, не фиксированное разделение.
spark.memory.offHeap.enabledfalseВключает off-heap аллоцирование через Unsafe. Снижает GC-давление для больших heap.
spark.memory.offHeap.size0Размер off-heap пула в байтах. Имеет смысл только при offHeap.enabled=true.
WARNING

spark.memory.fraction и spark.memory.storageFraction работают на уровне единственного executor’а, не кластера. Суммарная unified-память одного executor’а: (heapSize - 300MB) * 0.6. Execution Memory и Storage Memory делят эту область динамически — нет жёсткой границы, если одна сторона свободна.

Когда менять:

  • Поднимите spark.memory.fraction до 0.7-0.75 если видите частые spill при небольшом числе кэшированных RDD.
  • Поднимите spark.executor.memory если heap usage >85% по метрикам Spark UI.
  • Включайте offHeap для UnsafeRow-интенсивных join/sort workload’ов с heap >16 GB: GC паузы резко снижаются.

Ссылки на подробное описание

Детали UnifiedMemoryManager описаны в модуле 05 (Memory & Storage Internals).


2. Shuffle

Shuffle — самая дорогая операция. Эти конфиги определяют, сколько памяти тратится на буферы, когда происходит spill, и как данные передаются по сети.

Буферы и сжатие

КонфигДефолтЧто делает
spark.shuffle.file.buffer32kРазмер in-memory буфера для каждого output-файла при записи shuffle. Больший буфер снижает число syscall write.
spark.shuffle.sort.bypassMergeThreshold200При числе output-партиций ниже порога и без map-side aggregation используется bypass merge-sort writer (конкатенация файлов вместо сортировки).
spark.shuffle.compresstrueСжимает shuffle files. Снижает I/O, увеличивает CPU.
spark.shuffle.spill.compresstrueСжимает spill-файлы при сбросе на диск.
spark.shuffle.managersortРеализация ShuffleManager. В Spark 4.x единственное значение — sort (SortShuffleManager).

Fetch и сеть

КонфигДефолтЧто делает
spark.reducer.maxSizeInFlight48mМаксимальный суммарный объём данных, запрашиваемых reducer’ом у всех mapper’ов одновременно. Ограничивает потребление памяти на стороне reducer при fetch.
spark.shuffle.io.maxRetries3Число повторных попыток при сетевых ошибках во время shuffle fetch.
spark.shuffle.io.retryWait5sПауза между retry. Максимальная задержка: maxRetries * retryWait = 15s.

Push-based shuffle

КонфигДефолтЧто делает
spark.shuffle.push.enabledfalseВключает push-based shuffle: mapper’ы проталкивают блоки на ESS заранее, ESS мержит их. Снижает random IO при fetch. Требует ESS.
spark.shuffle.service.enabledfalseВключает External Shuffle Service. Обязателен для Dynamic Allocation и push-based shuffle.

SQL shuffle

КонфигДефолтЧто делает
spark.sql.shuffle.partitions200Число партиций для shuffle в SQL-операциях (join, groupBy). Это “стартовое” число до коалесценции AQE.
spark.default.parallelismЗависит от кластераДефолтное число партиций для RDD-операций. Для SQL не применяется — там spark.sql.shuffle.partitions.
TIP

spark.sql.shuffle.partitions=200 — историческое значение из эпохи малых кластеров. При AQE (включён по умолчанию) это число автоматически коалесцируется до оптимального. Для больших датасетов без AQE ставьте 2-4 * число ядер кластера. Практическое правило: целевой размер партиции 100-200 MB после shuffle.

Когда менять:

  • Поднимите spark.shuffle.file.buffer до 64k-128k при большом числе map-задач на spinning disk.
  • Поднимите spark.reducer.maxSizeInFlight до 96m-128m при большой пропускной способности сети (10Gbit+) и видимых задержках fetch.
  • Снизьте spark.shuffle.sort.bypassMergeThreshold до 50-100 при workload с малым числом ключей, где сортировка избыточна.

3. Adaptive Query Execution (AQE)

AQE переоптимизирует план во время выполнения на основе реальной статистики shuffle. Включён по умолчанию с Spark 3.2.

Основные переключатели

КонфигДефолтЧто делает
spark.sql.adaptive.enabledtrueГлавный переключатель AQE. При false отключает всю адаптивную оптимизацию.
spark.sql.adaptive.coalescePartitions.enabledtrueКоалесцирует смежные shuffle-партиции малого размера в одну. Уменьшает число задач и overhead планировщика.
spark.sql.adaptive.skewJoin.enabledtrueАвтоматически разбивает skewed-партиции при sort-merge join.
spark.sql.adaptive.localShuffleReader.enabledtrueПри конвертации shuffle-join в broadcast-join использует local reader вместо полного fetch.

Partition coalescing

КонфигДефолтЧто делает
spark.sql.adaptive.advisoryPartitionSizeInBytes64MBЦелевой размер партиции после коалесценции. AQE объединяет партиции, пока их сумма не достигнет этого значения.
spark.sql.adaptive.coalescePartitions.minPartitionSize1MBМинимальный размер партиции после коалесценции. Партиции меньше этого порога принудительно объединяются.
spark.sql.adaptive.coalescePartitions.parallelismFirsttrueПриоритизирует параллелизм над размером: игнорирует advisoryPartitionSizeInBytes и объединяет только при превышении minPartitionSize.
spark.sql.adaptive.coalescePartitions.initialPartitionNumНет (= spark.sql.shuffle.partitions)Начальное число shuffle-партиций до коалесценции. Если не задан, равен spark.sql.shuffle.partitions.

Skew join

КонфигДефолтЧто делает
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes256MBПартиция считается skewed, если её размер превышает этот порог И в skewedPartitionFactor раз больше медианного.
spark.sql.adaptive.skewJoin.skewedPartitionFactor5.0Партиция считается skewed, если её размер в N раз больше медианы. Работает в паре с threshold.
spark.sql.adaptive.forceOptimizeSkewedJoinfalseПринудительно применяет skew-оптимизацию даже если она требует дополнительного shuffle.
NOTE

Partition coalescing с дефолтным coalescePartitions.parallelismFirst=true означает, что AQE в первую очередь сохраняет параллелизм и объединяет партиции, только если они меньше minPartitionSize (1 MB). Чтобы активно укрупнять партиции до advisoryPartitionSizeInBytes, установите parallelismFirst=false. Это полезно при очень большом числе исходных партиций и медленных задачах из-за overhead планировщика.

Когда менять:

  • Увеличьте advisoryPartitionSizeInBytes до 128-256 MB при больших агрегациях с малым числом ключей.
  • Снизьте skewedPartitionThresholdInBytes до 64-128 MB при агрессивном skew на малых датасетах.
  • Отключите AQE (enabled=false) только при отладке производительности — иначе нет смысла.

4. Планировщик, локальность, спекуляция

Локальность задач

КонфигДефолтЧто делает
spark.locality.wait3sВремя ожидания перед понижением уровня локальности. Применяется к переходу PROCESS_LOCAL -> NODE_LOCAL, NODE_LOCAL -> RACK_LOCAL.
spark.locality.wait.nodespark.locality.waitОжидание конкретно для уровня NODE_LOCAL. По умолчанию равно базовому spark.locality.wait.
spark.locality.wait.processspark.locality.waitОжидание конкретно для уровня PROCESS_LOCAL.
spark.locality.wait.rackspark.locality.waitОжидание конкретно для уровня RACK_LOCAL.

Спекулятивное выполнение

КонфигДефолтЧто делает
spark.speculationfalseВключает запуск дублирующей задачи при медленно выполняющейся.
spark.speculation.interval100msИнтервал проверки медленных задач.
spark.speculation.multiplier3Задача считается медленной, если её время превышает multiplier * медианное время задач стадии.
spark.speculation.minTaskRuntime100msМинимальное время выполнения задачи до того, как её можно посчитать кандидатом на спекуляцию.
spark.speculation.quantile0.75Доля завершённых задач стадии, необходимая для включения спекуляции. При 0.75 первые три четверти задач должны завершиться раньше.

Провалы задач

КонфигДефолтЧто делает
spark.task.maxFailures4Максимальное число провалов одной задачи до провала стадии.
spark.task.cpus1Число ядер, резервируемых на одну задачу. Для тяжёлых ML-задач увеличивают до 2-4.
spark.blacklist.enabledfalseБлокирует назначение задач на repeatedly failing executor’а или узлы. В Spark 3.1+ переименован в spark.excludeOnFailure.enabled.

Dynamic Allocation

КонфигДефолтЧто делает
spark.dynamicAllocation.enabledfalseВключает динамическое выделение executor’ов.
spark.dynamicAllocation.minExecutors0Минимальное число executor’ов, которые не будут освобождены.
spark.dynamicAllocation.maxExecutorsinfinityВерхняя граница числа executor’ов.
spark.dynamicAllocation.initialExecutors= minExecutorsСтартовое число executor’ов.
spark.dynamicAllocation.executorIdleTimeout60sВремя простоя executor’а без задач до освобождения.
spark.dynamicAllocation.cachedExecutorIdleTimeoutinfinityВремя простоя executor’а с кэшированными данными до освобождения. По умолчанию такие executor’ы не освобождаются.
spark.dynamicAllocation.schedulerBacklogTimeout1sВремя наличия очереди задач до запроса нового executor’а.
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout15sИнтервал последующих запросов executor’ов при продолжающейся очереди.
WARNING

Dynamic Allocation требует spark.shuffle.service.enabled=true (External Shuffle Service) или push-based shuffle. Без ESS при освобождении executor’а теряются его shuffle файлы, и DAGScheduler вынужден перезапускать ShuffleMapStage. В Kubernetes-средах без ESS используют Remote Shuffle Service (RSS) или push-based shuffle.

Когда менять:

  • Снизьте spark.locality.wait до 1s в Kubernetes (там нет rack topology и node locality работает хуже).
  • Включайте spark.speculation только при workload’ах с гарантированной идемпотентностью задач и явным “хвостом” медленных задач.
  • Для Dynamic Allocation задавайте minExecutors больше 0 (например, 2-4) если критична латентность первого запроса.

5. Сериализация

Сериализатор влияет на скорость shuffle, размер данных в сети и накладные расходы GC.

КонфигДефолтЧто делает
spark.serializerorg.apache.spark.serializer.JavaSerializerСериализатор для shuffle и broadcast. KryoSerializer значительно быстрее и компактнее для большинства типов.
spark.kryo.registrationRequiredfalseПри true Kryo выбрасывает исключение для незарегистрированных классов. Гарантирует явную регистрацию и оптимальную производительность.
spark.kryo.registratorнетFQCN класса реализующего KryoRegistrator для регистрации пользовательских классов.
spark.kryoserializer.buffer64kНачальный размер буфера Kryo на каждый поток сериализации.
spark.kryoserializer.buffer.max64mМаксимальный размер буфера Kryo. Если объект больше — выбрасывается исключение.
spark.kryo.unsafefalseВключает Kryo UnsafeInput/UnsafeOutput для более быстрой (de)сериализации через Unsafe API.
TIP

Для большинства production workload’ов рекомендуется spark.serializer=org.apache.spark.serializer.KryoSerializer. Крио сериализует быстрее Java в 5-10x и выдаёт меньший по размеру output, что прямо ускоряет shuffle. Основное ограничение: кастомные классы нужно регистрировать явно для максимальной эффективности. Для DataFrame/Dataset workload’ов Spark внутри и так использует UnsafeRow — смена сериализатора влияет только на RDD-операции и broadcast.

Когда менять:

  • Поднимайте spark.kryoserializer.buffer.max при Buffer overflow ошибках Kryo (появляются в stacktrace).
  • Устанавливайте kryo.registrationRequired=true в production, если хотите гарантировать отсутствие неожиданных slow-path’ей сериализации.

6. Tungsten и кодогенерация

КонфигДефолтЧто делает
spark.sql.codegen.wholeStagetrueВключает Whole-Stage CodeGen: генерирует единый Java-метод для цепочки операторов. Устраняет virtual dispatch.
spark.sql.codegen.fallbacktrueПри ошибке кодогенерации автоматически переходит к интерпретируемому выполнению.
spark.sql.codegen.maxFields100Максимальное число полей в схеме, при котором включается кодогенерация. При большем числе полей WSCG отключается.
spark.sql.codegen.hugeMethodLimit65535Максимальный размер сгенерированного метода (в байткоде). При превышении Spark разбивает метод.
spark.sql.execution.arrow.pyspark.enabledfalseВключает Apache Arrow для передачи данных между JVM и Python в PySpark. Ускоряет toPandas() и createDataFrame() в 10-100x.

Когда менять:

  • Отключайте wholeStage только для диагностики: spark.conf.set("spark.sql.codegen.wholeStage", "false"). В production всегда должен быть включён.
  • Включайте arrow.pyspark.enabled при любом PySpark workload’е — это почти всегда ускорение без побочных эффектов.

7. SQL join стратегии

КонфигДефолтЧто делает
spark.sql.autoBroadcastJoinThreshold10485760 (10 MB)Таблица меньше этого размера автоматически broadcastится при join. -1 отключает автоматический broadcast.
spark.sql.broadcastTimeout300 (секунд)Таймаут ожидания broadcast-значения. При больших таблицах на медленной сети увеличивают.
spark.sql.join.preferSortMergeJointrueПри включённом предпочитает sort-merge join вместо shuffle hash join для больших таблиц.
spark.sql.files.maxPartitionBytes134217728 (128 MB)Максимальный размер блока данных при чтении файлов. Определяет начальный параллелизм чтения Parquet/ORC.
NOTE

spark.sql.autoBroadcastJoinThreshold основан на размере сжатых данных по статистике Spark Catalog. Если статистики нет (устаревшие или не собраны через ANALYZE TABLE), Spark не знает реальный размер и может не применить broadcast. При AQE threshold может быть переопределён spark.sql.adaptive.autoBroadcastJoinThreshold.


8. Structured Streaming

КонфигДефолтЧто делает
spark.sql.streaming.checkpointLocationнетДефолтная директория checkpoint для streaming запросов. Переопределяется через writeStream.option("checkpointLocation", ...).
spark.sql.streaming.stateStore.providerClassorg.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProviderРеализация State Store. Для production используют RocksDB: RocksDBStateStoreProvider.
spark.sql.streaming.minBatchesToRetain100Минимальное число batch’ей, хранимых в метаданных checkpoint. Влияет на время восстановления и размер директории checkpoint.
spark.sql.streaming.noDataMicroBatches.enabledtrueПри true Spark запускает micro-batch даже при отсутствии новых данных (для прогресса watermark и триггеров). При false batch пропускается — снижает нагрузку, но watermark продвигается медленнее.
spark.sql.streaming.maxBatchesToRetainInMemory2Число batch’ей, кэшируемых в памяти для ускорения StreamingQueryProgress.
spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabledfalseПри RocksDB state store включает инкрементальные changelog-checkpoint’ы вместо полных снапшотов. Снижает I/O при checkpoint’е.
TIP

Для stateful streaming с большим состоянием (windowed join, session windows) замените дефолтный HDFS-based State Store на RocksDB: spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider. RocksDB хранит state on-disk с compact представлением, не держит всё в heap, и поддерживает changelog checkpoint’ы.

Когда менять:

  • Снизьте minBatchesToRetain до 10-20 если checkpoint директория растёт слишком быстро и recovery с нуля приемлем.
  • Отключите noDataMicroBatches.enabled для источников с редкими данными чтобы снизить нагрузку на State Store при пустых batch’ах.

9. Быстрая таблица-шпаргалка по симптомам

СимптомПервые конфиги для проверки
OOM в executorspark.executor.memory, spark.memory.fraction, spark.memory.offHeap.*
Много spill на дискspark.memory.fraction (вверх), spark.sql.shuffle.partitions (вниз)
Медленный shuffle fetchspark.reducer.maxSizeInFlight, spark.shuffle.io.maxRetries
Skewed joinspark.sql.adaptive.skewJoin.*, AQE enabled
Много мелких задач после shufflespark.sql.adaptive.coalescePartitions.*, advisoryPartitionSizeInBytes
Долгий broadcastspark.sql.broadcastTimeout, spark.sql.autoBroadcastJoinThreshold
Медленная сериализацияspark.serializer -> KryoSerializer, spark.kryoserializer.buffer.max
Executor’ы долго не освобождаютсяspark.dynamicAllocation.executorIdleTimeout
Slow tasks в конце стадииspark.speculation=true, spark.speculation.multiplier
PySpark toPandas медленноspark.sql.execution.arrow.pyspark.enabled=true
Проверка знанийKnowledge check
Кластер с AQE включён показывает 200 задач по 1 KB каждая после большого shuffle. Какие конфиги нужно проверить и что означает такая ситуация?
ОтветAnswer
Ситуация означает, что AQE не смог эффективно коалесцировать партиции. Первое: проверить spark.sql.adaptive.coalescePartitions.parallelismFirst -- если true (дефолт), AQE коалесцирует только партиции меньше minPartitionSize (1 MB по умолчанию), а 1 KB > порога не достигает. Нет: 1 KB < 1 MB, значит partitions должны были слиться. Реальная проблема может быть в том, что coalescePartitions.enabled=false или AQE не применился к этому шагу (например, это RDD shuffle, а не SQL). Второе: убедиться что spark.sql.adaptive.enabled=true и spark.sql.adaptive.coalescePartitions.enabled=true. Третье: увеличить spark.sql.adaptive.advisoryPartitionSizeInBytes до 128-256 MB и установить coalescePartitions.parallelismFirst=false чтобы агрессивно объединять мелкие партиции. Для RDD shuffle AQE не применяется -- там нужно вручную задавать spark.default.parallelism.

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 2