Learning Platform
Глоссарий Troubleshooting
Урок 04.03 · 28 мин
Продвинутый
Task LocalityDelay SchedulingTaskSetManagerPROCESS_LOCALNODE_LOCALRACK_LOCAL

Task Locality и Delay Scheduling

Когда TaskSchedulerImpl получает TaskSet и располагает свободными executor-слотами, он не просто берёт первый попавшийся task и назначает его на первый попавшийся executor. Внутри TaskSetManager выполняется алгоритм delay scheduling — один из самых тонких и наименее понятых механизмов Spark. Именно он определяет: стоит ли подождать освобождения «правильного» executor или лучше запустить task сейчас на «неправильном».

Неправильная настройка этого алгоритма может привести к тому, что десятки свободных executor-ядер простаивают, пока JobScheduler терпеливо ждёт нод с нужными данными. Или, наоборот, Spark будет гонять гигабайты по сети вместо того, чтобы подождать 3 секунды. Понимание деталей делает разницу между 10-минутным и 2-часовым заданием.

Пять уровней локальности

Spark определяет иерархию пяти уровней Task Locality, каждый из которых описывает, насколько «близко» executor к данным, которые task должна обработать.

Иерархия Task Locality (от лучшего к худшему)
PROCESS_LOCALДанные находятся в том же JVM-процессе executor: кэшированный RDD/DataFrame в памяти, Broadcast-переменная. Нет IO вообще. Наивысший приоритет.
NODE_LOCALДанные на том же физическом узле: локальный HDFS DataNode, локальный диск, shuffle-файлы соседнего executor-а на той же машине. Стоимость = disk IO, нет сетевого трафика.
NO_PREFTask не имеет предпочтений по локальности: чтение из S3/GCS/ADLS, JDBC-источники, генераторы данных. Любой executor одинаково хорош.
RACK_LOCALДанные на другом узле, но в той же сетевой стойке (rack). Трафик идёт через top-of-rack switch, не через core switch. Стоимость ниже, чем cross-rack.
ANYДанные доступны с любого узла через сеть. Наихудший вариант — полный сетевой трафик, нет гарантий близости. Fallback-уровень.

Важно: уровень NO_PREF вставлен между NODE_LOCAL и RACK_LOCAL намеренно. Task с NO_PREF не имеет предпочтений, поэтому её можно запустить сразу на любом свободном executor, не ожидая. Она не «деградирует» с более высокого уровня — у неё вообще нет предпочтений по расположению.

Когда возникает каждый уровень

# PROCESS_LOCAL: данные закэшированы в executor memory
df = spark.read.parquet("/data/sales/")
df.cache()
df.count()  # первый проход: данные читаются, кэшируются
df.filter("amount > 100").count()  # второй проход: PROCESS_LOCAL

# NODE_LOCAL: HDFS DataNode co-located с executor
# Spark читает блок с локального DataNode, без сети
df2 = spark.read.parquet("hdfs://namenode:8020/data/")

# NO_PREF: S3 одинаково далеко от всех нод
df3 = spark.read.parquet("s3://bucket/data/")

# RACK_LOCAL: HDFS replication factor=3, данные на нодах в той же стойке
# Исходная нода занята, выбирается реплика в том же rack

# ANY: все нужные ноды заняты, приходится использовать любой executor

Внутреннее устройство TaskSetManager

TaskSetManager — это объект Scala, который управляет одним TaskSet (всеми задачами одной стадии). Класс находится в org.apache.spark.scheduler.TaskSetManager. Для реализации delay scheduling он ведёт несколько ключевых структур данных.

Pending-очереди по уровням локальности

// TaskSetManager.scala (упрощённо)
private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
// executorId -> список индексов tasks

private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
// hostname -> список индексов tasks

private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
// rack -> список индексов tasks

private val pendingTasksWithNoPrefs = new ArrayBuffer[Int]
// tasks без предпочтений по локальности

private val allPendingTasks = new ArrayBuffer[Int]
// все незавершённые tasks (для ANY)

