Learning Platform
Глоссарий Troubleshooting
Урок 06.08 · 28 мин
Продвинутый
OpenLineageColumn-level LineageSchema GatingImpact AnalysisMarquezCDE Markers

Введение

SwiftRide T+12M post-IPO Q4 audit cycle. Big 4 senior спрашивает CDO: «Engineer изменил schema commission_rules.commission_pct с NUMERIC(5,4) на NUMERIC(7,6) на прошлой неделе — какой this impact на CDE-SWR-003 driver_earnings, на CDE-SWR-001 revenue aggregates, на IRS 1099 export?». CDO открывает Marquez UI, traverses lineage, идентифицирует 7 downstream artefacts: 4 dbt models, 1 Looker dashboard, 1 IRS export, 1 Reverse ETL job к Salesforce. 5 из них CDE-marked. Каждая schema-dependent transformation re-tested + signed off в ту же неделю. Auditor удовлетворён.

Без lineage такой вопрос — несколько дней работы для CDO Office: grep, manual SQL inspection, asking engineers, hoping no consumer forgotten. Без traceable answer — control design deficiency (PCAOB AS 1305) — «entity не могла identify impact of change on financial reporting».

Lineage не просто observability tool. В CDE programme lineage — это активный control mechanism. Schema change → автоматический impact analysis → gating gate (block если CDE-marked downstream untested) → audit-grade evidence trail.

OpenLineage — текущее состояние

OpenLineage v1.46.0 — текущий release April 2025. Schema spec 1-1-0; column-level lineage facet, pullRequestNumber facet (PR traceability), sourceCodeLocation facet (file + commit SHA), TestRunFacet (test results inline в lineage events).

Почему OpenLineage имеет значение

Standardised emit format: emit lineage events из любого tool (dbt, Airflow, Spark, Flink, Beam, custom) в общую schema. Backend (Marquez, DataHub, Atlan, OpenMetadata, Astronomer Cosmos) consumes events; constructs lineage graph queryable + auditable.

Без стандарта: каждый tool хранит lineage internally; cross-tool stitching painful; нельзя ответить на вопрос «end-to-end CDE flow» без manual reconstruction.

Column-level lineage facets

Per OpenLineage spec, column-level lineage emitted как columnLineage facet на dataset output:

{
  "name": "fct_driver_earnings",
  "facets": {
    "columnLineage": {
      "fields": {
        "gross_earnings_usd": {
          "inputFields": [
            {
              "namespace": "snowflake://prod",
              "name": "stg_trips.fare_usd",
              "field": "fare_usd",
              "transformations": [{"type": "DIRECT", "subtype": "IDENTITY"}]
            },
            {
              "namespace": "snowflake://prod",
              "name": "commission_rules",
              "field": "commission_pct",
              "transformations": [{"type": "DIRECT", "subtype": "TRANSFORMATION", "description": "multiplier"}]
            }
          ]
        },
        "driver_email_hash": {
          "inputFields": [
            {
              "namespace": "postgresql://prod",
              "name": "drivers.email",
              "field": "email",
              "transformations": [{"type": "MASKING", "subtype": "HASH", "description": "SHA-256 + per-driver salt"}]
            }
          ]
        }
      }
    }
  }
}

Типы трансформаций — DIRECT (column-for-column propagation, identity или transformation), INDIRECT (используется в WHERE / JOIN / GROUP BY, не feeds значение напрямую), MASKING (PII obfuscation — hash, redact, tokenize).

Интерактивный lineage с CDE markers

Lineage с CDE-markers — schema change impact gate

Кликни на node — увидишь column-level breakdown (DIRECT / INDIRECT / MASKING facets). Кликни на edge — transformation type. CDE-flagged nodes имеют clay-обведённый бордер.

