Learning Platform
Глоссарий Troubleshooting
Урок 11.05 · 12 мин
Продвинутый
DeequData QualityAnalyzersScalaAWS GlueMetrics Repository

Amazon Deequ: качество данных на Scala

Что такое Deequ?

Amazon Deequ — open-source библиотека для проверки качества данных, разработанная Amazon. Ключевые отличия от Great Expectations:

АспектDeequGreat Expectations
ЯзыкScala (JVM-native)Python
ЭкосистемаAWS Glue, EMRDatabricks, Airflow, любой Python
API стильFluent Scala DSLPython classes
MetricsВстроенный Metrics RepositoryData Docs (HTML)
АвтосуггестииConstraintSuggestionRunnerProfiler

Deequ построен на Spark DataFrame API и выполняет все проверки распределённо. Если ваша команда работает на Scala или использует AWS Glue — Deequ может быть лучшим выбором.

Архитектура Deequ

Deequ Architecture
Deequ
AnalyzersМетрики данных
ChecksПравила валидации
Constraint SuggestionsАвто-генерация
Verification Result
Metrics Repository(time-series)

Четыре основных компонента:

  • Analyzers — вычисляют метрики: completeness, uniqueness, mean, standard deviation
  • Checks — правила валидации, построенные на метриках Analyzers
  • ConstraintSuggestions — автоматическая генерация rules по профилю данных
  • Metrics Repository — хранение метрик во времени для анализа трендов

Analyzers: метрики данных

Analyzers вычисляют статистики по DataFrame. Все примеры на Scala (Deequ — Scala-native библиотека):

// build.sbt
libraryDependencies += "com.amazon.deequ" % "deequ" % "2.0.7-spark-3.5"

// Analyzers
import com.amazon.deequ.analyzers.runners.AnalysisRunner
import com.amazon.deequ.analyzers._

val df = spark.read.parquet("/data/orders/")

val analysisResult = AnalysisRunner
  .onData(df)
  .addAnalyzer(Size())                          // количество строк
  .addAnalyzer(Completeness("customer_id"))     // доля non-null
  .addAnalyzer(Uniqueness("order_id"))          // доля уникальных
  .addAnalyzer(Mean("amount"))                  // среднее
  .addAnalyzer(StandardDeviation("amount"))     // стандартное отклонение
  .addAnalyzer(Compliance("positive_amount",    // доля соответствующих условию
    "amount > 0"))
  .run()

// Вывод метрик
analysisResult.metricMap.foreach { case (analyzer, metric) =>
  println(s"${analyzer}: ${metric.value.getOrElse("N/A")}")
}
// Size: 1500000.0
// Completeness(customer_id): 0.9987
// Uniqueness(order_id): 1.0
// Mean(amount): 247.53
NOTE

Scala-примеры в этом уроке — read-only. Deequ — Scala-native библиотека, запуск примеров требует JVM и sbt. Цель урока — понять архитектуру и ключевые возможности Deequ для работы в JVM/AWS командах.

Checks: правила валидации

Checks определяют правила и порог success/failure:

import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus}
import com.amazon.deequ.VerificationSuite

val verificationResult = VerificationSuite()
  .onData(df)
  .addCheck(
    Check(CheckLevel.Error, "orders data quality")
      .hasSize(_ >= 1000)                           // минимум 1000 строк
      .isComplete("order_id")                        // order_id не NULL
      .isComplete("customer_id")                     // customer_id не NULL
      .isUnique("order_id")                          // order_id уникален
      .isContainedIn("status",                       // status из допустимого набора
        Array("pending", "completed", "refunded"))
      .isNonNegative("amount")                       // amount >= 0
      .hasPattern("email",                           // email соответствует regex
        """\w+@\w+\.\w+""".r)
  )
  .run()

// Проверяем результат
val status = verificationResult.status
if (status == CheckStatus.Error) {
  println("DATA QUALITY CHECK FAILED!")
  // Получаем детали по каждому constraint
  verificationResult.checkResults.foreach { case (check, result) =>
    result.constraintResults.foreach { cr =>
      if (cr.status != ConstraintStatus.Success) {
        println(s"  FAILED: ${cr.constraint} -- ${cr.message.getOrElse("")}")
      }
    }
  }
} else {
  println("All checks passed!")
}

