Great Expectations для Spark
Что такое Great Expectations?
Great Expectations (GE) — Python-фреймворк для валидации, документирования и профилирования данных. GE отвечает на вопрос: «Соответствуют ли данные ожиданиям бизнеса?»
Ключевые преимущества:
- Декларативные правила — вы описываете что ожидаете, а не как проверять
- Автодокументация — Data Docs генерируют HTML-отчёт по результатам валидации
- Интеграция с PySpark — валидация DataFrame прямо в pipeline
- 200+ встроенных expectations — от null-проверок до статистических распределений
Архитектура GE
GE построен вокруг четырёх концепций:
| Компонент | Роль |
|---|---|
| Data Context | Корневой объект — конфигурация, stores, data sources |
| Expectation Suite | Набор правил валидации (expectations) |
| Checkpoint | Связка «данные + suite» → запуск валидации |
| Validator | Движок, который применяет expectations к данным |
| Data Docs | HTML-отчёт с результатами валидации |
PySpark интеграция
Установка
pip install great-expectations pyspark
Подключение SparkDF Data Source
import great_expectations as gx
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("GE-Spark-Demo") \
.getOrCreate()
# Создаём Data Context
context = gx.get_context()
# Подключаем Spark Data Source
data_source = context.data_sources.add_spark("spark_source")
GE использует SparkDFExecutionEngine для выполнения expectations. Движок работает напрямую с Spark DataFrame — expectations транслируются в Spark SQL операции, сохраняя все оптимизации Catalyst.
Основные Expectations
GE предоставляет более 200 встроенных expectations. Наиболее важные для data pipelines:
Проверки полноты
# Колонка не должна содержать NULL
validator.expect_column_values_to_not_be_null("user_id")
# Колонка должна существовать
validator.expect_column_to_exist("email")
# Набор колонок
validator.expect_table_columns_to_match_set(
["user_id", "email", "created_at", "status"]
)
Проверки значений
# Значения в диапазоне
validator.expect_column_values_to_be_between(
"age", min_value=0, max_value=150
)
# Значения из допустимого набора
validator.expect_column_values_to_be_in_set(
"status", ["active", "inactive", "suspended"]
)
# Regex-паттерн
validator.expect_column_values_to_match_regex(
"email", r"^[\w.+-]+@[\w-]+\.[\w.]+$"
)
Проверки уникальности и агрегации
# Уникальность
validator.expect_column_values_to_be_unique("transaction_id")
# Количество строк в диапазоне
validator.expect_table_row_count_to_be_between(
min_value=1000, max_value=10_000_000
)
# Среднее значение
validator.expect_column_mean_to_be_between(
"order_amount", min_value=10.0, max_value=1000.0
)
Создание Expectation Suite программно
Полный пример создания suite и валидации Spark DataFrame:
import great_expectations as gx
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DQ-Pipeline").getOrCreate()
# 1. Загружаем данные
df = spark.read.parquet("/data/orders/2024-01-15/")
# 2. Создаём Data Context и Data Source
context = gx.get_context()
data_source = context.data_sources.add_spark("orders_source")
# 3. Создаём Data Asset из DataFrame
data_asset = data_source.add_dataframe_asset("orders")
batch_definition = data_asset.add_batch_definition_whole_dataframe("full_batch")
# 4. Определяем Expectation Suite
suite = context.suites.add(
gx.ExpectationSuite(name="orders_quality_suite")
)
# Добавляем expectations
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="customer_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="amount", min_value=0.01, max_value=999999.99
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column="status", value_set=["pending", "completed", "refunded"]
)
)
suite.add_expectation(
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=100, max_value=10_000_000
)
)
# 5. Сохраняем suite
suite = context.suites.add(suite)
Checkpoint: запуск валидации
Checkpoint — это исполняемая единица GE. Он связывает данные (batch) с Expectation Suite и запускает валидацию:
# 6. Создаём и запускаем Checkpoint
checkpoint = context.checkpoints.add(
gx.Checkpoint(
name="orders_daily_checkpoint",
validation_definitions=[
gx.ValidationDefinition(
name="orders_validation",
data=batch_definition,
suite=suite,
)
],
actions=[
gx.checkpoint.UpdateDataDocsAction(
name="update_data_docs",
)
],
)
)
# 7. Запуск валидации
result = checkpoint.run(batch_parameters={"dataframe": df})
# 8. Проверяем результат
if not result.success:
failed = [
r for r in result.run_results.values()
if not r.success
]
print(f"VALIDATION FAILED: {len(failed)} checks failed")
# Отправить alert, не записывать данные
else:
print("VALIDATION PASSED: all checks passed")
# Записать данные в target
df.write.mode("append").parquet("/data/gold/orders/")
Интеграция в Spark Pipeline
Ключевой паттерн — validate before write. GE валидация встраивается между transform и write:
def run_orders_pipeline(spark, input_path, output_path):
"""Pipeline с GE валидацией перед записью."""
# 1. READ
raw = spark.read.parquet(input_path)
# 2. TRANSFORM
cleaned = raw \
.filter("amount > 0") \
.dropDuplicates(["order_id"]) \
.withColumn("processed_at", current_timestamp())
# 3. VALIDATE (GE)
context = gx.get_context()
result = context.checkpoints.get("orders_daily_checkpoint").run(
batch_parameters={"dataframe": cleaned}
)
if not result.success:
raise ValueError(
f"Data quality check failed for {input_path}. "
f"See Data Docs for details."
)
# 4. WRITE (только если валидация прошла)
cleaned.write \
.mode("append") \
.partitionBy("date") \
.parquet(output_path)
return cleaned.count()
Pipeline flow:
READ → TRANSFORM → VALIDATE → WRITE
↓ fail
ALERT + STOP
(не пишем плохие данные)
Validate before write — золотое правило. Никогда не записывайте данные без валидации. GE checkpoint между transform и write гарантирует, что в target попадают только данные, прошедшие все проверки. Если валидация падает — pipeline останавливается, отправляется alert, плохие данные не портят downstream таблицы.
Data Docs: автодокументация
Data Docs — HTML-отчёт, автоматически генерируемый GE при каждом запуске checkpoint:
- Expectation Suite страница — список всех правил с описаниями
- Validation Result страница — pass/fail для каждого expectation, с примерами failing rows
- История валидаций — тренды pass rate по дням
# Открыть Data Docs в браузере
context.open_data_docs()
Data Docs можно разместить на S3/GCS для командного доступа:
# great_expectations.yml
stores:
data_docs_store:
class_name: TupleS3StoreBackend
bucket: my-data-docs-bucket
prefix: great_expectations/data_docs/
Что дальше?
В следующем уроке рассмотрим Amazon Deequ — альтернативный фреймворк для качества данных, ориентированный на Scala и JVM-экосистему.