Где валидировать данные
Знать шесть DQ dimensions — это полдела. Второй вопрос: где в pipeline ставить проверки? Можно валидировать на ingestion (источник), внутри pipeline (трансформация), на serving (отдача в BI). У каждой стратегии есть плюсы, минусы и подходящие use cases.
Senior DE отличается от junior’а пониманием где ставить проверки, а не только что проверять.
Три места валидации
1. Ingestion validation: fail-fast
Где: на этапе загрузки данных в DWH/lake. Это первое место, где данные попадают в наши системы.
Что проверять:
- Структура (количество колонок, имена, типы).
- Базовая валидность (форматы дат, строк, чисел).
- Размер партии (не слишком маленькая, не слишком большая).
- Контрольные суммы (если есть от источника).
Принцип: fail-fast. Если данные пришли битые, лучше упасть сразу, чем продолжить и испортить downstream-витрины.
# Псевдокод проверки на ingestion
def validate_orders_batch(df):
if df.empty:
raise ValueError("Empty batch")
expected_columns = ['order_id', 'customer_id', 'amount', 'created_at', 'status']
if set(df.columns) != set(expected_columns):
raise ValueError(f"Schema mismatch: {df.columns}")
if df['amount'].dtype != 'float64':
raise ValueError(f"amount must be numeric")
# Range check
if df['amount'].min() < 0:
raise ValueError(f"Negative amount detected")
# Date check
if df['created_at'].max() > pd.Timestamp.now() + timedelta(days=1):
raise ValueError("Future dates in created_at")
return df
Плюсы:
- Раннее обнаружение проблем.
- Источник проблем чётко известен.
- Downstream-pipeline не загрязняется.
Минусы:
- Падение pipeline может задержать всю аналитику.
- Иногда плохие данные нужно загрузить, чтобы расследовать.
Главное правило: на ingestion проверяй структуру (схема, типы, обязательные поля), но не бизнес-логику. Бизнес-логика валидируется в pipeline.
2. Pipeline validation: smart skipping
Где: внутри dbt/Spark/Airflow трансформаций, между моделями.
Что проверять:
- Бизнес-инварианты (revenue >= 0, конверсия 0-100%).
- Согласованность между моделями (сумма по staging совпадает с mart).
- Распределение значений (распределение статусов не сильно изменилось).
- FK-relationships.
Принцип: smart skipping. Не каждая ошибка должна валить pipeline. Можно пропустить плохую партию и продолжить:
# Псевдокод pipeline-логики
def transform_orders():
raw = load_orders_yesterday()
issues = check_quality(raw)
if issues['critical'] > 0:
raise Exception("Critical issues, stopping pipeline")
if issues['warning'] > 0:
send_alert(f"Warning issues: {issues}")
# Продолжаем, но логируем
# Фильтруем плохие строки
clean = raw[raw['amount'] >= 0]
transformed = run_transformations(clean)
save_to_dwh(transformed)
dbt tests: severity warn vs error, schema tests и custom SQL tests как pipeline validation
В dbt это делается через severity: warn vs error:
columns:
- name: order_id
tests:
- unique:
severity: error # упадёт pipeline
- not_null:
severity: error
- name: discount
tests:
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 100
severity: warn # залогирует, но не свалит
3. Serving validation: alert without block
Где: на готовых витринах перед использованием в BI/ML.
Что проверять:
- Freshness (данные обновились?).
- Volume (количество строк в пределах нормы).
- Качество ключевых метрик (revenue по магазинам не сильно отличается от ожидаемого).
- Anomaly detection.
Принцип: alert without block — если что-то не так, лучше показать данные с предупреждением, чем не показать ничего. Дашборд с alert полезнее, чем пустой дашборд.
# Великое observability check на serving
checks:
mart_revenue_daily:
- freshness: max(day) >= today() - 1
- volume: count(*) > 1000 AND count(*) < 10000
- anomaly: revenue diff from yesterday < 50%
Использование: Monte Carlo, Soda, dbt-tests с monitoring.
Constraints vs Tests
Есть два разных способа гарантировать качество:
Constraints — это правила, которые БД enforce’ит автоматически при INSERT/UPDATE.
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY, -- unique constraint
customer_id BIGINT NOT NULL, -- not null constraint
amount DECIMAL(10,2) CHECK (amount >= 0), -- check constraint
status VARCHAR(20) CHECK (status IN ('pending', 'paid', 'cancelled')),
FOREIGN KEY (customer_id) REFERENCES customers(customer_id)
);
Tests — это SQL-проверки, которые мы запускаем явно (например, после загрузки).
-- dbt test
SELECT order_id FROM orders GROUP BY order_id HAVING count(*) > 1;
В OLTP-системах (Postgres, MySQL для приложений) — constraints обязательны. БД не пускает плохие данные.
В DWH — constraints поддерживаются частично (Snowflake enforce’ит NOT NULL и UNIQUE для primary keys, но не FK). Здесь основной инструмент — tests (dbt, Great Expectations).
В lakehouse (Iceberg, Delta) constraints развиваются: Delta поддерживает CHECK constraints с 2022.
SQL constraints и ACID: как БД enforce'ит NOT NULL, UNIQUE, CHECK и FK на уровне storageSchema validation
Особый случай — schema validation. Это проверка, что структура данных не изменилась неожиданно.
Источники меняют схему: добавляют колонки, переименовывают, меняют типы. Без schema validation такие изменения ломают pipeline молча.
Где проверять схему:
- На ingestion — проверять схему партии при загрузке.
- Через Schema Registry (Avro/Protobuf для Kafka) — централизованный реестр схем.
- Через dbt sources — описание ожидаемой схемы в YAML.
# dbt source с описанием схемы
sources:
- name: raw
tables:
- name: orders
columns:
- name: order_id
data_type: bigint
tests:
- not_null
- unique
- name: amount
data_type: decimal(10,2)
tests:
- not_null
dbt не enforce’ит схему сам, но позволяет тестировать наличие колонок и типов.
dbt_expectations дают мощные schema-проверки:
tests:
- dbt_expectations.expect_table_columns_to_match_ordered_list:
column_list: ['order_id', 'customer_id', 'amount', 'created_at', 'status']
- dbt_expectations.expect_column_to_exist:
column_name: amount
- dbt_expectations.expect_column_values_to_be_of_type:
column_name: amount
column_type: NUMERIC
Streaming validation: особый случай
В стриминговых pipelines (Kafka -> Flink/Spark Streaming) валидация работает иначе:
- Schema Registry (Confluent) — обязателен. Schema validation на produce.
- Dead-letter queue (DLQ) — плохие сообщения уходят в отдельную тему для анализа, не блокируя поток.
- Watermarks — отслеживание timeliness в Flink/Spark Streaming.
Kafka topic: orders -> Flink job -> проверка ->
├── valid -> orders_clean topic
└── invalid -> orders_dlq topic
DLQ позволяет валидировать без потери данных: даже если строка не прошла, она сохранена и можно проанализировать.
Идемпотентность и валидация
Связанная концепция — idempotency. Если pipeline можно безопасно перезапустить, то после исправления плохих данных не нужно бояться запустить заново.
Идемпотентность достигается через:
- MERGE вместо INSERT (нет дубликатов при повторе).
- Партиционирование (перезапуск переписывает только нужную партицию).
- Версионирование данных (lakehouse-форматы).
С идемпотентным pipeline стратегия валидации становится: упал тест -> исправили данные -> перезапустили pipeline -> всё хорошо.
Airflow TaskFlow: retry и idempotency как паттерны надёжного pipelineДизайн стратегии валидации
Когда строишь pipeline, спроси:
- Какие данные критичны? (Revenue, customer PII) -> жёсткие проверки, fail-fast.
- Что просто полезно? (метрики маркетинга) -> soft warnings.
- Сколько можно потерять? Если 1% битых строк — приемлемо, smart skipping. Если 0% — strict mode.
- Кто consumer? Финансовые отчёты -> strict; ML training -> можно мягче, модели терпят шум.
Главный анти-паттерн: валидировать всё одинаково. Если каждый тест с severity error — pipeline ломается каждый день из-за мелочей. Если каждый — severity warn — реальные проблемы теряются в потоке warnings. Дизайн валидации требует приоритизации.
Примеры реальных стратегий
Кейс 1: финансовая компания. Revenue не должен потеряться ни на копейку. Стратегия:
- Constraints в OLTP-БД (NOT NULL, UNIQUE, CHECK).
- На ingestion: schema validation, hash check каждой партии.
- В pipeline: data tests на reconciliation (DWH sum == source sum).
- На serving: алерты при отклонении.
Кейс 2: маркетинговый стартап. Конверсии и кликстрим, потери 0.5-1% приемлемы.
- Tests с severity warn на 80% проверок.
- Severity error только на ключи (unique customer_id).
- DLQ для невалидных событий.
- Daily reconciliation, не realtime.
Кейс 3: e-commerce. Заказы критичны, события — менее.
- Strict mode для orders (severity error везде).
- Smart skipping для browse-событий.
- Alerting на freshness серверных таблиц.
Попробуй сам
Возьми свой pipeline (или придумай). Выпиши все таблицы и для каждой ответь:
- Где валидируется: ingestion / pipeline / serving?
- Что проверяется на каждом уровне?
- Какие тесты severity error, какие warn?
- Что происходит при падении: pipeline стоп, skip partition, alert?
Затем сравни с реальностью: что не проверяется? Какие тесты бесполезны и шумят? Какие критичные тесты отсутствуют? Это даёт реальный план для улучшения DQ в проекте.
Этот модуль покрывает DQ на обзорном уровне (dimensions, validation strategies, инструменты). Production monitoring и data observability как самостоятельная практика — runbooks, SLA/SLO для данных, root-cause analysis, lineage-aware alerting, ML anomaly detection в деталях, organization of incident response — будут в отдельном будущем курсе data-quality-course deep-dive платформы.