Каждая task может находиться одновременно в нескольких очередях. Например, task с предпочтением к executor exec-1 на ноде worker-3 в rack rack-A будет зарегистрирована в pendingTasksForExecutor("exec-1"), pendingTasksForHost("worker-3"), pendingTasksForRack("rack-A") и allPendingTasks.

Состояние delay scheduling

// Текущий разрешённый уровень локальности
private var currentLocalityIndex = 0

// Время последнего сброса ожидания
private var lastLocalityWaitResetTime = clock.getTimeMillis()

// Вычисленные допустимые уровни для этого TaskSet
private var myLocalityLevels = computeValidLocalityLevels()

// Таймауты для каждого уровня (в порядке убывания приоритета)
private var localityWaits = myLocalityLevels.map(getLocalityWait)

Метод computeValidLocalityLevels() динамически определяет, какие уровни локальности вообще достижимы для данного TaskSet. Если все данные в S3 и ни одна task не имеет preferences, уровни PROCESS_LOCAL и NODE_LOCAL не включаются в myLocalityLevels — и Spark не будет их ждать.

Алгоритм delay scheduling

Когда TaskSchedulerImpl вызывает resourceOffer(execId, host, maxLocality) на TaskSetManager, происходит следующее:

Алгоритм resourceOffer и delay scheduling
Шаг 1TaskSchedulerImpl вызывает resourceOffer для каждого свободного слота. Передаёт executorId, hostname и maxLocality (максимально допустимый уровень).
Шаг 2allowedLocalityLevel = min(currentLocalityLevel, maxLocality). Если текущий уровень лучше, чем может предложить executor — снижаем до maxLocality.
Шаг 3Ищем task в очереди для данного executor/host/rack с учётом allowedLocalityLevel. Если нашли — возвращаем TaskDescription.
Шаг 4Если task не найдена на требуемом уровне: проверяем, истёк ли таймаут текущего уровня. Если истёк — повышаем currentLocalityIndex, сбрасываем таймер. Повторяем поиск.

Ключевой метод — getAllowedLocalityLevel():

// TaskSetManager.scala (упрощённо)
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
  // Поднимаемся по уровням, пока не найдём уровень,
  // для которого таймаут ещё не истёк
  while (currentLocalityIndex < myLocalityLevels.length - 1) {
    val morePreferable = myLocalityLevels(currentLocalityIndex)
    val currTime = curTime - lastLocalityWaitResetTime
    if (currTime >= localityWaits(currentLocalityIndex)) {
      // Таймаут этого уровня истёк — переходим к следующему (менее предпочтительному)
      currentLocalityIndex += 1
      lastLocalityWaitResetTime += localityWaits(currentLocalityIndex - 1)
    } else {
      return morePreferable  // Ещё ждём
    }
  }
  myLocalityLevels(currentLocalityIndex)
}

Этот алгоритм имеет важную тонкость: таймаут измеряется с момента последнего сброса, а не с момента прихода первого pending-task. Когда task успешно запускается на уровне NODE_LOCAL, lastLocalityWaitResetTime сбрасывается — и отсчёт ожидания следующего слота начинается заново.

Параметры spark.locality.wait

Spark предоставляет гранулярный контроль над временем ожидания для каждого уровня локальности:

# Глобальный таймаут (применяется ко всем уровням, если не переопределены)
spark.conf.set("spark.locality.wait", "3s")  # default: 3s

# Переопределение для конкретных уровней
spark.conf.set("spark.locality.wait.process", "3s")   # PROCESS_LOCAL (default: spark.locality.wait)
spark.conf.set("spark.locality.wait.node", "3s")      # NODE_LOCAL (default: spark.locality.wait)
spark.conf.set("spark.locality.wait.rack", "3s")      # RACK_LOCAL (default: spark.locality.wait)
# Для NO_PREF и ANY таймаут не применяется

Если spark.locality.wait.process не задан явно, он наследует значение spark.locality.wait. Каждый уровень имеет независимый таймаут. Цепочка ожидания для task с предпочтением PROCESS_LOCAL при дефолтных настройках:

