Learning Platform
Глоссарий Troubleshooting
Урок 11.06 · 14 мин
Продвинутый
Data Quality PipelineQuarantine PatternMulti-Layer ValidationLakehouseAlerting

Построение pipeline качества данных

Качество данных как архитектурный слой

В предыдущих уроках мы изучили инструменты валидации — pytest для тестирования кода, Great Expectations и Deequ для проверки данных. Теперь вопрос: как встроить валидацию в production pipeline?

Качество данных — это не одноразовая проверка, а архитектурный слой, который работает на каждом этапе обработки.

Validate-on-Read vs Validate-on-Write

Два фундаментальных подхода:

ПодходКогда проверяемПлюсыМинусы
Validate-on-WriteПеред записью в targetПлохие данные не попадают в targetЗадержка pipeline при ошибке
Validate-on-ReadПри чтении из sourceНе блокирует записьПлохие данные уже в target

Рекомендация: используйте validate-on-write как основной подход. Validate-on-read — как дополнительный для legacy данных.

# Validate-on-Write (рекомендуется)
def write_with_validation(df, output_path, expectations_suite):
    """Записывает данные только после валидации."""
    result = validate(df, expectations_suite)

    if result.success:
        df.write.mode("append").parquet(output_path)
        return {"status": "success", "rows": df.count()}
    else:
        quarantine(df, result.failed_rows)
        alert(result.failures)
        return {"status": "failed", "failures": result.failures}

Multi-Layer валидация: Bronze → Silver → Gold

В lakehouse-архитектуре каждый слой имеет свои проверки:

Data Quality Layers
BRONZE (raw ingestion)
Schema validation (типы, имена колонок)
Row count > 0
Primary key NOT NULL
Timestamp в разумном диапазоне
SILVER (cleaned, conformed)
Все Bronze проверки +
Business rules (amount > 0, email valid)
Referential integrity (FK exists)
Uniqueness constraints
Completeness > threshold (99.5%)
GOLD (aggregated, business-ready)
Все Silver проверки +
Aggregation consistency (sum matches)
Cross-table validation
Statistical anomaly detection
Row count vs expected (±20%)

Пример: multi-layer GE suites

# Bronze suite -- минимальные проверки (не блокирует ingestion)
bronze_suite = [
    ExpectColumnToExist("order_id"),
    ExpectColumnToExist("customer_id"),
    ExpectColumnToExist("amount"),
    ExpectColumnToExist("created_at"),
    ExpectTableRowCountToBeBetween(min_value=1),
    ExpectColumnValuesToNotBeNull("order_id"),
]

# Silver suite -- бизнес-правила (блокирует запись)
silver_suite = bronze_suite + [
    ExpectColumnValuesToNotBeNull("customer_id"),
    ExpectColumnValuesToNotBeNull("amount"),
    ExpectColumnValuesToBeBetween("amount", min_value=0.01, max_value=999999.99),
    ExpectColumnValuesToBeUnique("order_id"),
    ExpectColumnValuesToMatchRegex("email", r"^[\w.+-]+@[\w-]+\.[\w.]+$"),
    ExpectColumnValuesToBeInSet("status", ["pending", "completed", "refunded"]),
]

# Gold suite -- агрегационные проверки
gold_suite = [
    ExpectTableRowCountToBeBetween(min_value=100, max_value=10_000_000),
    ExpectColumnMeanToBeBetween("daily_revenue", min_value=1000, max_value=1_000_000),
    ExpectColumnValuesToBeBetween("customer_count", min_value=1),
]

Quarantine Pattern

Quarantine — паттерн изоляции плохих записей. Вместо отбрасывания или остановки pipeline, проблемные строки отправляются в отдельную таблицу для ручного разбора:

from pyspark.sql.functions import col, current_timestamp, lit


def process_with_quarantine(spark, df, rules):
    """Разделяет данные на valid и quarantine."""

    # Определяем условия валидности
    valid_condition = (
        col("order_id").isNotNull() &
        col("amount").between(0.01, 999999.99) &
        col("status").isin("pending", "completed", "refunded")
    )

    # Разделяем данные
    valid_df = df.filter(valid_condition)
    quarantine_df = df.filter(~valid_condition) \
        .withColumn("quarantine_reason", lit("failed_validation")) \
        .withColumn("quarantined_at", current_timestamp())

    return valid_df, quarantine_df


def run_pipeline(spark, input_path):
    raw = spark.read.parquet(input_path)

    valid, quarantine = process_with_quarantine(spark, raw, rules={})

    # Валидные данные -> основная таблица
    valid.write.mode("append").parquet("/data/silver/orders/")

    # Проблемные данные -> quarantine таблица
    if quarantine.count() > 0:
        quarantine.write.mode("append").parquet("/data/quarantine/orders/")
        alert_quarantine(quarantine.count())
