Use cases manifest.json: docs, Slim CI, observability, lineage, cost attribution
manifest.json — не просто debug artifact. Это API контракт для целой экосистемы tools и workflow. В этом уроке — реальные production use cases с архитектурой:
- docs site rendering
- Slim CI через state:modified
- Observability platforms (Elementary, dbt-osmosis)
- Lineage extraction для BI catalogs
- Cost attribution на project и team levels
- Project health checks (untested coverage, stale models, deprecated)
Каждый use case — production-grade pattern.
Slim CI: state:modified+ deep dive (dbt II) OpenLineage deep dive: automatic data lineage (airflow-course)
Use case 1 — docs site
dbt docs generate производит два артефакта: manifest.json (структура) и catalog.json (column types from warehouse). dbt docs serve запускает Express-like Python server, который рендерит React SPA.
Архитектура
manifest.json + catalog.json
│
v
┌─────────────────────────┐
│ index.html │
│ + bundled JS/CSS │
│ + manifest.json (embed)│
│ + catalog.json (embed) │
└─────────────────────────┘
│
v
React app fetches manifest as JSON,
рендерит:
- Lineage graph (d3.js + child_map/parent_map)
- Model pages (description, columns, tests, code)
- Source pages (loaded_at, freshness)
- Macro pages
- Exposures (downstream consumers)
manifest используется client-side через React:
const manifest = window.dbt.manifest;
const catalog = window.dbt.catalog;
// Render model page
function ModelPage({ uniqueId }) {
const node = manifest.nodes[uniqueId];
const catalogEntry = catalog.nodes[uniqueId];
return (
<div>
<h1>{node.name}</h1>
<p>{node.description}</p>
<ColumnTable columns={node.columns} catalog={catalogEntry} />
<CodeBlock>{node.compiled_code}</CodeBlock>
<LineageGraph
center={uniqueId}
parents={manifest.parent_map[uniqueId]}
children={manifest.child_map[uniqueId]}
/>
<TestsList tests={getTestsFor(uniqueId, manifest)} />
</div>
);
}
Hosting в production
dbt docs serve — для local dev. Production hosting:
- dbt Cloud Explorer — managed
- GitHub Pages — простой и free
- Internal Netlify/Vercel — proper auth + SSO
- S3 + CloudFront — корпоративный setup
- Self-hosted Astro/Next.js — кастомный wrap с auth
# .github/workflows/docs-deploy.yml
- name: Generate docs
run: |
dbt docs generate --target prod
- name: Deploy to GitHub Pages
uses: peaceiris/actions-gh-pages@v3
with:
publish_dir: ./target
github_token: ${{ secrets.GITHUB_TOKEN }}
dbt-osmosis pattern
Перед docs generate можно прогнать dbt-osmosis, чтобы автоматически propagate descriptions через DAG:
dbt-osmosis yaml refactor
Эффект:
- Если column documented в
stg_customers.id, иfct_orders.customer_idссылается через JOIN — description propagates автоматически. - Reduces undocumented columns by 60-80% в realistic projects.
Это работает за счёт чтения manifest и инспекции SQL для column lineage tracking.
Use case 2 — Slim CI через state:modified
Самый практичный use case manifest для day-to-day работы.
Проблема
Обычный CI:
dbt build # builds and tests ALL models
Для проекта с 2000 моделей — 1-3 часа на каждый PR. Дорого и медленно.
Решение — state:modified
# В CI:
dbt build --select "state:modified+" --defer --state baseline/
state:modified— только модели, которые изменились vs baseline+— также все downstream (children)--defer— reference unchanged models from baseline (saves rebuild)--state baseline/— указывает на directory с предыдущим manifest
Как dbt вычисляет state:modified
1. Загрузить current manifest (target/manifest.json)
2. Загрузить baseline manifest (baseline/manifest.json)
3. Для каждой node:
a. Compare checksums (raw_code)
b. Compare config (materialized, unique_key, etc.)
c. Compare upstream (depends_on.nodes)
d. Compare contract (если enforced)
e. Compare YAML changes (column descriptions, data_types)
4. Если ANY changed -> mark modified
5. Output node UIDs
Setup baseline storage
Baseline manifest — это manifest от последнего successful run на main branch. Хранится:
Option A — GitHub Actions artifacts:
- name: Restore baseline manifest
uses: dawidd6/action-download-artifact@v3
with:
name: dbt-manifest
branch: main
path: baseline/
workflow: dbt-build.yml
- name: Slim CI
run: |
dbt build --select "state:modified+" --defer --state baseline/
- name: Save manifest для next runs
if: github.ref == 'refs/heads/main'
uses: actions/upload-artifact@v3
with:
name: dbt-manifest
path: target/manifest.json
Option B — S3 / GCS:
- name: Download baseline
run: |
aws s3 cp s3://my-bucket/dbt/baseline/manifest.json baseline/
- name: Slim CI
run: |
dbt build --select "state:modified+" --defer --state baseline/
Defer mechanism
dbt run --defer --state baseline/ --select my_changed_model
Эффект:
my_changed_modelстроится в CI schema (e.g.,pr_42)- Любые
ref()к unchanged models разрешаются к prod schema (через baseline manifest) - Дешёво — не пересоздаём весь chain
-- my_changed_model.sql
SELECT * FROM {{ ref('stg_customers') }} -- -> resolves к analytics.stg_customers (prod), не pr_42.stg_customers
Trade-offs
Selection syntax
state:modified # All changes
state:modified+ # + downstream
+state:modified # + upstream
state:new # New models only
state:modified.body # Only body changes (raw_code)
state:modified.configs # Only config changes
state:modified.relation # database/schema/alias changes
state:modified.persisted_descriptions # docs changes
state:modified.macros # macro changes (propagates downstream)
state:modified.contract # contract changes
result:error+ # Errored in previous run + downstream
Можно combine:
dbt build --select "state:modified+ tag:nightly" --defer --state baseline/
Use case 3 — observability (Elementary)
Elementary — open source observability tool на основе manifest + run_results + catalog.
Архитектура
dbt run/test
│
▼
target/manifest.json
target/run_results.json
target/catalog.json
│
▼
elementary CLI:
- Reads manifest для node metadata
- Reads run_results для execution status/timing
- Reads catalog для column info
- Compares к historical data в warehouse
- Detects anomalies (volume, freshness, schema)
│
▼
Elementary Cloud / Self-hosted UI
- Dashboard alerts
- Test failure analysis
- Performance regression
- Schema changes
Patterns
1. Test result enrichment:
# Pseudocode of what Elementary does
manifest = load_manifest('target/manifest.json')
run_results = load_run_results('target/run_results.json')
for result in run_results['results']:
if result['unique_id'].startswith('test.'):
# Enrich с manifest data
test_node = manifest['nodes'][result['unique_id']]
enriched = {
'test_name': test_node['name'],
'test_type': test_node['test_metadata']['name'], # not_null, unique, etc.
'column': test_node['column_name'],
'model': test_node['attached_node'],
'severity': test_node['config']['severity'],
'status': result['status'],
'failures': result.get('failures'),
'duration': result['execution_time']
}
send_to_warehouse(enriched)
2. Anomaly detection:
Elementary сохраняет volume и freshness metrics в warehouse. Сравнивает с historical:
-- Generated by Elementary
WITH historical AS (
SELECT
model_unique_id,
AVG(row_count) AS avg_count,
STDDEV(row_count) AS stddev
FROM {{ ref('elementary_volume_history') }}
WHERE detected_at >= CURRENT_DATE - INTERVAL '30 day'
GROUP BY 1
),
current AS (
SELECT
model_unique_id,
row_count
FROM {{ ref('elementary_current_volume') }}
)
SELECT
c.model_unique_id,
c.row_count,
h.avg_count,
(c.row_count - h.avg_count) / NULLIF(h.stddev, 0) AS z_score
FROM current c
JOIN historical h USING (model_unique_id)
WHERE ABS((c.row_count - h.avg_count) / NULLIF(h.stddev, 0)) > 3 -- 3-sigma threshold
manifest provides node identity и metadata. Elementary correlates с warehouse data.
3. Lineage-aware alerting:
# If fct_orders fails, alert downstream
failed_model = 'model.jaffle_shop.fct_orders'
downstream = walk_downstream(failed_model, manifest['child_map'])
for affected in downstream:
if affected.startswith('exposure.'):
exp = manifest['exposures'][affected]
send_slack_alert(
owner=exp['owner']['email'],
message=f"Upstream model {failed_model} failed. "
f"Affects: {exp['url']}"
)
Это proactive alerting к consumers, не только to data team.
Use case 4 — lineage extraction для BI catalogs
Цель: BI tools (Tableau, Atlan, DataHub) показывают где данные originate. manifest содержит full lineage.
OpenLineage formato
OpenLineage — стандарт для lineage events. dbt-core (с 1.5+) emits OpenLineage events:
# dbt_project.yml
on-run-end:
- "{{ dbt_lineage.send_run_events() }}" # hypothetical
# Or в dbt-core: env var
OPENLINEAGE_URL=https://marquez.example.com
OPENLINEAGE_API_KEY=...
Event format:
{
"eventType": "COMPLETE",
"eventTime": "2026-05-19T10:30:00Z",
"run": {"runId": "..."},
"job": {
"namespace": "dbt://jaffle_shop",
"name": "fct_orders"
},
"inputs": [
{"namespace": "dbt://jaffle_shop", "name": "stg_orders"},
{"namespace": "dbt://jaffle_shop", "name": "stg_customers"}
],
"outputs": [
{
"namespace": "duckdb://jaffle_shop",
"name": "main.fct_orders",
"facets": {
"schema": {
"fields": [
{"name": "order_id", "type": "BIGINT"},
{"name": "amount", "type": "NUMERIC(10,2)"}
]
}
}
}
]
}
В Marquez / DataHub / OpenLineage backend — events построены через walk manifest:
def emit_lineage_events(manifest, run_results):
for result in run_results['results']:
node = manifest['nodes'].get(result['unique_id'])
if not node or node['resource_type'] != 'model':
continue
inputs = [
{
'namespace': f"dbt://{manifest['metadata']['project_name']}",
'name': dep.split('.')[-1]
}
for dep in node['depends_on']['nodes']
]
output = {
'namespace': f"warehouse://{node['database']}",
'name': f"{node['schema']}.{node['alias']}",
'facets': {
'schema': {
'fields': [
{'name': col_name, 'type': col_info.get('data_type', 'unknown')}
for col_name, col_info in node.get('columns', {}).items()
]
}
}
}
emit_event({
'eventType': 'COMPLETE',
'eventTime': result['timing'][-1]['completed_at'],
'job': {'name': node['name']},
'inputs': inputs,
'outputs': [output]
})
Atlan / DataHub ingestion
# Atlan dbt connector
from atlan.client import AtlanClient
client = AtlanClient(...)
manifest = load_manifest('target/manifest.json')
for uid, node in manifest['nodes'].items():
if node['resource_type'] != 'model':
continue
asset = client.dbt.create_model(
guid=uid,
name=node['name'],
qualified_name=f"{node['database']}.{node['schema']}.{node['alias']}",
description=node.get('description'),
tags=node['tags'],
owner=node.get('meta', {}).get('owner'),
upstream_assets=[
client.dbt.qualify(dep) for dep in node['depends_on']['nodes']
]
)
Result: Atlan показывает model в catalog с full upstream lineage и docs.
Use case 5 — cost attribution
Чтобы понять, кому достаётся warehouse cost для оптимизации.
Подход 1 — manifest meta + run_results
def cost_by_team(manifest, run_results):
"""Attribute warehouse cost to teams via config.meta."""
by_team = defaultdict(lambda: {'runs': 0, 'duration_seconds': 0})
for result in run_results['results']:
uid = result['unique_id']
node = manifest['nodes'].get(uid)
if not node or node['resource_type'] != 'model':
continue
team = node.get('meta', {}).get('team', 'unattributed')
by_team[team]['runs'] += 1
by_team[team]['duration_seconds'] += result['execution_time']
return dict(by_team)
Подход 2 — Snowflake QUERY_HISTORY join
Более точный — actual credits:
WITH dbt_models AS (
SELECT
unique_id,
meta:team::VARCHAR AS team
FROM {{ ref('dbt_manifest_models') }} -- pre-loaded from manifest
),
warehouse_costs AS (
SELECT
QUERY_TAG, -- set by dbt с unique_id
SUM(CREDITS_USED) AS credits
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE QUERY_TAG LIKE 'dbt:%'
AND START_TIME >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY QUERY_TAG
)
SELECT
m.team,
SUM(w.credits) AS total_credits,
SUM(w.credits) * 4.00 AS estimated_usd -- Snowflake credit price
FROM dbt_models m
JOIN warehouse_costs w
ON SPLIT_PART(w.QUERY_TAG, ':', 2) = m.unique_id
GROUP BY m.team
ORDER BY total_credits DESC
QUERY_TAG set через dbt-core (Snowflake-specific):
# dbt_project.yml
query-comment:
comment: "dbt:{{ node.unique_id }}"
Это автоматически проставляет QUERY_TAG в Snowflake, что allows joining к manifest.
Output dashboard
Team | Models | Total runs | Avg duration | Credits | USD
---------------|--------|------------|--------------|---------|------
finance | 25 | 4200 | 12.5s | 8500 | $34,000
marketing | 18 | 3100 | 8.3s | 4200 | $16,800
engineering | 12 | 1800 | 6.1s | 1500 | $6,000
unattributed | 8 | 1200 | 15.2s | 2800 | $11,200 ← chargeback!
unattributed highlights gaps в meta — models без owner. Push команду добавить meta:
# fct_orders.yml
models:
- name: fct_orders
meta:
team: finance
cost_center: "FIN-001"
Use case 6 — project health checks
Каждое CI / nightly job проверяет invariants:
1. Untested coverage
def check_untested(manifest, threshold=0.8):
"""Fail if test coverage below threshold."""
models = [
n for n in manifest['nodes'].values()
if n['resource_type'] == 'model'
]
tested_uids = {
t['attached_node']
for t in manifest['nodes'].values()
if t['resource_type'] in ('test', 'data_test') and t.get('attached_node')
}
coverage = sum(
1 for m in models
if m['unique_id'] in tested_uids
) / len(models) if models else 0
if coverage < threshold:
raise SystemExit(
f"Test coverage {coverage:.1%} below threshold {threshold:.1%}"
)
return coverage
2. Undocumented public models
def check_public_docs(manifest):
"""All public models must have description + column docs."""
violations = []
for uid, node in manifest['nodes'].items():
if node['resource_type'] != 'model':
continue
if node['config'].get('access') != 'public':
continue
if not node.get('description'):
violations.append((uid, 'missing_description'))
undocumented_cols = [
col_name for col_name, col_data in node.get('columns', {}).items()
if not col_data.get('description')
]
if undocumented_cols:
violations.append((uid, f'undocumented_columns: {undocumented_cols}'))
if violations:
for uid, issue in violations:
print(f"FAIL: {uid}: {issue}")
raise SystemExit(1)
3. Deprecated models past due
from datetime import date
def check_deprecation_overdue(manifest):
"""Alert on deprecated models past their date."""
today = date.today()
overdue = []
for uid, node in manifest['nodes'].items():
if node['resource_type'] != 'model':
continue
deprecation = node['config'].get('deprecation_date')
if not deprecation:
continue
deprecation_date = date.fromisoformat(deprecation)
if today > deprecation_date:
overdue.append((uid, deprecation_date))
return overdue
4. Stale models (no runs in N days)
def find_stale_models(manifest, run_results, days=7):
"""Models that haven't run in N days."""
today = date.today()
cutoff = today - timedelta(days=days)
# Models that ran recently (from current run_results)
recent_uids = {
r['unique_id'] for r in run_results['results']
if r['status'] != 'skipped'
}
stale = []
for uid, node in manifest['nodes'].items():
if node['resource_type'] != 'model':
continue
if node['config'].get('enabled') is False:
continue
if uid not in recent_uids:
stale.append(uid)
return stale
5. Group/access governance violations
def check_governance(manifest):
"""Check public models in groups, no orphaned access."""
violations = []
for uid, node in manifest['nodes'].items():
if node['resource_type'] != 'model':
continue
access = node['config'].get('access')
group = node['config'].get('group')
# Public models must be in a group
if access == 'public' and not group:
violations.append((uid, 'public_no_group'))
# Group must exist
if group:
group_uid = f"group.{node['package_name']}.{group}"
if group_uid not in manifest.get('groups', {}):
violations.append((uid, f'undefined_group: {group}'))
return violations
CI integration
# .github/workflows/dbt-health-check.yml
- name: Generate manifest
run: dbt parse
- name: Run health checks
run: |
python scripts/health_check.py target/manifest.json target/run_results.json
Аутпут:
Health check report
═══════════════════
[x] Test coverage: 85.2% (above 80% threshold)
[x] Public models documentation: 100%
[ ] Deprecated overdue: 2 models
- model.jaffle_shop.legacy_revenue (deprecation: 2026-01-01)
- model.jaffle_shop.old_orders (deprecation: 2025-12-15)
[x] Stale models: 3 (below 5 threshold)
[x] Governance violations: 0
Exit code: 1 (deprecated overdue requires attention)
Use case 7 — dbt Mesh contract enforcement
Для cross-project ref:
def find_breaking_consumer_changes(producer_manifest, consumer_manifest):
"""Detect breaking changes для consumers of public producer models."""
findings = []
# Collect public models в producer
public_models = {
uid: n for uid, n in producer_manifest['nodes'].items()
if n['resource_type'] == 'model' and n['config'].get('access') == 'public'
}
# Consumer refs к public models
for uid, consumer_node in consumer_manifest['nodes'].items():
for ref in consumer_node.get('refs', []):
if not ref.get('package'):
continue # Same-project ref
# Find target в producer manifest
target_uid = f"model.{ref['package']}.{ref['name']}"
if ref.get('version'):
target_uid += f".v{ref['version']}"
if target_uid not in public_models:
findings.append({
'consumer_uid': uid,
'issue': f'refs non-public {target_uid}'
})
continue
# Check schema compatibility
producer = public_models[target_uid]
if producer['config'].get('contract', {}).get('enforced'):
# Validate consumer expectations
# ... (deeper check)
pass
return findings
В dbt Mesh — это core CI gate для governance.
Use case 8 — query-time meta filtering
В BI tools — фильтровать exposures по meta:
# Atlan integration
exposures_high_priority = [
exp for uid, exp in manifest['exposures'].items()
if exp['meta'].get('priority') == 'P0'
]
for exp in exposures_high_priority:
notify_oncall_team(exp)
Or в Slack bot:
# /dbt-impact command
def impact_of_model(uid, manifest):
downstream = walk_downstream(uid, manifest['child_map'])
impacted_exposures = [
manifest['exposures'][d]
for d in downstream
if d.startswith('exposure.')
]
return {
'model': uid,
'downstream_models': len([d for d in downstream if d.startswith('model.')]),
'exposures': [
{'name': e['name'], 'url': e.get('url'), 'owner': e['owner']['email']}
for e in impacted_exposures
]
}
User asks /dbt-impact fct_orders:
fct_orders impacts:
- 12 downstream models
- 3 exposures:
* Executive Dashboard ([email protected]) -> looker.com/dashboards/42
* Sales KPIs ([email protected]) -> tableau.com/views/sales
* Finance Forecast ([email protected]) -> notebook
Ключевые выводы
- docs site — manifest + catalog рендерятся React SPA. dbt-osmosis enriches column docs.
- Slim CI через state:modified — 10-50x speedup. Baseline manifest critical, defer для cost saving.
- Elementary observability — manifest для metadata, run_results для status, catalog для columns. Anomaly detection через warehouse history.
- Lineage extraction: OpenLineage events, Atlan/DataHub ingestion. depends_on + columns + exposures формируют full lineage.
- Cost attribution через meta.team + Snowflake QUERY_HISTORY join. QUERY_TAG автоматически set через dbt-core.
- Project health checks: untested coverage, undocumented public models, deprecated overdue, stale, governance violations. Каждый — CI gate.
- dbt Mesh governance — cross-project ref validation, contract change detection.
- Query-time use cases: Slack bots для impact, BI catalogs для filtering, on-call notifications.
- manifest — API контракт между dbt и всей экосистемой tools.
- Production-grade integration: schema version validation, defensive parsing, caching, telemetry о supported versions.