Learning Platform
Глоссарий Troubleshooting
Урок 04.02 · 18 мин
Средний
TaskTaskSetSchedulingFIFOFAIRData Locality

Task Scheduler: Распределение задач по executors

После того как DAG Scheduler разбил Job на stages и сформировал TaskSets, в игру вступает Task Scheduler — компонент, который решает, какой task отправить на какой executor и в каком порядке.

TaskScheduler: от TaskSet к выполнению

TaskScheduler получает TaskSet (набор tasks одной стадии) и распределяет их по доступным executor cores. Каждый executor core может выполнять один task в один момент времени.

DAG Scheduler           Task Scheduler              Executors
     │                       │                          │
     │  TaskSet (200 tasks)  │                          │
     │ ────────────────────> │  Task 0 ───────────────> │ Executor 1, Core 1
     │                       │  Task 1 ───────────────> │ Executor 1, Core 2
     │                       │  Task 2 ───────────────> │ Executor 2, Core 1
     │                       │  ...                     │
     │                       │  Task 199 ────────────> │ Executor N, Core M

Scheduling Policies: FIFO vs FAIR

Spark поддерживает две политики планирования для Jobs (не tasks):

FIFO Scheduler (по умолчанию)

Jobs выполняются строго по очереди. Первый Job получает все ресурсы кластера. Следующий Job ждёт полного завершения предыдущего.

# FIFO: Job 1 занимает все executors, Jobs 2-3 ждут
spark.conf.set("spark.scheduler.mode", "FIFO")

# Job 1 -- большой, занимает 10 минут
df1.groupBy("key").count().write.parquet("/out1/")

# Job 2 -- маленький запрос, но ждёт 10 минут...
df2.count()

Проблема FIFO: маленький запрос может ждать часами, если перед ним стоит тяжёлый batch job.

FAIR Scheduler

Jobs делят ресурсы между собой через configurable pools. Каждый pool имеет weight (приоритет) и minShare (минимальные гарантированные ресурсы).

spark.conf.set("spark.scheduler.mode", "FAIR")

# Настройка в fairscheduler.xml:
# Pool "production" - weight=2, minShare=4 cores
# Pool "analytics" - weight=1, minShare=2 cores
<!-- fairscheduler.xml -->
<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>2</weight>
    <minShare>4</minShare>
  </pool>
  <pool name="analytics">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
</allocations>

С FAIR scheduling маленький аналитический запрос получит ресурсы сразу, даже если параллельно выполняется большой batch job.

Task Scheduling: FIFO vs FAIR

FIFO Scheduler

Job 1 (running -- all cores)
Job 2 (waiting)
Job 3 (waiting)

FAIR Scheduler

Pool: production (w=2)
Job 1 (67% cores)
Pool: analytics (w=1)
Job 2, Job 3 (33% cores)

Data Locality Levels

PROCESS_LOCALFastest
NODE_LOCALFast
NO_PREFMedium
RACK_LOCALSlow
ANYSlowest
spark.locality.wait3s (default)

Data Locality: где запускать task?

Task Scheduler пытается запустить task как можно ближе к данным, которые он обрабатывает. Это называется data locality — и это одна из ключевых оптимизаций Spark.

5 уровней data locality

УровеньОписаниеСкоростьКогда
PROCESS_LOCALДанные в том же executor JVMМаксимальнаяКэшированный RDD/DataFrame
NODE_LOCALДанные на том же узле (локальный диск, HDFS DataNode)ВысокаяДанные на HDFS с co-located executor
NO_PREFНет предпочтенияСредняяЧтение из удалённой БД, S3
RACK_LOCALДанные в той же стойке (rack)НизкаяHDFS в пределах rack
ANYДанные на другом узле кластераСамая низкаяПоследний fallback

spark.locality.wait

Spark ждёт spark.locality.wait (по умолчанию 3 секунды) перед тем, как согласиться на менее оптимальный уровень locality:

1. Пытаемся PROCESS_LOCAL -- executor с данными занят
2. Ждём 3 секунды...
3. Пробуем NODE_LOCAL -- есть свободный executor на том же узле!
4. Запускаем task с NODE_LOCAL

Можно настроить ожидание для каждого уровня отдельно:

spark.conf.set("spark.locality.wait", "3s")           # общий timeout
spark.conf.set("spark.locality.wait.process", "5s")    # PROCESS_LOCAL
spark.conf.set("spark.locality.wait.node", "3s")       # NODE_LOCAL
spark.conf.set("spark.locality.wait.rack", "1s")       # RACK_LOCAL
TIP

Для облачных хранилищ (S3, GCS, ADLS) data locality не имеет значения — данные всегда доступны по сети с одинаковой задержкой. Установите spark.locality.wait=0s для кластеров, работающих с cloud storage.

Проверка знанийKnowledge check
Почему FAIR scheduling важен для многопользовательских кластеров?
ОтветAnswer
В FIFO режиме один большой Job может занять все ресурсы кластера на часы, заставляя всех остальных пользователей ждать. FAIR scheduling разделяет ресурсы между Jobs через pools с настраиваемыми весами и минимальными гарантиями (minShare). Это позволяет аналитикам выполнять короткие интерактивные запросы одновременно с тяжёлыми batch-задачами, не дожидаясь их завершения.

Speculative Execution

Иногда один task выполняется значительно дольше остальных (из-за медленного диска, перегруженного узла, GC pause). Это называется straggler.

Speculative execution (spark.speculation=true) решает эту проблему: Spark запускает дубликат медленного task на другом executor. Первый завершившийся task побеждает, дубликат отменяется.

spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.multiplier", "1.5")  # task считается медленным, если в 1.5x дольше медианы
spark.conf.set("spark.speculation.quantile", "0.75")    # запускать speculation после завершения 75% tasks

Когда включать: кластеры с неоднородным hardware, cloud-среды с “noisy neighbors”. Когда не включать: если tasks пишут в non-idempotent хранилища (speculation может создать дубликаты).

Проверка знанийKnowledge check
Какой компромисс создаёт увеличение spark.locality.wait с 3s до 30s?
ОтветAnswer
Более длительное ожидание увеличивает шанс получить оптимальный уровень locality (PROCESS_LOCAL или NODE_LOCAL), что ускоряет чтение данных. Но при этом executor cores простаивают во время ожидания -- они могли бы обрабатывать другие tasks. Для небольших datasets выигрыш от locality может быть меньше, чем потеря от простоя. Оптимальное значение зависит от соотношения стоимости I/O и стоимости ожидания.

Что дальше?

В следующем уроке мы погрузимся в shuffle — самую дорогую операцию Spark, которая определяет границы стадий и является основным источником проблем с производительностью.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. В кластере работают 3 long-running Spark-приложения. Приложение A -- интерактивные аналитические запросы, приложения B и C -- batch ETL. Какой планировщик (FIFO vs FAIR) лучше подойдёт и почему?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6