Learning Platform
Глоссарий Troubleshooting
Урок 17.02 · 22 мин
Начальный
Data ValidationSchema ValidationConstraints

Где валидировать данные

Знать шесть DQ dimensions — это полдела. Второй вопрос: где в pipeline ставить проверки? Можно валидировать на ingestion (источник), внутри pipeline (трансформация), на serving (отдача в BI). У каждой стратегии есть плюсы, минусы и подходящие use cases.

Senior DE отличается от junior’а пониманием где ставить проверки, а не только что проверять.


Три места валидации

Три уровня валидации
Источник
1. Ingestion validation
2. Pipeline validation
3. Serving validation
Потребители (BI, ML)

1. Ingestion validation: fail-fast

Где: на этапе загрузки данных в DWH/lake. Это первое место, где данные попадают в наши системы.

Что проверять:

  • Структура (количество колонок, имена, типы).
  • Базовая валидность (форматы дат, строк, чисел).
  • Размер партии (не слишком маленькая, не слишком большая).
  • Контрольные суммы (если есть от источника).

Принцип: fail-fast. Если данные пришли битые, лучше упасть сразу, чем продолжить и испортить downstream-витрины.

# Псевдокод проверки на ingestion
def validate_orders_batch(df):
    if df.empty:
        raise ValueError("Empty batch")
    
    expected_columns = ['order_id', 'customer_id', 'amount', 'created_at', 'status']
    if set(df.columns) != set(expected_columns):
        raise ValueError(f"Schema mismatch: {df.columns}")
    
    if df['amount'].dtype != 'float64':
        raise ValueError(f"amount must be numeric")
    
    # Range check
    if df['amount'].min() < 0:
        raise ValueError(f"Negative amount detected")
    
    # Date check
    if df['created_at'].max() > pd.Timestamp.now() + timedelta(days=1):
        raise ValueError("Future dates in created_at")
    
    return df

Плюсы:

  • Раннее обнаружение проблем.
  • Источник проблем чётко известен.
  • Downstream-pipeline не загрязняется.

Минусы:

  • Падение pipeline может задержать всю аналитику.
  • Иногда плохие данные нужно загрузить, чтобы расследовать.
TIP

Главное правило: на ingestion проверяй структуру (схема, типы, обязательные поля), но не бизнес-логику. Бизнес-логика валидируется в pipeline.


2. Pipeline validation: smart skipping

Где: внутри dbt/Spark/Airflow трансформаций, между моделями.

Что проверять:

  • Бизнес-инварианты (revenue >= 0, конверсия 0-100%).
  • Согласованность между моделями (сумма по staging совпадает с mart).
  • Распределение значений (распределение статусов не сильно изменилось).
  • FK-relationships.

Принцип: smart skipping. Не каждая ошибка должна валить pipeline. Можно пропустить плохую партию и продолжить:

# Псевдокод pipeline-логики
def transform_orders():
    raw = load_orders_yesterday()
    
    issues = check_quality(raw)
    
    if issues['critical'] > 0:
        raise Exception("Critical issues, stopping pipeline")
    
    if issues['warning'] > 0:
        send_alert(f"Warning issues: {issues}")
        # Продолжаем, но логируем
    
    # Фильтруем плохие строки
    clean = raw[raw['amount'] >= 0]
    
    transformed = run_transformations(clean)
    save_to_dwh(transformed)
dbt tests: severity warn vs error, schema tests и custom SQL tests как pipeline validation

В dbt это делается через severity: warn vs error:

columns:
  - name: order_id
    tests:
      - unique:
          severity: error  # упадёт pipeline
      - not_null:
          severity: error
  - name: discount
    tests:
      - dbt_expectations.expect_column_values_to_be_between:
          min_value: 0
          max_value: 100
          severity: warn   # залогирует, но не свалит

3. Serving validation: alert without block

Где: на готовых витринах перед использованием в BI/ML.

Что проверять:

  • Freshness (данные обновились?).
  • Volume (количество строк в пределах нормы).
  • Качество ключевых метрик (revenue по магазинам не сильно отличается от ожидаемого).
  • Anomaly detection.

