Learning Platform
Глоссарий Troubleshooting
Урок 05.05 · 24 мин
Продвинутый
manifestobservabilityslim-cilineagecost

Use cases manifest.json: docs, Slim CI, observability, lineage, cost attribution

manifest.json — не просто debug artifact. Это API контракт для целой экосистемы tools и workflow. В этом уроке — реальные production use cases с архитектурой:

  1. docs site rendering
  2. Slim CI через state:modified
  3. Observability platforms (Elementary, dbt-osmosis)
  4. Lineage extraction для BI catalogs
  5. Cost attribution на project и team levels
  6. Project health checks (untested coverage, stale models, deprecated)

Каждый use case — production-grade pattern.


Slim CI: state:modified+ deep dive (dbt II) OpenLineage deep dive: automatic data lineage (airflow-course)

Use case 1 — docs site

dbt docs generate производит два артефакта: manifest.json (структура) и catalog.json (column types from warehouse). dbt docs serve запускает Express-like Python server, который рендерит React SPA.

Архитектура

manifest.json + catalog.json

        v
┌─────────────────────────┐
│  index.html             │
│  + bundled JS/CSS       │
│  + manifest.json (embed)│
│  + catalog.json (embed) │
└─────────────────────────┘

        v
React app fetches manifest as JSON,
рендерит:
  - Lineage graph (d3.js + child_map/parent_map)
  - Model pages (description, columns, tests, code)
  - Source pages (loaded_at, freshness)
  - Macro pages
  - Exposures (downstream consumers)

manifest используется client-side через React:

const manifest = window.dbt.manifest;
const catalog = window.dbt.catalog;

// Render model page
function ModelPage({ uniqueId }) {
  const node = manifest.nodes[uniqueId];
  const catalogEntry = catalog.nodes[uniqueId];
  
  return (
    <div>
      <h1>{node.name}</h1>
      <p>{node.description}</p>
      <ColumnTable columns={node.columns} catalog={catalogEntry} />
      <CodeBlock>{node.compiled_code}</CodeBlock>
      <LineageGraph
        center={uniqueId}
        parents={manifest.parent_map[uniqueId]}
        children={manifest.child_map[uniqueId]}
      />
      <TestsList tests={getTestsFor(uniqueId, manifest)} />
    </div>
  );
}

Hosting в production

dbt docs serve — для local dev. Production hosting:

  • dbt Cloud Explorer — managed
  • GitHub Pages — простой и free
  • Internal Netlify/Vercel — proper auth + SSO
  • S3 + CloudFront — корпоративный setup
  • Self-hosted Astro/Next.js — кастомный wrap с auth
# .github/workflows/docs-deploy.yml
- name: Generate docs
  run: |
    dbt docs generate --target prod
    
- name: Deploy to GitHub Pages
  uses: peaceiris/actions-gh-pages@v3
  with:
    publish_dir: ./target
    github_token: ${{ secrets.GITHUB_TOKEN }}

dbt-osmosis pattern

Перед docs generate можно прогнать dbt-osmosis, чтобы автоматически propagate descriptions через DAG:

dbt-osmosis yaml refactor

Эффект:

  • Если column documented в stg_customers.id, и fct_orders.customer_id ссылается через JOIN — description propagates автоматически.
  • Reduces undocumented columns by 60-80% в realistic projects.

Это работает за счёт чтения manifest и инспекции SQL для column lineage tracking.


Use case 2 — Slim CI через state:modified

Самый практичный use case manifest для day-to-day работы.

Проблема

Обычный CI:

dbt build  # builds and tests ALL models

Для проекта с 2000 моделей — 1-3 часа на каждый PR. Дорого и медленно.

Решение — state:modified

# В CI:
dbt build --select "state:modified+" --defer --state baseline/
  • state:modified — только модели, которые изменились vs baseline
  • + — также все downstream (children)
  • --defer — reference unchanged models from baseline (saves rebuild)
  • --state baseline/ — указывает на directory с предыдущим manifest

Как dbt вычисляет state:modified

1. Загрузить current manifest (target/manifest.json)
2. Загрузить baseline manifest (baseline/manifest.json)
3. Для каждой node:
   a. Compare checksums (raw_code)
   b. Compare config (materialized, unique_key, etc.)
   c. Compare upstream (depends_on.nodes)
   d. Compare contract (если enforced)
   e. Compare YAML changes (column descriptions, data_types)
4. Если ANY changed -> mark modified
5. Output node UIDs

Setup baseline storage

Baseline manifest — это manifest от последнего successful run на main branch. Хранится:

Option A — GitHub Actions artifacts:

- name: Restore baseline manifest
  uses: dawidd6/action-download-artifact@v3
  with:
    name: dbt-manifest
    branch: main
    path: baseline/
    workflow: dbt-build.yml

- name: Slim CI
  run: |
    dbt build --select "state:modified+" --defer --state baseline/

- name: Save manifest для next runs
  if: github.ref == 'refs/heads/main'
  uses: actions/upload-artifact@v3
  with:
    name: dbt-manifest
    path: target/manifest.json

Option B — S3 / GCS:

- name: Download baseline
  run: |
    aws s3 cp s3://my-bucket/dbt/baseline/manifest.json baseline/

