Learning Platform
Глоссарий Troubleshooting
Урок 07.05 · 12 мин
Продвинутый
SparkListenerCustom MetricsAccumulatorV2MetricSourceEvent Hooks

Custom метрики и SparkListener

Встроенные метрики Spark покрывают JVM, executor и shuffle — это инфраструктурный уровень. Но в production вам нужны бизнес-метрики: сколько записей прошло validation, сколько было дубликатов, укладывается ли pipeline в SLA. Для этого в Spark есть три механизма: SparkListener, AccumulatorV2 и custom MetricSource.

SparkListener: перехват событий

SparkListener — интерфейс для перехвата lifecycle-событий Spark-приложения. Вы реализуете callback-методы, и Spark вызывает их при каждом событии.

Доступные события

trait SparkListener {
  def onApplicationStart(event: SparkListenerApplicationStart): Unit
  def onApplicationEnd(event: SparkListenerApplicationEnd): Unit
  def onJobStart(event: SparkListenerJobStart): Unit
  def onJobEnd(event: SparkListenerJobEnd): Unit
  def onStageSubmitted(event: SparkListenerStageSubmitted): Unit
  def onStageCompleted(event: SparkListenerStageCompleted): Unit
  def onTaskStart(event: SparkListenerTaskStart): Unit
  def onTaskEnd(event: SparkListenerTaskEnd): Unit
  def onExecutorAdded(event: SparkListenerExecutorAdded): Unit
  def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit
}

Пример: SLA Monitor

import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd}
import org.apache.spark.scheduler.JobSucceeded
import java.time.{Instant, Duration}

class SLAMonitorListener(maxDurationMinutes: Int) extends SparkListener {
  private val startTimes = scala.collection.mutable.Map[Int, Instant]()

  override def onJobStart(event: SparkListenerJobStart): Unit = {
    startTimes(event.jobId) = Instant.now()
    println(s"[SLA] Job ${event.jobId} started at ${Instant.now()}")
  }

  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
    startTimes.get(event.jobId).foreach { startTime =>
      val duration = Duration.between(startTime, Instant.now())
      val minutes = duration.toMinutes

      if (minutes > maxDurationMinutes) {
        // Отправить алерт: SLA violation
        println(s"[SLA VIOLATION] Job ${event.jobId} took $minutes min " +
          s"(limit: $maxDurationMinutes min)")
      } else {
        println(s"[SLA OK] Job ${event.jobId} completed in $minutes min")
      }
    }
  }
}

Регистрация Listener

// Программная регистрация
spark.sparkContext.addSparkListener(new SLAMonitorListener(30))

// Или через конфигурацию
// spark.extraListeners=com.mycompany.SLAMonitorListener
# В PySpark -- через конфигурацию (JAR с listener должен быть в classpath)
spark = (
    SparkSession.builder
    .config("spark.extraListeners", "com.mycompany.SLAMonitorListener")
    .config("spark.jars", "/path/to/my-listeners.jar")
    .getOrCreate()
)
WARNING

Анти-паттерн: println для debugging в production

Не используйте println для вывода метрик в production. Вывод смешивается с логами Spark, теряется при перенаправлении stdout, и не поддерживает структурированный поиск. Вместо этого используйте structured logging (slf4j) или отправляйте метрики в Prometheus через custom MetricSource.

AccumulatorV2: распределённые счётчики

AccumulatorV2 — механизм для агрегации значений со всех executors в driver. Это единственный способ безопасно обновлять глобальный счётчик из distributed tasks.

Встроенные аккумуляторы

# Простой числовой аккумулятор
invalid_records = spark.sparkContext.accumulator(0)

def validate_record(row):
    if row["amount"] < 0:
        invalid_records.add(1)
        return False
    return True

# Использование в DataFrame
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

validate_udf = udf(validate_record, BooleanType())
valid_df = df.filter(validate_udf(df["amount"]))

# После action -- проверяем значение
valid_df.count()
print(f"Invalid records: {invalid_records.value}")

Custom AccumulatorV2 (Scala)

Для сложных агрегаций создайте свой AccumulatorV2:

import org.apache.spark.util.AccumulatorV2

// Аккумулятор для подсчёта записей по категориям
class CategoryCounter extends AccumulatorV2[String, Map[String, Long]] {
  private var counts = Map.empty[String, Long]

  override def isZero: Boolean = counts.isEmpty
  override def copy(): AccumulatorV2[String, Map[String, Long]] = {
    val newAcc = new CategoryCounter()
    newAcc.counts = this.counts
    newAcc
  }
  override def reset(): Unit = counts = Map.empty

  override def add(category: String): Unit = {
    counts = counts + (category -> (counts.getOrElse(category, 0L) + 1))
  }