Принцип: alert without block — если что-то не так, лучше показать данные с предупреждением, чем не показать ничего. Дашборд с alert полезнее, чем пустой дашборд.

# Великое observability check на serving
checks:
  mart_revenue_daily:
    - freshness: max(day) >= today() - 1
    - volume: count(*) > 1000 AND count(*) < 10000
    - anomaly: revenue diff from yesterday < 50%

Использование: Monte Carlo, Soda, dbt-tests с monitoring.

Сравнение трёх уровней валидации
Ingestion
Pipeline
Serving

Constraints vs Tests

Есть два разных способа гарантировать качество:

Constraints — это правила, которые БД enforce’ит автоматически при INSERT/UPDATE.

CREATE TABLE orders (
  order_id BIGINT PRIMARY KEY,        -- unique constraint
  customer_id BIGINT NOT NULL,         -- not null constraint
  amount DECIMAL(10,2) CHECK (amount >= 0),  -- check constraint
  status VARCHAR(20) CHECK (status IN ('pending', 'paid', 'cancelled')),
  FOREIGN KEY (customer_id) REFERENCES customers(customer_id)
);

Tests — это SQL-проверки, которые мы запускаем явно (например, после загрузки).

-- dbt test
SELECT order_id FROM orders GROUP BY order_id HAVING count(*) > 1;
Constraints vs Tests
Constraints
Tests

В OLTP-системах (Postgres, MySQL для приложений) — constraints обязательны. БД не пускает плохие данные.

В DWH — constraints поддерживаются частично (Snowflake enforce’ит NOT NULL и UNIQUE для primary keys, но не FK). Здесь основной инструмент — tests (dbt, Great Expectations).

В lakehouse (Iceberg, Delta) constraints развиваются: Delta поддерживает CHECK constraints с 2022.

SQL constraints и ACID: как БД enforce'ит NOT NULL, UNIQUE, CHECK и FK на уровне storage

Schema validation

Особый случай — schema validation. Это проверка, что структура данных не изменилась неожиданно.

Источники меняют схему: добавляют колонки, переименовывают, меняют типы. Без schema validation такие изменения ломают pipeline молча.

Где проверять схему:

  1. На ingestion — проверять схему партии при загрузке.
  2. Через Schema Registry (Avro/Protobuf для Kafka) — централизованный реестр схем.
  3. Через dbt sources — описание ожидаемой схемы в YAML.
# dbt source с описанием схемы
sources:
  - name: raw
    tables:
      - name: orders
        columns:
          - name: order_id
            data_type: bigint
            tests:
              - not_null
              - unique
          - name: amount
            data_type: decimal(10,2)
            tests:
              - not_null

dbt не enforce’ит схему сам, но позволяет тестировать наличие колонок и типов.

dbt_expectations дают мощные schema-проверки:

tests:
  - dbt_expectations.expect_table_columns_to_match_ordered_list:
      column_list: ['order_id', 'customer_id', 'amount', 'created_at', 'status']
  - dbt_expectations.expect_column_to_exist:
      column_name: amount
  - dbt_expectations.expect_column_values_to_be_of_type:
      column_name: amount
      column_type: NUMERIC

Streaming validation: особый случай

В стриминговых pipelines (Kafka -> Flink/Spark Streaming) валидация работает иначе:

  • Schema Registry (Confluent) — обязателен. Schema validation на produce.
  • Dead-letter queue (DLQ) — плохие сообщения уходят в отдельную тему для анализа, не блокируя поток.
  • Watermarks — отслеживание timeliness в Flink/Spark Streaming.
Kafka topic: orders -> Flink job -> проверка -> 
                                    ├── valid -> orders_clean topic
                                    └── invalid -> orders_dlq topic

DLQ позволяет валидировать без потери данных: даже если строка не прошла, она сохранена и можно проанализировать.


Идемпотентность и валидация

Связанная концепция — idempotency. Если pipeline можно безопасно перезапустить, то после исправления плохих данных не нужно бояться запустить заново.