- name: Slim CI
  run: |
    dbt build --select "state:modified+" --defer --state baseline/

Defer mechanism

dbt run --defer --state baseline/ --select my_changed_model

Эффект:

  • my_changed_model строится в CI schema (e.g., pr_42)
  • Любые ref() к unchanged models разрешаются к prod schema (через baseline manifest)
  • Дешёво — не пересоздаём весь chain
-- my_changed_model.sql
SELECT * FROM {{ ref('stg_customers') }}  -- -> resolves к analytics.stg_customers (prod), не pr_42.stg_customers

Trade-offs

Slim CI trade-offs

Selection syntax

state:modified              # All changes
state:modified+             # + downstream
+state:modified             # + upstream
state:new                   # New models only
state:modified.body         # Only body changes (raw_code)
state:modified.configs      # Only config changes
state:modified.relation     # database/schema/alias changes
state:modified.persisted_descriptions  # docs changes
state:modified.macros       # macro changes (propagates downstream)
state:modified.contract     # contract changes
result:error+               # Errored in previous run + downstream

Можно combine:

dbt build --select "state:modified+ tag:nightly" --defer --state baseline/

Use case 3 — observability (Elementary)

Elementary — open source observability tool на основе manifest + run_results + catalog.

Архитектура

dbt run/test


target/manifest.json
target/run_results.json
target/catalog.json


elementary CLI:
  - Reads manifest для node metadata
  - Reads run_results для execution status/timing
  - Reads catalog для column info
  - Compares к historical data в warehouse
  - Detects anomalies (volume, freshness, schema)


Elementary Cloud / Self-hosted UI
  - Dashboard alerts
  - Test failure analysis
  - Performance regression
  - Schema changes

Patterns

1. Test result enrichment:

# Pseudocode of what Elementary does
manifest = load_manifest('target/manifest.json')
run_results = load_run_results('target/run_results.json')

for result in run_results['results']:
    if result['unique_id'].startswith('test.'):
        # Enrich с manifest data
        test_node = manifest['nodes'][result['unique_id']]
        enriched = {
            'test_name': test_node['name'],
            'test_type': test_node['test_metadata']['name'],  # not_null, unique, etc.
            'column': test_node['column_name'],
            'model': test_node['attached_node'],
            'severity': test_node['config']['severity'],
            'status': result['status'],
            'failures': result.get('failures'),
            'duration': result['execution_time']
        }
        send_to_warehouse(enriched)

2. Anomaly detection:

Elementary сохраняет volume и freshness metrics в warehouse. Сравнивает с historical:

-- Generated by Elementary
WITH historical AS (
    SELECT
        model_unique_id,
        AVG(row_count) AS avg_count,
        STDDEV(row_count) AS stddev
    FROM {{ ref('elementary_volume_history') }}
    WHERE detected_at >= CURRENT_DATE - INTERVAL '30 day'
    GROUP BY 1
),
current AS (
    SELECT
        model_unique_id,
        row_count
    FROM {{ ref('elementary_current_volume') }}
)
SELECT
    c.model_unique_id,
    c.row_count,
    h.avg_count,
    (c.row_count - h.avg_count) / NULLIF(h.stddev, 0) AS z_score
FROM current c
JOIN historical h USING (model_unique_id)
WHERE ABS((c.row_count - h.avg_count) / NULLIF(h.stddev, 0)) > 3  -- 3-sigma threshold

manifest provides node identity и metadata. Elementary correlates с warehouse data.

3. Lineage-aware alerting:

# If fct_orders fails, alert downstream
failed_model = 'model.jaffle_shop.fct_orders'
downstream = walk_downstream(failed_model, manifest['child_map'])

for affected in downstream:
    if affected.startswith('exposure.'):
        exp = manifest['exposures'][affected]
        send_slack_alert(
            owner=exp['owner']['email'],
            message=f"Upstream model {failed_model} failed. "
                    f"Affects: {exp['url']}"
        )

Это proactive alerting к consumers, не только to data team.


Use case 4 — lineage extraction для BI catalogs

Цель: BI tools (Tableau, Atlan, DataHub) показывают где данные originate. manifest содержит full lineage.

OpenLineage formato

OpenLineage — стандарт для lineage events. dbt-core (с 1.5+) emits OpenLineage events:

# dbt_project.yml
on-run-end:
  - "{{ dbt_lineage.send_run_events() }}"  # hypothetical

# Or в dbt-core: env var
OPENLINEAGE_URL=https://marquez.example.com
OPENLINEAGE_API_KEY=...

Event format:

{
  "eventType": "COMPLETE",
  "eventTime": "2026-05-19T10:30:00Z",
  "run": {"runId": "..."},
  "job": {
    "namespace": "dbt://jaffle_shop",
    "name": "fct_orders"
  },
  "inputs": [
    {"namespace": "dbt://jaffle_shop", "name": "stg_orders"},
    {"namespace": "dbt://jaffle_shop", "name": "stg_customers"}
  ],
  "outputs": [
    {
      "namespace": "duckdb://jaffle_shop",
      "name": "main.fct_orders",
      "facets": {
        "schema": {
          "fields": [
            {"name": "order_id", "type": "BIGINT"},
            {"name": "amount", "type": "NUMERIC(10,2)"}
          ]
        }
      }
    }
  ]
}

