Learning Platform
Глоссарий Troubleshooting
Урок 06.03 · 24 мин
Продвинутый
observabilityperformancecostdashboards

Observability patterns: parsing run_results для time analytics, cost attribution, performance dashboards

manifest и run_results — raw data. Observability — превращение этой data в insights. В этом уроке — production patterns для observability на основе dbt artifacts: time analytics, cost attribution, performance dashboards.

OpenLineage deep dive — automatic data lineage из коробки JSON: json.loads, json.dumps, JSONL streaming, custom encoders

Каждый pattern — реальный, проверенный в production.


Pattern 1 — Time analytics

Цель

Отвечать на вопросы:

  • Какие модели медленные?
  • Что регрессирует?
  • Когда регрессия началась?
  • Где bottlenecks?

Schema

CREATE TABLE dbt_run_history (
    invocation_id VARCHAR PRIMARY KEY,
    detected_at TIMESTAMP,
    dbt_version VARCHAR,
    target VARCHAR,
    elapsed_time FLOAT,
    threads INTEGER,
    total_models INTEGER,
    total_tests INTEGER,
    successes INTEGER,
    errors INTEGER,
    fails INTEGER,
    skipped INTEGER
);

CREATE TABLE dbt_node_runs (
    id SERIAL PRIMARY KEY,
    invocation_id VARCHAR,
    detected_at TIMESTAMP,
    unique_id VARCHAR,
    name VARCHAR,
    resource_type VARCHAR,
    materialized VARCHAR,
    status VARCHAR,
    execution_time FLOAT,
    thread_id VARCHAR,
    rows_affected BIGINT,
    bytes_processed BIGINT,
    query_id VARCHAR,
    compile_seconds FLOAT,
    execute_seconds FLOAT,
    
    FOREIGN KEY (invocation_id) REFERENCES dbt_run_history(invocation_id)
);

CREATE INDEX idx_node_runs_uid_date ON dbt_node_runs(unique_id, detected_at);

Loader

# loaders/run_results_loader.py
import json
import psycopg2
from datetime import datetime

def load_run_results(run_results_path, manifest_path):
    rr = json.load(open(run_results_path))
    m = json.load(open(manifest_path))
    
    conn = psycopg2.connect(DATABASE_URL)
    cur = conn.cursor()
    
    # Header
    metadata = rr['metadata']
    args = rr['args']
    
    counts = {'success': 0, 'pass': 0, 'fail': 0, 'error': 0, 'skipped': 0, 'warn': 0}
    for r in rr['results']:
        counts[r['status']] = counts.get(r['status'], 0) + 1
    
    cur.execute("""
        INSERT INTO dbt_run_history (
            invocation_id, detected_at, dbt_version, target,
            elapsed_time, threads, total_models, total_tests,
            successes, errors, fails, skipped
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT DO NOTHING
    """, (
        metadata['invocation_id'],
        metadata['generated_at'],
        metadata['dbt_version'],
        args.get('target', 'unknown'),
        rr['elapsed_time'],
        args.get('threads', 1),
        sum(1 for r in rr['results'] if r['unique_id'].startswith('model.')),
        sum(1 for r in rr['results'] if r['unique_id'].startswith('test.')),
        counts['success'] + counts['pass'],
        counts['error'],
        counts['fail'],
        counts['skipped']
    ))
    
    # Per-node results
    for r in rr['results']:
        node = m['nodes'].get(r['unique_id']) or m.get('sources', {}).get(r['unique_id'])
        if not node:
            continue
        
        # Phase durations
        compile_secs = 0
        execute_secs = 0
        for phase in r.get('timing', []):
            start = datetime.fromisoformat(phase['started_at'].rstrip('Z'))
            end = datetime.fromisoformat(phase['completed_at'].rstrip('Z'))
            duration = (end - start).total_seconds()
            if phase['name'] == 'compile':
                compile_secs = duration
            elif phase['name'] == 'execute':
                execute_secs = duration
        
        adapter = r.get('adapter_response', {})
        
        cur.execute("""
            INSERT INTO dbt_node_runs (
                invocation_id, detected_at, unique_id, name, resource_type,
                materialized, status, execution_time, thread_id,
                rows_affected, bytes_processed, query_id,
                compile_seconds, execute_seconds
            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """, (
            metadata['invocation_id'],
            metadata['generated_at'],
            r['unique_id'],
            node['name'],
            node['resource_type'],
            node.get('config', {}).get('materialized'),
            r['status'],
            r['execution_time'],
            r['thread_id'],
            adapter.get('rows_affected'),
            adapter.get('bytes_processed'),
            adapter.get('query_id'),
            compile_secs,
            execute_secs
        ))
    
    conn.commit()
    print(f'Loaded {len(rr["results"])} node runs')