Pipeline с quarantine:

  SOURCE → VALIDATE → SPLIT
                        ├→ valid    → SILVER TABLE
                        └→ invalid  → QUARANTINE TABLE

                                    Manual review
                                    Fix & replay
Проверка знанийKnowledge check
Что такое quarantine pattern и когда его использовать вместо остановки pipeline?
ОтветAnswer
Quarantine pattern -- изоляция невалидных записей в отдельную таблицу вместо остановки всего pipeline или потери данных. Когда использовать: (1) pipeline обрабатывает миллионы строк и 0.1% невалидных не должны блокировать остальные 99.9%; (2) невалидные записи нужны для анализа причин (data forensics); (3) после исправления источника данные можно replay из quarantine. Остановка pipeline предпочтительна, когда невалидные данные сигнализируют о системной ошибке (пустой файл, schema break, неправильный source).

Alerting: уведомления при ошибках

Валидация без алертинга бесполезна. Основные паттерны:

Slack webhook

import requests
import json


def alert_slack(webhook_url, message, severity="warning"):
    """Отправляет alert в Slack при ошибке валидации."""
    color = "#ff0000" if severity == "error" else "#ffaa00"

    payload = {
        "attachments": [{
            "color": color,
            "title": "Data Quality Alert",
            "text": message,
            "fields": [
                {"title": "Pipeline", "value": "orders_daily", "short": True},
                {"title": "Severity", "value": severity, "short": True},
            ]
        }]
    }

    requests.post(webhook_url, json=payload)

Паттерн: severity levels

def handle_validation_result(result, df):
    """Обрабатывает результат валидации по severity."""
    if result.critical_failures:
        # CRITICAL: pipeline останавливается, PagerDuty alert
        alert_pagerduty("Critical DQ failure", result.critical_failures)
        raise DataQualityError("Critical validation failed")

    if result.warning_failures:
        # WARNING: pipeline продолжается, Slack alert
        alert_slack(WEBHOOK_URL, f"DQ warnings: {result.warning_failures}")

    # SUCCESS: метрики в мониторинг
    publish_metrics(result.metrics)

Интеграция с Lakehouse форматами

Delta Lake: проверка перед MERGE

from delta.tables import DeltaTable

def merge_with_validation(spark, new_data, target_path):
    """MERGE с валидацией перед вставкой."""

    # 1. Валидация новых данных
    result = validate(new_data, silver_suite)
    if not result.success:
        raise DataQualityError(f"Validation failed: {result.failures}")

    # 2. MERGE только если валидация прошла
    target = DeltaTable.forPath(spark, target_path)
    target.alias("target") \
        .merge(new_data.alias("source"), "target.id = source.id") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()

Iceberg: проверка перед append

def append_with_validation(spark, df, table_name):
    """Append в Iceberg с валидацией."""
    result = validate(df, silver_suite)
    if not result.success:
        raise DataQualityError(f"Validation failed: {result.failures}")

    df.writeTo(table_name).append()
TIP

Cross-reference: Подробнее о Delta Lake MERGE и Iceberg таблицах смотрите в модуле M10 (Lakehouse форматы). Здесь мы фокусируемся на интеграции DQ-проверок с этими форматами.

Проверка знанийKnowledge check
Какие проверки выполняются на каждом слое lakehouse (bronze, silver, gold)? Приведите по 2 примера для каждого слоя.
ОтветAnswer
Bronze (raw ingestion): (1) schema validation -- проверка имён и типов колонок; (2) primary key NOT NULL -- order_id не должен быть NULL. Silver (cleaned): (1) business rules -- amount между 0.01 и 999999.99; (2) uniqueness -- order_id уникален, нет дубликатов. Gold (aggregated): (1) aggregation consistency -- сумма daily_revenue в ожидаемом диапазоне; (2) statistical anomaly detection -- row count не отклоняется более чем на 20% от среднего. Каждый слой включает проверки предыдущих слоёв.

Best Practices

  1. Validate-on-write по умолчанию — не записывайте данные без проверки
  2. Quarantine вместо drop — не теряйте данные, изолируйте для анализа
  3. Severity levels — не все ошибки одинаковы: critical останавливает, warning логирует
  4. Метрики в мониторинг — completeness, uniqueness, row count как Prometheus/CloudWatch metrics
  5. DQ как code — expectation suites в Git, ревью как обычный код

Что дальше?

В следующем уроке рассмотрим отслеживание data lineage — как понять, откуда пришли данные и куда они идут, используя OpenLineage, Apache Atlas и Unity Catalog.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. В чём разница между validate-on-read и validate-on-write подходами к качеству данных?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8