  override def merge(other: AccumulatorV2[String, Map[String, Long]]): Unit = {
    other.value.foreach { case (k, v) =>
      counts = counts + (k -> (counts.getOrElse(k, 0L) + v))
    }
  }

  override def value: Map[String, Long] = counts
}

// Использование
val categoryAcc = new CategoryCounter()
spark.sparkContext.register(categoryAcc, "category_counts")

rdd.foreach { record =>
  categoryAcc.add(record.category)
}

println(s"Category breakdown: ${categoryAcc.value}")
Проверка знанийKnowledge check
Почему для подсчёта invalid records в distributed Spark job нельзя использовать обычную переменную (var counter = 0), а нужен Accumulator?
ОтветAnswer
Обычная переменная существует только в driver JVM. Когда closure с этой переменной отправляется на executor, каждый executor получает свою копию. Обновления копии на executor не видны в driver -- переменная в driver всегда останется 0. AccumulatorV2 решает это: каждый executor обновляет свою локальную копию, а после завершения task Spark merge'ит все значения обратно в driver. Это единственный safe mechanism для distributed aggregation обратно в driver.

Custom MetricSource для Prometheus

Чтобы бизнес-метрики были доступны в Prometheus (и Grafana), создайте custom MetricSource:

import org.apache.spark.metrics.source.Source
import com.codahale.metrics.{MetricRegistry, Gauge}

class DataQualityMetricSource extends Source {
  override val sourceName: String = "DataQuality"
  override val metricRegistry: MetricRegistry = new MetricRegistry()

  // Регистрируем gauge для процента valid записей
  val validRecordRate = metricRegistry.register(
    MetricRegistry.name("validRecordRate"),
    new Gauge[Double] {
      override def getValue: Double = {
        // Возвращает текущее значение метрики
        DataQualityTracker.getValidRate()
      }
    }
  )

  // Counter для total processed
  val processedCounter = metricRegistry.counter(
    MetricRegistry.name("recordsProcessed")
  )
}

Регистрация MetricSource

// В коде приложения
val metricsSource = new DataQualityMetricSource()
spark.sparkContext.env.metricsSystem.registerSource(metricsSource)

// Или через конфигурацию в metrics.properties
driver.source.dataQuality.class=com.mycompany.DataQualityMetricSource

После регистрации метрика доступна в Prometheus:

# PromQL запрос к custom метрике
metrics_DataQuality_validRecordRate{component="driver"}

Use Cases для Custom Metrics

Use CaseМеханизмМетрика
Подсчёт отклонённых записейAccumulatorV2rejected_records_total
Время обработки batchSparkListener (onJobEnd)batch_processing_time_seconds
Процент valid данныхCustom MetricSource + Gaugedata_quality_valid_rate
SLA мониторингSparkListener (onJobEnd)sla_violation_count
Количество записей по источникуCustom AccumulatorV2 (Map)records_by_source
Размер output файловSparkListener (onStageCompleted)output_bytes_total

Интеграция: полный pipeline мониторинга

Custom Metrics: полный pipeline
Application Code
AccumulatorV2
Driver (post-job analysis)
SparkListener
Structured Logging (ELK/Loki)
Custom MetricSource
Prometheus
Grafana
Alerts

Рекомендация для production:

  1. AccumulatorV2 — для значений, которые нужны в коде приложения (validation counts, checksums)
  2. SparkListener — для event-driven actions (alerting, logging, SLA checks)
  3. Custom MetricSource — для continuous monitoring через Prometheus/Grafana
Проверка знанийKnowledge check
Вы хотите мониторить, сколько записей ваш ETL pipeline отклоняет на этапе validation, и видеть это в Grafana dashboard в реальном времени. Какой механизм (или комбинацию) вы выберете?
ОтветAnswer
Для Grafana в реальном времени нужен Prometheus, значит нужен Custom MetricSource с Gauge или Counter. AccumulatorV2 доступен только после завершения job (в driver), а SparkListener оперирует событиями, а не числовыми метриками для Prometheus. Оптимальная комбинация: AccumulatorV2 для подсчёта rejected records внутри tasks, а Custom MetricSource с Gauge, который читает значение AccumulatorV2 и экспортирует в Prometheus. Так метрика обновляется в реальном времени и видна в Grafana.

Что дальше?

Модуль мониторинга завершён. Вы теперь умеете: диагностировать проблемы через Spark UI, анализировать завершённые приложения через History Server, собирать метрики через Prometheus, визуализировать их в Grafana и создавать custom бизнес-метрики. Следующий шаг — применить эти знания на практике в Docker lab, где вы запустите полный monitoring stack и намеренно сломаете кластер, чтобы увидеть, как метрики отражают проблемы.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 6. Зачем нужны custom метрики, если Spark уже экспортирует JVM, executor и shuffle метрики?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5