Run в CI

- name: Run dbt
  run: dbt build --target prod

- name: Load observability data
  if: always()
  run: python loaders/run_results_loader.py target/run_results.json target/manifest.json

Analytics queries

1. Slowest models (last 7 days):

SELECT
    name,
    materialized,
    AVG(execution_time) AS avg_duration,
    MAX(execution_time) AS max_duration,
    AVG(rows_affected) AS avg_rows,
    AVG(bytes_processed) / 1e9 AS avg_gb,
    COUNT(*) AS runs
FROM dbt_node_runs
WHERE resource_type = 'model'
    AND detected_at >= CURRENT_DATE - 7
    AND status = 'success'
GROUP BY name, materialized
HAVING AVG(execution_time) > 30
ORDER BY avg_duration DESC
LIMIT 20

2. Performance regression detection:

WITH baseline AS (
    SELECT
        unique_id,
        AVG(execution_time) AS avg_30d,
        STDDEV(execution_time) AS stddev_30d
    FROM dbt_node_runs
    WHERE detected_at BETWEEN CURRENT_DATE - 30 AND CURRENT_DATE - 1
        AND status = 'success'
    GROUP BY 1
    HAVING COUNT(*) >= 7
),
current AS (
    SELECT unique_id, AVG(execution_time) AS current_avg
    FROM dbt_node_runs
    WHERE detected_at >= CURRENT_DATE
        AND status = 'success'
    GROUP BY 1
)
SELECT
    c.unique_id,
    c.current_avg,
    b.avg_30d,
    (c.current_avg - b.avg_30d) / b.avg_30d * 100 AS pct_change,
    (c.current_avg - b.avg_30d) / NULLIF(b.stddev_30d, 0) AS z_score
FROM current c
JOIN baseline b USING (unique_id)
WHERE c.current_avg > b.avg_30d + 3 * b.stddev_30d  -- 3-sigma
ORDER BY pct_change DESC
LIMIT 10

3. Compile vs execute breakdown:

SELECT
    name,
    AVG(compile_seconds) AS avg_compile,
    AVG(execute_seconds) AS avg_execute,
    AVG(compile_seconds + execute_seconds) AS avg_total,
    AVG(compile_seconds) / NULLIF(AVG(compile_seconds + execute_seconds), 0) * 100 AS pct_compile
FROM dbt_node_runs
WHERE resource_type = 'model'
    AND detected_at >= CURRENT_DATE - 7
GROUP BY name
HAVING AVG(compile_seconds + execute_seconds) > 10
ORDER BY pct_compile DESC
LIMIT 10

If compile_seconds >> execute_seconds — Jinja heavy, possible optimization.


Pattern 2 — Cost attribution

Цель

Который team / cost center / model тратит больше credits?

Snowflake QUERY_HISTORY pattern

# dbt_project.yml
query-comment:
  comment: "dbt:{{ node.unique_id }}"

Snowflake auto-extracts QUERY_TAG from comment.

-- Match warehouse cost к dbt nodes
WITH warehouse_costs AS (
    SELECT
        SPLIT_PART(query_tag, ':', 2) AS unique_id,
        SUM(credits_used) AS credits_30d,
        SUM(credits_used) * 4.00 AS usd_30d,
        SUM(bytes_scanned) AS bytes_30d,
        COUNT(*) AS run_count_30d,
        AVG(execution_time / 1000.0) AS avg_execution_secs
    FROM snowflake.account_usage.query_history
    WHERE start_time >= CURRENT_DATE - 30
        AND query_tag LIKE 'dbt:%'
    GROUP BY 1
),
dbt_metadata AS (
    SELECT
        unique_id,
        name,
        meta:team::VARCHAR AS team,
        meta:cost_center::VARCHAR AS cost_center,
        materialized,
        access
    FROM dbt_manifest_models  -- loaded from manifest.json
)
SELECT
    dm.team,
    dm.cost_center,
    COUNT(*) AS model_count,
    SUM(wc.usd_30d) AS total_usd,
    AVG(wc.usd_30d) AS avg_usd_per_model,
    SUM(wc.run_count_30d) AS total_runs
