Профилирование executor
Spark UI показывает, что stage занимает 20 минут. Вы видите, что executor работает, cores загружены. Но где именно теряется время? GC? Сериализация? Конкретный operator? shuffle fetch latency? Без профилировщика вы угадываете. С профилировщиком — видите.
Этот урок о том, как получить flame graph с работающего executor, как читать метрики Spark UI на глубоком уровне и как использовать Event Log для post-mortem анализа.
async-profiler для JVM executor
async-profiler — это low-overhead sampling profiler для JVM. Ключевые преимущества перед другими инструментами:
- Нет safepoint bias. Большинство Java profilers (JProfiler, YourKit в sampling режиме) могут снимать stack traces только в safepoint’ах. async-profiler использует
AsyncGetCallTraceAPI — недокументированный internal HotSpot API, который работает в любой момент. - Native frames. Spark активно использует off-heap через
sun.misc.Unsafe. async-profiler показывает C++-код внутри JVM и нативных библиотек. - Overhead 0.5–2% на типичных workloads. Можно запускать на production executor.
Установка async-profiler на executor
Для standalone или YARN — можно установить на worker-нодах заранее:
# На каждой worker-ноде:
cd /opt
wget https://github.com/async-profiler/async-profiler/releases/latest/download/async-profiler-3.0-linux-x64.tar.gz
tar xzf async-profiler-3.0-linux-x64.tar.gz
ln -s async-profiler-3.0-linux-x64 async-profiler
Для Kubernetes — добавить в Docker image executor’а:
FROM apache/spark:4.0.0
COPY async-profiler-3.0-linux-x64 /opt/async-profiler
Подключение через spark.executor.extraJavaOptions
Можно запустить async-profiler как Java agent при старте executor:
spark-submit \
--conf "spark.executor.extraJavaOptions=\
-agentpath:/opt/async-profiler/lib/libasyncProfiler.so=start,\
event=cpu,\
file=/tmp/profile-{pid}.html,\
interval=10ms,\
jfr" \
--conf "spark.driver.extraJavaOptions=\
-XX:+PreserveFramePointer \
-XX:+UnlockDiagnosticVMOptions \
-XX:+DebugNonSafepoints" \
myapp.jar
Флаги -XX:+PreserveFramePointer и -XX:+DebugNonSafepoints важны: без них async-profiler получает неполные stack traces у JIT-компилированного кода.
Attach к работающему executor (без перезапуска)
Лучший подход при исследовании живого кластера — подключиться к уже работающему процессу:
# Найти PID executor на worker-ноде:
jps -l | grep CoarseGrainedExecutorBackend
# 14823 org.apache.spark.executor.CoarseGrainedExecutorBackend
# CPU profiling 60 секунд, flame graph в HTML:
/opt/async-profiler/profiler.sh \
-d 60 \
-e cpu \
-f /tmp/cpu-$(date +%s).html \
14823
# Allocation profiling — какие объекты создаются:
/opt/async-profiler/profiler.sh \
-d 60 \
-e alloc \
-f /tmp/alloc-$(date +%s).html \
14823
# Wall-clock — где thread тратит wall-clock time (включая IO waits):
/opt/async-profiler/profiler.sh \
-d 60 \
-e wall \
--wall-threads 16 \
-f /tmp/wall-$(date +%s).html \
14823
CPU mode
CPU profiling (-e cpu): отвечает на вопрос 'где CPU тратится когда работает'. Не видит IO waits, lock waits, sleep. Используй для: горячие вычисления, сериализация, кодогенерация.Wall-clock mode
Wall-clock profiling (-e wall): отвечает на вопрос 'где thread тратит wall-clock время'. Видит IO, lock waits, shuffle fetch latency, GC stalls. Используй для: диагностика throughput problems, IO bottlenecks, backpressure.SparkPlugin API: встроенная профилировочная инфраструктура
Spark 3.x+ предоставляет SparkPlugin API для расширения executor-процесса. В Spark 4.0 появился встроенный профилировщик на базе async-profiler как отдельный артефакт org.apache.spark:spark-profiler_2.13.
Структура SparkPlugin
// org.apache.spark.api.plugin.SparkPlugin
trait SparkPlugin {
def driverPlugin(): DriverPlugin // Один экземпляр на driver
def executorPlugin(): ExecutorPlugin // Один экземпляр на каждый executor
}
// org.apache.spark.api.plugin.ExecutorPlugin
trait ExecutorPlugin {
def init(ctx: PluginContext, extraConf: Map[String, String]): Unit
def onTaskStart(): Unit // До каждой task
def onTaskSucceeded(): Unit
def onTaskFailed(failureReason: TaskFailedReason): Unit
def shutdown(): Unit
}
Подключение профилировщика через SparkPlugin
Минимальный пример: executor-plugin, который запускает async-profiler при старте executor и сохраняет flame graph при завершении:
// Подключение встроенного spark-profiler (Spark 4.0):
spark-submit \
--packages org.apache.spark:spark-profiler_2.13:4.0.0 \
--conf spark.plugins=org.apache.spark.profiler.CpuProfiler \
--conf spark.profiler.enabled=true \
--conf spark.profiler.outputDir=/tmp/spark-profiles \
--conf spark.profiler.duration=60 \
myapp.jar
Для Kubernetes — монтируем output в shared PVC:
# spark-submit с kubernetes:
--conf spark.plugins=org.apache.spark.profiler.CpuProfiler
--conf spark.profiler.outputDir=s3://my-bucket/spark-profiles
--conf spark.kubernetes.executor.volumes.persistentVolumeClaim.profiles.mount.path=/tmp/profiles
Чтение flame graph: практика
Flame graph — это визуализация агрегированных stack traces. Вертикаль — глубина стека, горизонталь — доля времени (ширина = % samples). Читается снизу вверх (root внизу, hot leaves вверху).
Типичные паттерны в Spark executor:
Паттерн 1: Широкий блок в верхней части
TaskRunner.run
└─ RDDTask.runTask
└─ MapPartitionsRDD.compute
└─ MyUDF.call ← 65% width = много CPU
└─ JsonParser.parse ← 40% width
Интерпретация: UDF тратит ~65% CPU, из которых 40% — в JSON parsing.
Действие: кэшировать parsed структуры, перейти на бинарный формат.
Паттерн 2: GC dominates
JVM GC thread (не в основном треде)
└─ G1CollectedHeap::collect ← 30% wall-clock
Интерпретация: GC забирает 30% wall-clock. executor работает треть времени на GC.
Действие: см. урок JVM/GC tuning, проверь heap sizing.
Паттерн 3: Shuffle fetch latency
TaskRunner.run
└─ ShuffleReader.read
└─ BlockStoreShuffleReader.fetchIterator
└─ ShuffleBlockFetcherIterator.next
└─ SocketChannel.read ← 80% wall-clock
Wall-clock показывает 80% в SocketChannel.read — executor ждёт shuffle blocks. Это не CPU-проблема, а network или upstream bottleneck.
Spark UI: метрики на глубоком уровне
Spark UI (порт 4040 на driver) содержит метрики, которые большинство инженеров читают поверхностно. Разберём детально.
Страница Stages: скрытые метрики
На странице конкретного stage — нажмите на stage ID. Внутри:
Summary Metrics for 200 Completed Tasks
Min 25th Median 75th Max
Duration 0.1s 2.1s 2.3s 2.5s 45s
GC Time 0ms 10ms 15ms 20ms 8.2s ← GC time на task
Peak Memory 120MB 450MB 480MB 520MB 1.8GB ← peak heap per task
Shuffle Read Size 0B 0B 0B 0B 2.1GB ← если есть shuffle
Result Size 12B 1.2KB 1.4KB 1.6KB 2.1MB
GC Time. Если медиана GC Time > 5% от Duration task — серьёзная проблема. Если max GC Time >> 75th percentile — skewed GC, вероятен один executor с неоптимальными объектами.
Peak Memory (Execution Memory). Если близко к spark.executor.memory * spark.memory.fraction (дефолт 60% heap) — задачи балансируют на грани spill.
Shuffle Read Size vs Duration. Если время task коррелирует с shuffle read size — bottleneck в network/disk, а не в CPU.
Страница Executors: ключевые колонки
Executor ID | Address | GC Time | Peak JVM Memory | Active Tasks | Failed Tasks
driver | driver:4040 | 0.1s | 512MB | 0 | 0
1 | worker-1:40001 | 45s | 3.8GB | 4 | 2
2 | worker-2:40001 | 2s | 2.1GB | 4 | 0
Executor 1 с 45s GC и 2 failed tasks — очевидная проблема. Дальше — смотреть GC-логи этого executor (см. следующий урок).
Accumulators: кастомные метрики
Через AccumulatorV2 можно публиковать domain-specific метрики прямо в Spark UI:
from pyspark import AccumulatorParam
# Кастомный счётчик записей с нулевым значением
invalid_records = sc.accumulator(0)
def process_partition(records):
for record in records:
if not validate(record):
invalid_records.add(1)
else:
yield transform(record)
df.rdd.mapPartitions(process_partition).count()
print(f"Invalid records: {invalid_records.value}")
Значения accumulators видны в Spark UI на странице stage в секции “Accumulators”. Это дешевле, чем логировать каждую плохую запись.
Event Log и History Server
Event Log — бинарный журнал всех событий приложения. History Server восстанавливает из него полный Spark UI.
Включение Event Log
spark.eventLog.enabled=true
spark.eventLog.dir=hdfs:///spark-logs
# или для S3:
spark.eventLog.dir=s3://my-bucket/spark-event-logs
spark.eventLog.compress=true
spark.eventLog.compression.codec=zstd # эффективнее gzip
Запуск History Server
# Настройка conf/spark-history-server.conf:
spark.history.fs.logDirectory=hdfs:///spark-logs
spark.history.retainedApplications=100
# Запуск:
./sbin/start-history-server.sh
# UI доступен на port 18080
Анализ Event Log из командной строки
Event Log — это JSON-Lines файл с событиями. Можно анализировать напрямую:
# Найти приложение:
ls hdfs:///spark-logs/ | grep "app-20240115"
# Скачать и распаковать:
hdfs dfs -get /spark-logs/app-20240115-143022-0001 ./app.log.zst
zstd -d app.log.zst -o app.log
# Найти все SparkListenerTaskEnd с ошибками:
grep "SparkListenerTaskEnd" app.log | \
python3 -c "
import sys, json
for line in sys.stdin:
e = json.loads(line)
if e.get('Event') == 'SparkListenerTaskEnd':
info = e.get('Task Info', {})
reason = e.get('Task End Reason', {}).get('Reason', '')
if 'FetchFailed' in reason or 'ExceptionFailure' in reason:
print(f\"Stage {e['Stage ID']} Task {info['Task ID']}: {reason[:100]}\")
" | head -20
Пример вывода:
Stage 3 Task 45: FetchFailed(BlockManagerId(exec-2, worker-3, 40001), ...)
Stage 3 Task 67: ExceptionFailure(java.lang.OutOfMemoryError: Java heap space, ...)
Это даёт точную картину: какие executor теряли данные, какие задачи падали с OOM.
Production debugging: кейс из жизни
Симптом. Stage занимает 18 минут вместо ожидаемых 4. Spark UI показывает 200 tasks, медианное время task = 5s, но несколько tasks занимают 15+ минут.
Шаг 1: Проверить страницу Stage в UI.
Отсортировать tasks по Duration. Увидеть: 5 tasks по 15 минут с GC Time 12 минут. Остальные 195 tasks — GC Time 0.1–0.5 секунды.
Шаг 2: async-profiler на executor с проблемными tasks.
# Найти executor с долгими tasks через UI (Executors tab)
# Подключить async-profiler на том же worker-узле:
/opt/async-profiler/profiler.sh -d 120 -e alloc \
-f /tmp/alloc-exec5.html \
$(jps | grep CoarseGrainedExecutorBackend | awk '{print $1}')
Flame graph allocation показывает: 70% аллокаций — byte[] в UnsafeExternalSorter.insertKVRecord. Это означает, что sorter активно выделяет буферы.
Шаг 3: Проверка spill через метрики.
В Spark UI -> Stage -> Task Metrics: Shuffle Spill (Memory) = 0, Shuffle Spill (Disk) = 8.2 GB у 5 задач. Задачи spill’ят на диск, что вызывает GC давление при частых аллокациях/очистках буферов.
Root cause: Data skew. Пять partition’ов содержат 40x больше данных, чем остальные. Sorter overflow -> spill -> GC pressure -> долгие паузы.
Решение:
# 1. Диагностика распределения:
df.groupBy("partition_key").count().orderBy("count", ascending=False).show(10)
# 2. Salting для skewed join:
from pyspark.sql.functions import concat, lit, floor, rand
salt_buckets = 10
df_salted = df.withColumn("salted_key",
concat(col("partition_key"), lit("_"), (rand() * salt_buckets).cast("int")))
Попробуй сам
1. Локальный CPU profiling.
Запусти любое Spark приложение в local режиме, найди PID CoarseGrainedExecutorBackend через jps, подключи async-profiler:
./bin/spark-submit --master local[4] examples/jars/spark-examples_*.jar 100 &
sleep 10
PID=$(jps | grep CoarseGrainedExecutorBackend | cut -d' ' -f1)
/opt/async-profiler/profiler.sh -d 30 -e cpu -f /tmp/cpu.html $PID
# Открой /tmp/cpu.html в браузере
2. Найди straggler task через Event Log.
Включи spark.eventLog.enabled=true, запусти запрос с намеренным skew:
df = spark.range(1000000).withColumn("key", (col("id") % 5).cast("string"))
# Намеренно создаём skew: одно значение key встречается 900K раз
df.filter(col("key") == "0").groupBy("key").count().show()
Найди в Event Log событие SparkListenerTaskEnd для самого долгого task.
Лабораторная работа
В этой лабораторной вы прогоняете полный цикл профилирования executor-ов: включаете event logs, открываете History Server, снимаете thread dump зависшего task и строите flame graph через async-profiler. Это закрепляет навык отличать реальный CPU-bottleneck от ожидания, разобранный в уроке.
cd labs/internals-toolkit
docker compose up -d
Полное описание и шаги проверки — в labs/internals-toolkit/README.md.