Построение pipeline качества данных
Качество данных как архитектурный слой
В предыдущих уроках мы изучили инструменты валидации — pytest для тестирования кода, Great Expectations и Deequ для проверки данных. Теперь вопрос: как встроить валидацию в production pipeline?
Качество данных — это не одноразовая проверка, а архитектурный слой, который работает на каждом этапе обработки.
Validate-on-Read vs Validate-on-Write
Два фундаментальных подхода:
| Подход | Когда проверяем | Плюсы | Минусы |
|---|---|---|---|
| Validate-on-Write | Перед записью в target | Плохие данные не попадают в target | Задержка pipeline при ошибке |
| Validate-on-Read | При чтении из source | Не блокирует запись | Плохие данные уже в target |
Рекомендация: используйте validate-on-write как основной подход. Validate-on-read — как дополнительный для legacy данных.
# Validate-on-Write (рекомендуется)
def write_with_validation(df, output_path, expectations_suite):
"""Записывает данные только после валидации."""
result = validate(df, expectations_suite)
if result.success:
df.write.mode("append").parquet(output_path)
return {"status": "success", "rows": df.count()}
else:
quarantine(df, result.failed_rows)
alert(result.failures)
return {"status": "failed", "failures": result.failures}
Multi-Layer валидация: Bronze → Silver → Gold
В lakehouse-архитектуре каждый слой имеет свои проверки:
Пример: multi-layer GE suites
# Bronze suite -- минимальные проверки (не блокирует ingestion)
bronze_suite = [
ExpectColumnToExist("order_id"),
ExpectColumnToExist("customer_id"),
ExpectColumnToExist("amount"),
ExpectColumnToExist("created_at"),
ExpectTableRowCountToBeBetween(min_value=1),
ExpectColumnValuesToNotBeNull("order_id"),
]
# Silver suite -- бизнес-правила (блокирует запись)
silver_suite = bronze_suite + [
ExpectColumnValuesToNotBeNull("customer_id"),
ExpectColumnValuesToNotBeNull("amount"),
ExpectColumnValuesToBeBetween("amount", min_value=0.01, max_value=999999.99),
ExpectColumnValuesToBeUnique("order_id"),
ExpectColumnValuesToMatchRegex("email", r"^[\w.+-]+@[\w-]+\.[\w.]+$"),
ExpectColumnValuesToBeInSet("status", ["pending", "completed", "refunded"]),
]
# Gold suite -- агрегационные проверки
gold_suite = [
ExpectTableRowCountToBeBetween(min_value=100, max_value=10_000_000),
ExpectColumnMeanToBeBetween("daily_revenue", min_value=1000, max_value=1_000_000),
ExpectColumnValuesToBeBetween("customer_count", min_value=1),
]
Quarantine Pattern
Quarantine — паттерн изоляции плохих записей. Вместо отбрасывания или остановки pipeline, проблемные строки отправляются в отдельную таблицу для ручного разбора:
from pyspark.sql.functions import col, current_timestamp, lit
def process_with_quarantine(spark, df, rules):
"""Разделяет данные на valid и quarantine."""
# Определяем условия валидности
valid_condition = (
col("order_id").isNotNull() &
col("amount").between(0.01, 999999.99) &
col("status").isin("pending", "completed", "refunded")
)
# Разделяем данные
valid_df = df.filter(valid_condition)
quarantine_df = df.filter(~valid_condition) \
.withColumn("quarantine_reason", lit("failed_validation")) \
.withColumn("quarantined_at", current_timestamp())
return valid_df, quarantine_df
def run_pipeline(spark, input_path):
raw = spark.read.parquet(input_path)
valid, quarantine = process_with_quarantine(spark, raw, rules={})
# Валидные данные -> основная таблица
valid.write.mode("append").parquet("/data/silver/orders/")
# Проблемные данные -> quarantine таблица
if quarantine.count() > 0:
quarantine.write.mode("append").parquet("/data/quarantine/orders/")
alert_quarantine(quarantine.count())
Pipeline с quarantine:
SOURCE → VALIDATE → SPLIT
├→ valid → SILVER TABLE
└→ invalid → QUARANTINE TABLE
↓
Manual review
Fix & replay
Alerting: уведомления при ошибках
Валидация без алертинга бесполезна. Основные паттерны:
Slack webhook
import requests
import json
def alert_slack(webhook_url, message, severity="warning"):
"""Отправляет alert в Slack при ошибке валидации."""
color = "#ff0000" if severity == "error" else "#ffaa00"
payload = {
"attachments": [{
"color": color,
"title": "Data Quality Alert",
"text": message,
"fields": [
{"title": "Pipeline", "value": "orders_daily", "short": True},
{"title": "Severity", "value": severity, "short": True},
]
}]
}
requests.post(webhook_url, json=payload)
Паттерн: severity levels
def handle_validation_result(result, df):
"""Обрабатывает результат валидации по severity."""
if result.critical_failures:
# CRITICAL: pipeline останавливается, PagerDuty alert
alert_pagerduty("Critical DQ failure", result.critical_failures)
raise DataQualityError("Critical validation failed")
if result.warning_failures:
# WARNING: pipeline продолжается, Slack alert
alert_slack(WEBHOOK_URL, f"DQ warnings: {result.warning_failures}")
# SUCCESS: метрики в мониторинг
publish_metrics(result.metrics)
Интеграция с Lakehouse форматами
Delta Lake: проверка перед MERGE
from delta.tables import DeltaTable
def merge_with_validation(spark, new_data, target_path):
"""MERGE с валидацией перед вставкой."""
# 1. Валидация новых данных
result = validate(new_data, silver_suite)
if not result.success:
raise DataQualityError(f"Validation failed: {result.failures}")
# 2. MERGE только если валидация прошла
target = DeltaTable.forPath(spark, target_path)
target.alias("target") \
.merge(new_data.alias("source"), "target.id = source.id") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
Iceberg: проверка перед append
def append_with_validation(spark, df, table_name):
"""Append в Iceberg с валидацией."""
result = validate(df, silver_suite)
if not result.success:
raise DataQualityError(f"Validation failed: {result.failures}")
df.writeTo(table_name).append()
Cross-reference: Подробнее о Delta Lake MERGE и Iceberg таблицах смотрите в модуле M10 (Lakehouse форматы). Здесь мы фокусируемся на интеграции DQ-проверок с этими форматами.
Best Practices
- Validate-on-write по умолчанию — не записывайте данные без проверки
- Quarantine вместо drop — не теряйте данные, изолируйте для анализа
- Severity levels — не все ошибки одинаковы: critical останавливает, warning логирует
- Метрики в мониторинг — completeness, uniqueness, row count как Prometheus/CloudWatch metrics
- DQ как code — expectation suites в Git, ревью как обычный код
Что дальше?
В следующем уроке рассмотрим отслеживание data lineage — как понять, откуда пришли данные и куда они идут, используя OpenLineage, Apache Atlas и Unity Catalog.