FROM dbt_metadata dm
LEFT JOIN warehouse_costs wc USING (unique_id)
WHERE wc.usd_30d > 0
GROUP BY 1, 2
ORDER BY total_usd DESC

BigQuery equivalent

SELECT
    SPLIT(query_tag, ':')[OFFSET(1)] AS unique_id,
    SUM(total_bytes_billed) / POW(1024, 3) AS gb_billed_30d,
    SUM(total_bytes_billed) / POW(1024, 4) * 6.25 AS usd_30d,  -- BQ flat-rate
    COUNT(*) AS runs
FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
    AND labels.dbt_unique_id IS NOT NULL
GROUP BY 1

Dashboard layout

┌─────────────────────────────────────────────────────┐
│ dbt Cost Attribution Dashboard (Last 30 days)      │
├─────────────────────────────────────────────────────┤
│ Total Cost: $42,500    Top Team: Finance ($15,800) │
├─────────────────────────────────────────────────────┤
│ Cost by Team        │ Cost Trend                   │
│ [pie chart]         │ [line chart, days × $]       │
├─────────────────────────────────────────────────────┤
│ Top 10 Expensive Models                            │
│ [bar chart]                                        │
├─────────────────────────────────────────────────────┤
│ Cost Anomalies (Yesterday)                         │
│ - fct_orders: +35% ($420 vs avg $311)              │
│ - dim_users: +50% ($210 vs avg $140)               │
└─────────────────────────────────────────────────────┘

Alert на anomalies

-- Daily check
WITH daily_costs AS (
    SELECT
        DATE(start_time) AS day,
        SPLIT_PART(query_tag, ':', 2) AS unique_id,
        SUM(credits_used) AS daily_credits
    FROM snowflake.account_usage.query_history
    WHERE start_time >= CURRENT_DATE - 7
    GROUP BY 1, 2
),
deltas AS (
    SELECT
        unique_id,
        daily_credits,
        LAG(daily_credits) OVER (PARTITION BY unique_id ORDER BY day) AS prev_credits
    FROM daily_costs
    WHERE day = CURRENT_DATE
)
SELECT
    unique_id,
    daily_credits,
    prev_credits,
    (daily_credits - prev_credits) / prev_credits * 100 AS pct_change
FROM deltas
WHERE prev_credits > 1  -- ignore noise
    AND (daily_credits - prev_credits) / prev_credits > 0.5  -- >50% spike
ORDER BY pct_change DESC

Pipe к Slack:

spikes = run_query(daily_cost_anomalies_sql)
if spikes:
    send_slack({
        'channel': '#data-cost',
        'message': f'Cost spikes detected:\n' + '\n'.join(
            f'  {s["unique_id"]}: ${s["daily_credits"]} ({s["pct_change"]:+.0f}%)'
            for s in spikes[:5]
        )
    })

Pattern 3 — Performance dashboard

Build runtime breakdown

-- Identify slow models, classify them
WITH model_stats AS (
    SELECT
        unique_id,
        name,
        materialized,
        AVG(execution_time) AS avg_duration,
        AVG(rows_affected) AS avg_rows,
        AVG(bytes_processed / 1e9) AS avg_gb_processed,
        COUNT(*) AS run_count
    FROM dbt_node_runs
    WHERE resource_type = 'model'
        AND detected_at >= CURRENT_DATE - 7
        AND status = 'success'
    GROUP BY 1, 2, 3
)
SELECT
    name,
    materialized,
    avg_duration,
    avg_rows,
    avg_gb_processed,
    CASE
        WHEN avg_duration > 300 THEN 'very_slow'
        WHEN avg_duration > 60 THEN 'slow'
        WHEN avg_duration > 10 THEN 'medium'
        ELSE 'fast'
    END AS speed_class,
    CASE
        WHEN avg_rows < 1000 THEN 'small'
        WHEN avg_rows < 1000000 THEN 'medium'
        ELSE 'large'
    END AS volume_class
FROM model_stats
ORDER BY avg_duration DESC

Bottleneck analysis