DIRECTINDIRECTMASKINGCDECritical Data Element
DIRECTDIRECTDIRECTINDIRECTDIRECTOLTP SOURCEPostgreSQL · tripsOLTP CONFIGPostgreSQL · commission_ru…CDECDE-SWR-006STAGINGdbt · stg_tripsMARTdbt · fct_driver_earningsCDECDE-SWR-003BILooker · Driver Payouts Da…REGULATORY CONSUMERIRS 1099-NEC exportCDECDE-SWR-007
NODE · MART · dbt · fct_driver_earnings· CDE-SWR-003
driver_idINDIRECT
upstream: stg_trips.driver_id + KYC join
Indirect — join key for earnings aggregation.
gross_earnings_usdDIRECTCDE
upstream: stg_trips.fare_usd × (1 - commission_pct)
CDE — central output. DIRECT transformation; depends on both `fare_usd` (DIRECT) and `commission_pct` (DIRECT). Schema change of either upstream column → impact analysis required.
driver_email_hashMASKING
upstream: pg-drivers.email
MASKING facet — PII обфусцирован SHA-256 + per-driver salt. GDPR Art. 25 data minimization compliant.

CDE-flagged nodes (clay-bordered, CDE badge) — сущности в CDE registry (M4). Клик на node — column-level разбивка с facets. Клик на edge — transformation type + impact narrative.

Этот граф — активно запрашивается в CI / CD: при PR, модифицирующем schema, lineage backend traverses downstream + reports в PR comment.

Lineage в CI — schema gating

Workflow

  1. Engineer открывает PR, модифицирующий dbt/models/marts/driver_earnings/commission_rules.sql.
  2. CI запускает dbt build --select state:modified --defer-state prod.
  3. dbt model contract проверяет существование columns + types; build fails если breaking change без contract version bump.
  4. dbt emits OpenLineage event с column-level facet.
  5. Lineage backend (Marquez) consumes event; запрашивает downstream graph (5 hops).
  6. CI запускает lineage analysis script: идентифицирует все CDE-marked downstream + flags каждый для impact analysis.
  7. PR comment автопостится: «Downstream CDE-marked artefacts: CDE-SWR-003 (4 transformations DIRECT), CDE-SWR-001 (1 INDIRECT через aggregate), CDE-SWR-007 IRS 1099 (1 DIRECT). Required: re-test каждый + Data Owner sign-off».
  8. CODEOWNERS требует Finance Lead + Data Platform Lead approval.
  9. Manual review per downstream CDE; Data Owner подписывает PR comment, подтверждая re-test.
  10. Merge to main → stg run → manual ServiceNow CR → prod run.

Реализация schema gating

# CI script — lineage impact analysis
import requests

def get_downstream_cde(dataset_namespace, dataset_name, max_hops=5):
    """Запрос Marquez для downstream lineage + идентификация CDE-marked artefacts."""
    response = requests.get(
        f"http://marquez:5000/api/v1/lineage",
        params={
            "nodeId": f"dataset:{dataset_namespace}:{dataset_name}",
            "depth": max_hops
        }
    )
    graph = response.json()
    cde_downstream = []
    for node in graph["graph"]:
        if node.get("type") == "DATASET" and node.get("tags", {}).get("cde_id"):
            cde_downstream.append({
                "cde_id": node["tags"]["cde_id"],
                "dataset": node["name"],
                "transformation_facets": extract_column_facets(node, dataset_name)
            })
    return cde_downstream

# Failing CI если любой CDE-marked downstream без current attestation
def gate_schema_change(modified_models, cde_registry):
    impact = {}
    for model in modified_models:
        impact[model] = get_downstream_cde(model)
        for cde_entry in impact[model]:
            attestation = cde_registry.get_latest_attestation(cde_entry["cde_id"])
            if attestation.status != "current" or attestation.date < pr_open_date:
                fail_ci(f"CDE {cde_entry['cde_id']} требует свежую attestation — Data Owner sign-off на PR")
    post_pr_comment(impact)

Этот паттерн — lineage-as-control — automated gate, обеспечивающий, что schema change на upstream проходит через документированный impact analysis + downstream re-test + Data Owner sign-off до merge.

Impact analysis — эффекты downstream

Schema change может break downstream consumers silently:

  • Column drop → SELECT failures downstream.
  • Column type narrowing (DECIMAL(38,4)DECIMAL(10,2)) → rounding loss; перевычисленные значения divergent.
  • Column rename → SELECT failures.
  • Column semantic change без rename → values misinterpreted (например, fare_total ранее USD, теперь EUR — type same, semantics разный — silent failure).

