Learning Platform
Глоссарий Troubleshooting
Урок 15.03 · 30 мин
Продвинутый
async-profilerFlame GraphsSpark UISparkPluginProfilingEvent Log

Профилирование executor

Spark UI показывает, что stage занимает 20 минут. Вы видите, что executor работает, cores загружены. Но где именно теряется время? GC? Сериализация? Конкретный operator? shuffle fetch latency? Без профилировщика вы угадываете. С профилировщиком — видите.

Этот урок о том, как получить flame graph с работающего executor, как читать метрики Spark UI на глубоком уровне и как использовать Event Log для post-mortem анализа.


async-profiler для JVM executor

async-profiler — это low-overhead sampling profiler для JVM. Ключевые преимущества перед другими инструментами:

  1. Нет safepoint bias. Большинство Java profilers (JProfiler, YourKit в sampling режиме) могут снимать stack traces только в safepoint’ах. async-profiler использует AsyncGetCallTrace API — недокументированный internal HotSpot API, который работает в любой момент.
  2. Native frames. Spark активно использует off-heap через sun.misc.Unsafe. async-profiler показывает C++-код внутри JVM и нативных библиотек.
  3. Overhead 0.5–2% на типичных workloads. Можно запускать на production executor.

Установка async-profiler на executor

Для standalone или YARN — можно установить на worker-нодах заранее:

# На каждой worker-ноде:
cd /opt
wget https://github.com/async-profiler/async-profiler/releases/latest/download/async-profiler-3.0-linux-x64.tar.gz
tar xzf async-profiler-3.0-linux-x64.tar.gz
ln -s async-profiler-3.0-linux-x64 async-profiler

Для Kubernetes — добавить в Docker image executor’а:

FROM apache/spark:4.0.0
COPY async-profiler-3.0-linux-x64 /opt/async-profiler

Подключение через spark.executor.extraJavaOptions

Можно запустить async-profiler как Java agent при старте executor:

spark-submit \
  --conf "spark.executor.extraJavaOptions=\
    -agentpath:/opt/async-profiler/lib/libasyncProfiler.so=start,\
    event=cpu,\
    file=/tmp/profile-{pid}.html,\
    interval=10ms,\
    jfr" \
  --conf "spark.driver.extraJavaOptions=\
    -XX:+PreserveFramePointer \
    -XX:+UnlockDiagnosticVMOptions \
    -XX:+DebugNonSafepoints" \
  myapp.jar

Флаги -XX:+PreserveFramePointer и -XX:+DebugNonSafepoints важны: без них async-profiler получает неполные stack traces у JIT-компилированного кода.

Attach к работающему executor (без перезапуска)

Лучший подход при исследовании живого кластера — подключиться к уже работающему процессу:

# Найти PID executor на worker-ноде:
jps -l | grep CoarseGrainedExecutorBackend
# 14823 org.apache.spark.executor.CoarseGrainedExecutorBackend

# CPU profiling 60 секунд, flame graph в HTML:
/opt/async-profiler/profiler.sh \
    -d 60 \
    -e cpu \
    -f /tmp/cpu-$(date +%s).html \
    14823

# Allocation profiling — какие объекты создаются:
/opt/async-profiler/profiler.sh \
    -d 60 \
    -e alloc \
    -f /tmp/alloc-$(date +%s).html \
    14823

# Wall-clock — где thread тратит wall-clock time (включая IO waits):
/opt/async-profiler/profiler.sh \
    -d 60 \
    -e wall \
    --wall-threads 16 \
    -f /tmp/wall-$(date +%s).html \
    14823
CPU vs Wall-clock: разные вопросы

CPU mode

CPU profiling (-e cpu): отвечает на вопрос 'где CPU тратится когда работает'. Не видит IO waits, lock waits, sleep. Используй для: горячие вычисления, сериализация, кодогенерация.
vs

Wall-clock mode

Wall-clock profiling (-e wall): отвечает на вопрос 'где thread тратит wall-clock время'. Видит IO, lock waits, shuffle fetch latency, GC stalls. Используй для: диагностика throughput problems, IO bottlenecks, backpressure.

SparkPlugin API: встроенная профилировочная инфраструктура

Spark 3.x+ предоставляет SparkPlugin API для расширения executor-процесса. В Spark 4.0 появился встроенный профилировщик на базе async-profiler как отдельный артефакт org.apache.spark:spark-profiler_2.13.