В Marquez / DataHub / OpenLineage backend — events построены через walk manifest:

def emit_lineage_events(manifest, run_results):
    for result in run_results['results']:
        node = manifest['nodes'].get(result['unique_id'])
        if not node or node['resource_type'] != 'model':
            continue
        
        inputs = [
            {
                'namespace': f"dbt://{manifest['metadata']['project_name']}",
                'name': dep.split('.')[-1]
            }
            for dep in node['depends_on']['nodes']
        ]
        
        output = {
            'namespace': f"warehouse://{node['database']}",
            'name': f"{node['schema']}.{node['alias']}",
            'facets': {
                'schema': {
                    'fields': [
                        {'name': col_name, 'type': col_info.get('data_type', 'unknown')}
                        for col_name, col_info in node.get('columns', {}).items()
                    ]
                }
            }
        }
        
        emit_event({
            'eventType': 'COMPLETE',
            'eventTime': result['timing'][-1]['completed_at'],
            'job': {'name': node['name']},
            'inputs': inputs,
            'outputs': [output]
        })

Atlan / DataHub ingestion

# Atlan dbt connector
from atlan.client import AtlanClient

client = AtlanClient(...)

manifest = load_manifest('target/manifest.json')

for uid, node in manifest['nodes'].items():
    if node['resource_type'] != 'model':
        continue
    
    asset = client.dbt.create_model(
        guid=uid,
        name=node['name'],
        qualified_name=f"{node['database']}.{node['schema']}.{node['alias']}",
        description=node.get('description'),
        tags=node['tags'],
        owner=node.get('meta', {}).get('owner'),
        upstream_assets=[
            client.dbt.qualify(dep) for dep in node['depends_on']['nodes']
        ]
    )

Result: Atlan показывает model в catalog с full upstream lineage и docs.


Use case 5 — cost attribution

Чтобы понять, кому достаётся warehouse cost для оптимизации.

Подход 1 — manifest meta + run_results

def cost_by_team(manifest, run_results):
    """Attribute warehouse cost to teams via config.meta."""
    by_team = defaultdict(lambda: {'runs': 0, 'duration_seconds': 0})
    
    for result in run_results['results']:
        uid = result['unique_id']
        node = manifest['nodes'].get(uid)
        if not node or node['resource_type'] != 'model':
            continue
        
        team = node.get('meta', {}).get('team', 'unattributed')
        by_team[team]['runs'] += 1
        by_team[team]['duration_seconds'] += result['execution_time']
    
    return dict(by_team)

Подход 2 — Snowflake QUERY_HISTORY join

Более точный — actual credits:

WITH dbt_models AS (
    SELECT
        unique_id,
        meta:team::VARCHAR AS team
    FROM {{ ref('dbt_manifest_models') }}  -- pre-loaded from manifest
),
warehouse_costs AS (
    SELECT
        QUERY_TAG,  -- set by dbt с unique_id
        SUM(CREDITS_USED) AS credits
    FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
    WHERE QUERY_TAG LIKE 'dbt:%'
        AND START_TIME >= CURRENT_DATE - INTERVAL '7 days'
    GROUP BY QUERY_TAG
)
SELECT
    m.team,
    SUM(w.credits) AS total_credits,
    SUM(w.credits) * 4.00 AS estimated_usd  -- Snowflake credit price
FROM dbt_models m
JOIN warehouse_costs w
  ON SPLIT_PART(w.QUERY_TAG, ':', 2) = m.unique_id
GROUP BY m.team
ORDER BY total_credits DESC

QUERY_TAG set через dbt-core (Snowflake-specific):

# dbt_project.yml
query-comment:
  comment: "dbt:{{ node.unique_id }}"

Это автоматически проставляет QUERY_TAG в Snowflake, что allows joining к manifest.

Output dashboard

Team           | Models | Total runs | Avg duration | Credits | USD
---------------|--------|------------|--------------|---------|------
finance        | 25     | 4200       | 12.5s        | 8500    | $34,000
marketing      | 18     | 3100       | 8.3s         | 4200    | $16,800
engineering    | 12     | 1800       | 6.1s         | 1500    | $6,000
unattributed   | 8      | 1200       | 15.2s        | 2800    | $11,200  ← chargeback!

unattributed highlights gaps в meta — models без owner. Push команду добавить meta:

# fct_orders.yml
models:
  - name: fct_orders
    meta:
      team: finance
      cost_center: "FIN-001"

Use case 6 — project health checks

Каждое CI / nightly job проверяет invariants:

1. Untested coverage

def check_untested(manifest, threshold=0.8):
    """Fail if test coverage below threshold."""
    models = [
        n for n in manifest['nodes'].values()
        if n['resource_type'] == 'model'
    ]
    tested_uids = {
        t['attached_node']
        for t in manifest['nodes'].values()
        if t['resource_type'] in ('test', 'data_test') and t.get('attached_node')
    }
    
    coverage = sum(
        1 for m in models
        if m['unique_id'] in tested_uids
    ) / len(models) if models else 0
    
    if coverage < threshold:
        raise SystemExit(
            f"Test coverage {coverage:.1%} below threshold {threshold:.1%}"
        )
    return coverage

2. Undocumented public models