-- Find models с high execute, low rows — query plan issue
SELECT
    name,
    AVG(execution_time) AS duration,
    AVG(rows_affected) AS rows,
    AVG(execution_time) / NULLIF(AVG(rows_affected), 0) AS secs_per_row
FROM dbt_node_runs
WHERE resource_type = 'model'
    AND materialized != 'view'
    AND detected_at >= CURRENT_DATE - 7
GROUP BY 1
ORDER BY secs_per_row DESC
LIMIT 10

High secs_per_row = potential optimization (indexes, partitions, query rewrite).

Materialization analysis

-- Compare materialization types
SELECT
    materialized,
    COUNT(*) AS model_count,
    AVG(execution_time) AS avg_duration,
    SUM(execution_time) AS total_seconds,
    AVG(bytes_processed / 1e9) AS avg_gb
FROM dbt_node_runs
WHERE resource_type = 'model'
    AND detected_at >= CURRENT_DATE - 7
    AND status = 'success'
GROUP BY 1
ORDER BY total_seconds DESC

Output:

materialized  | count | avg_duration | total_seconds | avg_gb
incremental   | 25    | 45.2         | 1130          | 8.5
table         | 18    | 32.1         | 578           | 4.2
view          | 15    | 0.8          | 12            | 0
ephemeral     | 12    | 2.1          | 25            | 0.3

Incremental — most expensive. Reasonable, что they’re tables с merge logic.


Pattern 4 — Failure tracking

Failed runs analysis

-- Failures last 7 days
SELECT
    unique_id,
    name,
    COUNT(*) AS failure_count,
    MAX(detected_at) AS last_failure
FROM dbt_node_runs
WHERE status IN ('error', 'fail')
    AND detected_at >= CURRENT_DATE - 7
GROUP BY 1, 2
ORDER BY failure_count DESC

Flaky tests (intermittent failures)

WITH test_runs AS (
    SELECT
        unique_id,
        name,
        detected_at,
        status
    FROM dbt_node_runs
    WHERE resource_type IN ('test', 'data_test')
        AND detected_at >= CURRENT_DATE - 30
)
SELECT
    unique_id,
    name,
    COUNT(*) AS total_runs,
    COUNT(*) FILTER (WHERE status = 'fail') AS failures,
    COUNT(*) FILTER (WHERE status = 'pass') AS passes,
    ROUND(COUNT(*) FILTER (WHERE status = 'fail')::FLOAT / COUNT(*) * 100, 1) AS fail_rate
FROM test_runs
GROUP BY 1, 2
HAVING COUNT(*) >= 10
    AND COUNT(*) FILTER (WHERE status = 'fail') > 0
    AND COUNT(*) FILTER (WHERE status = 'fail') < COUNT(*)  -- flaky, not always failing
ORDER BY fail_rate DESC

Tests с 10-90% fail rate = flaky, need investigation.


Pattern 5 — Capacity planning

Project growth

-- Models добавлены по weeks
WITH weekly_inventory AS (
    SELECT
        DATE_TRUNC('week', detected_at) AS week,
        COUNT(DISTINCT unique_id) AS unique_models,
        COUNT(*) AS total_runs
    FROM dbt_node_runs
    WHERE resource_type = 'model'
    GROUP BY 1
)
SELECT * FROM weekly_inventory ORDER BY week

Warehouse utilization

-- Total runtime per day
SELECT
    DATE_TRUNC('day', detected_at) AS day,
    SUM(execution_time) AS total_seconds,
    SUM(execution_time) / 3600 AS total_hours,
    COUNT(DISTINCT unique_id) AS unique_models,
    COUNT(*) AS total_runs
FROM dbt_node_runs
WHERE detected_at >= CURRENT_DATE - 30
GROUP BY 1
ORDER BY 1

Trend chart shows growing dbt usage. Use для capacity planning warehouse sizing.


Pattern 6 — Slack integration

# scripts/notify_failures.py
import json
import requests

