Learning Platform
Глоссарий Troubleshooting
Урок 04.05 · 30 мин
Продвинутый
Dynamic AllocationExecutorAllocationManagerKubernetesShuffle TrackingScale Up/Down

Динамическое выделение ресурсов

Статическое выделение ресурсов работает просто: вы задаёте число executor-ов, они запускаются при старте приложения и держатся до его завершения. Подход надёжный, но расточительный: во время фаз ввода-вывода или ожидания shuffle-данных все эти executor-ы просто потребляют память кластера, не выполняя никакой работы.

Dynamic Resource Allocation (DRA) — механизм, позволяющий Spark увеличивать число executor-ов при наличии backlog задач и уменьшать при простое. Класс, реализующий этот механизм — org.apache.spark.ExecutorAllocationManager. Он живёт в процессе драйвера и является одним из самых сложных компонентов Spark с точки зрения взаимодействия с cluster manager-ом.

ExecutorAllocationManager: архитектура

ExecutorAllocationManager создаётся в SparkContext при условии spark.dynamicAllocation.enabled=true. Он регистрирует слушателей на LiveListenerBus, чтобы реагировать на события изменения состояния задач:

// SparkContext.scala (упрощённо)
_executorAllocationManager =
  if (dynamicAllocationEnabled) {
    schedulerBackend match {
      case b: ExecutorAllocationClient =>
        Some(new ExecutorAllocationManager(
          b, listenerBus, conf, cleaner, livyServer))
      case _ =>
        None
    }
  } else None

_executorAllocationManager.foreach(_.start())

Обратите внимание на тип ExecutorAllocationClient — это интерфейс, который реализуют CoarseGrainedSchedulerBackend (YARN, Kubernetes, Standalone). LocalBackend его не реализует, поэтому в локальном режиме DRA недоступен.

ExecutorAllocationManager: взаимодействие компонентов
LiveListenerBusШина событий Spark. EAM подписывается на SparkListenerTaskStart, SparkListenerTaskEnd, SparkListenerStageSubmitted, SparkListenerStageCompleted.
ExecutorAllocationManagerorg.apache.spark.ExecutorAllocationManager. Живёт в driver-процессе. Принимает события от ListenerBus, хранит счётчики pending/running tasks, принимает решения о scale.
ExecutorAllocationClientИнтерфейс для запроса/освобождения executor-ов. Реализован CoarseGrainedSchedulerBackend. Делегирует к конкретному cluster manager (YARN RM, K8s API).
Cluster ManagerYARN ResourceManager, Kubernetes API server, Standalone master. Принимает запрос на добавление/удаление контейнеров.

Алгоритм Scale Up: когда добавлять executor-ы

EAM добавляет executor-ы по следующей логике:

Триггер scale up: есть pending tasks (задачи в очереди, которым не хватает слотов).

// ExecutorAllocationManager.scala (упрощённо)
private def updateAndSyncNumExecutorsTarget(now: Long): Int = {
  val maxNeeded = maxNumExecutorsNeeded  // сколько нужно сейчас

  if (addTime == NOT_SET) {
    // Первый раз видим pending tasks — устанавливаем таймер
    addTime = now + (schedulerBacklogTimeoutS * 1000)
  }

  if (now >= addTime) {
    // Таймаут ожидания прошёл — запрашиваем больше executor-ов
    val delta = addExecutors(maxNeeded)
    logDebug(s"Will add $delta executors; target = $numExecutorsTarget")
  }
  numExecutorsTarget
}

Экспоненциальный рост: при первом trigg-ере добавляется 1 executor, при следующем — 2, потом 4, 8 и так далее. Это предотвращает одномоментный запрос огромного числа контейнеров (который перегрузил бы кластер), но быстро масштабируется при устойчивой нагрузке.

Нагрузка поступила:
t=0s:    pending_tasks > 0 -> запускаем таймер schedulerBacklogTimeout
t=1s:    (schedulerBacklogTimeoutS=1s истёк) -> +1 executor
t=2s:    всё ещё pending -> +2 executor  (sustainedSchedulerBacklogTimeoutS=1s)
t=3s:    всё ещё pending -> +4 executor
t=4s:    всё ещё pending -> +8 executor
...

Параметры, управляющие этим процессом:

# Сколько секунд ждать перед ПЕРВЫМ добавлением executor-ов
spark.conf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "1s")  # default: 1s