def check_public_docs(manifest):
    """All public models must have description + column docs."""
    violations = []
    for uid, node in manifest['nodes'].items():
        if node['resource_type'] != 'model':
            continue
        if node['config'].get('access') != 'public':
            continue
        
        if not node.get('description'):
            violations.append((uid, 'missing_description'))
        
        undocumented_cols = [
            col_name for col_name, col_data in node.get('columns', {}).items()
            if not col_data.get('description')
        ]
        if undocumented_cols:
            violations.append((uid, f'undocumented_columns: {undocumented_cols}'))
    
    if violations:
        for uid, issue in violations:
            print(f"FAIL: {uid}: {issue}")
        raise SystemExit(1)

3. Deprecated models past due

from datetime import date

def check_deprecation_overdue(manifest):
    """Alert on deprecated models past their date."""
    today = date.today()
    overdue = []
    for uid, node in manifest['nodes'].items():
        if node['resource_type'] != 'model':
            continue
        deprecation = node['config'].get('deprecation_date')
        if not deprecation:
            continue
        
        deprecation_date = date.fromisoformat(deprecation)
        if today > deprecation_date:
            overdue.append((uid, deprecation_date))
    
    return overdue

4. Stale models (no runs in N days)

def find_stale_models(manifest, run_results, days=7):
    """Models that haven't run in N days."""
    today = date.today()
    cutoff = today - timedelta(days=days)
    
    # Models that ran recently (from current run_results)
    recent_uids = {
        r['unique_id'] for r in run_results['results']
        if r['status'] != 'skipped'
    }
    
    stale = []
    for uid, node in manifest['nodes'].items():
        if node['resource_type'] != 'model':
            continue
        if node['config'].get('enabled') is False:
            continue
        if uid not in recent_uids:
            stale.append(uid)
    
    return stale

5. Group/access governance violations

def check_governance(manifest):
    """Check public models in groups, no orphaned access."""
    violations = []
    for uid, node in manifest['nodes'].items():
        if node['resource_type'] != 'model':
            continue
        
        access = node['config'].get('access')
        group = node['config'].get('group')
        
        # Public models must be in a group
        if access == 'public' and not group:
            violations.append((uid, 'public_no_group'))
        
        # Group must exist
        if group:
            group_uid = f"group.{node['package_name']}.{group}"
            if group_uid not in manifest.get('groups', {}):
                violations.append((uid, f'undefined_group: {group}'))
    
    return violations

CI integration

# .github/workflows/dbt-health-check.yml
- name: Generate manifest
  run: dbt parse

- name: Run health checks
  run: |
    python scripts/health_check.py target/manifest.json target/run_results.json

Аутпут:

Health check report
═══════════════════

[x] Test coverage: 85.2% (above 80% threshold)
[x] Public models documentation: 100%
[ ] Deprecated overdue: 2 models
  - model.jaffle_shop.legacy_revenue (deprecation: 2026-01-01)
  - model.jaffle_shop.old_orders (deprecation: 2025-12-15)
[x] Stale models: 3 (below 5 threshold)
[x] Governance violations: 0

Exit code: 1 (deprecated overdue requires attention)

Use case 7 — dbt Mesh contract enforcement

Для cross-project ref:

def find_breaking_consumer_changes(producer_manifest, consumer_manifest):
    """Detect breaking changes для consumers of public producer models."""
    findings = []
    
    # Collect public models в producer
    public_models = {
        uid: n for uid, n in producer_manifest['nodes'].items()
        if n['resource_type'] == 'model' and n['config'].get('access') == 'public'
    }
    
    # Consumer refs к public models
    for uid, consumer_node in consumer_manifest['nodes'].items():
        for ref in consumer_node.get('refs', []):
            if not ref.get('package'):
                continue  # Same-project ref
            
            # Find target в producer manifest
            target_uid = f"model.{ref['package']}.{ref['name']}"
            if ref.get('version'):
                target_uid += f".v{ref['version']}"
            
            if target_uid not in public_models:
                findings.append({
                    'consumer_uid': uid,
                    'issue': f'refs non-public {target_uid}'
                })
                continue
            
            # Check schema compatibility
            producer = public_models[target_uid]
            if producer['config'].get('contract', {}).get('enforced'):
                # Validate consumer expectations
                # ... (deeper check)
                pass
    
    return findings

В dbt Mesh — это core CI gate для governance.


Use case 8 — query-time meta filtering

В BI tools — фильтровать exposures по meta:

# Atlan integration
exposures_high_priority = [
    exp for uid, exp in manifest['exposures'].items()
    if exp['meta'].get('priority') == 'P0'
]

for exp in exposures_high_priority:
    notify_oncall_team(exp)

Or в Slack bot:

# /dbt-impact command
def impact_of_model(uid, manifest):
    downstream = walk_downstream(uid, manifest['child_map'])
    impacted_exposures = [
        manifest['exposures'][d]
        for d in downstream
        if d.startswith('exposure.')
    ]
    return {
        'model': uid,
        'downstream_models': len([d for d in downstream if d.startswith('model.')]),
        'exposures': [
            {'name': e['name'], 'url': e.get('url'), 'owner': e['owner']['email']}
            for e in impacted_exposures
        ]
    }

User asks /dbt-impact fct_orders:

