Спекулятивное выполнение: борьба со stragglers
В идеальном мире все tasks одной стадии выполняются примерно за одинаковое время, и стадия завершается, когда заканчивается последняя task. В реальном production-кластере этого почти никогда не происходит. Одна из тысячи tasks может работать в десять раз дольше остальных из-за перегруженного диска, GC-паузы, «шумного соседа» в облаке или неравномерного распределения данных. Такая задача называется straggler («отстающий»). Она блокирует завершение всей стадии — и всего Job.
Speculative execution — механизм, позволяющий Spark не ждать straggler, а запустить его копию на другом executor. Первая завершившаяся копия «побеждает», вторая убивается. С виду просто. На практике — набор тонких компромиссов, которые мы сейчас разберём детально.
Проблема stragglers: почему это серьёзно
Предположим, стадия из 1000 tasks. 999 tasks выполняются по 30 секунд каждая. Одна задача из-за медленного диска выполняется 10 минут. Время завершения стадии — не 30 секунд, а 10 минут. Выигрыш от параллелизма 1000 задач полностью уничтожен одним straggler-ом.
В производственных условиях к причинам straggler относятся:
- Hardware variability: в облаке VM-ы на одном физическом хосте конкурируют за disk I/O и сеть («noisy neighbor»)
- GC pressure: один executor попал в длинный Full GC
- Data skew: одна партиция содержит значительно больше данных, чем другие
- Disk hotspot: все reads идут к одному DataNode, который перегружен
- JVM JIT cold start: executor только что запустился, JIT ещё не оптимизировал код
Data skew — это отдельная проблема, которую speculative execution решает плохо. Если task медленная потому что обрабатывает 100 млн строк (против 10k у остальных), копия той же задачи на другом executor столкнётся с той же нагрузкой. Для data skew нужны другие инструменты: salting, AQE skew join, repartition.
Архитектура speculation: TaskSchedulerImpl + TaskSetManager
Механизм speculation реализован совместно в двух классах.
TaskSchedulerImpl отвечает за периодический запуск проверки. При старте (если spark.speculation=true), он создаёт scheduled task через ThreadUtils.newDaemonSingleThreadScheduledExecutor:
// TaskSchedulerImpl.scala (упрощённо)
if (conf.get(SPECULATION_ENABLED)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(
() => Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks(MIN_TIME_TO_SPECULATION)
},
SPECULATION_INTERVAL_MS, // spark.speculation.interval, default 100ms
SPECULATION_INTERVAL_MS,
TimeUnit.MILLISECONDS
)
}
Обратите внимание: интервал проверки по умолчанию 100 миллисекунд — это очень частая проверка. Она не является дорогостоящей операцией сама по себе, но при большом числе активных TaskSet-ов может создавать contention на синхронизированных структурах данных (именно это описывает JIRA SPARK-16929).
TaskSetManager реализует метод checkSpeculatableTasks(minTimeToSpeculation), который вызывается TaskSchedulerImpl:
// TaskSetManager.scala (упрощённо)
def checkSpeculatableTasks(minTimeToSpeculation: Long): Boolean = {
// Zombie TaskSet или единственная задача — speculation бесполезна
if (isZombie || numTasks == 1) return false
val (successfulTaskDurations, medianDuration) = {
val durations = taskInfos.values
.filter(_.successful)
.map(_.duration)
.toArray
if (durations.length < 1) return false
(durations, median(durations))
}
// Fraction завершённых tasks должна превышать spark.speculation.quantile
val minFinishedForSpeculation = (numTasks * SPECULATION_QUANTILE).floor.toInt
if (successfulTaskDurations.length < minFinishedForSpeculation) return false
var foundTasks = false
val threshold = max(
SPECULATION_MULTIPLIER * medianDuration,
minTimeToSpeculation
)
// Проверяем каждую незавершённую task
for (tid <- runningTasksSet) {
val info = taskInfos(tid)
val taskDuration = clock.getTimeMillis() - info.launchTime
if (taskDuration > threshold &&
!speculatableTasks.contains(info.index)) {
logInfo(s"Marking task ${info.index} in stage $stageId as speculative " +
s"because it ran more than ${taskDuration}ms")
speculatableTasks += info.index
foundTasks = true
}
}
foundTasks
}
Параметры speculation
Spark предоставляет тонкую настройку поведения.
# Включить speculation
spark.conf.set("spark.speculation", "true") # default: false
# Как часто проверять на наличие straggler-ов
spark.conf.set("spark.speculation.interval", "100ms") # default: 100ms
# Минимальная доля завершённых tasks перед запуском speculation
spark.conf.set("spark.speculation.quantile", "0.75") # default: 0.75
# Множитель медианы: task считается straggler-ом,
# если её время > multiplier * median_duration
spark.conf.set("spark.speculation.multiplier", "1.5") # default: 1.5
# Минимальное время выполнения task перед тем,
# как она может быть помечена как speculative
spark.conf.set("spark.speculation.minTaskRuntime", "100ms") # default: 100ms
Механизм запуска speculative copy
Когда task помечена как speculative, следующий вызов resourceOffer() на TaskSetManager увидит её в speculatableTasks. Новый TaskDescription будет создан с тем же index, но другим attemptNumber. Оба экземпляра — оригинальный и speculative — работают параллельно на разных executor-ах.
// TaskSetManager.scala
private def dequeueSpeculativeTask(
execId: String, host: String, locality: TaskLocality.Value
): Option[(Int, TaskLocality.Value)] = {
speculatableTasks.retain { index =>
!successful(index) // убираем уже завершённые задачи
}
// Ищем speculatable task с лучшей доступной locality
for (index <- speculatableTasks if canRunOnHost(index, host)) {
val tasks = taskAttempts(index)
// Проверяем, что executor не тот же, что уже запустил эту task
if (!tasks.exists(t => t.running && t.executorId == execId)) {
speculatableTasks -= index
return Some(index, locality)
}
}
None
}
Важная деталь: speculative copy никогда не запускается на том же executor, что оригинал. Если оригинал запущен из-за медленного executor — запустить копию там же бессмысленно.
Когда любой из двух экземпляров завершается успешно:
// TaskSetManager.handleSuccessfulTask
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
// Если task уже отмечена как successful (другой attempt выиграл) — игнорируем
if (successful(index)) {
logInfo(s"Ignoring task-done event for ${info} as task ${index} is already completed")
return
}
successful(index) = true
// Убиваем остальные running attempts той же задачи
taskAttempts(index).filter(_.running).foreach { taskInfo =>
if (taskInfo.taskId != tid) {
sched.backend.killTask(taskInfo.taskId, taskInfo.executorId,
interruptThread = true, reason = "another attempt succeeded")
}
}
}
Efficiency-критерии: Spark 3.x и далее
В ранних версиях Spark speculation мог создавать значительные накладные расходы: запускалось слишком много спекулятивных копий, даже для задач, которые вот-вот завершились бы сами. Начиная с Spark 3.1 появилось понятие speculation efficiency.
spark.speculation.minTaskRuntime
Параметр spark.speculation.minTaskRuntime (default: 100ms) — минимальное время выполнения task, после которого она вообще рассматривается для speculation. Это защита от запуска спекулятивных копий для очень быстрых задач, где разброс в несколько секунд может объясняться нормальным JVM warm-up.
Efficiency factor в Spark 3.x+
В Spark 3.x (и сохранено в 4.0) TaskSetManager использует дополнительную эвристику: задача не помечается как speculative, если количество уже запущенных speculative copies превышает определённый лимит относительно числа executor-ов. Это предотвращает каскадный speculation при высокой нагрузке.
Проверить текущее состояние speculation можно через Spark UI:
Spark UI -> Stages -> [Stage ID]
Tasks tab: столбец "Status" показывет "speculative"
Event Timeline: видно два параллельных attempt для одной task index
Риски: когда speculation опасен
Двойная запись в non-idempotent output
Главный риск speculation — запись в сторонний хранилище, которое не поддерживает идемпотентность:
# ОПАСНО: если speculation включён и оба attempt пишут в Kafka
df.foreachPartition(lambda rows: kafka_producer.send(rows))
# => дубликаты в Kafka при speculation
# ОПАСНО: вставка в базу без дедупликации
df.write.jdbc(url, "table", mode="append")
# => дублированные строки при speculation
# БЕЗОПАСНО: запись в файловую систему (Parquet, ORC, Delta)
# Spark использует временные файлы + атомарный rename
df.write.parquet("/output/")
# => только один attempt завершит rename, второй будет убит
Правило: speculation безопасен только для операций, которые можно повторить без побочных эффектов. Запись в файловые системы (HDFS, S3 с multi-part upload и атомарным commit, Delta Lake) безопасна. Запись в Kafka, HBase, JDBC-insert без upsert — небезопасна.
При использовании Spark Structured Streaming speculative execution должен быть отключён. Гарантии exactly-once semantics в Structured Streaming основаны на детерминированном порядке операций. Speculation нарушает эти гарантии.
Отладка через логи
При включённом speculation driver-лог покажет:
INFO TaskSetManager: Marking task 142 in stage 3.0 as speculative
because it ran more than 45123 ms (threshold 45000 ms,
median 30000 ms, multiplier 1.5)
INFO TaskSetManager: Starting task 142.1 in stage 3.0 (TID 891,
worker-5, executor 12, partition 142, NODE_LOCAL, 2048 bytes)
INFO TaskSetManager: Killing speculative task 142.0 (TID 777) on
executor 8 because task 142.1 finished first
Suffix .1 после номера task означает второй attempt (нумерация с нуля). Если в логах много строк «Marking task N as speculative» — это сигнал либо к расследованию hardware, либо к проверке data skew.
Sequence: жизнь speculative task
Настройка для production-кластеров
Сценарий 1: Облачный кластер с noisy neighbors (рекомендовано)
spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.quantile", "0.75")
spark.conf.set("spark.speculation.multiplier", "2.0") # менее агрессивно
spark.conf.set("spark.speculation.minTaskRuntime", "60s") # не трогать быстрые задачи
Сценарий 2: HDFS on-premise с гетерогенными нодами
spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.quantile", "0.9") # ждать 90% завершения
spark.conf.set("spark.speculation.multiplier", "3.0") # только явные выбросы
Сценарий 3: Data pipeline с JDBC/Kafka sink
# Отключить полностью — риск дубликатов
spark.conf.set("spark.speculation", "false")
# Вместо этого — расследовать straggler через Spark UI
Сценарий 4: Delta Lake writes
spark.conf.set("spark.speculation", "true")
# Delta Lake использует transaction log + атомарный commit
# Speculation безопасен: только один attempt попадёт в transaction log
Попробуй сам
Симулируем straggler и наблюдаем speculation в действии:
from pyspark.sql import SparkSession
import time, random
spark = SparkSession.builder \
.appName("speculation-demo") \
.config("spark.speculation", "true") \
.config("spark.speculation.quantile", "0.5") \
.config("spark.speculation.multiplier", "1.5") \
.config("spark.speculation.minTaskRuntime", "1s") \
.config("spark.executor.instances", "4") \
.getOrCreate()
sc = spark.sparkContext
def slow_partition(partition_id, rows):
"""Один partition специально замедлен."""
processed = list(rows)
if partition_id == 2: # Симулируем straggler
time.sleep(30) # Пауза 30 секунд
return iter(processed)
# Создаём RDD с 8 партициями
rdd = sc.range(0, 800, numSlices=8)
# mapPartitionsWithIndex чтобы получить partition_id
result = rdd.mapPartitionsWithIndex(
lambda i, it: slow_partition(i, it)
).count()
print(f"Результат: {result}")
print("Проверьте Spark UI -> Stages -> Tasks")
print("Должны видеть task 2 со статусом 'speculative'")
print("И attempt task 2.1 на другом executor")
spark.stop()
После запуска откройте Spark UI на http://localhost:4040. В разделе Stages -> Tasks вы должны увидеть:
- Задачи 0-7, большинство завершены за секунды
- Task 2 со статусом
RUNNINGзначительно дольше медианы - Через несколько секунд появится Task 2 (attempt 1) со статусом
speculativeна другом executor - Одна из них победит и получит статус
SUCCESS, вторая —KILLED