Learning Platform
Глоссарий Troubleshooting
Урок 04.06 · 32 мин
Продвинутый
Executor LifecycleCoarseGrainedExecutorBackendHeartbeatDecommissioningBlockManagerLost Executor

Жизненный цикл executor

Executor в Spark — это долгоживущий JVM-процесс, который запускается Cluster Manager-ом, регистрируется у driver-а, получает и выполняет task-и, периодически отправляет heartbeat-ы, и в конечном счёте завершается: либо штатно при завершении приложения, либо аварийно (OOM, network loss), либо через механизм graceful decommissioning.

Классы, реализующие эту логику: CoarseGrainedExecutorBackend (процесс executor-а) и CoarseGrainedSchedulerBackend$DriverEndpoint (driver-сторона). Понимание их взаимодействия критично для диагностики проблем: «executor lost», «heartbeat timeout», «fetch failed» — все эти ошибки уходят корнями в жизненный цикл executor-а.

Название «Coarse-Grained» историческое: оно противопоставляется «Fine-Grained» режиму (существовавшему в ранних версиях Spark с Mesos), где каждый task запускался в отдельном процессе. В современном Spark CoarseGrained — единственный режим для production.

CoarseGrainedExecutorBackend: точка входа

CoarseGrainedExecutorBackend — это главный класс executor-процесса. Он запускается как отдельный JVM-процесс, с точкой входа в CoarseGrainedExecutorBackend.main(). Cluster Manager передаёт ему параметры через аргументы командной строки или переменные окружения:

# Пример команды запуска executor-а (упрощённо):
java -cp spark.jar \
  org.apache.spark.executor.CoarseGrainedExecutorBackend \
  --driver-url spark://driver-host:7077 \
  --executor-id 5 \
  --hostname worker-3 \
  --cores 4 \
  --app-id app-12345 \
  --resourceProfileId 0

CoarseGrainedExecutorBackend реализует RpcEndpoint — он является участником RPC-системы Spark (основанной на Netty). Его взаимодействие с driver-ом происходит через обмен сообщениями.

Фаза 1: Регистрация executor-а

Сразу после запуска executor-процесс отправляет драйверу сообщение RegisterExecutor. Это синхронная операция: executor не может начать выполнять задачи до получения подтверждения.

Рассмотрим ключевые шаги подробнее.

На стороне driver-а (DriverEndpoint.receiveAndReply):

// CoarseGrainedSchedulerBackend.scala (упрощённо)
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, ...) =>
  if (executorDataMap.contains(executorId)) {
    // Дубликат — отклоняем
    executorRef.send(RegisterExecutorFailed(
      s"Duplicate executor ID: $executorId"))
  } else {
    // Регистрируем executor
    executorDataMap.put(executorId, ExecutorData(
      executorRef, hostname, cores, coresUsed=0, ...
    ))
    totalCoreCount.addAndGet(cores)
    totalRegisteredExecutors.addAndGet(1)
    // Уведомляем SparkListeners (включая EAM, History Server)
    listenerBus.post(SparkListenerExecutorAdded(
      System.currentTimeMillis(), executorId, executorInfo))
    // Планируем tasks на новый executor
    makeOffers(executorId)
  }

На стороне executor-а после получения RegisteredExecutor:

// CoarseGrainedExecutorBackend.scala (упрощённо)
case RegisteredExecutor =>
  logInfo("Successfully registered with driver")
  executor = new Executor(
    executorId, hostname, env, userClassPath, isLocal = false,
    resources = resources)
  heartbeater.start()  // Запускаем heartbeat поток

Фаза 2: BlockManager регистрация

Отдельно от регистрации executor-а происходит регистрация BlockManager — компонента, управляющего хранением данных на executor-е. BlockManager регистрируется у BlockManagerMaster (который живёт в driver-е).

// BlockManager.scala (упрощённо)
def initialize(appId: String): Unit = {
  blockTransferService.init(this)
  shuffleClient.init(appId)
  
  val id = BlockManagerId(executorId, blockManagerHost, blockManagerPort, ...)
  val idFromMaster = master.registerBlockManager(
    id,
    diskBlockManager.localDirsString,
    maxOnHeapMemory,
    maxOffHeapMemory,
    storageEndpoint
  )
  blockManagerId = idFromMaster
}

BlockManagerMaster хранит реестр всех BlockManager-ов в кластере. Это позволяет DAGScheduler и TaskSetManager знать, на каких executor-ах находятся конкретные блоки данных (кэшированные партиции, shuffle-файлы), что используется для вычисления data locality.

Фаза 3: Heartbeats — пульс executor-а