fct_orders impacts:
  - 12 downstream models
  - 3 exposures:
    * Executive Dashboard ([email protected]) -> looker.com/dashboards/42
    * Sales KPIs ([email protected]) -> tableau.com/views/sales
    * Finance Forecast ([email protected]) -> notebook

Ключевые выводы

  1. docs site — manifest + catalog рендерятся React SPA. dbt-osmosis enriches column docs.
  2. Slim CI через state:modified — 10-50x speedup. Baseline manifest critical, defer для cost saving.
  3. Elementary observability — manifest для metadata, run_results для status, catalog для columns. Anomaly detection через warehouse history.
  4. Lineage extraction: OpenLineage events, Atlan/DataHub ingestion. depends_on + columns + exposures формируют full lineage.
  5. Cost attribution через meta.team + Snowflake QUERY_HISTORY join. QUERY_TAG автоматически set через dbt-core.
  6. Project health checks: untested coverage, undocumented public models, deprecated overdue, stale, governance violations. Каждый — CI gate.
  7. dbt Mesh governance — cross-project ref validation, contract change detection.
  8. Query-time use cases: Slack bots для impact, BI catalogs для filtering, on-call notifications.
  9. manifest — API контракт между dbt и всей экосистемой tools.
  10. Production-grade integration: schema version validation, defensive parsing, caching, telemetry о supported versions.
