Learning Platform
Глоссарий Troubleshooting
Урок 11.04 · 16 мин
Продвинутый
Great ExpectationsData QualityExpectationSuiteCheckpointValidatorSparkDFExecutionEngine

Great Expectations для Spark

Что такое Great Expectations?

Great Expectations (GE) — Python-фреймворк для валидации, документирования и профилирования данных. GE отвечает на вопрос: «Соответствуют ли данные ожиданиям бизнеса?»

Ключевые преимущества:

  • Декларативные правила — вы описываете что ожидаете, а не как проверять
  • Автодокументация — Data Docs генерируют HTML-отчёт по результатам валидации
  • Интеграция с PySpark — валидация DataFrame прямо в pipeline
  • 200+ встроенных expectations — от null-проверок до статистических распределений

Архитектура GE

GE построен вокруг четырёх концепций:

Great Expectations Architecture
Data Context (центральная конфигурация проекта)
Data SourcesSpark DF · Pandas DF · SQL DB
Expectation Suites[правила]
Checkpoint[запуск валидации]
Validator(проверяет данные)
Data Docs(HTML отчёт)
КомпонентРоль
Data ContextКорневой объект — конфигурация, stores, data sources
Expectation SuiteНабор правил валидации (expectations)
CheckpointСвязка «данные + suite» → запуск валидации
ValidatorДвижок, который применяет expectations к данным
Data DocsHTML-отчёт с результатами валидации

PySpark интеграция

Установка

pip install great-expectations pyspark

Подключение SparkDF Data Source

import great_expectations as gx

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("GE-Spark-Demo") \
    .getOrCreate()

# Создаём Data Context
context = gx.get_context()

# Подключаем Spark Data Source
data_source = context.data_sources.add_spark("spark_source")

GE использует SparkDFExecutionEngine для выполнения expectations. Движок работает напрямую с Spark DataFrame — expectations транслируются в Spark SQL операции, сохраняя все оптимизации Catalyst.

Основные Expectations

GE предоставляет более 200 встроенных expectations. Наиболее важные для data pipelines:

Проверки полноты

# Колонка не должна содержать NULL
validator.expect_column_values_to_not_be_null("user_id")

# Колонка должна существовать
validator.expect_column_to_exist("email")

# Набор колонок
validator.expect_table_columns_to_match_set(
    ["user_id", "email", "created_at", "status"]
)

Проверки значений

# Значения в диапазоне
validator.expect_column_values_to_be_between(
    "age", min_value=0, max_value=150
)

# Значения из допустимого набора
validator.expect_column_values_to_be_in_set(
    "status", ["active", "inactive", "suspended"]
)

# Regex-паттерн
validator.expect_column_values_to_match_regex(
    "email", r"^[\w.+-]+@[\w-]+\.[\w.]+$"
)

Проверки уникальности и агрегации

# Уникальность
validator.expect_column_values_to_be_unique("transaction_id")

# Количество строк в диапазоне
validator.expect_table_row_count_to_be_between(
    min_value=1000, max_value=10_000_000
)

# Среднее значение
validator.expect_column_mean_to_be_between(
    "order_amount", min_value=10.0, max_value=1000.0
)
Проверка знанийKnowledge check
Назовите 4 основных компонента архитектуры Great Expectations и объясните роль каждого.
ОтветAnswer
(1) Data Context -- центральная конфигурация проекта: хранит настройки data sources, expectation suites, stores. (2) Expectation Suite -- набор декларативных правил валидации: expect_column_values_to_not_be_null, expect_column_values_to_be_between и другие. (3) Checkpoint -- связывает конкретные данные (DataFrame/таблица) с Expectation Suite и запускает валидацию. (4) Data Docs -- HTML-отчёт, автоматически генерируемый по результатам валидации, с визуализацией pass/fail для каждого expectation.

Создание Expectation Suite программно

Полный пример создания suite и валидации Spark DataFrame:

import great_expectations as gx

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DQ-Pipeline").getOrCreate()

# 1. Загружаем данные
df = spark.read.parquet("/data/orders/2024-01-15/")

# 2. Создаём Data Context и Data Source
context = gx.get_context()
data_source = context.data_sources.add_spark("orders_source")

# 3. Создаём Data Asset из DataFrame
data_asset = data_source.add_dataframe_asset("orders")
batch_definition = data_asset.add_batch_definition_whole_dataframe("full_batch")