Два уровня Check:

  • CheckLevel.Error — критическая ошибка, pipeline должен остановиться
  • CheckLevel.Warning — предупреждение, логировать но продолжать

ConstraintSuggestions: автоматический профилинг

Deequ может автоматически предложить constraints на основе анализа данных:

import com.amazon.deequ.suggestions.ConstraintSuggestionRunner
import com.amazon.deequ.suggestions.Rules

val suggestionResult = ConstraintSuggestionRunner()
  .onData(df)
  .addConstraintRules(Rules.DEFAULT)
  .run()

// Deequ анализирует данные и предлагает правила
suggestionResult.constraintSuggestions.foreach { suggestion =>
  println(s"Column: ${suggestion.columnName}")
  println(s"  Suggestion: ${suggestion.description}")
  println(s"  Scala code: ${suggestion.codeForConstraint}")
}
// Column: order_id
//   Suggestion: 'order_id' is not null
//   Scala code: .isComplete("order_id")
// Column: amount
//   Suggestion: 'amount' has no negative values
//   Scala code: .isNonNegative("amount")

Это удобно для bootstrap — запустите на production данных, получите начальный набор constraints, затем доработайте вручную.

Metrics Repository: мониторинг во времени

Уникальная фича Deequ — Metrics Repository. Метрики сохраняются с timestamp, что позволяет отслеживать тренды:

import com.amazon.deequ.repository.InMemoryMetricsRepository
import com.amazon.deequ.repository.ResultKey

val repository = new InMemoryMetricsRepository()
val resultKey = ResultKey(System.currentTimeMillis(), Map("dataset" -> "orders"))

// Запуск анализа с сохранением метрик
AnalysisRunner
  .onData(df)
  .addAnalyzer(Completeness("customer_id"))
  .addAnalyzer(Mean("amount"))
  .useRepository(repository)
  .saveOrAppendResult(resultKey)
  .run()

// Позже: загрузка истории метрик
val history = repository.load()
  .forAnalyzers(Seq(Completeness("customer_id")))
  .getSuccessMetricsAsDataFrame(spark)

history.show()
// +----------+-------------------+-----+
// | entity   | instance          | value|
// +----------+-------------------+-----+
// | Column   | customer_id       | 0.999|
// +----------+-------------------+-----+

В production используйте FileSystemMetricsRepository (S3/HDFS) вместо InMemoryMetricsRepository.

Проверка знанийKnowledge check
В чём главное отличие Metrics Repository в Deequ от Data Docs в Great Expectations?
ОтветAnswer
Metrics Repository в Deequ хранит числовые метрики (completeness, mean, uniqueness) как time-series -- каждый запуск сохраняет значения с timestamp, что позволяет отслеживать тренды и аномалии во времени (например, 'completeness customer_id упала с 0.999 до 0.95 за последний месяц'). Data Docs в Great Expectations -- это HTML-отчёт с визуализацией pass/fail для каждого expectation, но без встроенного хранения метрик во времени. Для time-series мониторинга в GE нужны внешние инструменты (Prometheus, custom stores).

Сравнение: когда Deequ vs Great Expectations

КритерийВыбирайте DeequВыбирайте GE
Язык командыScala/JavaPython
ОблакоAWS (Glue, EMR)Любое
МетрикиНужен time-series мониторингДостаточно pass/fail отчётов
АвтопрофилингConstraintSuggestionsProfiler (менее развит)
ЭкосистемаJVM-heavy stackPython data stack
ДокументацияСредняяОтличная
CommunityМеньшеБольше
Проверка знанийKnowledge check
Когда Deequ предпочтительнее Great Expectations? Назовите 3 сценария.
ОтветAnswer
Deequ предпочтительнее в трёх сценариях: (1) Scala-first команда -- Deequ имеет native Scala API с fluent DSL, не нужен Python. (2) AWS Glue pipeline -- Deequ нативно интегрируется с AWS Glue ETL jobs через Scala/Java. (3) Нужен time-series мониторинг метрик -- Metrics Repository хранит completeness, uniqueness, mean как time-series, позволяя отслеживать тренды и ставить алерты на аномалии без внешних инструментов.

Что дальше?

В следующем уроке разберём построение pipeline качества данных — как встроить валидацию в multi-layer lakehouse архитектуру (bronze → silver → gold).

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какие четыре основных компонента составляют архитектуру Amazon Deequ?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8