t=0s:   Ищем PROCESS_LOCAL слот. Свободных нет.
t=3s:   Таймаут PROCESS_LOCAL. Ищем NODE_LOCAL слот. Нашли!
        => Запускаем с NODE_LOCAL, сбрасываем таймер.

--- или ---

t=0s:   Ищем PROCESS_LOCAL слот. Нет.
t=3s:   Ищем NODE_LOCAL слот. Нет.
t=6s:   Ищем RACK_LOCAL слот. Нет.
t=9s:   Используем ANY. Запускаем на любом свободном executor.
WARNING

Максимальное ожидание для одной task — сумма всех таймаутов: wait.process + wait.node + wait.rack = 9s при дефолтных настройках. При большом числе partition-задач и занятом кластере это суммируется: если каждая из 1000 задач ждёт по 9s, общая задержка составит до 9000s. На практике таймер общий для всего TaskSet, а не для каждой задачи в отдельности.

Компромисс: ждать vs запустить сразу

Выбор значения spark.locality.wait — это классический инженерный trade-off.

Аргументы в пользу большего ожидания (>3s):

  • Данные большие, и выигрыш от избегания сетевого трафика существенен
  • Кластер HDFS с co-located executor-ами — реальная locality достижима
  • Executor-ы на нужных нодах временно заняты, но скоро освободятся
  • Shuffle-данные на конкретных нодах (между стадиями)

Аргументы в пользу меньшего ожидания (0-1s или 0s):

  • Данные в облачном хранилище (S3, GCS, ADLS) — locality недостижима
  • Кластер с autoscaling: нужные ноды могут никогда не освободиться
  • Данные уже равномерно реплицированы — разница между NODE_LOCAL и ANY минимальна
  • Кластер загружен неравномерно: простаивающие executor-ы есть только на «неправильных» нодах
# Для S3/GCS/ADLS — отключаем ожидание полностью
spark.conf.set("spark.locality.wait", "0s")

# Для HDFS on-premise с co-located DataNode
spark.conf.set("spark.locality.wait", "3s")   # дефолт разумен

# Для смешанного: HDFS, но с autoscaling
spark.conf.set("spark.locality.wait.process", "5s")
spark.conf.set("spark.locality.wait.node", "2s")
spark.conf.set("spark.locality.wait.rack", "0s")  # rack-locality редко даёт выигрыш

Индикаторы в Spark UI

В Spark UI вкладка Stages -> Tasks показывает столбец Locality Level для каждой запущенной task. Интерпретация результатов:

Картина в UIДиагнозДействие
90%+ PROCESS_LOCALКэш работает эффективноВсё хорошо
50%+ NODE_LOCALHDFS locality работаетУмеренно хорошо
Большинство ANYLocality не работаетspark.locality.wait=0s
Много RACK_LOCAL при пустых executor-ахwait слишком большойУменьшить wait
TIP

Быстрый способ проверить locality через код: после завершения stage посмотрите на TaskInfo.taskLocality в SparkListener или используйте spark.sparkContext.statusTracker().getStageInfo(stageId). В production инструментируйте приложение через кастомный SparkListener.

Диагностика через логи

При уровне INFO логирования TaskSchedulerImpl выводит сообщения при каждом изменении уровня локальности:

INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID 123,
  worker-3, executor 7, partition 0, NODE_LOCAL, 4567 bytes)

WARN TaskSetManager: Loss was due to stage failure
  (stage 5: nodes without data reached task locality ANY)

При DEBUG-уровне видна детальная картина:

DEBUG TaskSetManager: No tasks for locality PROCESS_LOCAL, checking
  NODE_LOCAL. lastLocalityWaitResetTime=1716300000000, wait=3000ms
DEBUG TaskSetManager: Fell through to ANY after waiting 9012ms

Взаимодействие с кэшированием и shuffle

Понимание locality приобретает особое значение в двух сценариях:

Кэшированные RDD/DataFrame: когда данные закэшированы через cache() или persist(), BlockManager на каждом executor знает, какие партиции у него есть. TaskSetManager получает эту информацию через BlockManagerMasterEndpoint и заполняет pendingTasksForExecutor соответственно. Поэтому повторный проход по кэшированным данным даёт преимущественно PROCESS_LOCAL.

