Learning Platform
Глоссарий Troubleshooting
Урок 04.04 · 26 мин
Продвинутый
Speculative ExecutionStragglerTaskSetManagerspark.speculationProduction

Спекулятивное выполнение: борьба со stragglers

В идеальном мире все tasks одной стадии выполняются примерно за одинаковое время, и стадия завершается, когда заканчивается последняя task. В реальном production-кластере этого почти никогда не происходит. Одна из тысячи tasks может работать в десять раз дольше остальных из-за перегруженного диска, GC-паузы, «шумного соседа» в облаке или неравномерного распределения данных. Такая задача называется straggler («отстающий»). Она блокирует завершение всей стадии — и всего Job.

Speculative execution — механизм, позволяющий Spark не ждать straggler, а запустить его копию на другом executor. Первая завершившаяся копия «побеждает», вторая убивается. С виду просто. На практике — набор тонких компромиссов, которые мы сейчас разберём детально.

Проблема stragglers: почему это серьёзно

Предположим, стадия из 1000 tasks. 999 tasks выполняются по 30 секунд каждая. Одна задача из-за медленного диска выполняется 10 минут. Время завершения стадии — не 30 секунд, а 10 минут. Выигрыш от параллелизма 1000 задач полностью уничтожен одним straggler-ом.

В производственных условиях к причинам straggler относятся:

  • Hardware variability: в облаке VM-ы на одном физическом хосте конкурируют за disk I/O и сеть («noisy neighbor»)
  • GC pressure: один executor попал в длинный Full GC
  • Data skew: одна партиция содержит значительно больше данных, чем другие
  • Disk hotspot: все reads идут к одному DataNode, который перегружен
  • JVM JIT cold start: executor только что запустился, JIT ещё не оптимизировал код
NOTE

Data skew — это отдельная проблема, которую speculative execution решает плохо. Если task медленная потому что обрабатывает 100 млн строк (против 10k у остальных), копия той же задачи на другом executor столкнётся с той же нагрузкой. Для data skew нужны другие инструменты: salting, AQE skew join, repartition.

Архитектура speculation: TaskSchedulerImpl + TaskSetManager

Механизм speculation реализован совместно в двух классах.

TaskSchedulerImpl отвечает за периодический запуск проверки. При старте (если spark.speculation=true), он создаёт scheduled task через ThreadUtils.newDaemonSingleThreadScheduledExecutor:

// TaskSchedulerImpl.scala (упрощённо)
if (conf.get(SPECULATION_ENABLED)) {
  logInfo("Starting speculative execution thread")
  speculationScheduler.scheduleWithFixedDelay(
    () => Utils.tryOrStopSparkContext(sc) {
      checkSpeculatableTasks(MIN_TIME_TO_SPECULATION)
    },
    SPECULATION_INTERVAL_MS,  // spark.speculation.interval, default 100ms
    SPECULATION_INTERVAL_MS,
    TimeUnit.MILLISECONDS
  )
}

Обратите внимание: интервал проверки по умолчанию 100 миллисекунд — это очень частая проверка. Она не является дорогостоящей операцией сама по себе, но при большом числе активных TaskSet-ов может создавать contention на синхронизированных структурах данных (именно это описывает JIRA SPARK-16929).

TaskSetManager реализует метод checkSpeculatableTasks(minTimeToSpeculation), который вызывается TaskSchedulerImpl:

// TaskSetManager.scala (упрощённо)
def checkSpeculatableTasks(minTimeToSpeculation: Long): Boolean = {
  // Zombie TaskSet или единственная задача — speculation бесполезна
  if (isZombie || numTasks == 1) return false

  val (successfulTaskDurations, medianDuration) = {
    val durations = taskInfos.values
      .filter(_.successful)
      .map(_.duration)
      .toArray
    if (durations.length < 1) return false
    (durations, median(durations))
  }

  // Fraction завершённых tasks должна превышать spark.speculation.quantile
  val minFinishedForSpeculation = (numTasks * SPECULATION_QUANTILE).floor.toInt
  if (successfulTaskDurations.length < minFinishedForSpeculation) return false

  var foundTasks = false
  val threshold = max(
    SPECULATION_MULTIPLIER * medianDuration,
    minTimeToSpeculation
  )

  // Проверяем каждую незавершённую task
  for (tid <- runningTasksSet) {
    val info = taskInfos(tid)
    val taskDuration = clock.getTimeMillis() - info.launchTime
    if (taskDuration > threshold &&
        !speculatableTasks.contains(info.index)) {
      logInfo(s"Marking task ${info.index} in stage $stageId as speculative " +
        s"because it ran more than ${taskDuration}ms")
      speculatableTasks += info.index
      foundTasks = true
    }
  }
  foundTasks
}

Параметры speculation

Spark предоставляет тонкую настройку поведения.

# Включить speculation
spark.conf.set("spark.speculation", "true")          # default: false

# Как часто проверять на наличие straggler-ов
spark.conf.set("spark.speculation.interval", "100ms") # default: 100ms