# Сколько ждать перед ПОСЛЕДУЮЩИМИ добавлениями
spark.conf.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", "1s")  # default: 1s

Алгоритм Scale Down: когда убирать executor-ы

Scale down срабатывает, когда executor простаивает дольше spark.dynamicAllocation.executorIdleTimeout. Но «простой» определяется тонче, чем кажется:

// ExecutorAllocationManager.scala (упрощённо)
private def executorIdle(executorId: String): Boolean = {
  // Executor считается idle, если:
  // 1. Нет running tasks
  // 2. Нет cached blocks (если spark.dynamicAllocation.cachedExecutorIdleTimeout не настроен)
  !runningTasks.contains(executorId) &&
  !executorsPendingLoss.contains(executorId) &&
  !hasStorageBlocks(executorId)
}

Executor с кэшированными данными (RDD, broadcast переменные) не считается idle по умолчанию. Это разумно: удаление executor-а с кэшем потребует пересчёта закэшированных данных. Но это может мешать scale down в долгосрочных приложениях.

# Сколько секунд executor должен быть idle перед удалением
spark.conf.set("spark.dynamicAllocation.executorIdleTimeout", "60s")  # default: 60s

# Отдельный таймаут для executor-ов с кэшированными данными
# Если не задан (default) — executor с кэшем никогда не удаляется
spark.conf.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "infinity")  # default: infinity

# На практике для K8s/облако рекомендуется:
spark.conf.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "300s")

Процесс удаления: EAM сначала помечает executor для удаления (executorsPendingToRemove), затем вызывает backend.killExecutors(). Cluster manager получает запрос на освобождение контейнера. Если executor в момент запроса начал выполнять task (race condition) — запрос игнорируется.

Граничные условия: min/max executor-ов

# Никогда не опускаться ниже этого числа executor-ов
spark.conf.set("spark.dynamicAllocation.minExecutors", "0")      # default: 0
# При 0 — все executor-ы могут быть убраны во время простоя

# Никогда не превышать это число executor-ов
spark.conf.set("spark.dynamicAllocation.maxExecutors", "1000")   # default: Integer.MAX_VALUE
# Необходимо задавать явно для защиты кластера

# Начальное число executor-ов при старте приложения
spark.conf.set("spark.dynamicAllocation.initialExecutors", "1")  # default: minExecutors
TIP

Всегда задавайте spark.dynamicAllocation.maxExecutors явно. Default Integer.MAX_VALUE означает, что при резком росте нагрузки приложение может запросить тысячи executor-ов и исчерпать ресурсы кластера. Хорошее правило — устанавливать maxExecutors в 2-3 раза больше типичного рабочего числа.

Зависимость от shuffle service

Здесь скрывается один из ключевых архитектурных ограничений DRA. Когда executor удаляется, его локальные shuffle-файлы также исчезают. Если эти файлы нужны следующей стадии — произойдёт shuffle fetch failure, DAGScheduler инициирует повтор стадии, что снижает всю выгоду от DRA.

Решений два:

External Shuffle Service (традиционный подход)

External Shuffle Service (ESS) — отдельный процесс (обычно на каждой worker-ноде), который хранит shuffle-файлы вне executor-ов. Executor записывает shuffle-данные в ESS, затем может быть удалён. Следующая стадия читает данные из ESS напрямую.

# YARN: ESS встроен, включается одним параметром
spark.conf.set("spark.shuffle.service.enabled", "true")
spark.conf.set("spark.dynamicAllocation.enabled", "true")
# DRA автоматически работает с YARN's NodeManager shuffle service

# Kubernetes: нужен отдельный deployment (Spark Shuffle Service)
# или Remote Shuffle Service (Celeborn, Uniffle)

ESS — зрелое решение, проверенное в YARN-кластерах, но требует администрирования (обновление вместе со Spark, мониторинг disk space) и не вписывается в ephemeral Kubernetes-архитектуру.

Shuffle Tracking (современный подход)

Начиная с Spark 3.0 появился механизм Shuffle Tracking — альтернатива ESS для случаев, когда его использование нежелательно (особенно в Kubernetes). При shuffle tracking EAM отслеживает, какие executor-ы хранят shuffle-данные, необходимые незавершённым стадиям.

