Learning Platform
Глоссарий Troubleshooting
Урок 11.08 · 10 мин
Средний
Docker LabGreat ExpectationsPySparkData QualityJupyterData Docs

Лабораторная: pipeline качества данных

Обзор лабораторной

В этой лабораторной работе мы соберём end-to-end pipeline качества данных с PySpark и Great Expectations в Docker-среде. Вы:

  1. Запустите Spark + Jupyter + Great Expectations в Docker
  2. Загрузите CSV-данные в Spark DataFrame
  3. Создадите Expectation Suite с правилами валидации
  4. Запустите Checkpoint и проверите результат
  5. Откроете Data Docs — HTML-отчёт по качеству данных
NOTE

Эта лабораторная использует LAB-04 из директории labs/data-quality/. Перед началом убедитесь, что Docker установлен и запущен на вашей машине.

Архитектура лаба

Docker Compose Lab
Docker Compose
Spark Masterspark:4.0.0Port: 8080
Jupyter + GEPySpark + great-expectationsPort: 8888
SparkSession connect
Spark Worker1 core, 1GB
Shared volume: ./data/ → /opt/data/
Shared volume: ./notebooks/ → /opt/notebooks/

Запуск

cd labs/data-quality/
docker compose up -d

Проверьте, что все контейнеры запущены:

docker compose ps
# NAME              STATUS    PORTS
# spark-master      running   8080->8080
# spark-worker      running
# jupyter-ge        running   8888->8888

Откройте Jupyter: http://localhost:8888

Шаг 1: Загрузка данных

В лабе предоставлен CSV-файл orders_sample.csv с тестовыми данными:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DQ-Lab") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

# Загружаем тестовые данные
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/opt/data/orders_sample.csv")

df.printSchema()
# root
#  |-- order_id: string
#  |-- customer_id: string
#  |-- amount: double
#  |-- status: string
#  |-- email: string
#  |-- created_at: timestamp

df.show(5)
# +--------+-----------+------+---------+-------------------+-------------------+
# |order_id|customer_id|amount|status   |email              |created_at         |
# +--------+-----------+------+---------+-------------------+-------------------+
# |ORD-001 |CUST-101   |249.99|completed|[email protected]  |2024-01-15 10:30:00|
# |ORD-002 |CUST-102   |75.50 |pending  |[email protected]       |2024-01-15 11:00:00|
# |ORD-003 |null       |199.00|completed|null               |2024-01-15 12:15:00|
# |ORD-004 |CUST-104   |-50.00|refunded |invalid-email      |2024-01-15 13:00:00|
# |ORD-005 |CUST-105   |320.00|unknown  |[email protected]  |2024-01-15 14:30:00|
# +--------+-----------+------+---------+-------------------+-------------------+

Обратите внимание на намеренные проблемы в данных:

  • ORD-003: customer_id и email — NULL
  • ORD-004: отрицательная сумма, невалидный email
  • ORD-005: статус unknown не из допустимого набора

Шаг 2: Создание Expectation Suite

import great_expectations as gx

# Создаём Data Context
context = gx.get_context()

# Подключаем Spark Data Source
data_source = context.data_sources.add_spark("orders_source")
data_asset = data_source.add_dataframe_asset("orders")
batch_def = data_asset.add_batch_definition_whole_dataframe("full")

# Создаём Expectation Suite
suite = gx.ExpectationSuite(name="orders_quality")

# Правила полноты
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="customer_id")
)

# Правила значений
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="amount", min_value=0.01, max_value=999999.99
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeInSet(
        column="status",
        value_set=["pending", "completed", "refunded"]
    )
)

# Правила формата
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToMatchRegex(
        column="email",
        regex=r"^[\w.+-]+@[\w-]+\.[\w.]+$"
    )
)

# Правила уникальности
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)

suite = context.suites.add(suite)
print(f"Suite '{suite.name}' created with {len(suite.expectations)} expectations")

Шаг 3: Запуск Checkpoint

# Создаём Validation Definition
validation_def = gx.ValidationDefinition(
    name="orders_validation",
    data=batch_def,
    suite=suite,
)
validation_def = context.validation_definitions.add(validation_def)

# Создаём Checkpoint
checkpoint = gx.Checkpoint(
    name="orders_checkpoint",
    validation_definitions=[validation_def],
    actions=[
        gx.checkpoint.UpdateDataDocsAction(name="update_docs"),
    ],
)
checkpoint = context.checkpoints.add(checkpoint)

# Запускаем валидацию
result = checkpoint.run(batch_parameters={"dataframe": df})

# Результат
print(f"Overall success: {result.success}")
# Overall success: False (ожидаемо -- в данных есть проблемы)

