Валидация качества данных
Quality Gates в pipeline
В предыдущих уроках мы построили bronze, silver и gold слои. Но pipeline без валидации — бомба замедленного действия: один битый файл в источнике может каскадировать ошибки через все слои.
В этом уроке мы добавим quality gates на каждом переходе:
- Bronze gate: проверяет сырые данные после загрузки
- Silver gate: проверяет данные после трансформаций
Для валидации используем Great Expectations (GE) — фреймворк, который мы подробно изучали в модуле М13 (тестирование и качество данных).
Настройка Great Expectations
Используем ephemeral context — без файловой конфигурации gx/. Этот подход проще для pipeline, который запускается как batch job:
import great_expectations as gx
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("EcommerceCapstone_Quality") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Ephemeral context -- конфигурация живёт только в runtime
context = gx.get_context()
data_source = context.data_sources.add_spark("ecommerce_source")
Bronze Quality Gate
Загрузка данных и создание validator
bronze_orders = spark.read.format("delta").load("/data/bronze/orders")
# Создаём data asset и batch
data_asset = data_source.add_dataframe_asset("bronze_orders")
batch_definition = data_asset.add_batch_definition_whole_dataframe(
"bronze_orders_batch"
)
batch = batch_definition.get_batch(
batch_parameters={"dataframe": bronze_orders}
)
Expectation suite для Bronze
На bronze-слое проверяем минимальные требования к сырым данным:
# Критические поля не должны быть NULL
suite = gx.ExpectationSuite(name="bronze_orders_suite")
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="customer_id")
)
# Цена в разумном диапазоне (защита от ошибок экспорта)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="price", min_value=0, max_value=100000
)
)
# Количество -- положительное и разумное
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="quantity", min_value=1, max_value=1000
)
)
# Статус из допустимого набора
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column="status",
value_set=["completed", "pending", "cancelled", "returned"]
)
)
suite = context.suites.add(suite)
Запуск валидации
validation_definition = gx.ValidationDefinition(
name="bronze_orders_validation",
data=batch_definition,
suite=suite,
)
validation_definition = context.validation_definitions.add(validation_definition)
checkpoint = gx.Checkpoint(
name="bronze_checkpoint",
validation_definitions=[validation_definition],
)
checkpoint = context.checkpoints.add(checkpoint)
result = checkpoint.run(batch_parameters={"dataframe": bronze_orders})
if not result.success:
failed = [
r.expectation_config.type
for r in result.run_results.values()
for vr in [r]
if not vr.success
]
print(f"BRONZE GATE FAILED: {failed}")
# В production: raise Exception или отправить alert
else:
print("BRONZE GATE PASSED")
Silver Quality Gate
После join и deduplication проверяем более строгие правила:
silver_orders = spark.read.format("delta").load("/data/silver/enriched_orders")
silver_asset = data_source.add_dataframe_asset("silver_orders")
silver_batch_def = silver_asset.add_batch_definition_whole_dataframe(
"silver_orders_batch"
)
silver_suite = gx.ExpectationSuite(name="silver_orders_suite")
# После dedup order_id должен быть уникальным
silver_suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
# total_amount должен быть положительным
silver_suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="total_amount", min_value=0, strict_min=True
)
)
# customer_name не NULL (join прошёл успешно)
silver_suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="customer_name")
)
# city не NULL
silver_suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="city")
)
silver_suite = context.suites.add(silver_suite)
Referential Integrity Check
GE не имеет встроенной проверки referential integrity, но мы можем реализовать её через PySpark:
# Все customer_id в orders должны существовать в customers
bronze_customers = spark.read.format("delta").load("/data/bronze/customers")
orphaned_orders = silver_orders.join(
bronze_customers,
on="customer_id",
how="left_anti" # строки из silver, которых нет в customers
)
orphan_count = orphaned_orders.count()
if orphan_count > 0:
print(f"WARNING: {orphan_count} orders with unknown customer_id")
# Quarantine orphaned records
orphaned_orders.write \
.format("delta") \
.mode("append") \
.save("/data/quarantine/orphaned_orders")
Quarantine Pattern
Для некритических проверок реализуем quarantine — изоляцию проблемных строк:
from pyspark.sql.functions import col, lit, current_timestamp
def quarantine_invalid_rows(df, condition, reason, quarantine_path):
"""Отделяет невалидные строки в quarantine table."""
invalid = df.filter(condition).withColumn(
"quarantine_reason", lit(reason)
).withColumn(
"quarantined_at", current_timestamp()
)
valid = df.filter(~condition)
if invalid.count() > 0:
invalid.write \
.format("delta") \
.mode("append") \
.save(quarantine_path)
print(f"Quarantined {invalid.count()} rows: {reason}")
return valid
# Пример: quarantine заказов с нулевой суммой
clean_orders = quarantine_invalid_rows(
silver_orders,
condition=(col("total_amount") <= 0),
reason="non_positive_total_amount",
quarantine_path="/data/quarantine/silver_orders"
)
Мониторинг качества
В production pipeline результаты валидации сохраняются для анализа трендов:
from pyspark.sql import Row
from datetime import datetime
quality_log = spark.createDataFrame([
Row(
layer="bronze",
suite="bronze_orders_suite",
passed=result.success,
checked_at=datetime.now(),
total_expectations=len(result.results),
failed_expectations=sum(
1 for r in result.results if not r.success
),
)
])
quality_log.write \
.format("delta") \
.mode("append") \
.save("/data/metadata/quality_log")
Этот лог позволяет строить дашборды качества: «процент проходящих expectations по дням», «самые частые ошибки», «тренд качества источника».
Итоги
Quality gates добавлены:
- Bronze gate: null-проверки, диапазоны price/quantity, допустимые status
- Silver gate: уникальность order_id, положительный total_amount, referential integrity
- Quarantine: изоляция невалидных строк без остановки pipeline
- Quality log: Delta-таблица для мониторинга трендов качества
В следующем уроке мы завершим pipeline: создадим сервисный слой для gold tables, нарисуем полный DAG и обсудим production deployment.