OpenLineage facets помогают обнаружить:

  • DIRECT transformation → schema change на upstream propagates directly; downstream rebuild required.
  • INDIRECT transformation → может affect aggregates / joins / filters; downstream functionality может degrade.
  • MASKING transformation → schema change на masked column должен сохранять masking logic; иначе PII leak risk.

Шаблон impact analysis SwiftRide (заполняется Data Owner в PR comment):

Impact analysis — PR #1234 — commission_rules.commission_pct DECIMAL(5,4) → DECIMAL(7,6)

Downstream CDE-marked:
  CDE-SWR-003 (driver_earnings_ledger)
    - fct_driver_earnings.gross_earnings_usd (DIRECT, multiplier)
      Impact: rounding precision increased; ожидается delta <0.0001 USD per row;
              кумулятивный impact оценён <$50 daily для $12M wallet — immaterial
      Re-test: yes; formula parity check rerun на 1000 random rows; 0 mismatches
      Attestation: Sami (SwiftPay Data Steward) signed 2026-08-15

  CDE-SWR-001 (revenue_gmv_aggregates)
    - fct_revenue_daily.net_revenue_usd (INDIRECT через commission rev offset)
      Impact: rounding propagation; delta <$50 daily на $6-8M revenue — immaterial
      Re-test: yes; cross-system reconciliation Snowflake vs Treasury cash-in run
      Attestation: Carlos (Finance Lead) signed 2026-08-15

  CDE-SWR-007 (irs_1099_nec_box1)
    - box_1_nonemployee_comp (DIRECT, annual SUM)
      Impact: annual aggregate; кумулятивный rounding gain/loss <$2000 across 50K drivers — immaterial
      Re-test: yes; backfill last 4 quarters; validate against 1099 templates
      Attestation: Tax Compliance Lead signed 2026-08-15

Material weakness risk: NONE (все immaterial)
Approved for merge: 2026-08-15

Этот document — evidence для PCAOB AS 1305 (control design effectiveness) + AS 1105 (audit evidence). S3 object lock; retention 7y.

Marquez как evidence backend

Marquezv0.51.x current2026-05

Marquez — reference OpenLineage backend, LF AI & Data project.

Features, используемые SwiftRide:

  • Lineage graph storage — PostgreSQL backend; queryable через REST API + UI.
  • Run tracking — каждый dbt / Airflow / Spark run = run event; состояния START / COMPLETE / FAIL; run duration.
  • Dataset versioning — schema versions tracked; column-level changes diff’d.
  • Job versioning — code version (sourceCodeLocation facet) tracked per run.
  • Tags + custom facets — CDE markers сохраняются как custom facet cde.cdeId; queryable.

Ограничения:

  • Storage не enforce immutability natively; полагается на PostgreSQL backend safeguards. SOX-grade evidence требует отдельный S3 object lock архив significant events (schema changes на CDE + run results на material pipelines).
  • UI хорош для exploration, не для automated reporting; CI integration через REST API.

Deployment SwiftRide T+9M — Marquez self-hosted; PostgreSQL backend в Aurora; UI exposed внутренне + к Big 4 auditor read-only role; events ingested от dbt-airflow integration + custom Spark integration; ночной экспорт significant events к S3 object lock retention 7y.

OpenLineage emitters

dbt

dbt-openlineage plugin (Astronomer-maintained); emits OpenLineage events per dbt run:

  • START event когда run begins.
  • COMPLETE event с columnLineage facet, derived из SQL parsing.
  • FAIL event если test/build fails; включает failed expectation details.
# profiles.yml
swiftride_prod:
  outputs:
    prod:
      ...
      openlineage:
        url: http://marquez:5000
        namespace: snowflake://prod

Airflow

OpenLineage provider package; emits events per task:

  • Task start / complete / fail.
  • Custom facets для DAG run, task run, lineage inputs/outputs.

Spark

OpenLineage-Spark integration; emits SQL-level lineage из Spark queries; column-level через SQL parser.

Custom Python / API

openlineage-python client; emit events напрямую:

from openlineage.client import OpenLineageClient, RunEvent, RunState, Run, Job