Идемпотентность достигается через:

  • MERGE вместо INSERT (нет дубликатов при повторе).
  • Партиционирование (перезапуск переписывает только нужную партицию).
  • Версионирование данных (lakehouse-форматы).

С идемпотентным pipeline стратегия валидации становится: упал тест -> исправили данные -> перезапустили pipeline -> всё хорошо.

Airflow TaskFlow: retry и idempotency как паттерны надёжного pipeline

Дизайн стратегии валидации

Когда строишь pipeline, спроси:

  1. Какие данные критичны? (Revenue, customer PII) -> жёсткие проверки, fail-fast.
  2. Что просто полезно? (метрики маркетинга) -> soft warnings.
  3. Сколько можно потерять? Если 1% битых строк — приемлемо, smart skipping. Если 0% — strict mode.
  4. Кто consumer? Финансовые отчёты -> strict; ML training -> можно мягче, модели терпят шум.
WARNING

Главный анти-паттерн: валидировать всё одинаково. Если каждый тест с severity error — pipeline ломается каждый день из-за мелочей. Если каждый — severity warn — реальные проблемы теряются в потоке warnings. Дизайн валидации требует приоритизации.


Примеры реальных стратегий

Кейс 1: финансовая компания. Revenue не должен потеряться ни на копейку. Стратегия:

  • Constraints в OLTP-БД (NOT NULL, UNIQUE, CHECK).
  • На ingestion: schema validation, hash check каждой партии.
  • В pipeline: data tests на reconciliation (DWH sum == source sum).
  • На serving: алерты при отклонении.

Кейс 2: маркетинговый стартап. Конверсии и кликстрим, потери 0.5-1% приемлемы.

  • Tests с severity warn на 80% проверок.
  • Severity error только на ключи (unique customer_id).
  • DLQ для невалидных событий.
  • Daily reconciliation, не realtime.

Кейс 3: e-commerce. Заказы критичны, события — менее.

  • Strict mode для orders (severity error везде).
  • Smart skipping для browse-событий.
  • Alerting на freshness серверных таблиц.

Попробуй сам

Возьми свой pipeline (или придумай). Выпиши все таблицы и для каждой ответь:

  1. Где валидируется: ingestion / pipeline / serving?
  2. Что проверяется на каждом уровне?
  3. Какие тесты severity error, какие warn?
  4. Что происходит при падении: pipeline стоп, skip partition, alert?

Затем сравни с реальностью: что не проверяется? Какие тесты бесполезны и шумят? Какие критичные тесты отсутствуют? Это даёт реальный план для улучшения DQ в проекте.

NOTE

Этот модуль покрывает DQ на обзорном уровне (dimensions, validation strategies, инструменты). Production monitoring и data observability как самостоятельная практика — runbooks, SLA/SLO для данных, root-cause analysis, lineage-aware alerting, ML anomaly detection в деталях, organization of incident response — будут в отдельном будущем курсе data-quality-course deep-dive платформы.

Проверка знанийKnowledge check
Чем стратегия валидации на ingestion отличается от валидации на serving, и почему смешивать их подходы — анти-паттерн?
ОтветAnswer
Ingestion-валидация — fail-fast: если данные пришли битые (неправильная схема, кривые типы), pipeline останавливается сразу, чтобы не загрязнить downstream. Что проверяется: структурная целостность — схема, типы, форматы, базовая валидность. Цель — поймать поломки источника до того, как они растекутся. Serving-валидация — alert without block: если что-то не так с финальной витриной, лучше показать данные с предупреждением, чем не показать ничего. Что проверяется: freshness, volume, anomaly detection — пользовательский опыт. Цель — информировать, не блокировать. Смешивать — анти-паттерн: если ingestion работает в alert-mode, плохие данные доедут до пользователей и испортят дашборды; если serving работает в fail-fast — каждая аномалия валит всю отчётность, бизнес жалуется на пустые дашборды. Правильная стратегия — разные подходы на разных уровнях: строгая валидация на источнике, гибкая в pipeline (severity warn/error), мягкая на serving (alert).

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 6. Какие три уровня валидации обычно используются в data pipeline?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3