def notify_run_failures():
    rr = json.load(open('target/run_results.json'))
    m = json.load(open('target/manifest.json'))
    
    failures = [
        r for r in rr['results']
        if r['status'] in ('error', 'fail', 'runtime error')
    ]
    
    if not failures:
        return
    
    # Group by owner
    by_owner = defaultdict(list)
    for f in failures:
        node = m['nodes'].get(f['unique_id'])
        if not node:
            continue
        owner = node.get('meta', {}).get('owner') or 'unowned'
        by_owner[owner].append({
            'unique_id': f['unique_id'],
            'name': node['name'],
            'status': f['status'],
            'message': f.get('message', '')[:200],
            'failures': f.get('failures'),
            'compiled_code': f.get('compiled_code', '')[:500]
        })
    
    for owner, items in by_owner.items():
        channel = f'#{owner}' if owner != 'unowned' else '#data-quality'
        
        message = f'dbt run failures для {owner}:\n\n'
        for item in items[:5]:
            message += f'- `{item["unique_id"]}` ({item["status"]})\n'
            if item.get('failures'):
                message += f'  Failures: {item["failures"]} rows\n'
            if item.get('message'):
                message += f'  {item["message"][:100]}\n\n'
        
        send_slack(channel, message)

Owner-targeted notifications — reduces noise.


Pattern 7 — Multi-target comparison

Different targets give different perspectives:

-- Compare prod vs ci runtime
WITH target_stats AS (
    SELECT
        target,
        unique_id,
        AVG(execution_time) AS avg_duration,
        AVG(rows_affected) AS avg_rows
    FROM dbt_node_runs n
    JOIN dbt_run_history h USING (invocation_id)
    WHERE n.detected_at >= CURRENT_DATE - 7
    GROUP BY 1, 2
)
SELECT
    unique_id,
    MAX(avg_duration) FILTER (WHERE target = 'prod') AS prod_duration,
    MAX(avg_duration) FILTER (WHERE target = 'ci') AS ci_duration,
    MAX(avg_rows) FILTER (WHERE target = 'prod') AS prod_rows,
    MAX(avg_rows) FILTER (WHERE target = 'ci') AS ci_rows
FROM target_stats
GROUP BY 1
HAVING prod_duration / NULLIF(ci_duration, 0) > 10  -- prod 10x slower than ci
ORDER BY prod_duration DESC

CI uses sample data (fast). Prod uses full data (slow). If ratio reasonable — OK. If extreme — possible scaling issue.


Pattern 8 — Cohort analysis

-- Performance over time per model
SELECT
    unique_id,
    DATE_TRUNC('week', detected_at) AS week,
    AVG(execution_time) AS avg_duration,
    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY execution_time) AS p95_duration
FROM dbt_node_runs
WHERE resource_type = 'model'
    AND unique_id = 'model.proj.fct_orders'
    AND detected_at >= CURRENT_DATE - 90
GROUP BY 1, 2
ORDER BY week

Output trend chart shows whether fct_orders gradually slowing or stable.


Pattern 9 — Test failure clustering

# Find tests что fail together (correlated failures)
from collections import defaultdict
import json

historical_runs = load_historical_runs(days=30)

# For каждой пары tests — count co-failures
co_failures = defaultdict(int)
total_runs = defaultdict(int)

for run in historical_runs:
    failed_uids = {
        r['unique_id'] for r in run['results']
        if r['status'] == 'fail' and r['unique_id'].startswith('test.')
    }
    
    for uid in failed_uids:
        total_runs[uid] += 1
    
    failed_list = list(failed_uids)
    for i, uid1 in enumerate(failed_list):
        for uid2 in failed_list[i+1:]:
            pair = tuple(sorted([uid1, uid2]))
            co_failures[pair] += 1

# Find clusters
correlation_threshold = 0.7
clusters = []
for (uid1, uid2), count in co_failures.items():
    correlation = count / max(total_runs[uid1], total_runs[uid2])
    if correlation > correlation_threshold:
        clusters.append((uid1, uid2, correlation))

# Output: 'Test X and Test Y fail together 80% of time — likely same root cause'

Pattern 10 — Cron job pattern

# cron_observability.py — runs every 5 min on schedule
import time
from datetime import datetime

while True:
    try:
        # Process latest run_results если new
        latest_artifact = get_latest_artifact_from_s3()
        if latest_artifact['invocation_id'] not in loaded_invocations:
            load_run_results(latest_artifact)
            loaded_invocations.add(latest_artifact['invocation_id'])
        
        # Check для regression
        check_regressions()
        
        # Anomaly detection
        detect_anomalies()
        
        # Send alerts
        process_alert_queue()
    except Exception as e:
        log_error(e)
    
    time.sleep(300)  # 5 minutes

Or use Airflow / Dagster для scheduling.


Architecture overview