client = OpenLineageClient.from_environment()
client.emit(RunEvent(
    eventType=RunState.COMPLETE,
    eventTime="2026-08-15T06:00:00Z",
    run=Run(runId="<uuid>"),
    job=Job(namespace="custom_jobs", name="commission_rules_reload"),
    inputs=[...],
    outputs=[...]
))

SwiftRide lineage-guarded migration

Сценарий T+12M actual: schema change на precision commission_rules.commission_pct (NUMERIC(5,4) → NUMERIC(7,6)). Driver:

  • ECB AMLR enforcement update — более granular commission tracking required для audit;
  • Big 4 Q4 readiness — precision improvement снижает material misstatement risk.

Lineage-guarded flow:

  1. PR opened — Engineer Anna модифицирует dbt/models/marts/swiftpay/commission_rules.sql + соответствующий YAML contract.
  2. dbt build в CI:
    • dbt 1.9 contract check: column precision change detected; would fail, если column dropped; passes, поскольку precision widened (backward-compatible).
    • dbt test на commission_rules model — passes.
  3. OpenLineage event emitted — dbt-openlineage публикует COMPLETE event с new column schema.
  4. Marquez consumes event — graph updated; tags cde.cdeId traversed downstream.
  5. CI lineage analysis script запрашивает Marquez REST API:
    • Downstream artefacts found: fct_driver_earnings, fct_revenue_daily, looker_driver_payouts_dashboard, irs_1099_export, salesforce_etl_payouts.
    • 5 из 5 CDE-marked.
  6. PR comment автопостится — listing 5 downstream artefacts + transformation facets + required attestations.
  7. Data Owners review:
    • Sami (SwiftPay Data Steward) reviews driver_earnings impact + перезапускает formula parity check + подписывает PR.
    • Carlos (Finance Lead) reviews revenue impact + cross-system reconciliation + подписывает.
    • Tax Compliance Lead reviews IRS 1099 + backfill validation + подписывает.
  8. CODEOWNERS approval — Finance Lead + Data Platform Lead approve PR.
  9. Merge to main → CI запускает stg deploy → manual ServiceNow CR → prod deploy.
  10. Evidence trail archived to S3:
    • PR commit SHA + impact analysis document.
    • OpenLineage events (Marquez export).
    • Re-test results (formula parity, reconciliation, backfill).
    • Attestations (signed PR comments + Workiva attestation form).
    • ServiceNow CR.

Всё archived в S3 object lock compliance mode; retention 7y; queryable Snowflake audit.evidence_index (linked to control_id CTL-CDE-SWR-003-005 schema migration).

Audit narrative: «Schema change governed by lineage-guarded process; 5 CDE downstream идентифицированы автоматически; Data Owners reviewed individual impact analysis; re-tests passed; PR approved CODEOWNERS Finance + Data Platform leads; deployed через signed CI/CD path; evidence retained». PCAOB-defensible.

Проверка знанийKnowledge check
SwiftRide T+15M post-IPO: engineer модифицирует тип column `fct_trip_records.surge_multiplier` (DECIMAL(5,3) → DECIMAL(7,5)). OpenLineage event emitted; Marquez идентифицирует downstream: `fct_revenue_daily`, `fct_driver_earnings`, `looker_pricing_dashboard`, `pricing_ml_feature_store`, `eu_ai_act_disclosure_export`. 4 из 5 CDE-marked. Engineer заявляет «precision widening — backward-compatible, impact analysis не нужен». PCAOB-defensible ответ?
ОтветAnswer
Engineer не прав — backward compatibility (type widening) ≠ no impact analysis. Required steps: (1) DIRECT transformations downstream (fct_revenue_daily, fct_driver_earnings) — precision widening propagates higher precision; rounding behaviour может differ; formula parity checks должны rerun на random samples; кумулятивный impact оценён; immaterial если delta <SAB 99 threshold иначе material weakness disclosure. (2) INDIRECT через ML feature store — `pricing_ml_feature_store` consumes surge_multiplier для model training; precision change может affect feature distributions → model retraining может быть required → AI Act Art. 10 documentation update если high-risk classification confirmed. (3) MASKING NOT применимо здесь, но AI Act Art. 13 transparency требует, чтобы `eu_ai_act_disclosure_export` был updated если material change в pricing transparency disclosure. (4) Каждый downstream — Data Owner sign-off required, documenting impact + re-test + attestation; archived 7y. (5) PCAOB AS 1305 — если engineer pushes без impact analysis, control design deficiency: «entity не могла identify impact of change on financial reporting» — material weakness candidate. (6) Audit narrative defensible только через: lineage-guarded gate triggered; impact analysis document filled; 4 Data Owners signed off; re-tests recorded; PR merged через CODEOWNERS approval; evidence archived. Backward compatibility addresses CODE not BUSINESS LOGIC — values могут propagate differently downstream даже когда schema technically compatible.