После успешной регистрации executor запускает фоновый поток heartbeat. Каждые spark.executor.heartbeatInterval (default: 10 секунд) он отправляет драйверу сообщение Heartbeat:

// Executor.scala (упрощённо)
private val heartbeater = new Heartbeater(
  () => Executor.this.reportHeartBeat(),
  "executor-heartbeat",
  intervalMs = conf.get(EXECUTOR_HEARTBEAT_INTERVAL)
)

private def reportHeartBeat(): Unit = {
  // Собираем метрики активных tasks
  val tasksMetrics = runningTasks.values()
    .map(t => (t.taskId, t.task.metrics.serialize()))
    .toArray
  
  val message = Heartbeat(executorId, tasksMetrics, env.blockManager.blockManagerId)
  
  // Отправляем синхронно с таймаутом
  val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
    message,
    new RpcTimeout(executorTimeoutMs, EXECUTOR_HEARTBEAT_INTERVAL.key))
  
  if (response.reregisterBlockManager) {
    // Драйвер попросил переперегистрировать BlockManager
    env.blockManager.reregister()
  }
}

На стороне driver-а heartbeat принимает HeartbeatReceiver:

// HeartbeatReceiver.scala (упрощённо)
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
    val unknownExecutor = !scheduler.executorHeartbeatReceived(
      executorId, taskMetrics, blockManagerId, ...)
    context.reply(HeartbeatResponse(reregisterBlockManager = unknownExecutor))
}

HeartbeatReceiver также содержит фоновый поток, проверяющий: не пропустил ли какой-то executor heartbeat дольше spark.network.timeout (default: 120 секунд)?

// HeartbeatReceiver.scala
private def expireDeadHosts(): Unit = {
  val now = clock.getTimeMillis()
  for ((executorId, lastSeenMs) <- executorLastSeen) {
    if (now - lastSeenMs > executorTimeoutMs) {
      logWarning(s"Removing executor $executorId with no recent heartbeats: " +
        s"${now - lastSeenMs} ms exceeds timeout ${executorTimeoutMs} ms")
      scheduler.executorLost(executorId, SlaveLost("..."))
      executorLastSeen.remove(executorId)
    }
  }
}

Критическое соотношение таймаутов

# ВАЖНО: heartbeatInterval ДОЛЖЕН быть значительно меньше network.timeout
spark.conf.set("spark.executor.heartbeatInterval", "10s")  # default: 10s
spark.conf.set("spark.network.timeout", "120s")             # default: 120s

# Безопасное правило: network.timeout > 3 * heartbeatInterval
# При default: 120s > 3 * 10s = 30s — запас есть

Если spark.executor.heartbeatInterval близко к spark.network.timeout, любая небольшая задержка в сети или GC-пауза вызовет ложную потерю executor-а. В production с GC-паузами на больших heap рекомендуется увеличивать оба значения пропорционально:

# Для executor-ов с большим heap (>32GB) и возможными длинными GC
spark.conf.set("spark.executor.heartbeatInterval", "30s")
spark.conf.set("spark.network.timeout", "360s")

Фаза 4: Потеря executor — реакция системы

Когда executor теряется (HeartbeatReceiver не получил heartbeat за network.timeout, или Cluster Manager сообщил об аварийном завершении Pod/контейнера), происходит цепочка событий:

TaskSchedulerImpl.executorLost():

// TaskSchedulerImpl.scala (упрощённо)
def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
  taskSetsByStageIdAndAttempt.values.foreach { taskSets =>
    taskSets.values.foreach { tsm =>
      tsm.executorLost(executorId, hostname, reason)
    }
  }
  
  // Уведомляем DAGScheduler
  dagScheduler.executorLost(executorId, reason)
  
  // Запрашиваем Cluster Manager (возможно, нужен новый executor)
  backend.reviveOffers()
}

Ключевое решение DAGScheduler-а: нужно ли пересчитывать shuffle стадию?

// DAGScheduler.scala (упрощённо)
def executorLost(execId: String, reason: ExecutorLossReason): Unit = {
  // Проверяем: хранил ли этот executor shuffle output для активных stages?
  val failedEpoch = mapOutputTracker.getEpoch
  
  shuffleIdToMapStage.values.foreach { shuffleStage =>
    // Если у потерянного executor-а был shuffle output для этой stage
    if (outputLocsByMapId.contains(execId)) {
      // Пометить эти outputs как unavailable
      mapOutputTracker.removeOutputsOnExecutor(execId)
      
      // Если shuffle stage уже завершена — запустить её снова
      if (shuffleStage.isAvailable) {
        resubmitFailedStages()
      }
    }
  }
}

