Введение
M5.8 покрыл lineage-as-control — schema gating, impact analysis, column-level DIRECT/INDIRECT/MASKING. M7.3 — тот же стек OpenLineage, но через лицо аудитора. Big 4 walkthrough Q4 2026: «Расскажите, как revenue_daily_aggregate вычисляется end-to-end. Какой commit SHA dbt model? Какой column lineage? Какова история прогонов за последние 90 дней? Где evidence каждой трансформации между событиями Kafka и дашбордом?». Без OpenLineage этот ответ занимает недели реконструкции вручную. С proper эмиссией OpenLineage + Marquez-бекендом + S3-архивом — ответ это один Marquez-запрос плюс восстановимое S3-скачивание.
Этот урок — lineage как audit evidence. Не «lineage для operational debugging» (покрыто в DG-курсе), а «lineage как ¶.10 IPE для completeness и accuracy testing».
OpenLineage — recap спецификации
OpenLineage spec 1.46.0 — открытая спецификация для emission lineage; schema 1-1-0 (последняя May 2025); проект LF AI & Data.
Producers (dbt, Spark, Airflow) emit'ят RunEvents в Kafka topic. Два параллельных consumer — Marquez (операционное состояние PostgreSQL) + S3-архив (audit evidence 7 лет). Auditor view читает архив S3 + Marquez UI.
Три типа событий:
- RunEvent —
START/RUNNING/COMPLETE/ABORT/FAIL. Идентифицирует исполнение. - DatasetEvent — событие только датасета (изменение схемы, изменение владения) без прогона.
- JobEvent — событие только job (обновление job definition) без прогона.
Core-поля на RunEvent:
{
"eventType": "COMPLETE",
"eventTime": "2026-09-15T06:05:14.823Z",
"run": {
"runId": "550e8400-e29b-41d4-a716-446655440000",
"facets": {...}
},
"job": {
"namespace": "swr.snowflake.dl_marts",
"name": "fct_driver_earnings_daily_build",
"facets": {
"sourceCodeLocation": {
"type": "git",
"url": "https://github.com/swiftride/dbt-marts.git",
"version": "a3f7b2c4e5d1..."
}
}
},
"inputs": [...],
"outputs": [...],
"producer": "https://github.com/OpenLineage/OpenLineage/tree/1.46.0/integration/dbt"
}
Facets — расширяемая схема; per dataset / job / run. Критичны для аудита:
- columnLineage facet (на выходной датасет) — per-column входы + трансформации.
- dataQualityAssertions facet (на выход) — результаты DQ-проверок inline.
- schema facet — типы колонок на снапшот.
- dataSource facet — origin (Snowflake-аккаунт, S3-бакет).
- storage facet — где хранится (таблица, формат файла).
- pullRequestNumber facet — PR, инициирующий build (введено в 2024).
- sourceCodeLocation facet — git commit SHA.
- testRunFacet — inline-результаты тестов (введено в 2024).
Facet column-level lineage — детали
Согласно M5.8 — columnLineage facet с тремя типами трансформации:
"columnLineage": {
"fields": {
"gross_earnings_usd": {
"inputFields": [
{
"namespace": "swr.snowflake",
"name": "swiftpay.commission_outputs",
"field": "commission_amount_cents",
"transformations": [
{"type": "DIRECT", "subtype": "TRANSFORMATION",
"description": "DIVIDE BY 100.0 to convert cents to USD",
"masking": false}
]
},
{
"namespace": "swr.snowflake",
"name": "stg_trips.trip_records",
"field": "trip_id",
"transformations": [
{"type": "INDIRECT", "subtype": "JOIN",
"description": "Join key only — does not feed value",
"masking": false}
]
}
]
},
"driver_email_hash": {
"inputFields": [
{
"namespace": "swr.aurora",
"name": "drivers.profiles",
"field": "email",
"transformations": [
{"type": "DIRECT", "subtype": "TRANSFORMATION",
"description": "SHA-256 with per-driver salt",
"masking": true}
]
}
]
}
}
}
Три типа трансформации (согласно M5.8):
- DIRECT — column-for-column propagation. Подтипы: IDENTITY (pass-through), TRANSFORMATION (арифметика, cast).
- INDIRECT — используется в WHERE/JOIN/GROUP BY, не подаёт значение напрямую.
- Masking — boolean-флаг; описывает, скрывает ли трансформация оригинал (hash, redact, tokenize).
Релевантность для аудита:
- Completeness testing (¶.10) — аудитор хочет уверенности, что ВСЕ входы, питающие выход, захвачены. Входы DIRECT прослеживают значения данных; входы INDIRECT прослеживают фильтрацию данных. Если аудитор найдёт незахваченный INDIRECT-вход (например, фильтр по
customer_kyc_tierне виден) — возникают вопросы: «Изменялась ли логика фильтра между Q1 и Q3?». - Accuracy testing — даны входы + трансформации, может ли аудитор пересчитать вывод? Трансформации DIRECT позволяют пересчёт. Трансформации INDIRECT позволяют «sample selection» — аудитор может найти все source-записи, вносящие вклад в выходную выборку.
- Соответствие masking — GDPR Art. 25 data minimization; EU AI Act Art. 10 data governance. Аудитор хочет уверенности, что masking/tokenisation колонки не утекают downstream без маскирования. Обход флага masking.
Run events как evidence
Каждое исполнение контроля → RunEvent с facet DataQualityAssertions:
{
"eventType": "COMPLETE",
"eventTime": "2026-09-15T06:00:14.523Z",
"run": {
"runId": "ge-checkpoint-uuid-...",
"facets": {
"swiftrideEvidence": {
"_producer": "swr.evidence.emitter",
"_schemaURL": "https://swr.io/schemas/evidence-v1.json",
"evidenceId": "ev_2026091506000000_ctl-cde-swr-003-002",
"controlId": "CTL-CDE-SWR-003-002",
"cdeId": "CDE-SWR-003",
"evidenceS3Key": "s3://swr-evidence-prod-cde-evidence/cde-swr-003/...",
"signature": "hmac-sha256:f4d2a..."
}
}
},
"outputs": [{
"namespace": "swr.snowflake.dl_marts",
"name": "fct_driver_earnings",
"facets": {
"dataQualityAssertions": {
"assertions": [
{
"assertion": "gross_earnings_usd > 0",
"success": true,
"column": "gross_earnings_usd"
},
{
"assertion": "row count delta vs T-1 within ±5%",
"success": true
}
]
}
}
}]
}
Кастомный facet swiftrideEvidence — указывает обратно на неизменяемое хранилище S3; событие OpenLineage само хранится в Marquez (операционно); первичный evidence в S3 Object Lock. Компрометация Marquez не затрагивает целостность первичного evidence.
Marquez как evidence-бекенд
Marquez 0.51.x (May 2026) — референсный OpenLineage-бекенд LF AI & Data. PostgreSQL первичное хранилище; REST API + UI. Хранит: facets job/run/dataset, версионирование per job (job version = история facets), история прогонов (per-run состояние + inputs/outputs), column-level lineage. Tags + кастомные facets поддерживаются. Операционный слой для запросов lineage; не первичный audit evidence (PostgreSQL изменяем). Для SOX-grade evidence — отдельный S3-экспорт событий OpenLineage параллельно с ingestion Marquez.
Развёртывание Marquez SwiftRide:
- PostgreSQL 15 (Aurora) — managed; backup еженощно; PITR 35 дней.
- Marquez API + Web — на ECS Fargate; auto-scaling; SLA target 99.95%.
- События lineage ingest’ятся через Kafka topic
openlineage.runs(Marquez Kafka consumer). - UI выставлен внутренне (SSO Okta); audit-роль read-only.
Хранилище Marquez = операционный слой. Первичный evidence — S3 Object Lock; Marquez — удобный слой запросов + UI. Состояние PostgreSQL может потеряться (DR-сценарий) → пересборка из архива событий OpenLineage в S3.
Пайплайн архива evidence:
DQ engine → emitter Lambda →
├─→ S3 Object Lock (первичный evidence — JSON evidence-schema v1)
├─→ Kafka openlineage.runs
│ ├─→ Marquez consumer (операционное состояние)
│ └─→ S3 archive consumer (raw-архив OpenLineage event, Object Lock 7 лет)
└─→ Snowflake audit.evidence_index INSERT
Два архива S3 — разные схемы, разные цели:
- Первичный evidence — SwiftRide evidence-schema-v1 (богаче, подписан, денормализован для аудитора).
- Архив OpenLineage — сырой JSON RunEvent (lineage trail; полезен, если evidence-схема SwiftRide мигрирует).
Интеграция с tooling
dbt → OpenLineage
dbt 1.9+ — интеграция openlineage-dbt (Python-пакет). Конфигурация:
# profiles.yml
swiftride-prod:
outputs:
prod:
type: snowflake
...
lineage:
enabled: true
emitter_url: "https://openlineage.swr.internal/api/v1/lineage"
emitter_namespace: "swr.snowflake.dl_marts"
На каждый прогон dbt — emit’ит RunEvent на сборку модели с:
- facet
sourceCodeLocation(git commit SHA из dbt build) - facet
dataQualityAssertionsиз результатов dbt tests - facet
columnLineage(dbt manifest + sqllineage parsing) - facet
pullRequestNumber(если CI build)
Паттерн SwiftRide — dbt Cloud (preferred) автоматически emit’ит; dbt Core (Airflow-оркестрируемый) — явный вызов openlineage-dbt после сборки.
Spark → OpenLineage
Agent openlineage-spark — JVM-агент, инжектируемый в Spark driver. Захватывает план исполнения; emit’ит RunEvent на job Spark.
Паттерн SwiftRide — нагрузки Databricks (модель ECL SwiftCapital, атрибуция SwiftAds). Init-скрипт кластера устанавливает OpenLineage Spark agent; события emit’ятся во внутренний Kafka-топик.
Airflow → OpenLineage
Нативный Airflow 2.7+ — встроенная поддержка OpenLineage (apache-airflow-providers-openlineage). На task instance → RunEvent. Распространение lineage через зависимости task instance.
Паттерн SwiftRide — Airflow (MWAA) — provider сконфигурирован; на запуск DAG → run events на task; оркестрация lineage автоматическая.
Snowflake direct queries → OpenLineage
Snowflake не emit’ит OpenLineage нативно. Паттерны:
- dbt как оркестратор — каждая сборка модели emit’ит через интеграцию dbt OpenLineage.
- Скрапер Snowflake QUERY_HISTORY — отдельный процесс читает
SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY; реконструирует lineage через парсинг SQL (Manta / SQLGlot / sqllineage); emit’ит события OpenLineage. SwiftRide использует SQLGlot-based скрапер для ad-hoc запросов, не покрытых dbt.
Lineage как evidence — призма PCAOB AS 1105
Аудитор использует evidence lineage для двух целей:
1. Completeness testing. «Покажите все входы, питающие revenue_daily_aggregate». Обход графа lineage — список всех upstream-датасетов + колонок. Аудитор сравнивает с проектной документацией (M5.9 control matrix + M4.5 реестр) — должно совпадать. Расхождение = потенциальный незадокументированный source-вход = потенциальный риск material misstatement.
2. Sample selection. Аудитор хочет 30 samples gross_earnings_usd Q3 2026. Lineage прослеживает sample через входы DIRECT обратно к источникам commission_outputs.commission_amount_cents и trip_records.fare_total_cents. Аудитор независимо пересчитывает по source-записям → ожидаемый выход → сравнение с фактическим. Пересчёт верифицирует арифметику; lineage верифицирует идентификацию входов.
Надёжность evidence — слой lineage:
- dbt-emitted events — tier 3 (внутренний с контролями — git-versioned модели, review CODEOWNERS).
- Spark-emitted events — tier 3 аналогично (код job в git, контролируемое развёртывание).
- События скрапера Snowflake QUERY_HISTORY — tier 4 (пост-фактум реконструкция; потенциальные пробелы, если запросы не парсятся).
- Ручные диаграммы lineage — tier 5 (subject manipulation; PCAOB не принимает как первичные).
Паттерны PCAOB inspection 2024-2025 — «lineage theatre» (диаграммы без программной верификации) постоянно отмечается. SwiftRide обязан обеспечить, чтобы все material CDE lineage покрывались автоматической эмиссией, не ручной диаграммой.
Настройка OpenLineage-эмиттера SwiftRide
Пайплайны Snowflake:
- dbt Cloud job
swiftride-prod— интеграция openlineage-dbt включена; на build → ~280 RunEvents (по одному на модель + dbt tests). - Скрапер Snowflake QUERY_HISTORY — Lambda
swr-snowflake-lineage-scraperзапускается каждый час; парсит запросы, не покрытые dbt; emit’ит вспомогательные RunEvents.
Пайплайны Databricks:
- Модель ECL SwiftCapital + атрибуция SwiftAds — Spark-нагрузки на Databricks.
- Init-скрипт кластера устанавливает agent
openlineage-spark(/databricks/init-scripts/install-openlineage.sh). - Конфигурация на уровне workspace — URL эмиттера, указывающий на внутренний endpoint Marquez.
Пайплайны Airflow:
- AWS MWAA Airflow 2.8 — пакет
apache-airflow-providers-openlineageустановлен. - Соединение Airflow
openlineage_defaultсконфигурировано на внутренний Marquez. - На task instance → автоматическая эмиссия RunEvent.
Kafka topic:
openlineage.runs — кластер MSK на 3 брокера; retention 30 дней операционно; tiered storage в S3 после 7 дней; архив постоянный S3 Object Lock 7 лет.
Consumers:
marquez-consumer— Marquez Kafka consumer; ingestion в PostgreSQL.evidence-archive-consumer— Lambda; пишет сырое событие вs3://swr-evidence-prod-openlineage-archive/year=YYYY/month=MM/day=DD/run-{runId}.json; Object Lock 7 лет.
Операционные метрики Q3 2026:
- События OpenLineage emit’нуто: ~12M / квартал
- Задержка запроса Marquez UI: <500ms p99 для запросов job lineage
- S3 PUTs архива событий: ~12M / квартал; ~180 GB роста хранилища (с facets column lineage).
- Стоимость хранения: ~5/месяц Glacier Deep Archive после года 1.
Антипаттерны
Диаграммы lineage без программного источника
Паттерн: диаграммы Confluence нарисованы вручную; «owner просматривает квартально».
Почему плохо: дрейф неизбежен; PCAOB lineage theatre.
Исправление: автоматическая эмиссия из CI/оркестрации; диаграммы Confluence генерируются из Marquez API (если людям нужна визуальная справка); никогда не первичный источник.
Компрометация Marquez → потеря evidence
Паттерн: Marquez PostgreSQL = первичное хранилище evidence; еженощный бэкап.
Почему плохо: PostgreSQL изменяем; DBA с MODIFY permission может переписать историю; PITR 35 дней; 7-летний SOX retention невозможен.
Исправление: Marquez = операционный слой; S3-архив сырых событий OpenLineage = первичный evidence; Marquez восстанавливается из архива S3 при необходимости.
Column lineage отсутствует для material CDE
Паттерн: захвачен table-level lineage; column-level «слишком дорого включать».
Почему плохо: completeness testing PCAOB требует прослеживаемости на уровне колонок для material CDE; выборка sample broken без column lineage; impact analysis ослаблен (M5.8).
Исправление: включить column-level facet для downstream material CDE (dbt-emitted автоматически; Spark agent поддерживается; SQLGlot scraper включить column-parsing mode). Накладные расходы хранилища управляемы (~30% увеличение размера события).
Изменения схемы не отслеживаются
Паттерн: эволюция схемы датасета через DDL; события lineage не emit’ятся (только RunEvents от data pipelines).
Почему плохо: изменения схемы могут тихо сломать downstream; аудитору нужна история эволюции схемы.
Исправление: эмиссия DatasetEvent на изменения схемы — Snowflake Alter Table → Lambda detector → emit DatasetEvent; изменения схемы dbt через build manifest. Аудит-trail эволюции схемы виден в Marquez.
Резюме
- OpenLineage spec v1.46.0 — RunEvents + DatasetEvents + JobEvents; facets расширяемы; column-level lineage с флагами DIRECT/INDIRECT/MASKING.
- Marquez — референсный OpenLineage-бекенд; операционный слой; первичный audit evidence — отдельный S3-архив сырых событий (Object Lock 7 лет).
- Lineage как evidence PCAOB AS 1105 — completeness testing (покрытие входов) + sample selection (пересчёт от источников) + соответствие masking (GDPR / EU AI Act).
- Интеграция: dbt-openlineage, openlineage-spark, OpenLineage provider Airflow; Snowflake direct queries — вспомогательный SQLGlot scraper.
- Tier 3 reliability — dbt/Spark-emitted (контролируемый production-код); tier 4 — Snowflake scraper (пост-фактум); tier 5 — ручные диаграммы (отвергаются PCAOB).
- Настройка SwiftRide — Kafka topic
openlineage.runs; Marquez consumer + S3 archive consumer параллельно; ~12M событий/квартал; column lineage включён для material CDE. - Антипаттерны: диаграммы Confluence как первичные (театр); Marquez = первичный evidence (изменяемый PostgreSQL); column lineage пропущен (пробел completeness); изменения схемы не emit’ятся (audit trail неполный).
В M7.4 разберём issue management workflow — detection → triage → containment → resolution → root cause → preventive action; SLAs per severity; интеграция с Jira / ServiceNow IRM / LogicGate.
OpenLineage spec — RunEvents, DatasetEvents, Facets Data Flow Lineage — трассируемость для аудитора