Шаг 4: Анализ результатов

# Детали по каждому expectation
for vr in result.run_results.values():
    for er in vr.results:
        status = "PASS" if er.success else "FAIL"
        print(f"  [{status}] {er.expectation_config.type}")
        if not er.success:
            print(f"         Unexpected: {er.result.get('unexpected_count', 'N/A')} rows")

# Ожидаемый вывод:
# [PASS] expect_column_values_to_not_be_null (order_id)
# [FAIL] expect_column_values_to_not_be_null (customer_id)
#        Unexpected: 1 rows
# [FAIL] expect_column_values_to_be_between (amount)
#        Unexpected: 1 rows
# [FAIL] expect_column_values_to_be_in_set (status)
#        Unexpected: 1 rows
# [FAIL] expect_column_values_to_match_regex (email)
#        Unexpected: 2 rows
# [PASS] expect_column_values_to_be_unique (order_id)

Шаг 5: Data Docs

Data Docs — HTML-отчёт, автоматически сгенерированный GE:

context.open_data_docs()

В Docker-среде Data Docs доступны по адресу, указанному в терминале. Отчёт покажет:

  • Summary — общий pass/fail для suite
  • Per-expectation details — какие правила нарушены, примеры проблемных строк
  • Statistics — процент строк, прошедших каждую проверку
Data Docs Report
Expectation Suite: orders_quality
Status: FAILED (4 of 6 expectations failed)
order_id not null
100% pass
customer_id not null
80% pass
amount between 0.01-999K
80% pass
status in valid set
80% pass
email matches regex
60% pass
order_id unique
100% pass
Проверка знанийKnowledge check
Какие 4 шага составляют DQ-pipeline в лабораторной? Что происходит на каждом шаге?
ОтветAnswer
(1) Загрузка данных -- читаем CSV в Spark DataFrame через spark.read.csv(). (2) Создание Expectation Suite -- определяем правила валидации: not_be_null, be_between, be_in_set, match_regex, be_unique. (3) Запуск Checkpoint -- связываем DataFrame с Suite и запускаем валидацию; получаем result с pass/fail для каждого expectation. (4) Анализ через Data Docs -- GE генерирует HTML-отчёт с детальной визуализацией: какие правила нарушены, сколько строк failed, примеры проблемных данных.

Troubleshooting

Частые проблемы

ПроблемаРешение
docker compose up — ошибка порта 8080Другой сервис занимает порт. Измените в docker-compose.yml: "8081:8080"
Jupyter не подключается к SparkПроверьте docker compose logs spark-master. Master должен быть running
ModuleNotFoundError: great_expectationsЗайдите в контейнер: docker exec -it jupyter-ge pip install great-expectations
Spark OOM при загрузке данныхТестовый CSV маленький, но если подставили свои данные — ограничьте: df.limit(10000)
Data Docs не открываютсяВ Docker Data Docs сохраняются в /opt/notebooks/gx/uncommitted/data_docs/. Скопируйте на хост через shared volume

Остановка лаба

# Остановить и удалить контейнеры
docker compose down

# Остановить, удалить контейнеры и volumes
docker compose down -v
Проверка знанийKnowledge check
Почему в тестовых данных лаба намеренно добавлены проблемные записи (NULL, отрицательные суммы, невалидные email)?
ОтветAnswer
Проблемные записи добавлены намеренно, чтобы: (1) показать, как Great Expectations обнаруживает различные типы ошибок -- NULL в обязательных полях, значения вне диапазона, невалидные форматы, неизвестные значения enum; (2) продемонстрировать Data Docs отчёт с FAIL-результатами и примерами failing rows; (3) дать практику анализа результатов валидации. В production данные всегда содержат аномалии -- задача DQ pipeline их находить и обрабатывать.

Итоги модуля

В модуле M13 «Testing & Data Quality» мы изучили:

  1. Unit-тестирование PySpark с pytest и SparkSession fixture
  2. Интеграционное тестирование с Testcontainers и тестовой пирамидой
  3. spark-testing-base — готовые utilities vs нативный pytest
  4. Great Expectations — декларативная валидация данных с Data Docs
  5. Amazon Deequ — Scala-native DQ с Metrics Repository
  6. DQ pipelines — multi-layer валидация, quarantine pattern, alerting
  7. Data lineage — OpenLineage, Atlas, Unity Catalog
  8. Hands-on lab — end-to-end DQ pipeline в Docker

Качество данных — не опциональная фича, а фундаментальное требование production data engineering. Тесты ловят баги в коде, а DQ-проверки ловят аномалии в данных. Вместе они гарантируют, что ваш pipeline надёжен.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Из каких компонентов состоит Docker lab для data quality pipeline?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8