# 4. Определяем Expectation Suite
suite = context.suites.add(
    gx.ExpectationSuite(name="orders_quality_suite")
)

# Добавляем expectations
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="customer_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="amount", min_value=0.01, max_value=999999.99
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeInSet(
        column="status", value_set=["pending", "completed", "refunded"]
    )
)
suite.add_expectation(
    gx.expectations.ExpectTableRowCountToBeBetween(
        min_value=100, max_value=10_000_000
    )
)

# 5. Сохраняем suite
suite = context.suites.add(suite)

Checkpoint: запуск валидации

Checkpoint — это исполняемая единица GE. Он связывает данные (batch) с Expectation Suite и запускает валидацию:

# 6. Создаём и запускаем Checkpoint
checkpoint = context.checkpoints.add(
    gx.Checkpoint(
        name="orders_daily_checkpoint",
        validation_definitions=[
            gx.ValidationDefinition(
                name="orders_validation",
                data=batch_definition,
                suite=suite,
            )
        ],
        actions=[
            gx.checkpoint.UpdateDataDocsAction(
                name="update_data_docs",
            )
        ],
    )
)

# 7. Запуск валидации
result = checkpoint.run(batch_parameters={"dataframe": df})

# 8. Проверяем результат
if not result.success:
    failed = [
        r for r in result.run_results.values()
        if not r.success
    ]
    print(f"VALIDATION FAILED: {len(failed)} checks failed")
    # Отправить alert, не записывать данные
else:
    print("VALIDATION PASSED: all checks passed")
    # Записать данные в target
    df.write.mode("append").parquet("/data/gold/orders/")

Интеграция в Spark Pipeline

Ключевой паттерн — validate before write. GE валидация встраивается между transform и write:

def run_orders_pipeline(spark, input_path, output_path):
    """Pipeline с GE валидацией перед записью."""

    # 1. READ
    raw = spark.read.parquet(input_path)

    # 2. TRANSFORM
    cleaned = raw \
        .filter("amount > 0") \
        .dropDuplicates(["order_id"]) \
        .withColumn("processed_at", current_timestamp())

    # 3. VALIDATE (GE)
    context = gx.get_context()
    result = context.checkpoints.get("orders_daily_checkpoint").run(
        batch_parameters={"dataframe": cleaned}
    )

    if not result.success:
        raise ValueError(
            f"Data quality check failed for {input_path}. "
            f"See Data Docs for details."
        )

    # 4. WRITE (только если валидация прошла)
    cleaned.write \
        .mode("append") \
        .partitionBy("date") \
        .parquet(output_path)

    return cleaned.count()
Pipeline flow:

  READ → TRANSFORM → VALIDATE → WRITE
                        ↓ fail
                    ALERT + STOP
                    (не пишем плохие данные)
TIP

Validate before write — золотое правило. Никогда не записывайте данные без валидации. GE checkpoint между transform и write гарантирует, что в target попадают только данные, прошедшие все проверки. Если валидация падает — pipeline останавливается, отправляется alert, плохие данные не портят downstream таблицы.

Data Docs: автодокументация

Data Docs — HTML-отчёт, автоматически генерируемый GE при каждом запуске checkpoint:

  • Expectation Suite страница — список всех правил с описаниями
  • Validation Result страница — pass/fail для каждого expectation, с примерами failing rows
  • История валидаций — тренды pass rate по дням
# Открыть Data Docs в браузере
context.open_data_docs()

Data Docs можно разместить на S3/GCS для командного доступа:

# great_expectations.yml
stores:
  data_docs_store:
    class_name: TupleS3StoreBackend
    bucket: my-data-docs-bucket
    prefix: great_expectations/data_docs/
Проверка знанийKnowledge check
Что такое паттерн 'validate before write' и почему он критичен для production data pipelines?
ОтветAnswer
Validate before write -- паттерн, при котором GE валидация встраивается между transform и write этапами pipeline. Данные записываются в target ТОЛЬКО если все expectations пройдены. Если валидация падает -- pipeline останавливается, отправляется alert, плохие данные не попадают в downstream таблицы. Это критично, потому что без валидации ошибка в данных (NULL в ключевом поле, выход за диапазон, дубликаты) может незаметно испортить аналитические отчёты и ML-модели, а обнаружится только через дни.

Что дальше?

В следующем уроке рассмотрим Amazon Deequ — альтернативный фреймворк для качества данных, ориентированный на Scala и JVM-экосистему.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 6. Какие четыре основных компонента составляют архитектуру Great Expectations?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8