# Минимальная доля завершённых tasks перед запуском speculation
spark.conf.set("spark.speculation.quantile", "0.75")  # default: 0.75

# Множитель медианы: task считается straggler-ом,
# если её время > multiplier * median_duration
spark.conf.set("spark.speculation.multiplier", "1.5") # default: 1.5

# Минимальное время выполнения task перед тем,
# как она может быть помечена как speculative
spark.conf.set("spark.speculation.minTaskRuntime", "100ms") # default: 100ms
Когда task помечается как speculative
Завершённых tasks: 75%+spark.speculation.quantile=0.75: нужно завершить 75% tasks стадии, прежде чем запускать speculation. До этого медиана нестабильна.
Медиана durationВычисляется по всем успешно завершённым tasks текущего TaskSet. Устойчивее к выбросам, чем среднее.
Порогthreshold = max(multiplier * median, minTaskRuntime). При multiplier=1.5 и median=30s: threshold = max(45s, 100ms) = 45s.
Task 7: 8 минут8 минут >> 45 секунд порога. Task 7 помечается как speculative и добавляется в speculatableTasks.

Механизм запуска speculative copy

Когда task помечена как speculative, следующий вызов resourceOffer() на TaskSetManager увидит её в speculatableTasks. Новый TaskDescription будет создан с тем же index, но другим attemptNumber. Оба экземпляра — оригинальный и speculative — работают параллельно на разных executor-ах.

// TaskSetManager.scala
private def dequeueSpeculativeTask(
    execId: String, host: String, locality: TaskLocality.Value
): Option[(Int, TaskLocality.Value)] = {
  speculatableTasks.retain { index =>
    !successful(index) // убираем уже завершённые задачи
  }
  // Ищем speculatable task с лучшей доступной locality
  for (index <- speculatableTasks if canRunOnHost(index, host)) {
    val tasks = taskAttempts(index)
    // Проверяем, что executor не тот же, что уже запустил эту task
    if (!tasks.exists(t => t.running && t.executorId == execId)) {
      speculatableTasks -= index
      return Some(index, locality)
    }
  }
  None
}

Важная деталь: speculative copy никогда не запускается на том же executor, что оригинал. Если оригинал запущен из-за медленного executor — запустить копию там же бессмысленно.

Когда любой из двух экземпляров завершается успешно:

// TaskSetManager.handleSuccessfulTask
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
  val info = taskInfos(tid)
  val index = info.index
  // Если task уже отмечена как successful (другой attempt выиграл) — игнорируем
  if (successful(index)) {
    logInfo(s"Ignoring task-done event for ${info} as task ${index} is already completed")
    return
  }
  successful(index) = true
  // Убиваем остальные running attempts той же задачи
  taskAttempts(index).filter(_.running).foreach { taskInfo =>
    if (taskInfo.taskId != tid) {
      sched.backend.killTask(taskInfo.taskId, taskInfo.executorId,
        interruptThread = true, reason = "another attempt succeeded")
    }
  }
}

Efficiency-критерии: Spark 3.x и далее

В ранних версиях Spark speculation мог создавать значительные накладные расходы: запускалось слишком много спекулятивных копий, даже для задач, которые вот-вот завершились бы сами. Начиная с Spark 3.1 появилось понятие speculation efficiency.

spark.speculation.minTaskRuntime

Параметр spark.speculation.minTaskRuntime (default: 100ms) — минимальное время выполнения task, после которого она вообще рассматривается для speculation. Это защита от запуска спекулятивных копий для очень быстрых задач, где разброс в несколько секунд может объясняться нормальным JVM warm-up.

Efficiency factor в Spark 3.x+

В Spark 3.x (и сохранено в 4.0) TaskSetManager использует дополнительную эвристику: задача не помечается как speculative, если количество уже запущенных speculative copies превышает определённый лимит относительно числа executor-ов. Это предотвращает каскадный speculation при высокой нагрузке.

Проверить текущее состояние speculation можно через Spark UI:

Spark UI -> Stages -> [Stage ID]
Tasks tab: столбец "Status" показывет "speculative"
Event Timeline: видно два параллельных attempt для одной task index

Риски: когда speculation опасен

Двойная запись в non-idempotent output

Главный риск speculation — запись в сторонний хранилище, которое не поддерживает идемпотентность:

# ОПАСНО: если speculation включён и оба attempt пишут в Kafka
df.foreachPartition(lambda rows: kafka_producer.send(rows))
# => дубликаты в Kafka при speculation

# ОПАСНО: вставка в базу без дедупликации
df.write.jdbc(url, "table", mode="append")
# => дублированные строки при speculation

# БЕЗОПАСНО: запись в файловую систему (Parquet, ORC, Delta)
# Spark использует временные файлы + атомарный rename
df.write.parquet("/output/")
# => только один attempt завершит rename, второй будет убит