Anti-patterns

Lineage existing, но не enforced в CI

Паттерн: Marquez deployed; engineers могут see lineage; но нет automated gate; engineers просто «check» manually (или forget).

Почему плохо: lineage observability ≠ lineage control. Audit asks «как вы обеспечиваете, чтобы schema change на CDE проходил через impact analysis?» Manual check insufficient.

Fix: CI script + CODEOWNERS gating; lineage analysis запускается automatically на каждом PR; results posted to PR comment; required Data Owner sign-off в PR review.

Column-level lineage только на 50% pipelines

Паттерн: dbt models имеют column-level lineage; Spark jobs нет; Airflow custom tasks нет.

Почему плохо: gaps в lineage означают blind spots в impact analysis; schema change может propagate undetected.

Fix: emit OpenLineage events из всех production pipelines (dbt, Spark, Airflow, custom Python). Coverage gap tracked + remediated.

Lineage events ephemeral

Паттерн: Marquez stores events; PostgreSQL retention 90d; older events purged.

Почему плохо: SOX evidence retention 7y; нельзя реконструировать lineage 3 года назад для audit follow-up.

Fix: significant lineage events (CDE-marked schema changes + material run results) exported daily/nightly к S3 object lock 7y. Marquez как operational layer; S3 как evidence layer.

Manual lineage в документации

Паттерн: «lineage задокументирован в Confluence page; updated квартально».

Почему плохо: lineage документации lags behind code; auditor asks current state, документация stale.

Fix: automated lineage из running code (OpenLineage emitters); документация generated, не hand-maintained.

Резюме

  • OpenLineage v1.46.0 (текущий April 2025) — standardised emit format; column-level facets с DIRECT / INDIRECT / MASKING transformation types; PR + sourceCodeLocation + TestRun facets.
  • Marquez (LF AI & Data) — reference backend; PostgreSQL storage; queryable через REST API + UI; events ingested из dbt-openlineage, Airflow OpenLineage provider, Spark integration, custom Python.
  • Lineage-as-control workflow: PR opened → dbt build emits event → Marquez updates graph → CI lineage analysis идентифицирует CDE-marked downstream → PR comment автопостится с impact requirements → Data Owners sign off → CODEOWNERS approval → merge → deploy.
  • Schema gating prevents silent breakage: column drops, type narrowing, rename, semantic changes — все идентифицированы через column-level transformation facets.
  • Marquez = operational layer; S3 object lock = evidence layer (7y retention) для CDE-marked schema changes + material run results.
  • BCBS 239 Principle 2 (data architecture) + Principle 3 (reconciliation to source) — lineage обеспечивает требуемую traceability.
  • Сценарий SwiftRide T+12M — schema change на commission_rules.commission_pct прошёл через 10-step lineage-guarded process; 5 CDE downstream идентифицированы, attested, deployed; PCAOB-defensible.

В M5.9 — capstone lab — построим полный controls matrix для одного CDE (Driver Earnings) с минимум 12 контролей; используем structure objective/activity/evidence + DQ dimensions + SoD + lineage-as-control.

OpenLineage — column-level lineage технически Data Flow Lineage — impact analysis downstream

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. SwiftRide T+15M: engineer modifies `fct_trip_records.surge_multiplier` (DECIMAL(5,3) → DECIMAL(7,5)). Marquez identifies downstream: fct_revenue_daily, fct_driver_earnings, looker_pricing_dashboard, pricing_ml_feature_store, eu_ai_act_disclosure_export. 4 из 5 CDE-marked. Engineer claims «precision widening — backward-compatible, no impact analysis needed». PCAOB-defensible response?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 9