Именно поэтому потеря executor-а может вызвать пересчёт не только текущей стадии, но и предыдущей — той, которая писала shuffle. Это проявляется в Spark UI как неожиданное повторное выполнение уже завершённых стадий.

Различение причин потери

Не все потери executor-а одинаковы. Spark различает:

ПричинаКлассПоведение
Heartbeat timeoutExecutorHeartbeatLostМожет быть GC, считается recoverable
Cluster Manager killExecutorExited(exitCode)exitCode != 0 = потеря данных
Graceful decommissionExecutorDecommissionДанные мигрированы, нет потерь
OOM killExecutorExited(137)Нужно увеличить executor memory

В логах driver-а:

WARN HeartbeatReceiver: Removing executor 5 with no recent heartbeats:
  125423 ms exceeds timeout 120000 ms

INFO CoarseGrainedSchedulerBackend: Executor 5 lost:
  ExecutorHeartbeatLost(last heartbeat: 125s ago)

INFO TaskSetManager: Re-queueing tasks [42, 43, 44] in stage 7.0 that were
  running on lost executor 5

Фаза 5: Graceful Decommissioning

Обычная потеря executor-а — аварийная ситуация с потерей данных. Graceful decommissioning (SPARK-20624) — это механизм плановой остановки executor-а без потери данных. Он особенно важен для облачных кластеров с spot-instances и Kubernetes с preemption.

Деcommission инициируется несколькими способами:

  • Cluster Manager уведомляет Spark о предстоящем прекращении ноды (YARN Node decommissioning, K8s preemption notice)
  • Operator явно вызывает decommission через REST API driver-а
  • EAM при scale down через decommissionExecutors()

Конфигурация decommissioning

# Включить decommissioning
spark.conf.set("spark.decommission.enabled", "true")

# Миграция cached RDD blocks
spark.conf.set("spark.storage.decommission.enabled", "true")
spark.conf.set("spark.storage.decommission.rddBlocks.enabled", "true")

# Миграция shuffle blocks
spark.conf.set("spark.storage.decommission.shuffleBlocks.enabled", "true")

# Максимальное число параллельных миграций
spark.conf.set("spark.storage.decommission.maxReplicationFailuresPerBlock", "3")

# Таймаут ожидания миграции (после него — принудительное завершение)
spark.conf.set("spark.storage.decommission.replicationRampUpTime", "300s")

Миграция блоков: детали реализации

При получении DecommissionExecutor BlockManager на executor-е начинает процесс BlockManagerDecommissioner:

