Жизненный цикл executor
Executor в Spark — это долгоживущий JVM-процесс, который запускается Cluster Manager-ом, регистрируется у driver-а, получает и выполняет task-и, периодически отправляет heartbeat-ы, и в конечном счёте завершается: либо штатно при завершении приложения, либо аварийно (OOM, network loss), либо через механизм graceful decommissioning.
Классы, реализующие эту логику: CoarseGrainedExecutorBackend (процесс executor-а) и CoarseGrainedSchedulerBackend$DriverEndpoint (driver-сторона). Понимание их взаимодействия критично для диагностики проблем: «executor lost», «heartbeat timeout», «fetch failed» — все эти ошибки уходят корнями в жизненный цикл executor-а.
Название «Coarse-Grained» историческое: оно противопоставляется «Fine-Grained» режиму (существовавшему в ранних версиях Spark с Mesos), где каждый task запускался в отдельном процессе. В современном Spark CoarseGrained — единственный режим для production.
CoarseGrainedExecutorBackend: точка входа
CoarseGrainedExecutorBackend — это главный класс executor-процесса. Он запускается как отдельный JVM-процесс, с точкой входа в CoarseGrainedExecutorBackend.main(). Cluster Manager передаёт ему параметры через аргументы командной строки или переменные окружения:
# Пример команды запуска executor-а (упрощённо):
java -cp spark.jar \
org.apache.spark.executor.CoarseGrainedExecutorBackend \
--driver-url spark://driver-host:7077 \
--executor-id 5 \
--hostname worker-3 \
--cores 4 \
--app-id app-12345 \
--resourceProfileId 0
CoarseGrainedExecutorBackend реализует RpcEndpoint — он является участником RPC-системы Spark (основанной на Netty). Его взаимодействие с driver-ом происходит через обмен сообщениями.
Фаза 1: Регистрация executor-а
Сразу после запуска executor-процесс отправляет драйверу сообщение RegisterExecutor. Это синхронная операция: executor не может начать выполнять задачи до получения подтверждения.
Рассмотрим ключевые шаги подробнее.
На стороне driver-а (DriverEndpoint.receiveAndReply):
// CoarseGrainedSchedulerBackend.scala (упрощённо)
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, ...) =>
if (executorDataMap.contains(executorId)) {
// Дубликат — отклоняем
executorRef.send(RegisterExecutorFailed(
s"Duplicate executor ID: $executorId"))
} else {
// Регистрируем executor
executorDataMap.put(executorId, ExecutorData(
executorRef, hostname, cores, coresUsed=0, ...
))
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
// Уведомляем SparkListeners (включая EAM, History Server)
listenerBus.post(SparkListenerExecutorAdded(
System.currentTimeMillis(), executorId, executorInfo))
// Планируем tasks на новый executor
makeOffers(executorId)
}
На стороне executor-а после получения RegisteredExecutor:
// CoarseGrainedExecutorBackend.scala (упрощённо)
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
executor = new Executor(
executorId, hostname, env, userClassPath, isLocal = false,
resources = resources)
heartbeater.start() // Запускаем heartbeat поток
Фаза 2: BlockManager регистрация
Отдельно от регистрации executor-а происходит регистрация BlockManager — компонента, управляющего хранением данных на executor-е. BlockManager регистрируется у BlockManagerMaster (который живёт в driver-е).
// BlockManager.scala (упрощённо)
def initialize(appId: String): Unit = {
blockTransferService.init(this)
shuffleClient.init(appId)
val id = BlockManagerId(executorId, blockManagerHost, blockManagerPort, ...)
val idFromMaster = master.registerBlockManager(
id,
diskBlockManager.localDirsString,
maxOnHeapMemory,
maxOffHeapMemory,
storageEndpoint
)
blockManagerId = idFromMaster
}
BlockManagerMaster хранит реестр всех BlockManager-ов в кластере. Это позволяет DAGScheduler и TaskSetManager знать, на каких executor-ах находятся конкретные блоки данных (кэшированные партиции, shuffle-файлы), что используется для вычисления data locality.
Фаза 3: Heartbeats — пульс executor-а
После успешной регистрации executor запускает фоновый поток heartbeat. Каждые spark.executor.heartbeatInterval (default: 10 секунд) он отправляет драйверу сообщение Heartbeat:
// Executor.scala (упрощённо)
private val heartbeater = new Heartbeater(
() => Executor.this.reportHeartBeat(),
"executor-heartbeat",
intervalMs = conf.get(EXECUTOR_HEARTBEAT_INTERVAL)
)
private def reportHeartBeat(): Unit = {
// Собираем метрики активных tasks
val tasksMetrics = runningTasks.values()
.map(t => (t.taskId, t.task.metrics.serialize()))
.toArray
val message = Heartbeat(executorId, tasksMetrics, env.blockManager.blockManagerId)
// Отправляем синхронно с таймаутом
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
message,
new RpcTimeout(executorTimeoutMs, EXECUTOR_HEARTBEAT_INTERVAL.key))
if (response.reregisterBlockManager) {
// Драйвер попросил переперегистрировать BlockManager
env.blockManager.reregister()
}
}
На стороне driver-а heartbeat принимает HeartbeatReceiver:
// HeartbeatReceiver.scala (упрощённо)
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId, ...)
context.reply(HeartbeatResponse(reregisterBlockManager = unknownExecutor))
}
HeartbeatReceiver также содержит фоновый поток, проверяющий: не пропустил ли какой-то executor heartbeat дольше spark.network.timeout (default: 120 секунд)?
// HeartbeatReceiver.scala
private def expireDeadHosts(): Unit = {
val now = clock.getTimeMillis()
for ((executorId, lastSeenMs) <- executorLastSeen) {
if (now - lastSeenMs > executorTimeoutMs) {
logWarning(s"Removing executor $executorId with no recent heartbeats: " +
s"${now - lastSeenMs} ms exceeds timeout ${executorTimeoutMs} ms")
scheduler.executorLost(executorId, SlaveLost("..."))
executorLastSeen.remove(executorId)
}
}
}
Критическое соотношение таймаутов
# ВАЖНО: heartbeatInterval ДОЛЖЕН быть значительно меньше network.timeout
spark.conf.set("spark.executor.heartbeatInterval", "10s") # default: 10s
spark.conf.set("spark.network.timeout", "120s") # default: 120s
# Безопасное правило: network.timeout > 3 * heartbeatInterval
# При default: 120s > 3 * 10s = 30s — запас есть
Если spark.executor.heartbeatInterval близко к spark.network.timeout, любая небольшая задержка в сети или GC-пауза вызовет ложную потерю executor-а. В production с GC-паузами на больших heap рекомендуется увеличивать оба значения пропорционально:
# Для executor-ов с большим heap (>32GB) и возможными длинными GC
spark.conf.set("spark.executor.heartbeatInterval", "30s")
spark.conf.set("spark.network.timeout", "360s")
Фаза 4: Потеря executor — реакция системы
Когда executor теряется (HeartbeatReceiver не получил heartbeat за network.timeout, или Cluster Manager сообщил об аварийном завершении Pod/контейнера), происходит цепочка событий:
TaskSchedulerImpl.executorLost():
// TaskSchedulerImpl.scala (упрощённо)
def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
taskSetsByStageIdAndAttempt.values.foreach { taskSets =>
taskSets.values.foreach { tsm =>
tsm.executorLost(executorId, hostname, reason)
}
}
// Уведомляем DAGScheduler
dagScheduler.executorLost(executorId, reason)
// Запрашиваем Cluster Manager (возможно, нужен новый executor)
backend.reviveOffers()
}
Ключевое решение DAGScheduler-а: нужно ли пересчитывать shuffle стадию?
// DAGScheduler.scala (упрощённо)
def executorLost(execId: String, reason: ExecutorLossReason): Unit = {
// Проверяем: хранил ли этот executor shuffle output для активных stages?
val failedEpoch = mapOutputTracker.getEpoch
shuffleIdToMapStage.values.foreach { shuffleStage =>
// Если у потерянного executor-а был shuffle output для этой stage
if (outputLocsByMapId.contains(execId)) {
// Пометить эти outputs как unavailable
mapOutputTracker.removeOutputsOnExecutor(execId)
// Если shuffle stage уже завершена — запустить её снова
if (shuffleStage.isAvailable) {
resubmitFailedStages()
}
}
}
}
Именно поэтому потеря executor-а может вызвать пересчёт не только текущей стадии, но и предыдущей — той, которая писала shuffle. Это проявляется в Spark UI как неожиданное повторное выполнение уже завершённых стадий.
Различение причин потери
Не все потери executor-а одинаковы. Spark различает:
| Причина | Класс | Поведение |
|---|---|---|
| Heartbeat timeout | ExecutorHeartbeatLost | Может быть GC, считается recoverable |
| Cluster Manager kill | ExecutorExited(exitCode) | exitCode != 0 = потеря данных |
| Graceful decommission | ExecutorDecommission | Данные мигрированы, нет потерь |
| OOM kill | ExecutorExited(137) | Нужно увеличить executor memory |
В логах driver-а:
WARN HeartbeatReceiver: Removing executor 5 with no recent heartbeats:
125423 ms exceeds timeout 120000 ms
INFO CoarseGrainedSchedulerBackend: Executor 5 lost:
ExecutorHeartbeatLost(last heartbeat: 125s ago)
INFO TaskSetManager: Re-queueing tasks [42, 43, 44] in stage 7.0 that were
running on lost executor 5
Фаза 5: Graceful Decommissioning
Обычная потеря executor-а — аварийная ситуация с потерей данных. Graceful decommissioning (SPARK-20624) — это механизм плановой остановки executor-а без потери данных. Он особенно важен для облачных кластеров с spot-instances и Kubernetes с preemption.
Деcommission инициируется несколькими способами:
- Cluster Manager уведомляет Spark о предстоящем прекращении ноды (YARN Node decommissioning, K8s preemption notice)
- Operator явно вызывает decommission через REST API driver-а
- EAM при scale down через
decommissionExecutors()
Конфигурация decommissioning
# Включить decommissioning
spark.conf.set("spark.decommission.enabled", "true")
# Миграция cached RDD blocks
spark.conf.set("spark.storage.decommission.enabled", "true")
spark.conf.set("spark.storage.decommission.rddBlocks.enabled", "true")
# Миграция shuffle blocks
spark.conf.set("spark.storage.decommission.shuffleBlocks.enabled", "true")
# Максимальное число параллельных миграций
spark.conf.set("spark.storage.decommission.maxReplicationFailuresPerBlock", "3")
# Таймаут ожидания миграции (после него — принудительное завершение)
spark.conf.set("spark.storage.decommission.replicationRampUpTime", "300s")
Миграция блоков: детали реализации
При получении DecommissionExecutor BlockManager на executor-е начинает процесс BlockManagerDecommissioner:
// BlockManagerDecommissioner.scala (упрощённо)
def start(): Unit = {
if (conf.get(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
migratableShuffleBlocks = fetchMigratableShuffleBlocks()
shuffleMigrationPool.submit(new ShuffleMigrationRunnable())
}
if (conf.get(STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) {
migratableRDDBlocks = fetchMigratableRDDBlocks()
rddBlocksMigrationPool.submit(new RDDBlocksMigrationRunnable())
}
}
// Выбор целевого BlockManager для миграции
private def getDestination(blockId: BlockId): Option[BlockManagerId] = {
// Получаем список живых executor-ов из BlockManagerMaster
// Выбираем executor с достаточным свободным местом
// Избегаем текущего executor-а (decommissioned)
blockManager.master.getPeers(blockManager.blockManagerId)
.filterNot(_.isDriver)
.find(canFitBlock(blockId, _))
}
Интересный аспект: DAGScheduler учитывает состояние decommissioning при обработке ошибок. Если fetch failure произошла с executor-а, который был в состоянии decommission — DAGScheduler может быть более терпелив и не немедленно рестартовать stage:
// DAGScheduler.scala
case FetchFailed(bmAddress, shuffleId, mapTaskId, ...) =>
val failedExec = bmAddress.map(_.executorId)
val isDecommissioned = failedExec.exists { execId =>
taskScheduler.getExecutorDecommissionState(execId).isDefined
}
if (isDecommissioned) {
// Executor в decommission — вероятно, блоки мигрируют.
// Ждём завершения миграции вместо немедленного перезапуска стадии.
logInfo(s"Fetch failure from decommissioning executor $failedExec. " +
"Will wait for migration to complete.")
}
Мониторинг жизненного цикла
Spark UI
В Spark UI -> Executors можно наблюдать весь жизненный цикл:
Active Executors: 12 # текущие живые
Dead Executors: 3 # потеряны или деcommissioned
Blacklisted Executors: 1 # в exclusion list после повторных ошибок
В разделе Event Log (если включён) видны все SparkListenerExecutorAdded / SparkListenerExecutorRemoved события с timestamp и причиной.
JMX метрики executor-а
Executor экспортирует JMX метрики через ExecutorMetricsSource:
spark.executor.metrics.JVMHeapMemory # Heap используется
spark.executor.metrics.OnHeapStorageMemory # Storage memory
spark.executor.metrics.OnHeapExecutionMemory # Execution memory
spark.executor.metrics.GarbageCollectionTime # Суммарное время GC
spark.executor.metrics.tasksFailed # Число упавших tasks
Аномальный рост GarbageCollectionTime — ранний признак проблем с памятью до того, как executor начнёт пропускать heartbeat.
Production чеклист
# 1. Убедитесь, что heartbeat << network.timeout
assert heartbeat_interval_ms * 4 < network_timeout_ms, \
"Risk of false executor loss due to GC pauses"
# 2. Для кластеров с spot/preemptible нодами — включить decommission
spark.conf.set("spark.decommission.enabled", "true")
spark.conf.set("spark.storage.decommission.enabled", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.enabled", "true")
# 3. Мониторинг через SparkListener
class ExecutorLifecycleListener(SparkListener):
def onExecutorAdded(self, e):
metrics.increment("executor.added",
tags={"app": e.executorInfo().host()})
def onExecutorRemoved(self, e):
metrics.increment("executor.removed",
tags={"reason": e.reason()})
Попробуй сам
Наблюдаем за lifecycle событиями через кастомный SparkListener:
from pyspark.sql import SparkSession
from pyspark import SparkContext
# PySpark не позволяет напрямую писать SparkListener,
# но можно читать логи через spark.sparkContext.statusTracker()
spark = SparkSession.builder \
.appName("executor-lifecycle-demo") \
.config("spark.executor.heartbeatInterval", "5s") \
.config("spark.network.timeout", "60s") \
.config("spark.executor.instances", "3") \
.getOrCreate()
sc = spark.sparkContext
# Запускаем задание и смотрим на executor-ы
rdd = sc.range(1_000_000, numSlices=12)
result = rdd.map(lambda x: x * 2).sum()
print(f"Sum: {result}")
# Статус executor-ов через StatusTracker
tracker = sc.statusTracker()
# Получаем список активных executor-ов
# (не все методы доступны в Python API, но в Scala полный доступ)
print(f"Active jobs: {tracker.getActiveJobIds()}")
print(f"Active stages: {tracker.getActiveStageIds()}")
# В драйвер-логах (INFO уровень) ищите:
# "Successfully registered with driver" -- успешная регистрация
# "heartbeat from executor" -- периодические heartbeats
# "Removing executor X with no recent heartbeats" -- таймаут
# Для детальной диагностики:
sc.setLogLevel("DEBUG")
# Предупреждение: очень verbose, только для локальной отладки
spark.stop()
# После остановки: в логах должно быть
# "Executor N finished with exit code 0"
# SparkListenerExecutorRemoved с reason="Application shutdown"
В production включите историю событий через spark.eventLog.enabled=true и spark.eventLog.dir. Spark History Server позволяет после завершения приложения просмотреть полный timeline executor-ов, включая все добавления, удаления, потери и decommission события.