Custom метрики и SparkListener
Встроенные метрики Spark покрывают JVM, executor и shuffle — это инфраструктурный уровень. Но в production вам нужны бизнес-метрики: сколько записей прошло validation, сколько было дубликатов, укладывается ли pipeline в SLA. Для этого в Spark есть три механизма: SparkListener, AccumulatorV2 и custom MetricSource.
SparkListener: перехват событий
SparkListener — интерфейс для перехвата lifecycle-событий Spark-приложения. Вы реализуете callback-методы, и Spark вызывает их при каждом событии.
Доступные события
trait SparkListener {
def onApplicationStart(event: SparkListenerApplicationStart): Unit
def onApplicationEnd(event: SparkListenerApplicationEnd): Unit
def onJobStart(event: SparkListenerJobStart): Unit
def onJobEnd(event: SparkListenerJobEnd): Unit
def onStageSubmitted(event: SparkListenerStageSubmitted): Unit
def onStageCompleted(event: SparkListenerStageCompleted): Unit
def onTaskStart(event: SparkListenerTaskStart): Unit
def onTaskEnd(event: SparkListenerTaskEnd): Unit
def onExecutorAdded(event: SparkListenerExecutorAdded): Unit
def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit
}
Пример: SLA Monitor
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd}
import org.apache.spark.scheduler.JobSucceeded
import java.time.{Instant, Duration}
class SLAMonitorListener(maxDurationMinutes: Int) extends SparkListener {
private val startTimes = scala.collection.mutable.Map[Int, Instant]()
override def onJobStart(event: SparkListenerJobStart): Unit = {
startTimes(event.jobId) = Instant.now()
println(s"[SLA] Job ${event.jobId} started at ${Instant.now()}")
}
override def onJobEnd(event: SparkListenerJobEnd): Unit = {
startTimes.get(event.jobId).foreach { startTime =>
val duration = Duration.between(startTime, Instant.now())
val minutes = duration.toMinutes
if (minutes > maxDurationMinutes) {
// Отправить алерт: SLA violation
println(s"[SLA VIOLATION] Job ${event.jobId} took $minutes min " +
s"(limit: $maxDurationMinutes min)")
} else {
println(s"[SLA OK] Job ${event.jobId} completed in $minutes min")
}
}
}
}
Регистрация Listener
// Программная регистрация
spark.sparkContext.addSparkListener(new SLAMonitorListener(30))
// Или через конфигурацию
// spark.extraListeners=com.mycompany.SLAMonitorListener
# В PySpark -- через конфигурацию (JAR с listener должен быть в classpath)
spark = (
SparkSession.builder
.config("spark.extraListeners", "com.mycompany.SLAMonitorListener")
.config("spark.jars", "/path/to/my-listeners.jar")
.getOrCreate()
)
Анти-паттерн: println для debugging в production
Не используйте println для вывода метрик в production. Вывод смешивается с логами Spark, теряется при перенаправлении stdout, и не поддерживает структурированный поиск. Вместо этого используйте structured logging (slf4j) или отправляйте метрики в Prometheus через custom MetricSource.
AccumulatorV2: распределённые счётчики
AccumulatorV2 — механизм для агрегации значений со всех executors в driver. Это единственный способ безопасно обновлять глобальный счётчик из distributed tasks.
Встроенные аккумуляторы
# Простой числовой аккумулятор
invalid_records = spark.sparkContext.accumulator(0)
def validate_record(row):
if row["amount"] < 0:
invalid_records.add(1)
return False
return True
# Использование в DataFrame
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
validate_udf = udf(validate_record, BooleanType())
valid_df = df.filter(validate_udf(df["amount"]))
# После action -- проверяем значение
valid_df.count()
print(f"Invalid records: {invalid_records.value}")
Custom AccumulatorV2 (Scala)
Для сложных агрегаций создайте свой AccumulatorV2:
import org.apache.spark.util.AccumulatorV2
// Аккумулятор для подсчёта записей по категориям
class CategoryCounter extends AccumulatorV2[String, Map[String, Long]] {
private var counts = Map.empty[String, Long]
override def isZero: Boolean = counts.isEmpty
override def copy(): AccumulatorV2[String, Map[String, Long]] = {
val newAcc = new CategoryCounter()
newAcc.counts = this.counts
newAcc
}
override def reset(): Unit = counts = Map.empty
override def add(category: String): Unit = {
counts = counts + (category -> (counts.getOrElse(category, 0L) + 1))
}
override def merge(other: AccumulatorV2[String, Map[String, Long]]): Unit = {
other.value.foreach { case (k, v) =>
counts = counts + (k -> (counts.getOrElse(k, 0L) + v))
}
}
override def value: Map[String, Long] = counts
}
// Использование
val categoryAcc = new CategoryCounter()
spark.sparkContext.register(categoryAcc, "category_counts")
rdd.foreach { record =>
categoryAcc.add(record.category)
}
println(s"Category breakdown: ${categoryAcc.value}")
Custom MetricSource для Prometheus
Чтобы бизнес-метрики были доступны в Prometheus (и Grafana), создайте custom MetricSource:
import org.apache.spark.metrics.source.Source
import com.codahale.metrics.{MetricRegistry, Gauge}
class DataQualityMetricSource extends Source {
override val sourceName: String = "DataQuality"
override val metricRegistry: MetricRegistry = new MetricRegistry()
// Регистрируем gauge для процента valid записей
val validRecordRate = metricRegistry.register(
MetricRegistry.name("validRecordRate"),
new Gauge[Double] {
override def getValue: Double = {
// Возвращает текущее значение метрики
DataQualityTracker.getValidRate()
}
}
)
// Counter для total processed
val processedCounter = metricRegistry.counter(
MetricRegistry.name("recordsProcessed")
)
}
Регистрация MetricSource
// В коде приложения
val metricsSource = new DataQualityMetricSource()
spark.sparkContext.env.metricsSystem.registerSource(metricsSource)
// Или через конфигурацию в metrics.properties
driver.source.dataQuality.class=com.mycompany.DataQualityMetricSource
После регистрации метрика доступна в Prometheus:
# PromQL запрос к custom метрике
metrics_DataQuality_validRecordRate{component="driver"}
Use Cases для Custom Metrics
| Use Case | Механизм | Метрика |
|---|---|---|
| Подсчёт отклонённых записей | AccumulatorV2 | rejected_records_total |
| Время обработки batch | SparkListener (onJobEnd) | batch_processing_time_seconds |
| Процент valid данных | Custom MetricSource + Gauge | data_quality_valid_rate |
| SLA мониторинг | SparkListener (onJobEnd) | sla_violation_count |
| Количество записей по источнику | Custom AccumulatorV2 (Map) | records_by_source |
| Размер output файлов | SparkListener (onStageCompleted) | output_bytes_total |
Интеграция: полный pipeline мониторинга
Рекомендация для production:
- AccumulatorV2 — для значений, которые нужны в коде приложения (validation counts, checksums)
- SparkListener — для event-driven actions (alerting, logging, SLA checks)
- Custom MetricSource — для continuous monitoring через Prometheus/Grafana
Что дальше?
Модуль мониторинга завершён. Вы теперь умеете: диагностировать проблемы через Spark UI, анализировать завершённые приложения через History Server, собирать метрики через Prometheus, визуализировать их в Grafana и создавать custom бизнес-метрики. Следующий шаг — применить эти знания на практике в Docker lab, где вы запустите полный monitoring stack и намеренно сломаете кластер, чтобы увидеть, как метрики отражают проблемы.