Amazon Deequ: качество данных на Scala
Что такое Deequ?
Amazon Deequ — open-source библиотека для проверки качества данных, разработанная Amazon. Ключевые отличия от Great Expectations:
| Аспект | Deequ | Great Expectations |
|---|---|---|
| Язык | Scala (JVM-native) | Python |
| Экосистема | AWS Glue, EMR | Databricks, Airflow, любой Python |
| API стиль | Fluent Scala DSL | Python classes |
| Metrics | Встроенный Metrics Repository | Data Docs (HTML) |
| Автосуггестии | ConstraintSuggestionRunner | Profiler |
Deequ построен на Spark DataFrame API и выполняет все проверки распределённо. Если ваша команда работает на Scala или использует AWS Glue — Deequ может быть лучшим выбором.
Архитектура Deequ
Четыре основных компонента:
- Analyzers — вычисляют метрики: completeness, uniqueness, mean, standard deviation
- Checks — правила валидации, построенные на метриках Analyzers
- ConstraintSuggestions — автоматическая генерация rules по профилю данных
- Metrics Repository — хранение метрик во времени для анализа трендов
Analyzers: метрики данных
Analyzers вычисляют статистики по DataFrame. Все примеры на Scala (Deequ — Scala-native библиотека):
// build.sbt
libraryDependencies += "com.amazon.deequ" % "deequ" % "2.0.7-spark-3.5"
// Analyzers
import com.amazon.deequ.analyzers.runners.AnalysisRunner
import com.amazon.deequ.analyzers._
val df = spark.read.parquet("/data/orders/")
val analysisResult = AnalysisRunner
.onData(df)
.addAnalyzer(Size()) // количество строк
.addAnalyzer(Completeness("customer_id")) // доля non-null
.addAnalyzer(Uniqueness("order_id")) // доля уникальных
.addAnalyzer(Mean("amount")) // среднее
.addAnalyzer(StandardDeviation("amount")) // стандартное отклонение
.addAnalyzer(Compliance("positive_amount", // доля соответствующих условию
"amount > 0"))
.run()
// Вывод метрик
analysisResult.metricMap.foreach { case (analyzer, metric) =>
println(s"${analyzer}: ${metric.value.getOrElse("N/A")}")
}
// Size: 1500000.0
// Completeness(customer_id): 0.9987
// Uniqueness(order_id): 1.0
// Mean(amount): 247.53
Scala-примеры в этом уроке — read-only. Deequ — Scala-native библиотека, запуск примеров требует JVM и sbt. Цель урока — понять архитектуру и ключевые возможности Deequ для работы в JVM/AWS командах.
Checks: правила валидации
Checks определяют правила и порог success/failure:
import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus}
import com.amazon.deequ.VerificationSuite
val verificationResult = VerificationSuite()
.onData(df)
.addCheck(
Check(CheckLevel.Error, "orders data quality")
.hasSize(_ >= 1000) // минимум 1000 строк
.isComplete("order_id") // order_id не NULL
.isComplete("customer_id") // customer_id не NULL
.isUnique("order_id") // order_id уникален
.isContainedIn("status", // status из допустимого набора
Array("pending", "completed", "refunded"))
.isNonNegative("amount") // amount >= 0
.hasPattern("email", // email соответствует regex
"""\w+@\w+\.\w+""".r)
)
.run()
// Проверяем результат
val status = verificationResult.status
if (status == CheckStatus.Error) {
println("DATA QUALITY CHECK FAILED!")
// Получаем детали по каждому constraint
verificationResult.checkResults.foreach { case (check, result) =>
result.constraintResults.foreach { cr =>
if (cr.status != ConstraintStatus.Success) {
println(s" FAILED: ${cr.constraint} -- ${cr.message.getOrElse("")}")
}
}
}
} else {
println("All checks passed!")
}
Два уровня Check:
- CheckLevel.Error — критическая ошибка, pipeline должен остановиться
- CheckLevel.Warning — предупреждение, логировать но продолжать
ConstraintSuggestions: автоматический профилинг
Deequ может автоматически предложить constraints на основе анализа данных:
import com.amazon.deequ.suggestions.ConstraintSuggestionRunner
import com.amazon.deequ.suggestions.Rules
val suggestionResult = ConstraintSuggestionRunner()
.onData(df)
.addConstraintRules(Rules.DEFAULT)
.run()
// Deequ анализирует данные и предлагает правила
suggestionResult.constraintSuggestions.foreach { suggestion =>
println(s"Column: ${suggestion.columnName}")
println(s" Suggestion: ${suggestion.description}")
println(s" Scala code: ${suggestion.codeForConstraint}")
}
// Column: order_id
// Suggestion: 'order_id' is not null
// Scala code: .isComplete("order_id")
// Column: amount
// Suggestion: 'amount' has no negative values
// Scala code: .isNonNegative("amount")
Это удобно для bootstrap — запустите на production данных, получите начальный набор constraints, затем доработайте вручную.
Metrics Repository: мониторинг во времени
Уникальная фича Deequ — Metrics Repository. Метрики сохраняются с timestamp, что позволяет отслеживать тренды:
import com.amazon.deequ.repository.InMemoryMetricsRepository
import com.amazon.deequ.repository.ResultKey
val repository = new InMemoryMetricsRepository()
val resultKey = ResultKey(System.currentTimeMillis(), Map("dataset" -> "orders"))
// Запуск анализа с сохранением метрик
AnalysisRunner
.onData(df)
.addAnalyzer(Completeness("customer_id"))
.addAnalyzer(Mean("amount"))
.useRepository(repository)
.saveOrAppendResult(resultKey)
.run()
// Позже: загрузка истории метрик
val history = repository.load()
.forAnalyzers(Seq(Completeness("customer_id")))
.getSuccessMetricsAsDataFrame(spark)
history.show()
// +----------+-------------------+-----+
// | entity | instance | value|
// +----------+-------------------+-----+
// | Column | customer_id | 0.999|
// +----------+-------------------+-----+
В production используйте FileSystemMetricsRepository (S3/HDFS) вместо InMemoryMetricsRepository.
Сравнение: когда Deequ vs Great Expectations
| Критерий | Выбирайте Deequ | Выбирайте GE |
|---|---|---|
| Язык команды | Scala/Java | Python |
| Облако | AWS (Glue, EMR) | Любое |
| Метрики | Нужен time-series мониторинг | Достаточно pass/fail отчётов |
| Автопрофилинг | ConstraintSuggestions | Profiler (менее развит) |
| Экосистема | JVM-heavy stack | Python data stack |
| Документация | Средняя | Отличная |
| Community | Меньше | Больше |
Что дальше?
В следующем уроке разберём построение pipeline качества данных — как встроить валидацию в multi-layer lakehouse архитектуру (bronze → silver → gold).