Проверка знанийKnowledge check
Команда implements Slim CI. Baseline manifest хранится в S3, обновляется после каждого main merge. После 2 weeks работы они замечают, что CI начал тратить такое же время как full build. Что произошло, как diagnose, как fix?
ОтветAnswer
**Possible causes** для regression Slim CI обратно к full build time:\n\n**Diagnosis steps**:\n\n**1. Проверить selection output**:\n\n```bash\ndbt ls --select "state:modified+" --defer --state baseline/\n```\n\nЕсли вывод показывает ~всё проект — что-то ломает state comparison.\n\n**2. Проверить baseline freshness**:\n\n```bash\n# В CI\nls -la baseline/manifest.json\ncat baseline/manifest.json | jq '.metadata.generated_at'\ncat baseline/manifest.json | jq '.metadata.dbt_version'\ncat baseline/manifest.json | jq '.metadata.invocation_id'\n\n# Compare с current\ncat target/manifest.json | jq '.metadata.generated_at'\ncat target/manifest.json | jq '.metadata.dbt_version'\n```\n\n**Diagnostic finding 1 — dbt version mismatch**:\n\n```\nbaseline dbt_version: 1.7.10\ncurrent dbt_version: 1.11.0\n```\n\nMajor version upgrade -> schema migration. state:modified internal comparison treats всё как modified.\n\n**Fix**:\n\n```bash\n# После dbt upgrade — invalidate baseline\naws s3 rm s3://bucket/baseline/manifest.json\n\n# Затем next main run пишет fresh baseline\n```\n\n**Diagnostic finding 2 — partial parse корпит**:\n\n```bash\ndbt parse --no-partial-parse # forces full parse\n# Compare manifest result\n```\n\nЕсли after `--no-partial-parse` selection дает sensible count, значит partial_parse.msgpack corrupted. Возможно из-за race condition в S3 sync.\n\n**Fix**:\n\n```yaml\n- name: Clean partial parse cache в CI\n run: |\n rm -rf target/partial_parse.msgpack\n dbt parse\n```\n\n**Diagnostic finding 3 — common macro modified**:\n\n```bash\ndbt ls --select "state:modified.macros+" --defer --state baseline/\n```\n\nIf shows tons of nodes — central macro was modified, и все using-models marked modified.\n\nИзвестный pattern — изменение в `generate_schema_name` или `get_custom_alias` ломает всё.\n\n**Fix options**:\n\n- **Option A**: exclude macro-only modifications:\n\n```bash\n# Only direct body/config changes\ndbt build --select "state:modified.body+ state:modified.configs+ state:new+" \\\n --defer --state baseline/\n```\n\nRisky — потенциально skip legitimate downstream changes.\n\n- **Option B**: separate ci jobs:\n\n```yaml\n- name: Slim CI — direct changes\n run: dbt build --select "state:modified.body+ state:modified.configs+" --defer --state baseline/\n\n- name: Macro change CI — full rebuild affected\n if: contains(github.event.pull_request.labels.*.name, 'macro-changed')\n run: dbt build --defer --state baseline/ # full\n```\n\nLabel approach — manual gate для macro changes.\n\n**Diagnostic finding 4 — stale baseline (delete от main)**:\n\nIf baseline в S3 not updated после recent merges, current PR will show всё modifications as new (relative к ancient baseline).\n\n```bash\ncat baseline/manifest.json | jq '.metadata.generated_at'\n# 2026-03-01 — stale 2.5 months\n```\n\n**Fix — fix baseline update workflow**:\n\n```yaml\n# .github/workflows/dbt-main-build.yml\non:\n push:\n branches: [main]\n\njobs:\n build-and-baseline:\n runs-on: ubuntu-latest\n steps:\n - uses: actions/checkout@v4\n - run: dbt build --target prod\n \n - name: Upload baseline\n if: success() # ВАЖНО: только если build succeeded\n run: |\n aws s3 cp target/manifest.json s3://bucket/baseline/manifest.json\n aws s3 cp target/manifest.json s3://bucket/baseline/manifest_$(date +%Y%m%d_%H%M%S).json\n\n - name: Validate baseline written\n run: |\n # Verify upload\n aws s3 ls s3://bucket/baseline/manifest.json\n # Validate it's recent\n age=$(aws s3api head-object --bucket bucket --key baseline/manifest.json --query 'LastModified')\n echo "Baseline timestamp: $age"\n```\n\nIf baseline workflow itself fails (e.g., dbt build fails on main, никто не upload baseline), и PRs comparing к ancient baseline.\n\n**Add baseline freshness gate**:\n\n```bash\n# В Slim CI\nbaseline_age=$(python -c "\nimport json, datetime\nm = json.load(open('baseline/manifest.json'))\ngen = datetime.datetime.fromisoformat(m['metadata']['generated_at'].rstrip('Z'))\nage_hours = (datetime.datetime.utcnow() - gen).total_seconds() / 3600\nprint(int(age_hours))\n")\n\nif [ "$baseline_age" -gt 48 ]; then\n echo "WARN: Baseline is $baseline_age hours old. Slim CI may be inaccurate."\nfi\n```\n\n**Diagnostic finding 5 — refactor / mass rename**:\n\nЕсли разработчик rename'нул много моделей or moved files, у них new unique_ids -> marked as new + old marked as deleted. State:modified+ может trigger full downstream rebuild.\n\n**Fix — план refactor wisely**:\n\n- Запускать refactor в separate PR с full build expected\n- Add CI label `large-refactor` skipping slim mode\n- Document expectation в PR description\n\n**Diagnostic finding 6 — yml-only changes treated as modified**:\n\n```bash\ndbt ls --select "state:modified.persisted_descriptions+"\n# Tons of nodes — docs propagation from dbt-osmosis updated YAML descriptions\n```\n\ndbt-osmosis может regenerate descriptions, и каждая updated description marks node modified. Если CI configured без exclude, всё triggers.\n\n**Fix**:\n\n```bash\n# Exclude doc-only changes\ndbt build --select "state:modified.body+ state:modified.configs+ state:modified.contract+" \\\n --defer --state baseline/\n```\n\nOr separate gate для docs-only PRs.\n\n**Comprehensive monitoring**:\n\n```python\n# scripts/slim_ci_metrics.py\nimport json\nfrom datetime import datetime\n\ndef report_slim_ci_health(baseline_path, current_path):\n baseline = json.load(open(baseline_path))\n current = json.load(open(current_path))\n \n baseline_time = datetime.fromisoformat(baseline['metadata']['generated_at'].rstrip('Z'))\n current_time = datetime.fromisoformat(current['metadata']['generated_at'].rstrip('Z'))\n age_hours = (current_time - baseline_time).total_seconds() / 3600\n \n print(f'Baseline age: {age_hours:.1f} hours')\n print(f'Baseline dbt: {baseline["metadata"]["dbt_version"]}')\n print(f'Current dbt: {current["metadata"]["dbt_version"]}')\n \n # Run dbt ls с detail\n import subprocess\n for selector in [\n 'state:modified',\n 'state:modified.body',\n 'state:modified.configs',\n 'state:modified.macros',\n 'state:modified.persisted_descriptions',\n 'state:new',\n ]:\n result = subprocess.run(\n ['dbt', 'ls', '--select', f'{selector}+', '--defer', '--state', 'baseline/'],\n capture_output=True, text=True\n )\n count = len(result.stdout.strip().split('\\n'))\n print(f' {selector}: {count} nodes')\n```\n\nOutput шows breakdown который helps identify root cause.\n\n**Long-term fixes**:\n\n1. **Baseline freshness alerts** — if baseline > 48h old, alert.\n2. **Schema version compatibility check** — fail loudly если major dbt upgrade.\n3. **CI metric collection** — track \"% modified per PR\" over time; spike -> investigation.\n4. **macro/yml change labels** — separate PRs trigger different CI paths.\n5. **Documentation** — runbook что делать когда Slim CI regresses.\n6. **Frequent baseline updates** — even nightly cron if main rarely merged.\n7. **Multiple baselines** — keep 24h/1week/1month rolling baselines.\n\n**Production reality**: Slim CI requires ongoing maintenance, not set-and-forget. Major dbt upgrades, macro changes, refactors all break assumption — но fix цикл proven и known.\n\nThis is **production engineering** — observability into CI itself, не just dbt models.
Проверка знанийKnowledge check
Senior хочет dashboard 'Top 10 моделей по cost x consumers count'. Combine manifest, run_results, Snowflake QUERY_HISTORY. Опиши data pipeline и architecture.
ОтветAnswer
**Цель**: ранжировать модели по total impact — cost (warehouse credits) + reach (downstream consumers).\n\n**Pipeline architecture**:\n\n```\n┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐\n│ manifest.json │ │ run_results.json │ │ Snowflake │\n│ (every dbt run) │ │ (every dbt run) │ │ QUERY_HISTORY │\n└──────────┬──────────┘ └──────────┬──────────┘ └──────────┬──────────┘\n │ │ │\n v v v\n [Loader script] [Loader script] [SQL query]\n │ │ │\n └──────────────┬────────────┴───────────────────────────┘\n │\n v\n ┌──────────────────────────┐\n │ Analytics warehouse │\n │ (Snowflake / DuckDB) │\n │ │\n │ - models_metadata │\n │ - run_history │\n │ - cost_per_query │\n └──────────────┬───────────┘\n │\n v\n ┌────────────────┐\n │ Dashboard │\n │ (Metabase/ │\n │ Looker) │\n └────────────────┘\n```\n\n**Step 1 — Load manifest**:\n\n```python\n# Script runs after every dbt run\nimport json\n\ndef extract_models_metadata(manifest_path):\n manifest = json.load(open(manifest_path))\n \n rows = []\n for uid, node in manifest['nodes'].items():\n if node['resource_type'] != 'model':\n continue\n \n # Compute downstream count\n downstream = walk_downstream(uid, manifest['child_map'])\n \n rows.append({\n 'unique_id': uid,\n 'name': node['name'],\n 'package': node['package_name'],\n 'schema': node['schema'],\n 'database': node['database'],\n 'materialized': node['config']['materialized'],\n 'unique_key': node['config'].get('unique_key'),\n 'team': node.get('meta', {}).get('team', 'unattributed'),\n 'owner': node.get('meta', {}).get('owner'),\n 'is_public': node['config'].get('access') == 'public',\n 'has_contract': node['config'].get('contract', {}).get('enforced', False),\n 'downstream_models_count': sum(1 for d in downstream if d.startswith('model.')),\n 'downstream_exposures_count': sum(1 for d in downstream if d.startswith('exposure.')),\n 'downstream_tests_count': sum(1 for d in downstream if d.startswith('test.')),\n 'collection_timestamp': manifest['metadata']['generated_at'],\n 'invocation_id': manifest['metadata']['invocation_id']\n })\n \n return rows\n\ndef walk_downstream(start_uid, child_map):\n visited = set()\n queue = [start_uid]\n while queue:\n node = queue.pop(0)\n if node in visited:\n continue\n visited.add(node)\n queue.extend(child_map.get(node, []))\n return visited - {start_uid}\n\n# Load в analytics warehouse\nrows = extract_models_metadata('target/manifest.json')\nload_to_warehouse('analytics.dbt_metadata.models_metadata', rows)\n```\n\n**Step 2 — Load run_results**:\n\n```python\ndef extract_run_results(run_results_path):\n rr = json.load(open(run_results_path))\n \n rows = []\n for result in rr['results']:\n if not result['unique_id'].startswith('model.'):\n continue\n \n adapter = result.get('adapter_response', {})\n rows.append({\n 'invocation_id': rr['metadata']['invocation_id'],\n 'unique_id': result['unique_id'],\n 'status': result['status'],\n 'execution_time_seconds': result['execution_time'],\n 'rows_affected': adapter.get('rows_affected'),\n 'bytes_processed': adapter.get('bytes_processed'),\n 'query_id': adapter.get('query_id'), # Snowflake QUERY_ID\n 'run_at': result['timing'][-1]['completed_at'] if result.get('timing') else None\n })\n \n return rows\n\nrows = extract_run_results('target/run_results.json')\nload_to_warehouse('analytics.dbt_metadata.run_history', rows)\n```\n\n**Step 3 — Query QUERY_HISTORY for credits**:\n\n```sql\nCREATE OR REPLACE TABLE analytics.dbt_metadata.cost_per_query AS\nSELECT\n QUERY_ID,\n QUERY_TAG,\n CREDITS_USED,\n CREDITS_USED * 4.00 AS USD,\n START_TIME,\n EXECUTION_TIME / 1000.0 AS execution_seconds,\n BYTES_SCANNED,\n ROWS_PRODUCED,\n WAREHOUSE_SIZE,\n -- Extract dbt unique_id from query_tag\n CASE\n WHEN QUERY_TAG LIKE 'dbt:%'\n THEN SPLIT_PART(QUERY_TAG, ':', 2)\n END AS dbt_unique_id\nFROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY\nWHERE START_TIME не меньше CURRENT_DATE - INTERVAL '30 day'\n AND QUERY_TAG LIKE 'dbt:%';\n```\n\n**Step 4 — Dashboard query**:\n\n```sql\nWITH model_costs AS (\n SELECT\n c.dbt_unique_id AS unique_id,\n SUM(c.CREDITS_USED) AS total_credits_30d,\n SUM(c.USD) AS total_usd_30d,\n COUNT(*) AS run_count_30d,\n AVG(c.execution_seconds) AS avg_duration_seconds,\n AVG(c.BYTES_SCANNED) AS avg_bytes_scanned\n FROM analytics.dbt_metadata.cost_per_query c\n WHERE c.START_TIME не меньше CURRENT_DATE - INTERVAL '30 day'\n GROUP BY c.dbt_unique_id\n),\nmodel_metadata AS (\n SELECT\n m.unique_id,\n m.name,\n m.team,\n m.owner,\n m.is_public,\n m.has_contract,\n m.downstream_models_count,\n m.downstream_exposures_count\n FROM analytics.dbt_metadata.models_metadata m\n WHERE m.collection_timestamp = (\n SELECT MAX(collection_timestamp) FROM analytics.dbt_metadata.models_metadata\n )\n)\nSELECT\n mm.name,\n mm.team,\n mm.owner,\n mc.total_usd_30d,\n mc.run_count_30d,\n mc.avg_duration_seconds,\n mm.downstream_models_count,\n mm.downstream_exposures_count,\n mm.is_public,\n mm.has_contract,\n -- Impact score = cost × reach\n mc.total_usd_30d * (mm.downstream_models_count + mm.downstream_exposures_count * 5) AS impact_score\nFROM model_metadata mm\nLEFT JOIN model_costs mc ON mm.unique_id = mc.unique_id\nWHERE mc.total_usd_30d IS NOT NULL\nORDER BY impact_score DESC\nLIMIT 10;\n```\n\nOutput:\n\n```\nname | team | total_usd_30d | downstream_models | downstream_exposures | impact_score\nfct_orders | finance | $2,840 | 25 | 8 | 184,600\nfct_revenue | finance | $1,920 | 18 | 5 | 82,560\ndim_customers | marketing | $1,250 | 30 | 12 | 112,500\nstg_orders | finance | $950 | 20 | 0 | 19,000\n...\n```\n\n**Step 5 — Visualization**:\n\nDashboard в Metabase / Looker / Tableau:\n\n- Top 10 bar chart (impact_score)\n- Cost breakdown by team (pie/stacked bar)\n- Trend lines (cost over time per model)\n- Filterable by tag, materialization, team\n- Drilldown — clicking model shows its lineage и individual run history\n\n**Step 6 — Alerts**:\n\n```yaml\n# Pseudo-alert config\nalerts:\n - name: "High-cost model regression"\n condition: "daily_cost > 30d_avg * 2"\n notification: "slack:#data-team"\n \n - name: "Critical model failing"\n condition: "impact_score > 50000 AND last_run_status != 'success'"\n notification: "pagerduty:data-oncall"\n \n - name: "Untested high-impact model"\n condition: "impact_score > 100000 AND downstream_tests_count = 0"\n notification: "slack:#data-quality"\n```\n\n**Step 7 — Pipeline scheduling**:\n\n```yaml\n# Apache Airflow / Dagster\nworkflows:\n load_manifest:\n trigger: after_dbt_run\n script: scripts/load_manifest.py target/manifest.json\n \n load_run_results:\n trigger: after_dbt_run\n script: scripts/load_run_results.py target/run_results.json\n \n refresh_costs:\n schedule: "0 * * * *" # hourly\n sql: "INSERT INTO cost_per_query SELECT * FROM QUERY_HISTORY WHERE NOT EXISTS ..."\n \n refresh_dashboard:\n schedule: "0 6 * * *" # daily 6am\n action: refresh_metabase_card_42\n```\n\n**Key data sources**:\n\n| Source | What | Frequency |\n|---|---|---|\n| manifest.json | Model metadata, lineage | Every dbt run |\n| run_results.json | Execution status, durations, query IDs | Every dbt run |\n| QUERY_HISTORY | Actual credits, bytes scanned, query plans | Hourly bulk |\n| Catalog.json | Column types | Daily (после dbt docs generate) |\n\n**Why query_tag is critical**:\n\n```yaml\n# dbt_project.yml\nquery-comment:\n comment: "dbt:{{ node.unique_id }}"\n```\n\nThis emits SQL comment с unique_id. Snowflake parses comment as QUERY_TAG -> join к manifest works.\n\nWithout query_tag — no way to attribute warehouse cost к dbt models. Critical setup.\n\n**Considerations**:\n\n**1. Data freshness**: QUERY_HISTORY has ~45min latency в Snowflake. Real-time cost не достижим без callback hooks.\n\n**2. Multi-warehouse**: если разные models используют разные warehouses (development XS vs prod XL), credit rates differ. Account для warehouse size.\n\n**3. Adapter coverage**: Snowflake QUERY_HISTORY pattern. BigQuery — INFORMATION_SCHEMA.JOBS_BY_PROJECT. Redshift — STL_QUERY. DuckDB — нет cost (local). Pattern adaptable к каждому adapter.\n\n**4. Privacy**: QUERY_HISTORY contains SQL strings -> potential PII. Restrict access к analytics.dbt_metadata schema.\n\n**5. Backfill costs**: первые dbt-osmosis docs propagation — высокий cost для всех models. Excluded от baseline вычисления.\n\n**6. Cost attribution disputes**: shared models attributed к 'unattributed' team. Implement requirement в CI:\n\n```python\nfor model in manifest['nodes'].values():\n if model['resource_type'] == 'model' and not model.get('meta', {}).get('team'):\n raise ValueError(f'{model["unique_id"]}: missing meta.team')\n```\n\n**Production output** — Senior managers can answer:\n- \"Which model costs us most?\"\n- \"Which team should optimize next?\"\n- \"What's revenue impact if X fails?\"\n- \"Are we under-investing in critical models?\"\n\nThis transforms dbt from black box to **business intelligence about itself**.\n\nFoundation: manifest.json + run_results.json + adapter QUERY_HISTORY equivalents.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Команда хочет Slim CI. Какие artifacts нужно сохранять для baseline, как именно implement state:modified+?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5