Shuffle-данные между стадиями: после завершения ShuffleMapStage, shuffle-файлы лежат на дисках конкретных executor-ов (или во внешнем shuffle service). DAGScheduler знает, на каком executor находится каждый shuffle-блок. Если следующая стадия читает shuffle-данные, TaskSetManager может запланировать tasks с учётом NODE_LOCAL (если читающий executor на той же ноде, что пишущий). Именно поэтому иногда имеет смысл небольшое ожидание даже для shuffle-стадий.

Locality при чтении кэшированных данных
Executor 1 (worker-1)Хранит партиции 0, 1, 2 кэшированного DataFrame. TaskSetManager знает об этом из BlockManagerMaster.
Executor 2 (worker-2)Хранит партиции 3, 4, 5 кэшированного DataFrame.
Task 0Назначается на Executor 1 с уровнем PROCESS_LOCAL: данные уже в его памяти.
Task 3Назначается на Executor 2 с уровнем PROCESS_LOCAL: данные уже в его памяти.

Попробуй сам

Следующий код позволяет наблюдать за locality в действии. Сначала запустим без кэша (ожидаем ANY или NODE_LOCAL), затем с кэшем (ожидаем PROCESS_LOCAL):

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time

spark = SparkSession.builder \
    .appName("locality-demo") \
    .config("spark.executor.memory", "1g") \
    .config("spark.locality.wait", "3s") \
    .getOrCreate()

sc = spark.sparkContext

# Включаем логирование TaskSetManager
sc.setLogLevel("INFO")

# Шаг 1: Первый проход — данные не кэшированы
df = spark.range(10_000_000).toDF("id")
df2 = df.withColumn("value", col("id") * 2)

print("=== Первый проход (без кэша) ===")
t0 = time.time()
count1 = df2.count()
print(f"Count: {count1}, time: {time.time()-t0:.2f}s")
# В логах: locality будет ANY или NODE_LOCAL

# Шаг 2: Кэшируем
df2.cache()
df2.count()  # прогрев кэша

# Шаг 3: Второй проход — данные в памяти executor-ов
print("=== Второй проход (с кэшем) ===")
t1 = time.time()
count2 = df2.filter(col("value") > 1000).count()
print(f"Count: {count2}, time: {time.time()-t1:.2f}s")
# В логах: locality должна быть преимущественно PROCESS_LOCAL

# Проверка через Spark UI:
# http://localhost:4040/stages/
# Вкладка Tasks -> столбец "Locality Level"

spark.stop()

В Spark UI после выполнения перейдите в Stages, найдите последний stage и откройте список tasks. Сравните столбец Locality Level между первым и вторым проходами. При правильно работающем кэше второй проход должен показать PROCESS_LOCAL для большинства tasks.

Проверка знанийKnowledge check
TaskSetManager находит pending task с предпочтением NODE_LOCAL на worker-5. Свободный executor есть только на worker-9 (уровень ANY). spark.locality.wait.node = 3s. Прошло уже 2 секунды с последнего запуска в этом TaskSet. Что произойдёт при следующем вызове resourceOffer?
ОтветAnswer
TaskSetManager НЕ запустит задачу на worker-9 немедленно. Алгоритм проверяет: с момента lastLocalityWaitResetTime прошло 2 секунды, а spark.locality.wait.node = 3 секунды — таймаут ещё не истёк. currentLocalityIndex остаётся на уровне NODE_LOCAL. resourceOffer вернёт None (задача не назначена). Слот на worker-9 может быть использован для другой task из другого TaskSet или той же stage с уровнем ANY. Только через 1 секунду (суммарно 3s) TaskSetManager повысит currentLocalityIndex до RACK_LOCAL и потом ANY, и тогда task будет назначена на worker-9.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Production-кластер читает данные из Amazon S3 (spark.dynamicAllocation включён, 50 нод). Дефолтный spark.locality.wait=3s. Tasks Stage 1 массово получают уровень ANY. Через 9 секунд после появления pending tasks стадия наконец стартует. Что происходит и как исправить?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6