Правило: speculation безопасен только для операций, которые можно повторить без побочных эффектов. Запись в файловые системы (HDFS, S3 с multi-part upload и атомарным commit, Delta Lake) безопасна. Запись в Kafka, HBase, JDBC-insert без upsert — небезопасна.

DANGER

При использовании Spark Structured Streaming speculative execution должен быть отключён. Гарантии exactly-once semantics в Structured Streaming основаны на детерминированном порядке операций. Speculation нарушает эти гарантии.

Отладка через логи

При включённом speculation driver-лог покажет:

INFO TaskSetManager: Marking task 142 in stage 3.0 as speculative
  because it ran more than 45123 ms (threshold 45000 ms,
  median 30000 ms, multiplier 1.5)
INFO TaskSetManager: Starting task 142.1 in stage 3.0 (TID 891,
  worker-5, executor 12, partition 142, NODE_LOCAL, 2048 bytes)
INFO TaskSetManager: Killing speculative task 142.0 (TID 777) on
  executor 8 because task 142.1 finished first

Suffix .1 после номера task означает второй attempt (нумерация с нуля). Если в логах много строк «Marking task N as speculative» — это сигнал либо к расследованию hardware, либо к проверке data skew.

Sequence: жизнь speculative task

Настройка для production-кластеров

Сценарий 1: Облачный кластер с noisy neighbors (рекомендовано)

spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.quantile", "0.75")
spark.conf.set("spark.speculation.multiplier", "2.0")  # менее агрессивно
spark.conf.set("spark.speculation.minTaskRuntime", "60s")  # не трогать быстрые задачи

Сценарий 2: HDFS on-premise с гетерогенными нодами

spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.quantile", "0.9")   # ждать 90% завершения
spark.conf.set("spark.speculation.multiplier", "3.0") # только явные выбросы

Сценарий 3: Data pipeline с JDBC/Kafka sink

# Отключить полностью — риск дубликатов
spark.conf.set("spark.speculation", "false")
# Вместо этого — расследовать straggler через Spark UI

Сценарий 4: Delta Lake writes

spark.conf.set("spark.speculation", "true")
# Delta Lake использует transaction log + атомарный commit
# Speculation безопасен: только один attempt попадёт в transaction log

Попробуй сам

Симулируем straggler и наблюдаем speculation в действии:

from pyspark.sql import SparkSession
import time, random

spark = SparkSession.builder \
    .appName("speculation-demo") \
    .config("spark.speculation", "true") \
    .config("spark.speculation.quantile", "0.5") \
    .config("spark.speculation.multiplier", "1.5") \
    .config("spark.speculation.minTaskRuntime", "1s") \
    .config("spark.executor.instances", "4") \
    .getOrCreate()

sc = spark.sparkContext

def slow_partition(partition_id, rows):
    """Один partition специально замедлен."""
    processed = list(rows)
    if partition_id == 2:  # Симулируем straggler
        time.sleep(30)     # Пауза 30 секунд
    return iter(processed)

# Создаём RDD с 8 партициями
rdd = sc.range(0, 800, numSlices=8)

# mapPartitionsWithIndex чтобы получить partition_id
result = rdd.mapPartitionsWithIndex(
    lambda i, it: slow_partition(i, it)
).count()

print(f"Результат: {result}")
print("Проверьте Spark UI -> Stages -> Tasks")
print("Должны видеть task 2 со статусом 'speculative'")
print("И attempt task 2.1 на другом executor")

spark.stop()

После запуска откройте Spark UI на http://localhost:4040. В разделе Stages -> Tasks вы должны увидеть:

  • Задачи 0-7, большинство завершены за секунды
  • Task 2 со статусом RUNNING значительно дольше медианы
  • Через несколько секунд появится Task 2 (attempt 1) со статусом speculative на другом executor
  • Одна из них победит и получит статус SUCCESS, вторая — KILLED
Проверка знанийKnowledge check
В production Stage содержит 200 tasks. spark.speculation.quantile=0.75, spark.speculation.multiplier=1.5. После завершения 150 tasks (75%), медиана составила 45 секунд. Task 88 работает уже 9 минут. Task 99 работает уже 1 минуту. Какая из них будет помечена как speculative, и почему?
ОтветAnswer
Только Task 88 (9 минут = 540 секунд) будет помечена как speculative. Порог для speculation: threshold = 1.5 * 45s = 67.5 секунды. Task 88 (540s) >> 67.5s — помечается. Task 99 (60s) < 67.5s — не помечается, её время укладывается в допустимый диапазон. Quantile условие выполнено: 150/200 = 75% >= 0.75. Speculation запустит копию Task 88 на другом executor. Если копия завершится раньше оригинала — оригинал будет убит. Если Task 88 медленная из-за data skew (большая партиция), копия столкнётся с той же проблемой и не даст выигрыша.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Spark-pipeline пишет результат в Apache Kafka. spark.speculation=true. Stage 5 содержит 100 tasks; 80 завершились. Task 33 работает в 4 раза дольше медианы. Speculation запустит копию Task 33. Какая проблема возникнет?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6