Task Scheduler: Распределение задач по executors
После того как DAG Scheduler разбил Job на stages и сформировал TaskSets, в игру вступает Task Scheduler — компонент, который решает, какой task отправить на какой executor и в каком порядке.
TaskScheduler: от TaskSet к выполнению
TaskScheduler получает TaskSet (набор tasks одной стадии) и распределяет их по доступным executor cores. Каждый executor core может выполнять один task в один момент времени.
DAG Scheduler Task Scheduler Executors
│ │ │
│ TaskSet (200 tasks) │ │
│ ────────────────────> │ Task 0 ───────────────> │ Executor 1, Core 1
│ │ Task 1 ───────────────> │ Executor 1, Core 2
│ │ Task 2 ───────────────> │ Executor 2, Core 1
│ │ ... │
│ │ Task 199 ────────────> │ Executor N, Core M
Scheduling Policies: FIFO vs FAIR
Spark поддерживает две политики планирования для Jobs (не tasks):
FIFO Scheduler (по умолчанию)
Jobs выполняются строго по очереди. Первый Job получает все ресурсы кластера. Следующий Job ждёт полного завершения предыдущего.
# FIFO: Job 1 занимает все executors, Jobs 2-3 ждут
spark.conf.set("spark.scheduler.mode", "FIFO")
# Job 1 -- большой, занимает 10 минут
df1.groupBy("key").count().write.parquet("/out1/")
# Job 2 -- маленький запрос, но ждёт 10 минут...
df2.count()
Проблема FIFO: маленький запрос может ждать часами, если перед ним стоит тяжёлый batch job.
FAIR Scheduler
Jobs делят ресурсы между собой через configurable pools. Каждый pool имеет weight (приоритет) и minShare (минимальные гарантированные ресурсы).
spark.conf.set("spark.scheduler.mode", "FAIR")
# Настройка в fairscheduler.xml:
# Pool "production" - weight=2, minShare=4 cores
# Pool "analytics" - weight=1, minShare=2 cores
<!-- fairscheduler.xml -->
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>2</weight>
<minShare>4</minShare>
</pool>
<pool name="analytics">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
</allocations>
С FAIR scheduling маленький аналитический запрос получит ресурсы сразу, даже если параллельно выполняется большой batch job.
FIFO Scheduler
FAIR Scheduler
Job 1 (67% cores)
Job 2, Job 3 (33% cores)
Data Locality Levels
Data Locality: где запускать task?
Task Scheduler пытается запустить task как можно ближе к данным, которые он обрабатывает. Это называется data locality — и это одна из ключевых оптимизаций Spark.
5 уровней data locality
| Уровень | Описание | Скорость | Когда |
|---|---|---|---|
| PROCESS_LOCAL | Данные в том же executor JVM | Максимальная | Кэшированный RDD/DataFrame |
| NODE_LOCAL | Данные на том же узле (локальный диск, HDFS DataNode) | Высокая | Данные на HDFS с co-located executor |
| NO_PREF | Нет предпочтения | Средняя | Чтение из удалённой БД, S3 |
| RACK_LOCAL | Данные в той же стойке (rack) | Низкая | HDFS в пределах rack |
| ANY | Данные на другом узле кластера | Самая низкая | Последний fallback |
spark.locality.wait
Spark ждёт spark.locality.wait (по умолчанию 3 секунды) перед тем, как согласиться на менее оптимальный уровень locality:
1. Пытаемся PROCESS_LOCAL -- executor с данными занят
2. Ждём 3 секунды...
3. Пробуем NODE_LOCAL -- есть свободный executor на том же узле!
4. Запускаем task с NODE_LOCAL
Можно настроить ожидание для каждого уровня отдельно:
spark.conf.set("spark.locality.wait", "3s") # общий timeout
spark.conf.set("spark.locality.wait.process", "5s") # PROCESS_LOCAL
spark.conf.set("spark.locality.wait.node", "3s") # NODE_LOCAL
spark.conf.set("spark.locality.wait.rack", "1s") # RACK_LOCAL
Для облачных хранилищ (S3, GCS, ADLS) data locality не имеет значения — данные всегда доступны по сети с одинаковой задержкой. Установите spark.locality.wait=0s для кластеров, работающих с cloud storage.
Speculative Execution
Иногда один task выполняется значительно дольше остальных (из-за медленного диска, перегруженного узла, GC pause). Это называется straggler.
Speculative execution (spark.speculation=true) решает эту проблему: Spark запускает дубликат медленного task на другом executor. Первый завершившийся task побеждает, дубликат отменяется.
spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.multiplier", "1.5") # task считается медленным, если в 1.5x дольше медианы
spark.conf.set("spark.speculation.quantile", "0.75") # запускать speculation после завершения 75% tasks
Когда включать: кластеры с неоднородным hardware, cloud-среды с “noisy neighbors”. Когда не включать: если tasks пишут в non-idempotent хранилища (speculation может создать дубликаты).
Что дальше?
В следующем уроке мы погрузимся в shuffle — самую дорогую операцию Spark, которая определяет границы стадий и является основным источником проблем с производительностью.