// BlockManagerDecommissioner.scala (упрощённо)
def start(): Unit = {
  if (conf.get(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
    migratableShuffleBlocks = fetchMigratableShuffleBlocks()
    shuffleMigrationPool.submit(new ShuffleMigrationRunnable())
  }
  
  if (conf.get(STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) {
    migratableRDDBlocks = fetchMigratableRDDBlocks()
    rddBlocksMigrationPool.submit(new RDDBlocksMigrationRunnable())
  }
}

// Выбор целевого BlockManager для миграции
private def getDestination(blockId: BlockId): Option[BlockManagerId] = {
  // Получаем список живых executor-ов из BlockManagerMaster
  // Выбираем executor с достаточным свободным местом
  // Избегаем текущего executor-а (decommissioned)
  blockManager.master.getPeers(blockManager.blockManagerId)
    .filterNot(_.isDriver)
    .find(canFitBlock(blockId, _))
}

Интересный аспект: DAGScheduler учитывает состояние decommissioning при обработке ошибок. Если fetch failure произошла с executor-а, который был в состоянии decommission — DAGScheduler может быть более терпелив и не немедленно рестартовать stage:

// DAGScheduler.scala
case FetchFailed(bmAddress, shuffleId, mapTaskId, ...) =>
  val failedExec = bmAddress.map(_.executorId)
  val isDecommissioned = failedExec.exists { execId =>
    taskScheduler.getExecutorDecommissionState(execId).isDefined
  }
  if (isDecommissioned) {
    // Executor в decommission — вероятно, блоки мигрируют.
    // Ждём завершения миграции вместо немедленного перезапуска стадии.
    logInfo(s"Fetch failure from decommissioning executor $failedExec. " +
      "Will wait for migration to complete.")
  }

Мониторинг жизненного цикла

Spark UI

В Spark UI -> Executors можно наблюдать весь жизненный цикл:

Active Executors: 12       # текущие живые
Dead Executors: 3          # потеряны или деcommissioned
Blacklisted Executors: 1   # в exclusion list после повторных ошибок

В разделе Event Log (если включён) видны все SparkListenerExecutorAdded / SparkListenerExecutorRemoved события с timestamp и причиной.

JMX метрики executor-а

Executor экспортирует JMX метрики через ExecutorMetricsSource:

spark.executor.metrics.JVMHeapMemory          # Heap используется
spark.executor.metrics.OnHeapStorageMemory    # Storage memory
spark.executor.metrics.OnHeapExecutionMemory  # Execution memory
spark.executor.metrics.GarbageCollectionTime  # Суммарное время GC
spark.executor.metrics.tasksFailed            # Число упавших tasks

Аномальный рост GarbageCollectionTime — ранний признак проблем с памятью до того, как executor начнёт пропускать heartbeat.

Production чеклист

# 1. Убедитесь, что heartbeat << network.timeout
assert heartbeat_interval_ms * 4 < network_timeout_ms, \
    "Risk of false executor loss due to GC pauses"

# 2. Для кластеров с spot/preemptible нодами — включить decommission
spark.conf.set("spark.decommission.enabled", "true")
spark.conf.set("spark.storage.decommission.enabled", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.enabled", "true")

# 3. Мониторинг через SparkListener
class ExecutorLifecycleListener(SparkListener):
    def onExecutorAdded(self, e):
        metrics.increment("executor.added",
                          tags={"app": e.executorInfo().host()})
    def onExecutorRemoved(self, e):
        metrics.increment("executor.removed",
                          tags={"reason": e.reason()})

Попробуй сам

Наблюдаем за lifecycle событиями через кастомный SparkListener:

from pyspark.sql import SparkSession
from pyspark import SparkContext

# PySpark не позволяет напрямую писать SparkListener,
# но можно читать логи через spark.sparkContext.statusTracker()

spark = SparkSession.builder \
    .appName("executor-lifecycle-demo") \
    .config("spark.executor.heartbeatInterval", "5s") \
    .config("spark.network.timeout", "60s") \
    .config("spark.executor.instances", "3") \
    .getOrCreate()

sc = spark.sparkContext

# Запускаем задание и смотрим на executor-ы
rdd = sc.range(1_000_000, numSlices=12)
result = rdd.map(lambda x: x * 2).sum()
print(f"Sum: {result}")

# Статус executor-ов через StatusTracker
tracker = sc.statusTracker()

# Получаем список активных executor-ов
# (не все методы доступны в Python API, но в Scala полный доступ)
print(f"Active jobs: {tracker.getActiveJobIds()}")
print(f"Active stages: {tracker.getActiveStageIds()}")

# В драйвер-логах (INFO уровень) ищите:
# "Successfully registered with driver" -- успешная регистрация
# "heartbeat from executor" -- периодические heartbeats
# "Removing executor X with no recent heartbeats" -- таймаут

# Для детальной диагностики:
sc.setLogLevel("DEBUG")
# Предупреждение: очень verbose, только для локальной отладки

spark.stop()

# После остановки: в логах должно быть
# "Executor N finished with exit code 0"
# SparkListenerExecutorRemoved с reason="Application shutdown"
TIP

В production включите историю событий через spark.eventLog.enabled=true и spark.eventLog.dir. Spark History Server позволяет после завершения приложения просмотреть полный timeline executor-ов, включая все добавления, удаления, потери и decommission события.

Проверка знанийKnowledge check
Spark-приложение запущено на Kubernetes с preemptible нодами. Нода с тремя executor-ами получила уведомление о preemption. spark.decommission.enabled=false. Что произойдёт с tasks, которые выполнялись на этих executor-ах, и с данными, которые они хранили (кэш и shuffle output)?
ОтветAnswer
Без decommission всё произойдёт аварийно. Когда нода будет остановлена: 1) Pod-ы трёх executor-ов получат SIGKILL. 2) HeartbeatReceiver на driver-е через spark.network.timeout (120s по умолчанию) зафиксирует потерю. 3) Все tasks, выполнявшиеся на этих executor-ах, будут помечены как FAILED и переплановываться на оставшихся executor-ах. 4) Кэшированные RDD-партиции потеряны безвозвратно — при обращении к ним они будут пересчитаны с нуля. 5) Shuffle output для завершённых стадий потерян — DAGScheduler инициирует повторный запуск тех ShuffleMapStage, чьи данные хранились на потерянных executor-ах. Это дополнительные часы работы. С spark.decommission.enabled=true K8s мог бы уведомить Spark заблаговременно (через SIGTERM), executor успел бы мигрировать блоки, и только текущие tasks нужно было бы перезапустить.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. spark.executor.heartbeatInterval=10s, spark.network.timeout=120s. Executor 7 попал в длинный Full GC — пауза 95 секунд. Что произойдёт с executor-ом и выполняемыми на нём tasks?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6