// ExecutorAllocationManager.scala
private val shuffleTracker = if (conf.get(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
  new ExecutorShuffleIdsTracker()
} else {
  EmptyShuffleTracker
}

Механизм:

  1. Когда shuffle write завершён, TaskSetManager уведомляет EAM: «executor X записал shuffle-данные для stage Y».
  2. EAM помечает executor X как «хранящий активные shuffle-данные».
  3. Executor X не может быть удалён, пока stage Y не завершена (все читатели не прочитали свои shuffle-блоки).
  4. После завершения downstream stage shuffle-данные считаются неактивными, и executor X снова может быть кандидатом на удаление.
# Включение shuffle tracking (не требует ESS)
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.shuffleTracking.enabled", "true")  # default: true (Spark 4.0)
spark.conf.set("spark.dynamicAllocation.shuffleTracking.timeout", "300s")   # default: infinity

В Spark 4.0 spark.dynamicAllocation.shuffleTracking.enabled по умолчанию true при включённом DRA без ESS. Параметр shuffleTracking.timeout позволяет удалять executor-ы с shuffle-данными по истечении таймаута (полезно для очень долгих приложений, где shuffle-данные могут не читаться никогда).

NOTE

С Spark 4.0 при использовании external shuffle service управление удалением shuffle-блоков для деаллоцированных executor-ов берёт на себя ESS. Это снимает нагрузку с driver-а по отслеживанию shuffle-данных и улучшает надёжность при больших объёмах shuffle.

Поведение в Kubernetes

Kubernetes-среда принципиально отличается от YARN. Каждый executor — отдельный Pod. Запрос нового executor-а = создание нового Pod. Удаление executor-а = удаление Pod. Это создаёт специфические проблемы и возможности.

DRA в Kubernetes: scale up и scale down
Driver PodEAM запрашивает executor-ы через Kubernetes API (KubernetesClusterManager). Каждый executor = отдельный Pod.
Kubernetes APIK8s API server создаёт/удаляет Executor Pods через scheduler. Executor Pod имеет resource requests: memory + CPU.
Executor Pod 1Каждый executor Pod имеет lifecycle: Pending -> Running -> Succeeded/Failed. Pod startup time — от 10 до 60 секунд в зависимости от image pull.
Executor Pod 2При scale down: Pod получает SIGTERM, executor завершает текущие tasks, сообщает driver-у, Pod удаляется.
Executor Pod 3Если shuffle tracking включён: Pod с активными shuffle-данными не удаляется до завершения читающей стадии.

Рекомендуемые настройки для Kubernetes

# Базовые настройки DRA на K8s
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.shuffleTracking.enabled", "true")

# Без ESS — shuffle tracking обязателен
spark.conf.set("spark.shuffle.service.enabled", "false")

# Таймаут для shuffle-данных (не держать Pod вечно)
spark.conf.set("spark.dynamicAllocation.shuffleTracking.timeout", "300s")

# Пределы — критически важны для K8s (защита от resource exhaustion)
spark.conf.set("spark.dynamicAllocation.minExecutors", "1")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "50")

# Более консервативный scale down — Pod startup overhead высокий
spark.conf.set("spark.dynamicAllocation.executorIdleTimeout", "120s")

# Executor с кэшем — держим подольше, т.к. пересчёт дороже в K8s
spark.conf.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "600s")

Проблема холодного старта Pod-а

В Kubernetes новый executor-Pod должен пройти стадии: scheduled -> Pending (image pull) -> Running -> RegisteredWithDriver. Это может занять 30-60 секунд. Если задержка слишком высока, EAM может запросить ещё больше executor-ов, думая, что предыдущие не стартовали. Для защиты от этого EAM отслеживает executorsPendingToAdd — executor-ы, которые были запрошены, но ещё не зарегистрировались.

# Держать минимум executor-ов всегда запущенными (избегает cold start)
spark.conf.set("spark.dynamicAllocation.minExecutors", "2")

# Увеличить таймаут для учёта времени старта Pod-а
spark.conf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "5s")

Взаимодействие с Kubernetes Node Affinity

В Kubernetes можно комбинировать DRA с Node Affinity, чтобы executor-ы запускались на определённых типах нод:

spark.conf.set(
    "spark.kubernetes.executor.annotation.cluster-autoscaler.kubernetes.io/safe-to-evict",
    "false"  # Не эвиктировать Pod-ы с shuffle-данными через Cluster Autoscaler
)