Структура SparkPlugin

// org.apache.spark.api.plugin.SparkPlugin
trait SparkPlugin {
  def driverPlugin(): DriverPlugin     // Один экземпляр на driver
  def executorPlugin(): ExecutorPlugin // Один экземпляр на каждый executor
}

// org.apache.spark.api.plugin.ExecutorPlugin
trait ExecutorPlugin {
  def init(ctx: PluginContext, extraConf: Map[String, String]): Unit
  def onTaskStart(): Unit   // До каждой task
  def onTaskSucceeded(): Unit
  def onTaskFailed(failureReason: TaskFailedReason): Unit
  def shutdown(): Unit
}

Подключение профилировщика через SparkPlugin

Минимальный пример: executor-plugin, который запускает async-profiler при старте executor и сохраняет flame graph при завершении:

// Подключение встроенного spark-profiler (Spark 4.0):
spark-submit \
  --packages org.apache.spark:spark-profiler_2.13:4.0.0 \
  --conf spark.plugins=org.apache.spark.profiler.CpuProfiler \
  --conf spark.profiler.enabled=true \
  --conf spark.profiler.outputDir=/tmp/spark-profiles \
  --conf spark.profiler.duration=60 \
  myapp.jar

Для Kubernetes — монтируем output в shared PVC:

# spark-submit с kubernetes:
--conf spark.plugins=org.apache.spark.profiler.CpuProfiler
--conf spark.profiler.outputDir=s3://my-bucket/spark-profiles
--conf spark.kubernetes.executor.volumes.persistentVolumeClaim.profiles.mount.path=/tmp/profiles

Чтение flame graph: практика

Flame graph — это визуализация агрегированных stack traces. Вертикаль — глубина стека, горизонталь — доля времени (ширина = % samples). Читается снизу вверх (root внизу, hot leaves вверху).

Типичные паттерны в Spark executor:

Паттерн 1: Широкий блок в верхней части
TaskRunner.run
  └─ RDDTask.runTask
      └─ MapPartitionsRDD.compute
          └─ MyUDF.call  ← 65% width = много CPU
              └─ JsonParser.parse  ← 40% width

Интерпретация: UDF тратит ~65% CPU, из которых 40% — в JSON parsing.
Действие: кэшировать parsed структуры, перейти на бинарный формат.
Паттерн 2: GC dominates
JVM GC thread (не в основном треде)
  └─ G1CollectedHeap::collect  ← 30% wall-clock

Интерпретация: GC забирает 30% wall-clock. executor работает треть времени на GC.
Действие: см. урок JVM/GC tuning, проверь heap sizing.
Паттерн 3: Shuffle fetch latency
TaskRunner.run
  └─ ShuffleReader.read
      └─ BlockStoreShuffleReader.fetchIterator
          └─ ShuffleBlockFetcherIterator.next
              └─ SocketChannel.read  ← 80% wall-clock

Wall-clock показывает 80% в SocketChannel.read — executor ждёт shuffle blocks. Это не CPU-проблема, а network или upstream bottleneck.


Spark UI: метрики на глубоком уровне

Spark UI (порт 4040 на driver) содержит метрики, которые большинство инженеров читают поверхностно. Разберём детально.

Страница Stages: скрытые метрики

На странице конкретного stage — нажмите на stage ID. Внутри:

Summary Metrics for 200 Completed Tasks
                    Min    25th    Median    75th    Max
Duration           0.1s    2.1s    2.3s     2.5s    45s
GC Time            0ms     10ms    15ms     20ms    8.2s   ← GC time на task
Peak Memory        120MB   450MB   480MB    520MB   1.8GB  ← peak heap per task
Shuffle Read Size  0B      0B      0B       0B      2.1GB  ← если есть shuffle
Result Size        12B     1.2KB   1.4KB    1.6KB   2.1MB

GC Time. Если медиана GC Time > 5% от Duration task — серьёзная проблема. Если max GC Time >> 75th percentile — skewed GC, вероятен один executor с неоптимальными объектами.

Peak Memory (Execution Memory). Если близко к spark.executor.memory * spark.memory.fraction (дефолт 60% heap) — задачи балансируют на грани spill.

Shuffle Read Size vs Duration. Если время task коррелирует с shuffle read size — bottleneck в network/disk, а не в CPU.

Страница Executors: ключевые колонки

