Введение
SwiftRide T+12M post-IPO Q4 audit cycle. Big 4 senior спрашивает CDO: «Engineer изменил schema commission_rules.commission_pct с NUMERIC(5,4) на NUMERIC(7,6) на прошлой неделе — какой this impact на CDE-SWR-003 driver_earnings, на CDE-SWR-001 revenue aggregates, на IRS 1099 export?». CDO открывает Marquez UI, traverses lineage, идентифицирует 7 downstream artefacts: 4 dbt models, 1 Looker dashboard, 1 IRS export, 1 Reverse ETL job к Salesforce. 5 из них CDE-marked. Каждая schema-dependent transformation re-tested + signed off в ту же неделю. Auditor удовлетворён.
Без lineage такой вопрос — несколько дней работы для CDO Office: grep, manual SQL inspection, asking engineers, hoping no consumer forgotten. Без traceable answer — control design deficiency (PCAOB AS 1305) — «entity не могла identify impact of change on financial reporting».
Lineage не просто observability tool. В CDE programme lineage — это активный control mechanism. Schema change → автоматический impact analysis → gating gate (block если CDE-marked downstream untested) → audit-grade evidence trail.
OpenLineage — текущее состояние
OpenLineage v1.46.0 — текущий release April 2025. Schema spec 1-1-0; column-level lineage facet, pullRequestNumber facet (PR traceability), sourceCodeLocation facet (file + commit SHA), TestRunFacet (test results inline в lineage events).
Почему OpenLineage имеет значение
Standardised emit format: emit lineage events из любого tool (dbt, Airflow, Spark, Flink, Beam, custom) в общую schema. Backend (Marquez, DataHub, Atlan, OpenMetadata, Astronomer Cosmos) consumes events; constructs lineage graph queryable + auditable.
Без стандарта: каждый tool хранит lineage internally; cross-tool stitching painful; нельзя ответить на вопрос «end-to-end CDE flow» без manual reconstruction.
Column-level lineage facets
Per OpenLineage spec, column-level lineage emitted как columnLineage facet на dataset output:
{
"name": "fct_driver_earnings",
"facets": {
"columnLineage": {
"fields": {
"gross_earnings_usd": {
"inputFields": [
{
"namespace": "snowflake://prod",
"name": "stg_trips.fare_usd",
"field": "fare_usd",
"transformations": [{"type": "DIRECT", "subtype": "IDENTITY"}]
},
{
"namespace": "snowflake://prod",
"name": "commission_rules",
"field": "commission_pct",
"transformations": [{"type": "DIRECT", "subtype": "TRANSFORMATION", "description": "multiplier"}]
}
]
},
"driver_email_hash": {
"inputFields": [
{
"namespace": "postgresql://prod",
"name": "drivers.email",
"field": "email",
"transformations": [{"type": "MASKING", "subtype": "HASH", "description": "SHA-256 + per-driver salt"}]
}
]
}
}
}
}
}
Типы трансформаций — DIRECT (column-for-column propagation, identity или transformation), INDIRECT (используется в WHERE / JOIN / GROUP BY, не feeds значение напрямую), MASKING (PII obfuscation — hash, redact, tokenize).
Интерактивный lineage с CDE markers
Кликни на node — увидишь column-level breakdown (DIRECT / INDIRECT / MASKING facets). Кликни на edge — transformation type. CDE-flagged nodes имеют clay-обведённый бордер.
CDE-flagged nodes (clay-bordered, CDE badge) — сущности в CDE registry (M4). Клик на node — column-level разбивка с facets. Клик на edge — transformation type + impact narrative.
Этот граф — активно запрашивается в CI / CD: при PR, модифицирующем schema, lineage backend traverses downstream + reports в PR comment.
Lineage в CI — schema gating
Workflow
- Engineer открывает PR, модифицирующий
dbt/models/marts/driver_earnings/commission_rules.sql. - CI запускает
dbt build --select state:modified --defer-state prod. - dbt model contract проверяет существование columns + types; build fails если breaking change без contract version bump.
- dbt emits OpenLineage event с column-level facet.
- Lineage backend (Marquez) consumes event; запрашивает downstream graph (5 hops).
- CI запускает lineage analysis script: идентифицирует все CDE-marked downstream + flags каждый для impact analysis.
- PR comment автопостится: «Downstream CDE-marked artefacts: CDE-SWR-003 (4 transformations DIRECT), CDE-SWR-001 (1 INDIRECT через aggregate), CDE-SWR-007 IRS 1099 (1 DIRECT). Required: re-test каждый + Data Owner sign-off».
- CODEOWNERS требует Finance Lead + Data Platform Lead approval.
- Manual review per downstream CDE; Data Owner подписывает PR comment, подтверждая re-test.
- Merge to main → stg run → manual ServiceNow CR → prod run.
Реализация schema gating
# CI script — lineage impact analysis
import requests
def get_downstream_cde(dataset_namespace, dataset_name, max_hops=5):
"""Запрос Marquez для downstream lineage + идентификация CDE-marked artefacts."""
response = requests.get(
f"http://marquez:5000/api/v1/lineage",
params={
"nodeId": f"dataset:{dataset_namespace}:{dataset_name}",
"depth": max_hops
}
)
graph = response.json()
cde_downstream = []
for node in graph["graph"]:
if node.get("type") == "DATASET" and node.get("tags", {}).get("cde_id"):
cde_downstream.append({
"cde_id": node["tags"]["cde_id"],
"dataset": node["name"],
"transformation_facets": extract_column_facets(node, dataset_name)
})
return cde_downstream
# Failing CI если любой CDE-marked downstream без current attestation
def gate_schema_change(modified_models, cde_registry):
impact = {}
for model in modified_models:
impact[model] = get_downstream_cde(model)
for cde_entry in impact[model]:
attestation = cde_registry.get_latest_attestation(cde_entry["cde_id"])
if attestation.status != "current" or attestation.date < pr_open_date:
fail_ci(f"CDE {cde_entry['cde_id']} требует свежую attestation — Data Owner sign-off на PR")
post_pr_comment(impact)
Этот паттерн — lineage-as-control — automated gate, обеспечивающий, что schema change на upstream проходит через документированный impact analysis + downstream re-test + Data Owner sign-off до merge.
Impact analysis — эффекты downstream
Schema change может break downstream consumers silently:
- Column drop → SELECT failures downstream.
- Column type narrowing (
DECIMAL(38,4)→DECIMAL(10,2)) → rounding loss; перевычисленные значения divergent. - Column rename → SELECT failures.
- Column semantic change без rename → values misinterpreted (например,
fare_totalранее USD, теперь EUR — type same, semantics разный — silent failure).
OpenLineage facets помогают обнаружить:
- DIRECT transformation → schema change на upstream propagates directly; downstream rebuild required.
- INDIRECT transformation → может affect aggregates / joins / filters; downstream functionality может degrade.
- MASKING transformation → schema change на masked column должен сохранять masking logic; иначе PII leak risk.
Шаблон impact analysis SwiftRide (заполняется Data Owner в PR comment):
Impact analysis — PR #1234 — commission_rules.commission_pct DECIMAL(5,4) → DECIMAL(7,6)
Downstream CDE-marked:
CDE-SWR-003 (driver_earnings_ledger)
- fct_driver_earnings.gross_earnings_usd (DIRECT, multiplier)
Impact: rounding precision increased; ожидается delta <0.0001 USD per row;
кумулятивный impact оценён <$50 daily для $12M wallet — immaterial
Re-test: yes; formula parity check rerun на 1000 random rows; 0 mismatches
Attestation: Sami (SwiftPay Data Steward) signed 2026-08-15
CDE-SWR-001 (revenue_gmv_aggregates)
- fct_revenue_daily.net_revenue_usd (INDIRECT через commission rev offset)
Impact: rounding propagation; delta <$50 daily на $6-8M revenue — immaterial
Re-test: yes; cross-system reconciliation Snowflake vs Treasury cash-in run
Attestation: Carlos (Finance Lead) signed 2026-08-15
CDE-SWR-007 (irs_1099_nec_box1)
- box_1_nonemployee_comp (DIRECT, annual SUM)
Impact: annual aggregate; кумулятивный rounding gain/loss <$2000 across 50K drivers — immaterial
Re-test: yes; backfill last 4 quarters; validate against 1099 templates
Attestation: Tax Compliance Lead signed 2026-08-15
Material weakness risk: NONE (все immaterial)
Approved for merge: 2026-08-15
Этот document — evidence для PCAOB AS 1305 (control design effectiveness) + AS 1105 (audit evidence). S3 object lock; retention 7y.
Marquez как evidence backend
Marquez — reference OpenLineage backend, LF AI & Data project.
Features, используемые SwiftRide:
- Lineage graph storage — PostgreSQL backend; queryable через REST API + UI.
- Run tracking — каждый dbt / Airflow / Spark run = run event; состояния START / COMPLETE / FAIL; run duration.
- Dataset versioning — schema versions tracked; column-level changes diff’d.
- Job versioning — code version (sourceCodeLocation facet) tracked per run.
- Tags + custom facets — CDE markers сохраняются как custom facet
cde.cdeId; queryable.
Ограничения:
- Storage не enforce immutability natively; полагается на PostgreSQL backend safeguards. SOX-grade evidence требует отдельный S3 object lock архив significant events (schema changes на CDE + run results на material pipelines).
- UI хорош для exploration, не для automated reporting; CI integration через REST API.
Deployment SwiftRide T+9M — Marquez self-hosted; PostgreSQL backend в Aurora; UI exposed внутренне + к Big 4 auditor read-only role; events ingested от dbt-airflow integration + custom Spark integration; ночной экспорт significant events к S3 object lock retention 7y.
OpenLineage emitters
dbt
dbt-openlineage plugin (Astronomer-maintained); emits OpenLineage events per dbt run:
STARTevent когда run begins.COMPLETEevent сcolumnLineagefacet, derived из SQL parsing.FAILevent если test/build fails; включает failed expectation details.
# profiles.yml
swiftride_prod:
outputs:
prod:
...
openlineage:
url: http://marquez:5000
namespace: snowflake://prod
Airflow
OpenLineage provider package; emits events per task:
- Task start / complete / fail.
- Custom facets для DAG run, task run, lineage inputs/outputs.
Spark
OpenLineage-Spark integration; emits SQL-level lineage из Spark queries; column-level через SQL parser.
Custom Python / API
openlineage-python client; emit events напрямую:
from openlineage.client import OpenLineageClient, RunEvent, RunState, Run, Job
client = OpenLineageClient.from_environment()
client.emit(RunEvent(
eventType=RunState.COMPLETE,
eventTime="2026-08-15T06:00:00Z",
run=Run(runId="<uuid>"),
job=Job(namespace="custom_jobs", name="commission_rules_reload"),
inputs=[...],
outputs=[...]
))
SwiftRide lineage-guarded migration
Сценарий T+12M actual: schema change на precision commission_rules.commission_pct (NUMERIC(5,4) → NUMERIC(7,6)). Driver:
- ECB AMLR enforcement update — более granular commission tracking required для audit;
- Big 4 Q4 readiness — precision improvement снижает material misstatement risk.
Lineage-guarded flow:
- PR opened — Engineer Anna модифицирует
dbt/models/marts/swiftpay/commission_rules.sql+ соответствующий YAML contract. - dbt build в CI:
- dbt 1.9 contract check: column precision change detected; would fail, если column dropped; passes, поскольку precision widened (backward-compatible).
- dbt test на
commission_rulesmodel — passes.
- OpenLineage event emitted — dbt-openlineage публикует
COMPLETEevent с new column schema. - Marquez consumes event — graph updated; tags
cde.cdeIdtraversed downstream. - CI lineage analysis script запрашивает Marquez REST API:
- Downstream artefacts found:
fct_driver_earnings,fct_revenue_daily,looker_driver_payouts_dashboard,irs_1099_export,salesforce_etl_payouts. - 5 из 5 CDE-marked.
- Downstream artefacts found:
- PR comment автопостится — listing 5 downstream artefacts + transformation facets + required attestations.
- Data Owners review:
- Sami (SwiftPay Data Steward) reviews driver_earnings impact + перезапускает formula parity check + подписывает PR.
- Carlos (Finance Lead) reviews revenue impact + cross-system reconciliation + подписывает.
- Tax Compliance Lead reviews IRS 1099 + backfill validation + подписывает.
- CODEOWNERS approval — Finance Lead + Data Platform Lead approve PR.
- Merge to main → CI запускает stg deploy → manual ServiceNow CR → prod deploy.
- Evidence trail archived to S3:
- PR commit SHA + impact analysis document.
- OpenLineage events (Marquez export).
- Re-test results (formula parity, reconciliation, backfill).
- Attestations (signed PR comments + Workiva attestation form).
- ServiceNow CR.
Всё archived в S3 object lock compliance mode; retention 7y; queryable Snowflake audit.evidence_index (linked to control_id CTL-CDE-SWR-003-005 schema migration).
Audit narrative: «Schema change governed by lineage-guarded process; 5 CDE downstream идентифицированы автоматически; Data Owners reviewed individual impact analysis; re-tests passed; PR approved CODEOWNERS Finance + Data Platform leads; deployed через signed CI/CD path; evidence retained». PCAOB-defensible.
Anti-patterns
Lineage existing, но не enforced в CI
Паттерн: Marquez deployed; engineers могут see lineage; но нет automated gate; engineers просто «check» manually (или forget).
Почему плохо: lineage observability ≠ lineage control. Audit asks «как вы обеспечиваете, чтобы schema change на CDE проходил через impact analysis?» Manual check insufficient.
Fix: CI script + CODEOWNERS gating; lineage analysis запускается automatically на каждом PR; results posted to PR comment; required Data Owner sign-off в PR review.
Column-level lineage только на 50% pipelines
Паттерн: dbt models имеют column-level lineage; Spark jobs нет; Airflow custom tasks нет.
Почему плохо: gaps в lineage означают blind spots в impact analysis; schema change может propagate undetected.
Fix: emit OpenLineage events из всех production pipelines (dbt, Spark, Airflow, custom Python). Coverage gap tracked + remediated.
Lineage events ephemeral
Паттерн: Marquez stores events; PostgreSQL retention 90d; older events purged.
Почему плохо: SOX evidence retention 7y; нельзя реконструировать lineage 3 года назад для audit follow-up.
Fix: significant lineage events (CDE-marked schema changes + material run results) exported daily/nightly к S3 object lock 7y. Marquez как operational layer; S3 как evidence layer.
Manual lineage в документации
Паттерн: «lineage задокументирован в Confluence page; updated квартально».
Почему плохо: lineage документации lags behind code; auditor asks current state, документация stale.
Fix: automated lineage из running code (OpenLineage emitters); документация generated, не hand-maintained.
Резюме
- OpenLineage v1.46.0 (текущий April 2025) — standardised emit format; column-level facets с DIRECT / INDIRECT / MASKING transformation types; PR + sourceCodeLocation + TestRun facets.
- Marquez (LF AI & Data) — reference backend; PostgreSQL storage; queryable через REST API + UI; events ingested из dbt-openlineage, Airflow OpenLineage provider, Spark integration, custom Python.
- Lineage-as-control workflow: PR opened → dbt build emits event → Marquez updates graph → CI lineage analysis идентифицирует CDE-marked downstream → PR comment автопостится с impact requirements → Data Owners sign off → CODEOWNERS approval → merge → deploy.
- Schema gating prevents silent breakage: column drops, type narrowing, rename, semantic changes — все идентифицированы через column-level transformation facets.
- Marquez = operational layer; S3 object lock = evidence layer (7y retention) для CDE-marked schema changes + material run results.
- BCBS 239 Principle 2 (data architecture) + Principle 3 (reconciliation to source) — lineage обеспечивает требуемую traceability.
- Сценарий SwiftRide T+12M — schema change на commission_rules.commission_pct прошёл через 10-step lineage-guarded process; 5 CDE downstream идентифицированы, attested, deployed; PCAOB-defensible.
В M5.9 — capstone lab — построим полный controls matrix для одного CDE (Driver Earnings) с минимум 12 контролей; используем structure objective/activity/evidence + DQ dimensions + SoD + lineage-as-control.