┌─────────────────────────┐
│ dbt build (CI / cron)   │
└────────────┬────────────┘


┌─────────────────────────┐
│ S3 — artifacts          │
│ /manifests/<run_id>     │
│ /run_results/<run_id>   │
│ /catalogs/<run_id>      │
└────────────┬────────────┘


┌─────────────────────────┐
│ Loader (event-driven    │
│ or scheduled)           │
└────────────┬────────────┘


┌─────────────────────────┐
│ Postgres / Snowflake    │
│ - dbt_run_history       │
│ - dbt_node_runs         │
│ - dbt_test_results      │
│ - dbt_source_freshness  │
└────────────┬────────────┘


┌─────────────────────────┐
│ Metabase / Looker       │
│ - Performance dashboard │
│ - Cost dashboard        │
│ - Failure tracking      │
│ - Capacity planning     │
└────────────┬────────────┘


┌─────────────────────────┐
│ Slack / PagerDuty       │
│ - Alerting              │
└─────────────────────────┘

Ключевые выводы

  1. Time analytics — store per-node run data в database, compute baselines, detect regressions.
  2. Cost attribution через query_tag mechanism + warehouse QUERY_HISTORY join.
  3. Performance dashboards — bottleneck identification, materialization analysis, capacity planning.
  4. Failure tracking — flaky tests, failure clustering, root cause analysis.
  5. Slack integration — owner-targeted notifications, reducing noise.
  6. Multi-target comparison — prod vs ci insights.
  7. Cohort analysis — per-model trends over weeks/months.
  8. Test correlation — finding co-failing tests для shared root causes.
  9. Architecture: artifacts -> S3 -> loader -> analytics DB -> dashboards -> alerts.
  10. Elementary alternative — built-in observability via dbt package; trade-off: less customization but faster setup.
Проверка знанийKnowledge check
Senior показывает dashboard: model X average duration выросла 2.5x за 3 weeks. Tests passing. Что diagnose? Какие artifacts использовать?
ОтветAnswer
Investigation workflow: подтверждаем regression по run_results.json (avg execution_time за 60 дней по неделям — видим скачок в конкретную неделю). Сравниваем manifest snapshots до и после из S3 версионированной коллекции — diff показывает что изменилось в конфиге (например, materialized сменился с table на incremental merge). Подтверждаем git log на соответствующий файл. Анализируем adapter_response: rows_affected упал (incremental работает), но bytes_processed и execution_time выросли. Снимаем query plan через Snowflake QUERY_HISTORY — видим spilled_to_local_storage и full table scan для MERGE join. Решение: переключаем strategy на delete+insert (часто быстрее MERGE для больших таблиц) или добавляем cluster_by. Артефакты: dbt_node_runs (run_results), manifest.json snapshots, Snowflake QUERY_HISTORY, git history, target/run final DDL. Урок: tests passing не означает performance OK, materialization changes требуют performance testing перед merge.
Проверка знанийKnowledge check
Команда хочет понимать, какие модели стоят дороже всего. Implement weekly Slack digest с top 10 expensive models, breakdown by team, week-over-week change.
ОтветAnswer
Архитектура решения: cost-таблица в analytics warehouse, заполняемая из Snowflake QUERY_HISTORY (для BQ — INFORMATION_SCHEMA.JOBS) по query_tag='dbt:unique_id'. Daily refresh: aggregate по unique_id, суммируем credits/USD, bytes_scanned, runs count. Weekly aggregation: SUM текущей недели + previous week, считаем pct_change. Enrichment: join с dbt_metadata.models на unique_id для team/owner из meta. Python скрипт (psycopg2 + requests) строит digest: top 10 by cost, by-team breakdown, total с week-over-week. Format в Slack блоки с trend indicators. Cron — GitHub Actions schedule на понедельник утром. Auto-investigation для моделей с pct_change больше 50: проверить rows_growth, git diff, manifest diff. Per-team Slack channels для targeted accountability. Сравнение с Elementary: custom подход даёт business-team structure и custom alerting; Elementary — built-in. Источники данных: Snowflake QUERY_HISTORY (credits), BQ INFORMATION_SCHEMA.JOBS (bytes), run_results.json (execution_time для DuckDB), manifest.json (team metadata).

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Production team wants comprehensive dbt observability. Pattern для time analytics: store run history. Schema?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4