Без этой аннотации Kubernetes Cluster Autoscaler может удалить ноду с executor-ом, хранящим shuffle-данные, даже если shuffle tracking включён — это нарушит корректность выполнения.

Диагностика DRA в Spark UI и логах

Spark UI -> Executors tab показывает текущее состояние динамического масштабирования:

МетрикаЧто означает
Active ExecutorsТекущее число выполняющих tasks executor-ов
Dead ExecutorsExecutor-ы, которые были удалены (scale down) или потеряны
Blacklisted ExecutorsExecutor-ы с повторяющимися ошибками (exclusion list)

Driver лог при DRA (уровень INFO):

INFO ExecutorAllocationManager: Request 1 executor (executor limit: 10)
INFO ExecutorAllocationManager: Request 2 executor (executor limit: 10)
INFO SparkContext: Requesting executor remove...
INFO ExecutorAllocationManager: Removing executor 3 because it has been idle for 60s
INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asked to remove executor 3

При DEBUG:

DEBUG ExecutorAllocationManager: maxNeeded = 8, current = 3, target = 5
DEBUG ExecutorAllocationManager: executor 7 is not removable: has cached blocks
DEBUG ExecutorAllocationManager: executor 9 is idle for 75s > idleTimeout 60s

Попробуй сам

Запуск приложения с DRA и наблюдение за масштабированием:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum
import time

spark = SparkSession.builder \
    .appName("dynamic-allocation-demo") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "1") \
    .config("spark.dynamicAllocation.maxExecutors", "8") \
    .config("spark.dynamicAllocation.initialExecutors", "1") \
    .config("spark.dynamicAllocation.schedulerBacklogTimeout", "2s") \
    .config("spark.dynamicAllocation.executorIdleTimeout", "30s") \
    .config("spark.dynamicAllocation.shuffleTracking.enabled", "true") \
    .getOrCreate()

sc = spark.sparkContext

print("=== Фаза 1: маленькая нагрузка (1 executor достаточно) ===")
# Простое действие — 1 executor справится
result1 = sc.range(1000).sum()
print(f"Sum: {result1}")
time.sleep(5)

print("=== Фаза 2: большая нагрузка (нужно больше executor-ов) ===")
# Создаём большой DataFrame с shuffle — нужны executor-ы
df = spark.range(0, 10_000_000, numPartitions=64).toDF("id")
result2 = df.groupBy((col("id") % 100).alias("bucket")) \
            .agg(spark_sum("id").alias("total")) \
            .count()
print(f"Buckets: {result2}")

print("=== Фаза 3: простой (executor-ы будут удалены через 30s) ===")
time.sleep(60)
print("Проверьте Spark UI -> Executors: некоторые executor-ы должны быть Dead")

spark.stop()

Наблюдайте в Spark UI -> Executors за изменением числа активных executor-ов. В фазе 2 число executor-ов должно вырасти, в фазе 3 — уменьшиться обратно.

Проверка знанийKnowledge check
Приложение использует DRA с maxExecutors=20, executorIdleTimeout=60s. После завершения shuffle-интенсивной стадии все 20 executor-ов простаивают. Shuffle tracking включён. Через 60 секунд EAM должен убрать idle executor-ы. Однако 5 executor-ов продолжают работать. В чём причина и как проверить?
ОтветAnswer
Эти 5 executor-ов, вероятно, хранят shuffle-данные для ещё не начавшейся downstream стадии (или стадии в процессе выполнения). При включённом shuffle tracking EAM помечает executor-ы с активными shuffle-данными как 'не removable', даже если tasks на них не выполняются. Как проверить: 1) Spark UI -> Executors: посмотрите на Storage Memory — если там есть данные, executor хранит что-то; 2) Driver лог: ищите 'executor X is not removable: has shuffle data for stage Y'; 3) Spark UI -> Stages: найдите стадию со статусом 'Pending' или 'Active', которая читает shuffle из этих executor-ов. Решение: либо подождать завершения downstream стадии, либо установить spark.dynamicAllocation.shuffleTracking.timeout=300s для принудительного удаления через таймаут.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Spark-приложение с DRA запущено на Kubernetes без External Shuffle Service. spark.dynamicAllocation.shuffleTracking.enabled=true. Stage 3 (ShuffleMapStage) завершилась, её данные нужны Stage 4. EAM хочет убрать idle executor-ы. Какие executor-ы он НЕ может убрать?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6