Лабораторная: pipeline качества данных
Обзор лабораторной
В этой лабораторной работе мы соберём end-to-end pipeline качества данных с PySpark и Great Expectations в Docker-среде. Вы:
- Запустите Spark + Jupyter + Great Expectations в Docker
- Загрузите CSV-данные в Spark DataFrame
- Создадите Expectation Suite с правилами валидации
- Запустите Checkpoint и проверите результат
- Откроете Data Docs — HTML-отчёт по качеству данных
Эта лабораторная использует LAB-04 из директории labs/data-quality/. Перед началом убедитесь, что Docker установлен и запущен на вашей машине.
Архитектура лаба
Запуск
cd labs/data-quality/
docker compose up -d
Проверьте, что все контейнеры запущены:
docker compose ps
# NAME STATUS PORTS
# spark-master running 8080->8080
# spark-worker running
# jupyter-ge running 8888->8888
Откройте Jupyter: http://localhost:8888
Шаг 1: Загрузка данных
В лабе предоставлен CSV-файл orders_sample.csv с тестовыми данными:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DQ-Lab") \
.master("spark://spark-master:7077") \
.getOrCreate()
# Загружаем тестовые данные
df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("/opt/data/orders_sample.csv")
df.printSchema()
# root
# |-- order_id: string
# |-- customer_id: string
# |-- amount: double
# |-- status: string
# |-- email: string
# |-- created_at: timestamp
df.show(5)
# +--------+-----------+------+---------+-------------------+-------------------+
# |order_id|customer_id|amount|status |email |created_at |
# +--------+-----------+------+---------+-------------------+-------------------+
# |ORD-001 |CUST-101 |249.99|completed|[email protected] |2024-01-15 10:30:00|
# |ORD-002 |CUST-102 |75.50 |pending |[email protected] |2024-01-15 11:00:00|
# |ORD-003 |null |199.00|completed|null |2024-01-15 12:15:00|
# |ORD-004 |CUST-104 |-50.00|refunded |invalid-email |2024-01-15 13:00:00|
# |ORD-005 |CUST-105 |320.00|unknown |[email protected] |2024-01-15 14:30:00|
# +--------+-----------+------+---------+-------------------+-------------------+
Обратите внимание на намеренные проблемы в данных:
ORD-003:customer_idиemail— NULLORD-004: отрицательная сумма, невалидный emailORD-005: статусunknownне из допустимого набора
Шаг 2: Создание Expectation Suite
import great_expectations as gx
# Создаём Data Context
context = gx.get_context()
# Подключаем Spark Data Source
data_source = context.data_sources.add_spark("orders_source")
data_asset = data_source.add_dataframe_asset("orders")
batch_def = data_asset.add_batch_definition_whole_dataframe("full")
# Создаём Expectation Suite
suite = gx.ExpectationSuite(name="orders_quality")
# Правила полноты
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="customer_id")
)
# Правила значений
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="amount", min_value=0.01, max_value=999999.99
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column="status",
value_set=["pending", "completed", "refunded"]
)
)
# Правила формата
suite.add_expectation(
gx.expectations.ExpectColumnValuesToMatchRegex(
column="email",
regex=r"^[\w.+-]+@[\w-]+\.[\w.]+$"
)
)
# Правила уникальности
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
suite = context.suites.add(suite)
print(f"Suite '{suite.name}' created with {len(suite.expectations)} expectations")
Шаг 3: Запуск Checkpoint
# Создаём Validation Definition
validation_def = gx.ValidationDefinition(
name="orders_validation",
data=batch_def,
suite=suite,
)
validation_def = context.validation_definitions.add(validation_def)
# Создаём Checkpoint
checkpoint = gx.Checkpoint(
name="orders_checkpoint",
validation_definitions=[validation_def],
actions=[
gx.checkpoint.UpdateDataDocsAction(name="update_docs"),
],
)
checkpoint = context.checkpoints.add(checkpoint)
# Запускаем валидацию
result = checkpoint.run(batch_parameters={"dataframe": df})
# Результат
print(f"Overall success: {result.success}")
# Overall success: False (ожидаемо -- в данных есть проблемы)
Шаг 4: Анализ результатов
# Детали по каждому expectation
for vr in result.run_results.values():
for er in vr.results:
status = "PASS" if er.success else "FAIL"
print(f" [{status}] {er.expectation_config.type}")
if not er.success:
print(f" Unexpected: {er.result.get('unexpected_count', 'N/A')} rows")
# Ожидаемый вывод:
# [PASS] expect_column_values_to_not_be_null (order_id)
# [FAIL] expect_column_values_to_not_be_null (customer_id)
# Unexpected: 1 rows
# [FAIL] expect_column_values_to_be_between (amount)
# Unexpected: 1 rows
# [FAIL] expect_column_values_to_be_in_set (status)
# Unexpected: 1 rows
# [FAIL] expect_column_values_to_match_regex (email)
# Unexpected: 2 rows
# [PASS] expect_column_values_to_be_unique (order_id)
Шаг 5: Data Docs
Data Docs — HTML-отчёт, автоматически сгенерированный GE:
context.open_data_docs()
В Docker-среде Data Docs доступны по адресу, указанному в терминале. Отчёт покажет:
- Summary — общий pass/fail для suite
- Per-expectation details — какие правила нарушены, примеры проблемных строк
- Statistics — процент строк, прошедших каждую проверку
Troubleshooting
Частые проблемы
| Проблема | Решение |
|---|---|
docker compose up — ошибка порта 8080 | Другой сервис занимает порт. Измените в docker-compose.yml: "8081:8080" |
| Jupyter не подключается к Spark | Проверьте docker compose logs spark-master. Master должен быть running |
ModuleNotFoundError: great_expectations | Зайдите в контейнер: docker exec -it jupyter-ge pip install great-expectations |
| Spark OOM при загрузке данных | Тестовый CSV маленький, но если подставили свои данные — ограничьте: df.limit(10000) |
| Data Docs не открываются | В Docker Data Docs сохраняются в /opt/notebooks/gx/uncommitted/data_docs/. Скопируйте на хост через shared volume |
Остановка лаба
# Остановить и удалить контейнеры
docker compose down
# Остановить, удалить контейнеры и volumes
docker compose down -v
Итоги модуля
В модуле M13 «Testing & Data Quality» мы изучили:
- Unit-тестирование PySpark с pytest и SparkSession fixture
- Интеграционное тестирование с Testcontainers и тестовой пирамидой
- spark-testing-base — готовые utilities vs нативный pytest
- Great Expectations — декларативная валидация данных с Data Docs
- Amazon Deequ — Scala-native DQ с Metrics Repository
- DQ pipelines — multi-layer валидация, quarantine pattern, alerting
- Data lineage — OpenLineage, Atlas, Unity Catalog
- Hands-on lab — end-to-end DQ pipeline в Docker
Качество данных — не опциональная фича, а фундаментальное требование production data engineering. Тесты ловят баги в коде, а DQ-проверки ловят аномалии в данных. Вместе они гарантируют, что ваш pipeline надёжен.