Learning Platform
Глоссарий Troubleshooting
Урок 13.04 · 15 мин
Продвинутый
Great ExpectationsData QualityQuality GatesQuarantineExpectation SuiteValidator

Валидация качества данных

Quality Gates в pipeline

В предыдущих уроках мы построили bronze, silver и gold слои. Но pipeline без валидации — бомба замедленного действия: один битый файл в источнике может каскадировать ошибки через все слои.

В этом уроке мы добавим quality gates на каждом переходе:

  • Bronze gate: проверяет сырые данные после загрузки
  • Silver gate: проверяет данные после трансформаций

Для валидации используем Great Expectations (GE) — фреймворк, который мы подробно изучали в модуле М13 (тестирование и качество данных).

Настройка Great Expectations

Используем ephemeral context — без файловой конфигурации gx/. Этот подход проще для pipeline, который запускается как batch job:

import great_expectations as gx

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("EcommerceCapstone_Quality") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Ephemeral context -- конфигурация живёт только в runtime
context = gx.get_context()
data_source = context.data_sources.add_spark("ecommerce_source")

Bronze Quality Gate

Загрузка данных и создание validator

bronze_orders = spark.read.format("delta").load("/data/bronze/orders")

# Создаём data asset и batch
data_asset = data_source.add_dataframe_asset("bronze_orders")
batch_definition = data_asset.add_batch_definition_whole_dataframe(
    "bronze_orders_batch"
)
batch = batch_definition.get_batch(
    batch_parameters={"dataframe": bronze_orders}
)

Expectation suite для Bronze

На bronze-слое проверяем минимальные требования к сырым данным:

# Критические поля не должны быть NULL
suite = gx.ExpectationSuite(name="bronze_orders_suite")

suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="customer_id")
)

# Цена в разумном диапазоне (защита от ошибок экспорта)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="price", min_value=0, max_value=100000
    )
)

# Количество -- положительное и разумное
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="quantity", min_value=1, max_value=1000
    )
)

# Статус из допустимого набора
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeInSet(
        column="status",
        value_set=["completed", "pending", "cancelled", "returned"]
    )
)

suite = context.suites.add(suite)

Запуск валидации

validation_definition = gx.ValidationDefinition(
    name="bronze_orders_validation",
    data=batch_definition,
    suite=suite,
)
validation_definition = context.validation_definitions.add(validation_definition)

checkpoint = gx.Checkpoint(
    name="bronze_checkpoint",
    validation_definitions=[validation_definition],
)
checkpoint = context.checkpoints.add(checkpoint)

result = checkpoint.run(batch_parameters={"dataframe": bronze_orders})

if not result.success:
    failed = [
        r.expectation_config.type
        for r in result.run_results.values()
        for vr in [r]
        if not vr.success
    ]
    print(f"BRONZE GATE FAILED: {failed}")
    # В production: raise Exception или отправить alert
else:
    print("BRONZE GATE PASSED")

Silver Quality Gate

После join и deduplication проверяем более строгие правила:

silver_orders = spark.read.format("delta").load("/data/silver/enriched_orders")

silver_asset = data_source.add_dataframe_asset("silver_orders")
silver_batch_def = silver_asset.add_batch_definition_whole_dataframe(
    "silver_orders_batch"
)

silver_suite = gx.ExpectationSuite(name="silver_orders_suite")

# После dedup order_id должен быть уникальным
silver_suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)

# total_amount должен быть положительным
silver_suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="total_amount", min_value=0, strict_min=True
    )
)

# customer_name не NULL (join прошёл успешно)
silver_suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="customer_name")
)

# city не NULL
silver_suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="city")
)

silver_suite = context.suites.add(silver_suite)

Referential Integrity Check

GE не имеет встроенной проверки referential integrity, но мы можем реализовать её через PySpark:

# Все customer_id в orders должны существовать в customers
bronze_customers = spark.read.format("delta").load("/data/bronze/customers")

orphaned_orders = silver_orders.join(
    bronze_customers,
    on="customer_id",
    how="left_anti"  # строки из silver, которых нет в customers
)

orphan_count = orphaned_orders.count()
if orphan_count > 0:
    print(f"WARNING: {orphan_count} orders with unknown customer_id")
    # Quarantine orphaned records
    orphaned_orders.write \
        .format("delta") \
        .mode("append") \
        .save("/data/quarantine/orphaned_orders")
Проверка знанийKnowledge check
ОтветAnswer

Quarantine Pattern

Для некритических проверок реализуем quarantine — изоляцию проблемных строк:

from pyspark.sql.functions import col, lit, current_timestamp

def quarantine_invalid_rows(df, condition, reason, quarantine_path):
    """Отделяет невалидные строки в quarantine table."""
    invalid = df.filter(condition).withColumn(
        "quarantine_reason", lit(reason)
    ).withColumn(
        "quarantined_at", current_timestamp()
    )

    valid = df.filter(~condition)

    if invalid.count() > 0:
        invalid.write \
            .format("delta") \
            .mode("append") \
            .save(quarantine_path)
        print(f"Quarantined {invalid.count()} rows: {reason}")

    return valid

# Пример: quarantine заказов с нулевой суммой
clean_orders = quarantine_invalid_rows(
    silver_orders,
    condition=(col("total_amount") <= 0),
    reason="non_positive_total_amount",
    quarantine_path="/data/quarantine/silver_orders"
)

Мониторинг качества

В production pipeline результаты валидации сохраняются для анализа трендов:

from pyspark.sql import Row
from datetime import datetime

quality_log = spark.createDataFrame([
    Row(
        layer="bronze",
        suite="bronze_orders_suite",
        passed=result.success,
        checked_at=datetime.now(),
        total_expectations=len(result.results),
        failed_expectations=sum(
            1 for r in result.results if not r.success
        ),
    )
])

quality_log.write \
    .format("delta") \
    .mode("append") \
    .save("/data/metadata/quality_log")

Этот лог позволяет строить дашборды качества: «процент проходящих expectations по дням», «самые частые ошибки», «тренд качества источника».

Итоги

Quality gates добавлены:

  • Bronze gate: null-проверки, диапазоны price/quantity, допустимые status
  • Silver gate: уникальность order_id, положительный total_amount, referential integrity
  • Quarantine: изоляция невалидных строк без остановки pipeline
  • Quality log: Delta-таблица для мониторинга трендов качества

В следующем уроке мы завершим pipeline: создадим сервисный слой для gold tables, нарисуем полный DAG и обсудим production deployment.

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5