Динамическое выделение ресурсов
Статическое выделение ресурсов работает просто: вы задаёте число executor-ов, они запускаются при старте приложения и держатся до его завершения. Подход надёжный, но расточительный: во время фаз ввода-вывода или ожидания shuffle-данных все эти executor-ы просто потребляют память кластера, не выполняя никакой работы.
Dynamic Resource Allocation (DRA) — механизм, позволяющий Spark увеличивать число executor-ов при наличии backlog задач и уменьшать при простое. Класс, реализующий этот механизм — org.apache.spark.ExecutorAllocationManager. Он живёт в процессе драйвера и является одним из самых сложных компонентов Spark с точки зрения взаимодействия с cluster manager-ом.
ExecutorAllocationManager: архитектура
ExecutorAllocationManager создаётся в SparkContext при условии spark.dynamicAllocation.enabled=true. Он регистрирует слушателей на LiveListenerBus, чтобы реагировать на события изменения состояния задач:
// SparkContext.scala (упрощённо)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
b, listenerBus, conf, cleaner, livyServer))
case _ =>
None
}
} else None
_executorAllocationManager.foreach(_.start())
Обратите внимание на тип ExecutorAllocationClient — это интерфейс, который реализуют CoarseGrainedSchedulerBackend (YARN, Kubernetes, Standalone). LocalBackend его не реализует, поэтому в локальном режиме DRA недоступен.
Алгоритм Scale Up: когда добавлять executor-ы
EAM добавляет executor-ы по следующей логике:
Триггер scale up: есть pending tasks (задачи в очереди, которым не хватает слотов).
// ExecutorAllocationManager.scala (упрощённо)
private def updateAndSyncNumExecutorsTarget(now: Long): Int = {
val maxNeeded = maxNumExecutorsNeeded // сколько нужно сейчас
if (addTime == NOT_SET) {
// Первый раз видим pending tasks — устанавливаем таймер
addTime = now + (schedulerBacklogTimeoutS * 1000)
}
if (now >= addTime) {
// Таймаут ожидания прошёл — запрашиваем больше executor-ов
val delta = addExecutors(maxNeeded)
logDebug(s"Will add $delta executors; target = $numExecutorsTarget")
}
numExecutorsTarget
}
Экспоненциальный рост: при первом trigg-ере добавляется 1 executor, при следующем — 2, потом 4, 8 и так далее. Это предотвращает одномоментный запрос огромного числа контейнеров (который перегрузил бы кластер), но быстро масштабируется при устойчивой нагрузке.
Нагрузка поступила:
t=0s: pending_tasks > 0 -> запускаем таймер schedulerBacklogTimeout
t=1s: (schedulerBacklogTimeoutS=1s истёк) -> +1 executor
t=2s: всё ещё pending -> +2 executor (sustainedSchedulerBacklogTimeoutS=1s)
t=3s: всё ещё pending -> +4 executor
t=4s: всё ещё pending -> +8 executor
...
Параметры, управляющие этим процессом:
# Сколько секунд ждать перед ПЕРВЫМ добавлением executor-ов
spark.conf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "1s") # default: 1s
# Сколько ждать перед ПОСЛЕДУЮЩИМИ добавлениями
spark.conf.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", "1s") # default: 1s
Алгоритм Scale Down: когда убирать executor-ы
Scale down срабатывает, когда executor простаивает дольше spark.dynamicAllocation.executorIdleTimeout. Но «простой» определяется тонче, чем кажется:
// ExecutorAllocationManager.scala (упрощённо)
private def executorIdle(executorId: String): Boolean = {
// Executor считается idle, если:
// 1. Нет running tasks
// 2. Нет cached blocks (если spark.dynamicAllocation.cachedExecutorIdleTimeout не настроен)
!runningTasks.contains(executorId) &&
!executorsPendingLoss.contains(executorId) &&
!hasStorageBlocks(executorId)
}
Executor с кэшированными данными (RDD, broadcast переменные) не считается idle по умолчанию. Это разумно: удаление executor-а с кэшем потребует пересчёта закэшированных данных. Но это может мешать scale down в долгосрочных приложениях.
# Сколько секунд executor должен быть idle перед удалением
spark.conf.set("spark.dynamicAllocation.executorIdleTimeout", "60s") # default: 60s
# Отдельный таймаут для executor-ов с кэшированными данными
# Если не задан (default) — executor с кэшем никогда не удаляется
spark.conf.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "infinity") # default: infinity
# На практике для K8s/облако рекомендуется:
spark.conf.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "300s")
Процесс удаления: EAM сначала помечает executor для удаления (executorsPendingToRemove), затем вызывает backend.killExecutors(). Cluster manager получает запрос на освобождение контейнера. Если executor в момент запроса начал выполнять task (race condition) — запрос игнорируется.
Граничные условия: min/max executor-ов
# Никогда не опускаться ниже этого числа executor-ов
spark.conf.set("spark.dynamicAllocation.minExecutors", "0") # default: 0
# При 0 — все executor-ы могут быть убраны во время простоя
# Никогда не превышать это число executor-ов
spark.conf.set("spark.dynamicAllocation.maxExecutors", "1000") # default: Integer.MAX_VALUE
# Необходимо задавать явно для защиты кластера
# Начальное число executor-ов при старте приложения
spark.conf.set("spark.dynamicAllocation.initialExecutors", "1") # default: minExecutors
Всегда задавайте spark.dynamicAllocation.maxExecutors явно. Default Integer.MAX_VALUE означает, что при резком росте нагрузки приложение может запросить тысячи executor-ов и исчерпать ресурсы кластера. Хорошее правило — устанавливать maxExecutors в 2-3 раза больше типичного рабочего числа.
Зависимость от shuffle service
Здесь скрывается один из ключевых архитектурных ограничений DRA. Когда executor удаляется, его локальные shuffle-файлы также исчезают. Если эти файлы нужны следующей стадии — произойдёт shuffle fetch failure, DAGScheduler инициирует повтор стадии, что снижает всю выгоду от DRA.
Решений два:
External Shuffle Service (традиционный подход)
External Shuffle Service (ESS) — отдельный процесс (обычно на каждой worker-ноде), который хранит shuffle-файлы вне executor-ов. Executor записывает shuffle-данные в ESS, затем может быть удалён. Следующая стадия читает данные из ESS напрямую.
# YARN: ESS встроен, включается одним параметром
spark.conf.set("spark.shuffle.service.enabled", "true")
spark.conf.set("spark.dynamicAllocation.enabled", "true")
# DRA автоматически работает с YARN's NodeManager shuffle service
# Kubernetes: нужен отдельный deployment (Spark Shuffle Service)
# или Remote Shuffle Service (Celeborn, Uniffle)
ESS — зрелое решение, проверенное в YARN-кластерах, но требует администрирования (обновление вместе со Spark, мониторинг disk space) и не вписывается в ephemeral Kubernetes-архитектуру.
Shuffle Tracking (современный подход)
Начиная с Spark 3.0 появился механизм Shuffle Tracking — альтернатива ESS для случаев, когда его использование нежелательно (особенно в Kubernetes). При shuffle tracking EAM отслеживает, какие executor-ы хранят shuffle-данные, необходимые незавершённым стадиям.
// ExecutorAllocationManager.scala
private val shuffleTracker = if (conf.get(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
new ExecutorShuffleIdsTracker()
} else {
EmptyShuffleTracker
}
Механизм:
- Когда shuffle write завершён, TaskSetManager уведомляет EAM: «executor X записал shuffle-данные для stage Y».
- EAM помечает executor X как «хранящий активные shuffle-данные».
- Executor X не может быть удалён, пока stage Y не завершена (все читатели не прочитали свои shuffle-блоки).
- После завершения downstream stage shuffle-данные считаются неактивными, и executor X снова может быть кандидатом на удаление.
# Включение shuffle tracking (не требует ESS)
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.shuffleTracking.enabled", "true") # default: true (Spark 4.0)
spark.conf.set("spark.dynamicAllocation.shuffleTracking.timeout", "300s") # default: infinity
В Spark 4.0 spark.dynamicAllocation.shuffleTracking.enabled по умолчанию true при включённом DRA без ESS. Параметр shuffleTracking.timeout позволяет удалять executor-ы с shuffle-данными по истечении таймаута (полезно для очень долгих приложений, где shuffle-данные могут не читаться никогда).
С Spark 4.0 при использовании external shuffle service управление удалением shuffle-блоков для деаллоцированных executor-ов берёт на себя ESS. Это снимает нагрузку с driver-а по отслеживанию shuffle-данных и улучшает надёжность при больших объёмах shuffle.
Поведение в Kubernetes
Kubernetes-среда принципиально отличается от YARN. Каждый executor — отдельный Pod. Запрос нового executor-а = создание нового Pod. Удаление executor-а = удаление Pod. Это создаёт специфические проблемы и возможности.
Рекомендуемые настройки для Kubernetes
# Базовые настройки DRA на K8s
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.shuffleTracking.enabled", "true")
# Без ESS — shuffle tracking обязателен
spark.conf.set("spark.shuffle.service.enabled", "false")
# Таймаут для shuffle-данных (не держать Pod вечно)
spark.conf.set("spark.dynamicAllocation.shuffleTracking.timeout", "300s")
# Пределы — критически важны для K8s (защита от resource exhaustion)
spark.conf.set("spark.dynamicAllocation.minExecutors", "1")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "50")
# Более консервативный scale down — Pod startup overhead высокий
spark.conf.set("spark.dynamicAllocation.executorIdleTimeout", "120s")
# Executor с кэшем — держим подольше, т.к. пересчёт дороже в K8s
spark.conf.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "600s")
Проблема холодного старта Pod-а
В Kubernetes новый executor-Pod должен пройти стадии: scheduled -> Pending (image pull) -> Running -> RegisteredWithDriver. Это может занять 30-60 секунд. Если задержка слишком высока, EAM может запросить ещё больше executor-ов, думая, что предыдущие не стартовали. Для защиты от этого EAM отслеживает executorsPendingToAdd — executor-ы, которые были запрошены, но ещё не зарегистрировались.
# Держать минимум executor-ов всегда запущенными (избегает cold start)
spark.conf.set("spark.dynamicAllocation.minExecutors", "2")
# Увеличить таймаут для учёта времени старта Pod-а
spark.conf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "5s")
Взаимодействие с Kubernetes Node Affinity
В Kubernetes можно комбинировать DRA с Node Affinity, чтобы executor-ы запускались на определённых типах нод:
spark.conf.set(
"spark.kubernetes.executor.annotation.cluster-autoscaler.kubernetes.io/safe-to-evict",
"false" # Не эвиктировать Pod-ы с shuffle-данными через Cluster Autoscaler
)
Без этой аннотации Kubernetes Cluster Autoscaler может удалить ноду с executor-ом, хранящим shuffle-данные, даже если shuffle tracking включён — это нарушит корректность выполнения.
Диагностика DRA в Spark UI и логах
Spark UI -> Executors tab показывает текущее состояние динамического масштабирования:
| Метрика | Что означает |
|---|---|
| Active Executors | Текущее число выполняющих tasks executor-ов |
| Dead Executors | Executor-ы, которые были удалены (scale down) или потеряны |
| Blacklisted Executors | Executor-ы с повторяющимися ошибками (exclusion list) |
Driver лог при DRA (уровень INFO):
INFO ExecutorAllocationManager: Request 1 executor (executor limit: 10)
INFO ExecutorAllocationManager: Request 2 executor (executor limit: 10)
INFO SparkContext: Requesting executor remove...
INFO ExecutorAllocationManager: Removing executor 3 because it has been idle for 60s
INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asked to remove executor 3
При DEBUG:
DEBUG ExecutorAllocationManager: maxNeeded = 8, current = 3, target = 5
DEBUG ExecutorAllocationManager: executor 7 is not removable: has cached blocks
DEBUG ExecutorAllocationManager: executor 9 is idle for 75s > idleTimeout 60s
Попробуй сам
Запуск приложения с DRA и наблюдение за масштабированием:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum
import time
spark = SparkSession.builder \
.appName("dynamic-allocation-demo") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", "1") \
.config("spark.dynamicAllocation.maxExecutors", "8") \
.config("spark.dynamicAllocation.initialExecutors", "1") \
.config("spark.dynamicAllocation.schedulerBacklogTimeout", "2s") \
.config("spark.dynamicAllocation.executorIdleTimeout", "30s") \
.config("spark.dynamicAllocation.shuffleTracking.enabled", "true") \
.getOrCreate()
sc = spark.sparkContext
print("=== Фаза 1: маленькая нагрузка (1 executor достаточно) ===")
# Простое действие — 1 executor справится
result1 = sc.range(1000).sum()
print(f"Sum: {result1}")
time.sleep(5)
print("=== Фаза 2: большая нагрузка (нужно больше executor-ов) ===")
# Создаём большой DataFrame с shuffle — нужны executor-ы
df = spark.range(0, 10_000_000, numPartitions=64).toDF("id")
result2 = df.groupBy((col("id") % 100).alias("bucket")) \
.agg(spark_sum("id").alias("total")) \
.count()
print(f"Buckets: {result2}")
print("=== Фаза 3: простой (executor-ы будут удалены через 30s) ===")
time.sleep(60)
print("Проверьте Spark UI -> Executors: некоторые executor-ы должны быть Dead")
spark.stop()
Наблюдайте в Spark UI -> Executors за изменением числа активных executor-ов. В фазе 2 число executor-ов должно вырасти, в фазе 3 — уменьшиться обратно.