Отслеживание lineage данных
Зачем нужен data lineage?
Data lineage — это карта происхождения данных: откуда они пришли, через какие трансформации прошли, и куда записались. Три причины, почему lineage критичен:
1. Compliance (GDPR, SOX)
Регулятор: "Покажите, из каких источников формируется отчёт по клиентам."
Без lineage: "Э-э-э... наверное, из PostgreSQL? Или из S3? Спросим у Васи, он писал pipeline 2 года назад."
С lineage: "Вот граф: CRM → bronze.customers → silver.customers → gold.customer_metrics → report_daily."
2. Impact Analysis
Перед изменением таблицы вы должны знать: кто от неё зависит?
silver.orders (вы хотите удалить колонку `discount`)
↓ читает
gold.revenue_daily (использует discount для расчёта net_revenue)
↓ читает
dashboard_finance (показывает net_revenue C-level)
Без lineage: удалили колонку → dashboard сломался → инцидент.
С lineage: увидели зависимость → мигрировали downstream → безопасно удалили.
3. Debugging
Данные в gold-таблице неправильные. Lineage показывает путь:
Источник ← bronze ← silver ← gold (ошибка здесь)
↑
Вот здесь JOIN потерял строки (LEFT → INNER по ошибке)
OpenLineage: открытый стандарт
OpenLineage — открытый стандарт для сбора lineage-метаданных. Разработан Datakin (ныне часть Astronomer/Atlan). Не привязан к конкретному инструменту.
Модель данных
OpenLineage оперирует тремя сущностями:
- Dataset — таблица, файл, topic (input или output)
- Job — трансформация (Spark job, dbt model, Airflow task)
- Run — конкретный запуск Job (с ID, временем, статусом)
- Facets — метаданные: schema, row count, SQL query, data quality metrics
События жизненного цикла
OpenLineage отправляет события в моменты:
START → Job начал выполнение (inputs known)
RUNNING → промежуточные обновления (опционально)
COMPLETE → Job завершён успешно (outputs known)
FAIL → Job завершён с ошибкой
Spark + OpenLineage
Конфигурация
OpenLineage интегрируется с Spark через SparkListener:
# spark-submit с OpenLineage
spark-submit \
--packages "io.openlineage:openlineage-spark_2.12:1.25.0" \
--conf "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener" \
--conf "spark.openlineage.transport.type=http" \
--conf "spark.openlineage.transport.url=http://marquez:5000/api/v1/lineage" \
--conf "spark.openlineage.namespace=production" \
my_pipeline.py
Или в spark-defaults.conf:
spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://marquez:5000/api/v1/lineage
spark.openlineage.namespace=production
Что Spark отправляет автоматически
После подключения OpenLineage listener, Spark автоматически отправляет:
- Input datasets — таблицы и файлы, которые Spark читает
- Output datasets — куда Spark записывает результат
- Schema facets — структура входных и выходных данных
- SQL facets — SQL-запрос (если используется Spark SQL)
- Run metadata — время запуска, продолжительность, статус
Никаких изменений в коде pipeline не нужно — listener перехватывает события Spark автоматически.
Apache Atlas
Apache Atlas — система управления метаданными для Hadoop-экосистемы. Предоставляет:
- Type system — определение типов сущностей (hive_table, spark_process, kafka_topic)
- Lineage graph — визуальный граф происхождения данных
- Classification — теги (PII, financial, confidential) для governance
- REST API — программный доступ к метаданным
Архитектура Atlas
Atlas интегрируется с Spark через Atlas Spark Hook — listener, аналогичный OpenLineage, но отправляет метаданные в формате Atlas:
# spark-defaults.conf для Atlas
spark.extraListeners=org.apache.atlas.spark.hook.SparkAtlasEventTracker
atlas.rest.address=http://atlas-server:21000
atlas.cluster.name=production
Atlas хорошо подходит для Hadoop-centric инфраструктуры (HDFS, Hive, HBase, Kafka). Для cloud-native стека рассмотрите OpenLineage + Marquez или Unity Catalog.
Unity Catalog (Databricks)
Unity Catalog — система governance от Databricks. Предоставляет lineage из коробки для всех Spark workloads на Databricks:
- Автоматический lineage — захватывает все read/write без конфигурации
- Column-level lineage — отслеживает не только таблицы, но и отдельные колонки
- Cross-workspace — lineage между разными Databricks workspace
- Интеграция с Delta Lake — линки к конкретным версиям таблиц
-- Unity Catalog: просмотр lineage через SQL
-- (доступно только на Databricks)
SHOW TABLE LINEAGE FOR gold.customer_metrics;
Spark4.0
Spark 4.0 улучшает lineage-интеграцию через Spark Connect — стандартизированный протокол, который упрощает сбор lineage-метаданных из клиентских приложений.
Сравнение инструментов lineage
| Аспект | OpenLineage | Apache Atlas | Unity Catalog |
|---|---|---|---|
| Тип | Открытый стандарт | Открытый проект | Проприетарный |
| Инфраструктура | Marquez / Atlan | Hadoop cluster | Databricks |
| Lineage level | Table + column | Table | Table + column |
| Setup | Spark listener config | Atlas hook + server | Из коробки |
| Стоимость | Бесплатно | Бесплатно | Databricks license |
| Best for | Multi-tool, cloud-native | Hadoop ecosystem | Databricks users |
Практический выбор инструмента lineage
Вопрос 1: Вы на Databricks?
ДА → Unity Catalog (из коробки, ничего настраивать не нужно)
НЕТ ↓
Вопрос 2: У вас Hadoop-кластер с Hive/HBase/Atlas?
ДА → Apache Atlas (нативная интеграция)
НЕТ ↓
Вопрос 3: Cloud-native или multi-tool?
ДА → OpenLineage + Marquez (открытый стандарт, любой инструмент)
Что дальше?
В следующем уроке — лабораторная работа: собираем pipeline качества данных с Great Expectations и Spark в Docker.