Executor ID  | Address          | GC Time | Peak JVM Memory | Active Tasks | Failed Tasks
driver       | driver:4040      | 0.1s    | 512MB          | 0            | 0
1            | worker-1:40001   | 45s     | 3.8GB          | 4            | 2
2            | worker-2:40001   | 2s      | 2.1GB          | 4            | 0

Executor 1 с 45s GC и 2 failed tasks — очевидная проблема. Дальше — смотреть GC-логи этого executor (см. следующий урок).

Accumulators: кастомные метрики

Через AccumulatorV2 можно публиковать domain-specific метрики прямо в Spark UI:

from pyspark import AccumulatorParam

# Кастомный счётчик записей с нулевым значением
invalid_records = sc.accumulator(0)

def process_partition(records):
    for record in records:
        if not validate(record):
            invalid_records.add(1)
        else:
            yield transform(record)

df.rdd.mapPartitions(process_partition).count()

print(f"Invalid records: {invalid_records.value}")

Значения accumulators видны в Spark UI на странице stage в секции “Accumulators”. Это дешевле, чем логировать каждую плохую запись.


Event Log и History Server

Event Log — бинарный журнал всех событий приложения. History Server восстанавливает из него полный Spark UI.

Включение Event Log

spark.eventLog.enabled=true
spark.eventLog.dir=hdfs:///spark-logs
# или для S3:
spark.eventLog.dir=s3://my-bucket/spark-event-logs
spark.eventLog.compress=true
spark.eventLog.compression.codec=zstd  # эффективнее gzip

Запуск History Server

# Настройка conf/spark-history-server.conf:
spark.history.fs.logDirectory=hdfs:///spark-logs
spark.history.retainedApplications=100

# Запуск:
./sbin/start-history-server.sh
# UI доступен на port 18080

Анализ Event Log из командной строки

Event Log — это JSON-Lines файл с событиями. Можно анализировать напрямую:

# Найти приложение:
ls hdfs:///spark-logs/ | grep "app-20240115"

# Скачать и распаковать:
hdfs dfs -get /spark-logs/app-20240115-143022-0001 ./app.log.zst
zstd -d app.log.zst -o app.log

# Найти все SparkListenerTaskEnd с ошибками:
grep "SparkListenerTaskEnd" app.log | \
    python3 -c "
