Task Locality и Delay Scheduling
Когда TaskSchedulerImpl получает TaskSet и располагает свободными executor-слотами, он не просто берёт первый попавшийся task и назначает его на первый попавшийся executor. Внутри TaskSetManager выполняется алгоритм delay scheduling — один из самых тонких и наименее понятых механизмов Spark. Именно он определяет: стоит ли подождать освобождения «правильного» executor или лучше запустить task сейчас на «неправильном».
Неправильная настройка этого алгоритма может привести к тому, что десятки свободных executor-ядер простаивают, пока JobScheduler терпеливо ждёт нод с нужными данными. Или, наоборот, Spark будет гонять гигабайты по сети вместо того, чтобы подождать 3 секунды. Понимание деталей делает разницу между 10-минутным и 2-часовым заданием.
Пять уровней локальности
Spark определяет иерархию пяти уровней Task Locality, каждый из которых описывает, насколько «близко» executor к данным, которые task должна обработать.
Важно: уровень NO_PREF вставлен между NODE_LOCAL и RACK_LOCAL намеренно. Task с NO_PREF не имеет предпочтений, поэтому её можно запустить сразу на любом свободном executor, не ожидая. Она не «деградирует» с более высокого уровня — у неё вообще нет предпочтений по расположению.
Когда возникает каждый уровень
# PROCESS_LOCAL: данные закэшированы в executor memory
df = spark.read.parquet("/data/sales/")
df.cache()
df.count() # первый проход: данные читаются, кэшируются
df.filter("amount > 100").count() # второй проход: PROCESS_LOCAL
# NODE_LOCAL: HDFS DataNode co-located с executor
# Spark читает блок с локального DataNode, без сети
df2 = spark.read.parquet("hdfs://namenode:8020/data/")
# NO_PREF: S3 одинаково далеко от всех нод
df3 = spark.read.parquet("s3://bucket/data/")
# RACK_LOCAL: HDFS replication factor=3, данные на нодах в той же стойке
# Исходная нода занята, выбирается реплика в том же rack
# ANY: все нужные ноды заняты, приходится использовать любой executor
Внутреннее устройство TaskSetManager
TaskSetManager — это объект Scala, который управляет одним TaskSet (всеми задачами одной стадии). Класс находится в org.apache.spark.scheduler.TaskSetManager. Для реализации delay scheduling он ведёт несколько ключевых структур данных.
Pending-очереди по уровням локальности
// TaskSetManager.scala (упрощённо)
private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
// executorId -> список индексов tasks
private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
// hostname -> список индексов tasks
private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
// rack -> список индексов tasks
private val pendingTasksWithNoPrefs = new ArrayBuffer[Int]
// tasks без предпочтений по локальности
private val allPendingTasks = new ArrayBuffer[Int]
// все незавершённые tasks (для ANY)
Каждая task может находиться одновременно в нескольких очередях. Например, task с предпочтением к executor exec-1 на ноде worker-3 в rack rack-A будет зарегистрирована в pendingTasksForExecutor("exec-1"), pendingTasksForHost("worker-3"), pendingTasksForRack("rack-A") и allPendingTasks.
Состояние delay scheduling
// Текущий разрешённый уровень локальности
private var currentLocalityIndex = 0
// Время последнего сброса ожидания
private var lastLocalityWaitResetTime = clock.getTimeMillis()
// Вычисленные допустимые уровни для этого TaskSet
private var myLocalityLevels = computeValidLocalityLevels()
// Таймауты для каждого уровня (в порядке убывания приоритета)
private var localityWaits = myLocalityLevels.map(getLocalityWait)
Метод computeValidLocalityLevels() динамически определяет, какие уровни локальности вообще достижимы для данного TaskSet. Если все данные в S3 и ни одна task не имеет preferences, уровни PROCESS_LOCAL и NODE_LOCAL не включаются в myLocalityLevels — и Spark не будет их ждать.
Алгоритм delay scheduling
Когда TaskSchedulerImpl вызывает resourceOffer(execId, host, maxLocality) на TaskSetManager, происходит следующее:
Ключевой метод — getAllowedLocalityLevel():
// TaskSetManager.scala (упрощённо)
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
// Поднимаемся по уровням, пока не найдём уровень,
// для которого таймаут ещё не истёк
while (currentLocalityIndex < myLocalityLevels.length - 1) {
val morePreferable = myLocalityLevels(currentLocalityIndex)
val currTime = curTime - lastLocalityWaitResetTime
if (currTime >= localityWaits(currentLocalityIndex)) {
// Таймаут этого уровня истёк — переходим к следующему (менее предпочтительному)
currentLocalityIndex += 1
lastLocalityWaitResetTime += localityWaits(currentLocalityIndex - 1)
} else {
return morePreferable // Ещё ждём
}
}
myLocalityLevels(currentLocalityIndex)
}
Этот алгоритм имеет важную тонкость: таймаут измеряется с момента последнего сброса, а не с момента прихода первого pending-task. Когда task успешно запускается на уровне NODE_LOCAL, lastLocalityWaitResetTime сбрасывается — и отсчёт ожидания следующего слота начинается заново.
Параметры spark.locality.wait
Spark предоставляет гранулярный контроль над временем ожидания для каждого уровня локальности:
# Глобальный таймаут (применяется ко всем уровням, если не переопределены)
spark.conf.set("spark.locality.wait", "3s") # default: 3s
# Переопределение для конкретных уровней
spark.conf.set("spark.locality.wait.process", "3s") # PROCESS_LOCAL (default: spark.locality.wait)
spark.conf.set("spark.locality.wait.node", "3s") # NODE_LOCAL (default: spark.locality.wait)
spark.conf.set("spark.locality.wait.rack", "3s") # RACK_LOCAL (default: spark.locality.wait)
# Для NO_PREF и ANY таймаут не применяется
Если spark.locality.wait.process не задан явно, он наследует значение spark.locality.wait. Каждый уровень имеет независимый таймаут. Цепочка ожидания для task с предпочтением PROCESS_LOCAL при дефолтных настройках:
t=0s: Ищем PROCESS_LOCAL слот. Свободных нет.
t=3s: Таймаут PROCESS_LOCAL. Ищем NODE_LOCAL слот. Нашли!
=> Запускаем с NODE_LOCAL, сбрасываем таймер.
--- или ---
t=0s: Ищем PROCESS_LOCAL слот. Нет.
t=3s: Ищем NODE_LOCAL слот. Нет.
t=6s: Ищем RACK_LOCAL слот. Нет.
t=9s: Используем ANY. Запускаем на любом свободном executor.
Максимальное ожидание для одной task — сумма всех таймаутов: wait.process + wait.node + wait.rack = 9s при дефолтных настройках. При большом числе partition-задач и занятом кластере это суммируется: если каждая из 1000 задач ждёт по 9s, общая задержка составит до 9000s. На практике таймер общий для всего TaskSet, а не для каждой задачи в отдельности.
Компромисс: ждать vs запустить сразу
Выбор значения spark.locality.wait — это классический инженерный trade-off.
Аргументы в пользу большего ожидания (>3s):
- Данные большие, и выигрыш от избегания сетевого трафика существенен
- Кластер HDFS с co-located executor-ами — реальная locality достижима
- Executor-ы на нужных нодах временно заняты, но скоро освободятся
- Shuffle-данные на конкретных нодах (между стадиями)
Аргументы в пользу меньшего ожидания (0-1s или 0s):
- Данные в облачном хранилище (S3, GCS, ADLS) — locality недостижима
- Кластер с autoscaling: нужные ноды могут никогда не освободиться
- Данные уже равномерно реплицированы — разница между NODE_LOCAL и ANY минимальна
- Кластер загружен неравномерно: простаивающие executor-ы есть только на «неправильных» нодах
# Для S3/GCS/ADLS — отключаем ожидание полностью
spark.conf.set("spark.locality.wait", "0s")
# Для HDFS on-premise с co-located DataNode
spark.conf.set("spark.locality.wait", "3s") # дефолт разумен
# Для смешанного: HDFS, но с autoscaling
spark.conf.set("spark.locality.wait.process", "5s")
spark.conf.set("spark.locality.wait.node", "2s")
spark.conf.set("spark.locality.wait.rack", "0s") # rack-locality редко даёт выигрыш
Индикаторы в Spark UI
В Spark UI вкладка Stages -> Tasks показывает столбец Locality Level для каждой запущенной task. Интерпретация результатов:
| Картина в UI | Диагноз | Действие |
|---|---|---|
| 90%+ PROCESS_LOCAL | Кэш работает эффективно | Всё хорошо |
| 50%+ NODE_LOCAL | HDFS locality работает | Умеренно хорошо |
| Большинство ANY | Locality не работает | spark.locality.wait=0s |
| Много RACK_LOCAL при пустых executor-ах | wait слишком большой | Уменьшить wait |
Быстрый способ проверить locality через код: после завершения stage посмотрите на TaskInfo.taskLocality в SparkListener или используйте spark.sparkContext.statusTracker().getStageInfo(stageId). В production инструментируйте приложение через кастомный SparkListener.
Диагностика через логи
При уровне INFO логирования TaskSchedulerImpl выводит сообщения при каждом изменении уровня локальности:
INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID 123,
worker-3, executor 7, partition 0, NODE_LOCAL, 4567 bytes)
WARN TaskSetManager: Loss was due to stage failure
(stage 5: nodes without data reached task locality ANY)
При DEBUG-уровне видна детальная картина:
DEBUG TaskSetManager: No tasks for locality PROCESS_LOCAL, checking
NODE_LOCAL. lastLocalityWaitResetTime=1716300000000, wait=3000ms
DEBUG TaskSetManager: Fell through to ANY after waiting 9012ms
Взаимодействие с кэшированием и shuffle
Понимание locality приобретает особое значение в двух сценариях:
Кэшированные RDD/DataFrame: когда данные закэшированы через cache() или persist(), BlockManager на каждом executor знает, какие партиции у него есть. TaskSetManager получает эту информацию через BlockManagerMasterEndpoint и заполняет pendingTasksForExecutor соответственно. Поэтому повторный проход по кэшированным данным даёт преимущественно PROCESS_LOCAL.
Shuffle-данные между стадиями: после завершения ShuffleMapStage, shuffle-файлы лежат на дисках конкретных executor-ов (или во внешнем shuffle service). DAGScheduler знает, на каком executor находится каждый shuffle-блок. Если следующая стадия читает shuffle-данные, TaskSetManager может запланировать tasks с учётом NODE_LOCAL (если читающий executor на той же ноде, что пишущий). Именно поэтому иногда имеет смысл небольшое ожидание даже для shuffle-стадий.
Попробуй сам
Следующий код позволяет наблюдать за locality в действии. Сначала запустим без кэша (ожидаем ANY или NODE_LOCAL), затем с кэшем (ожидаем PROCESS_LOCAL):
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time
spark = SparkSession.builder \
.appName("locality-demo") \
.config("spark.executor.memory", "1g") \
.config("spark.locality.wait", "3s") \
.getOrCreate()
sc = spark.sparkContext
# Включаем логирование TaskSetManager
sc.setLogLevel("INFO")
# Шаг 1: Первый проход — данные не кэшированы
df = spark.range(10_000_000).toDF("id")
df2 = df.withColumn("value", col("id") * 2)
print("=== Первый проход (без кэша) ===")
t0 = time.time()
count1 = df2.count()
print(f"Count: {count1}, time: {time.time()-t0:.2f}s")
# В логах: locality будет ANY или NODE_LOCAL
# Шаг 2: Кэшируем
df2.cache()
df2.count() # прогрев кэша
# Шаг 3: Второй проход — данные в памяти executor-ов
print("=== Второй проход (с кэшем) ===")
t1 = time.time()
count2 = df2.filter(col("value") > 1000).count()
print(f"Count: {count2}, time: {time.time()-t1:.2f}s")
# В логах: locality должна быть преимущественно PROCESS_LOCAL
# Проверка через Spark UI:
# http://localhost:4040/stages/
# Вкладка Tasks -> столбец "Locality Level"
spark.stop()
В Spark UI после выполнения перейдите в Stages, найдите последний stage и откройте список tasks. Сравните столбец Locality Level между первым и вторым проходами. При правильно работающем кэше второй проход должен показать PROCESS_LOCAL для большинства tasks.