import sys, json
for line in sys.stdin:
    e = json.loads(line)
    if e.get('Event') == 'SparkListenerTaskEnd':
        info = e.get('Task Info', {})
        reason = e.get('Task End Reason', {}).get('Reason', '')
        if 'FetchFailed' in reason or 'ExceptionFailure' in reason:
            print(f\"Stage {e['Stage ID']} Task {info['Task ID']}: {reason[:100]}\")
" | head -20

Пример вывода:

Stage 3 Task 45: FetchFailed(BlockManagerId(exec-2, worker-3, 40001), ...)
Stage 3 Task 67: ExceptionFailure(java.lang.OutOfMemoryError: Java heap space, ...)

Это даёт точную картину: какие executor теряли данные, какие задачи падали с OOM.

Event Log: ключевые типы событий
SparkListenerJobStartФиксирует начало Job: jobId, stageIds, время. Позволяет вычислить total job duration post-mortem.
SparkListenerStageCompletedСодержит StageInfo с агрегированными метриками: taskMetrics (shuffleReadBytes, peakExecutionMemory, gcTime). Полный аналог страницы stage в UI.
SparkListenerTaskEndОдно событие на каждую task. TaskMetrics: duration, gcTime, peakMemory, shuffleReadBytes, shuffleWriteBytes, inputBytes, outputBytes. TaskEndReason: Success / FetchFailed / ExceptionFailure.
SparkListenerExecutorRemovedФиксирует потерю executor. reason: 'Lost' (heartbeat timeout), 'killed' (dynamic allocation), 'exited' (process crash). Критично для диагностики executor churn.

Production debugging: кейс из жизни

Симптом. Stage занимает 18 минут вместо ожидаемых 4. Spark UI показывает 200 tasks, медианное время task = 5s, но несколько tasks занимают 15+ минут.

Шаг 1: Проверить страницу Stage в UI.

Отсортировать tasks по Duration. Увидеть: 5 tasks по 15 минут с GC Time 12 минут. Остальные 195 tasks — GC Time 0.1–0.5 секунды.

Шаг 2: async-profiler на executor с проблемными tasks.

# Найти executor с долгими tasks через UI (Executors tab)
# Подключить async-profiler на том же worker-узле:
/opt/async-profiler/profiler.sh -d 120 -e alloc \
    -f /tmp/alloc-exec5.html \
    $(jps | grep CoarseGrainedExecutorBackend | awk '{print $1}')

Flame graph allocation показывает: 70% аллокаций — byte[] в UnsafeExternalSorter.insertKVRecord. Это означает, что sorter активно выделяет буферы.

Шаг 3: Проверка spill через метрики.

В Spark UI -> Stage -> Task Metrics: Shuffle Spill (Memory) = 0, Shuffle Spill (Disk) = 8.2 GB у 5 задач. Задачи spill’ят на диск, что вызывает GC давление при частых аллокациях/очистках буферов.

Root cause: Data skew. Пять partition’ов содержат 40x больше данных, чем остальные. Sorter overflow -> spill -> GC pressure -> долгие паузы.

Решение:

# 1. Диагностика распределения:
df.groupBy("partition_key").count().orderBy("count", ascending=False).show(10)

# 2. Salting для skewed join:
from pyspark.sql.functions import concat, lit, floor, rand
salt_buckets = 10
df_salted = df.withColumn("salted_key",
    concat(col("partition_key"), lit("_"), (rand() * salt_buckets).cast("int")))

Попробуй сам

1. Локальный CPU profiling.

Запусти любое Spark приложение в local режиме, найди PID CoarseGrainedExecutorBackend через jps, подключи async-profiler:

./bin/spark-submit --master local[4] examples/jars/spark-examples_*.jar 100 &
sleep 10
PID=$(jps | grep CoarseGrainedExecutorBackend | cut -d' ' -f1)
/opt/async-profiler/profiler.sh -d 30 -e cpu -f /tmp/cpu.html $PID
# Открой /tmp/cpu.html в браузере

2. Найди straggler task через Event Log.

Включи spark.eventLog.enabled=true, запусти запрос с намеренным skew:

df = spark.range(1000000).withColumn("key", (col("id") % 5).cast("string"))
# Намеренно создаём skew: одно значение key встречается 900K раз
df.filter(col("key") == "0").groupBy("key").count().show()

Найди в Event Log событие SparkListenerTaskEnd для самого долгого task.

Проверка знанийKnowledge check
Async-profiler в CPU mode на 60 секунд показывает: 95% времени в TaskRunner.run -> MyUDFExec.eval -> JsonParser.parseString. Throughput приложения при этом — 1K events/sec при target 50K. В чём ошибка интерпретации этого профиля и какой инструмент нужен для правильной диагностики?
ОтветAnswer
Ошибка интерпретации: CPU profiler показывает только что делает CPU пока он занят, но не объясняет почему throughput низкий. При 1K events/sec thread может проводить большую часть времени в idle ожидании: shuffle fetch, backpressure от downstream, Kafka lag, IO waits. В этих состояниях CPU не работает — CPU profiler их не покажет. Правильная диагностика: 1) Wall-clock profiling — запустить async-profiler с -e wall. Если 80% времени окажется в SocketChannel.read или LockSupport.park — bottleneck в IO или backpressure, а не в UDF. 2) Spark UI -> Stage metrics: если медиана task duration = 60s, а actual compute = 3s, то 57 секунд — ожидание (shuffle fetch latency, task scheduling delay). 3) Task Metrics в Event Log: сравни 'Duration' с 'Executor Run Time' — разница это overhead (scheduling, serialization, GC). 4) Executor -> GC Time: если GC Time занимает 50% duration — GC bottleneck (но тогда CPU profiler показал бы GC threads). Только после wall-clock анализа станет понятно, является ли UDF реальным bottleneck или это просто место, где CPU тратится во время редких active periods.

Лабораторная работа

В этой лабораторной вы прогоняете полный цикл профилирования executor-ов: включаете event logs, открываете History Server, снимаете thread dump зависшего task и строите flame graph через async-profiler. Это закрепляет навык отличать реальный CPU-bottleneck от ожидания, разобранный в уроке.

cd labs/internals-toolkit
docker compose up -d

Полное описание и шаги проверки — в labs/internals-toolkit/README.md.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. В Spark UI вы видите: stage занимает 15 минут, медиана task duration = 4s, максимальная task duration = 14 минут, GC Time для этой task = 12 минут. Каков следующий правильный шаг диагностики?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4