Observability patterns: parsing run_results для time analytics, cost attribution, performance dashboards
manifest и run_results — raw data. Observability — превращение этой data в insights. В этом уроке — production patterns для observability на основе dbt artifacts: time analytics, cost attribution, performance dashboards.
OpenLineage deep dive — automatic data lineage из коробки JSON: json.loads, json.dumps, JSONL streaming, custom encodersКаждый pattern — реальный, проверенный в production.
Pattern 1 — Time analytics
Цель
Отвечать на вопросы:
- Какие модели медленные?
- Что регрессирует?
- Когда регрессия началась?
- Где bottlenecks?
Schema
CREATE TABLE dbt_run_history (
invocation_id VARCHAR PRIMARY KEY,
detected_at TIMESTAMP,
dbt_version VARCHAR,
target VARCHAR,
elapsed_time FLOAT,
threads INTEGER,
total_models INTEGER,
total_tests INTEGER,
successes INTEGER,
errors INTEGER,
fails INTEGER,
skipped INTEGER
);
CREATE TABLE dbt_node_runs (
id SERIAL PRIMARY KEY,
invocation_id VARCHAR,
detected_at TIMESTAMP,
unique_id VARCHAR,
name VARCHAR,
resource_type VARCHAR,
materialized VARCHAR,
status VARCHAR,
execution_time FLOAT,
thread_id VARCHAR,
rows_affected BIGINT,
bytes_processed BIGINT,
query_id VARCHAR,
compile_seconds FLOAT,
execute_seconds FLOAT,
FOREIGN KEY (invocation_id) REFERENCES dbt_run_history(invocation_id)
);
CREATE INDEX idx_node_runs_uid_date ON dbt_node_runs(unique_id, detected_at);
Loader
# loaders/run_results_loader.py
import json
import psycopg2
from datetime import datetime
def load_run_results(run_results_path, manifest_path):
rr = json.load(open(run_results_path))
m = json.load(open(manifest_path))
conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()
# Header
metadata = rr['metadata']
args = rr['args']
counts = {'success': 0, 'pass': 0, 'fail': 0, 'error': 0, 'skipped': 0, 'warn': 0}
for r in rr['results']:
counts[r['status']] = counts.get(r['status'], 0) + 1
cur.execute("""
INSERT INTO dbt_run_history (
invocation_id, detected_at, dbt_version, target,
elapsed_time, threads, total_models, total_tests,
successes, errors, fails, skipped
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT DO NOTHING
""", (
metadata['invocation_id'],
metadata['generated_at'],
metadata['dbt_version'],
args.get('target', 'unknown'),
rr['elapsed_time'],
args.get('threads', 1),
sum(1 for r in rr['results'] if r['unique_id'].startswith('model.')),
sum(1 for r in rr['results'] if r['unique_id'].startswith('test.')),
counts['success'] + counts['pass'],
counts['error'],
counts['fail'],
counts['skipped']
))
# Per-node results
for r in rr['results']:
node = m['nodes'].get(r['unique_id']) or m.get('sources', {}).get(r['unique_id'])
if not node:
continue
# Phase durations
compile_secs = 0
execute_secs = 0
for phase in r.get('timing', []):
start = datetime.fromisoformat(phase['started_at'].rstrip('Z'))
end = datetime.fromisoformat(phase['completed_at'].rstrip('Z'))
duration = (end - start).total_seconds()
if phase['name'] == 'compile':
compile_secs = duration
elif phase['name'] == 'execute':
execute_secs = duration
adapter = r.get('adapter_response', {})
cur.execute("""
INSERT INTO dbt_node_runs (
invocation_id, detected_at, unique_id, name, resource_type,
materialized, status, execution_time, thread_id,
rows_affected, bytes_processed, query_id,
compile_seconds, execute_seconds
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
metadata['invocation_id'],
metadata['generated_at'],
r['unique_id'],
node['name'],
node['resource_type'],
node.get('config', {}).get('materialized'),
r['status'],
r['execution_time'],
r['thread_id'],
adapter.get('rows_affected'),
adapter.get('bytes_processed'),
adapter.get('query_id'),
compile_secs,
execute_secs
))
conn.commit()
print(f'Loaded {len(rr["results"])} node runs')
Run в CI
- name: Run dbt
run: dbt build --target prod
- name: Load observability data
if: always()
run: python loaders/run_results_loader.py target/run_results.json target/manifest.json
Analytics queries
1. Slowest models (last 7 days):
SELECT
name,
materialized,
AVG(execution_time) AS avg_duration,
MAX(execution_time) AS max_duration,
AVG(rows_affected) AS avg_rows,
AVG(bytes_processed) / 1e9 AS avg_gb,
COUNT(*) AS runs
FROM dbt_node_runs
WHERE resource_type = 'model'
AND detected_at >= CURRENT_DATE - 7
AND status = 'success'
GROUP BY name, materialized
HAVING AVG(execution_time) > 30
ORDER BY avg_duration DESC
LIMIT 20
2. Performance regression detection:
WITH baseline AS (
SELECT
unique_id,
AVG(execution_time) AS avg_30d,
STDDEV(execution_time) AS stddev_30d
FROM dbt_node_runs
WHERE detected_at BETWEEN CURRENT_DATE - 30 AND CURRENT_DATE - 1
AND status = 'success'
GROUP BY 1
HAVING COUNT(*) >= 7
),
current AS (
SELECT unique_id, AVG(execution_time) AS current_avg
FROM dbt_node_runs
WHERE detected_at >= CURRENT_DATE
AND status = 'success'
GROUP BY 1
)
SELECT
c.unique_id,
c.current_avg,
b.avg_30d,
(c.current_avg - b.avg_30d) / b.avg_30d * 100 AS pct_change,
(c.current_avg - b.avg_30d) / NULLIF(b.stddev_30d, 0) AS z_score
FROM current c
JOIN baseline b USING (unique_id)
WHERE c.current_avg > b.avg_30d + 3 * b.stddev_30d -- 3-sigma
ORDER BY pct_change DESC
LIMIT 10
3. Compile vs execute breakdown:
SELECT
name,
AVG(compile_seconds) AS avg_compile,
AVG(execute_seconds) AS avg_execute,
AVG(compile_seconds + execute_seconds) AS avg_total,
AVG(compile_seconds) / NULLIF(AVG(compile_seconds + execute_seconds), 0) * 100 AS pct_compile
FROM dbt_node_runs
WHERE resource_type = 'model'
AND detected_at >= CURRENT_DATE - 7
GROUP BY name
HAVING AVG(compile_seconds + execute_seconds) > 10
ORDER BY pct_compile DESC
LIMIT 10
If compile_seconds >> execute_seconds — Jinja heavy, possible optimization.
Pattern 2 — Cost attribution
Цель
Который team / cost center / model тратит больше credits?
Snowflake QUERY_HISTORY pattern
# dbt_project.yml
query-comment:
comment: "dbt:{{ node.unique_id }}"
Snowflake auto-extracts QUERY_TAG from comment.
-- Match warehouse cost к dbt nodes
WITH warehouse_costs AS (
SELECT
SPLIT_PART(query_tag, ':', 2) AS unique_id,
SUM(credits_used) AS credits_30d,
SUM(credits_used) * 4.00 AS usd_30d,
SUM(bytes_scanned) AS bytes_30d,
COUNT(*) AS run_count_30d,
AVG(execution_time / 1000.0) AS avg_execution_secs
FROM snowflake.account_usage.query_history
WHERE start_time >= CURRENT_DATE - 30
AND query_tag LIKE 'dbt:%'
GROUP BY 1
),
dbt_metadata AS (
SELECT
unique_id,
name,
meta:team::VARCHAR AS team,
meta:cost_center::VARCHAR AS cost_center,
materialized,
access
FROM dbt_manifest_models -- loaded from manifest.json
)
SELECT
dm.team,
dm.cost_center,
COUNT(*) AS model_count,
SUM(wc.usd_30d) AS total_usd,
AVG(wc.usd_30d) AS avg_usd_per_model,
SUM(wc.run_count_30d) AS total_runs
FROM dbt_metadata dm
LEFT JOIN warehouse_costs wc USING (unique_id)
WHERE wc.usd_30d > 0
GROUP BY 1, 2
ORDER BY total_usd DESC
BigQuery equivalent
SELECT
SPLIT(query_tag, ':')[OFFSET(1)] AS unique_id,
SUM(total_bytes_billed) / POW(1024, 3) AS gb_billed_30d,
SUM(total_bytes_billed) / POW(1024, 4) * 6.25 AS usd_30d, -- BQ flat-rate
COUNT(*) AS runs
FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
AND labels.dbt_unique_id IS NOT NULL
GROUP BY 1
Dashboard layout
┌─────────────────────────────────────────────────────┐
│ dbt Cost Attribution Dashboard (Last 30 days) │
├─────────────────────────────────────────────────────┤
│ Total Cost: $42,500 Top Team: Finance ($15,800) │
├─────────────────────────────────────────────────────┤
│ Cost by Team │ Cost Trend │
│ [pie chart] │ [line chart, days × $] │
├─────────────────────────────────────────────────────┤
│ Top 10 Expensive Models │
│ [bar chart] │
├─────────────────────────────────────────────────────┤
│ Cost Anomalies (Yesterday) │
│ - fct_orders: +35% ($420 vs avg $311) │
│ - dim_users: +50% ($210 vs avg $140) │
└─────────────────────────────────────────────────────┘
Alert на anomalies
-- Daily check
WITH daily_costs AS (
SELECT
DATE(start_time) AS day,
SPLIT_PART(query_tag, ':', 2) AS unique_id,
SUM(credits_used) AS daily_credits
FROM snowflake.account_usage.query_history
WHERE start_time >= CURRENT_DATE - 7
GROUP BY 1, 2
),
deltas AS (
SELECT
unique_id,
daily_credits,
LAG(daily_credits) OVER (PARTITION BY unique_id ORDER BY day) AS prev_credits
FROM daily_costs
WHERE day = CURRENT_DATE
)
SELECT
unique_id,
daily_credits,
prev_credits,
(daily_credits - prev_credits) / prev_credits * 100 AS pct_change
FROM deltas
WHERE prev_credits > 1 -- ignore noise
AND (daily_credits - prev_credits) / prev_credits > 0.5 -- >50% spike
ORDER BY pct_change DESC
Pipe к Slack:
spikes = run_query(daily_cost_anomalies_sql)
if spikes:
send_slack({
'channel': '#data-cost',
'message': f'Cost spikes detected:\n' + '\n'.join(
f' {s["unique_id"]}: ${s["daily_credits"]} ({s["pct_change"]:+.0f}%)'
for s in spikes[:5]
)
})
Pattern 3 — Performance dashboard
Build runtime breakdown
-- Identify slow models, classify them
WITH model_stats AS (
SELECT
unique_id,
name,
materialized,
AVG(execution_time) AS avg_duration,
AVG(rows_affected) AS avg_rows,
AVG(bytes_processed / 1e9) AS avg_gb_processed,
COUNT(*) AS run_count
FROM dbt_node_runs
WHERE resource_type = 'model'
AND detected_at >= CURRENT_DATE - 7
AND status = 'success'
GROUP BY 1, 2, 3
)
SELECT
name,
materialized,
avg_duration,
avg_rows,
avg_gb_processed,
CASE
WHEN avg_duration > 300 THEN 'very_slow'
WHEN avg_duration > 60 THEN 'slow'
WHEN avg_duration > 10 THEN 'medium'
ELSE 'fast'
END AS speed_class,
CASE
WHEN avg_rows < 1000 THEN 'small'
WHEN avg_rows < 1000000 THEN 'medium'
ELSE 'large'
END AS volume_class
FROM model_stats
ORDER BY avg_duration DESC
Bottleneck analysis
-- Find models с high execute, low rows — query plan issue
SELECT
name,
AVG(execution_time) AS duration,
AVG(rows_affected) AS rows,
AVG(execution_time) / NULLIF(AVG(rows_affected), 0) AS secs_per_row
FROM dbt_node_runs
WHERE resource_type = 'model'
AND materialized != 'view'
AND detected_at >= CURRENT_DATE - 7
GROUP BY 1
ORDER BY secs_per_row DESC
LIMIT 10
High secs_per_row = potential optimization (indexes, partitions, query rewrite).
Materialization analysis
-- Compare materialization types
SELECT
materialized,
COUNT(*) AS model_count,
AVG(execution_time) AS avg_duration,
SUM(execution_time) AS total_seconds,
AVG(bytes_processed / 1e9) AS avg_gb
FROM dbt_node_runs
WHERE resource_type = 'model'
AND detected_at >= CURRENT_DATE - 7
AND status = 'success'
GROUP BY 1
ORDER BY total_seconds DESC
Output:
materialized | count | avg_duration | total_seconds | avg_gb
incremental | 25 | 45.2 | 1130 | 8.5
table | 18 | 32.1 | 578 | 4.2
view | 15 | 0.8 | 12 | 0
ephemeral | 12 | 2.1 | 25 | 0.3
Incremental — most expensive. Reasonable, что they’re tables с merge logic.
Pattern 4 — Failure tracking
Failed runs analysis
-- Failures last 7 days
SELECT
unique_id,
name,
COUNT(*) AS failure_count,
MAX(detected_at) AS last_failure
FROM dbt_node_runs
WHERE status IN ('error', 'fail')
AND detected_at >= CURRENT_DATE - 7
GROUP BY 1, 2
ORDER BY failure_count DESC
Flaky tests (intermittent failures)
WITH test_runs AS (
SELECT
unique_id,
name,
detected_at,
status
FROM dbt_node_runs
WHERE resource_type IN ('test', 'data_test')
AND detected_at >= CURRENT_DATE - 30
)
SELECT
unique_id,
name,
COUNT(*) AS total_runs,
COUNT(*) FILTER (WHERE status = 'fail') AS failures,
COUNT(*) FILTER (WHERE status = 'pass') AS passes,
ROUND(COUNT(*) FILTER (WHERE status = 'fail')::FLOAT / COUNT(*) * 100, 1) AS fail_rate
FROM test_runs
GROUP BY 1, 2
HAVING COUNT(*) >= 10
AND COUNT(*) FILTER (WHERE status = 'fail') > 0
AND COUNT(*) FILTER (WHERE status = 'fail') < COUNT(*) -- flaky, not always failing
ORDER BY fail_rate DESC
Tests с 10-90% fail rate = flaky, need investigation.
Pattern 5 — Capacity planning
Project growth
-- Models добавлены по weeks
WITH weekly_inventory AS (
SELECT
DATE_TRUNC('week', detected_at) AS week,
COUNT(DISTINCT unique_id) AS unique_models,
COUNT(*) AS total_runs
FROM dbt_node_runs
WHERE resource_type = 'model'
GROUP BY 1
)
SELECT * FROM weekly_inventory ORDER BY week
Warehouse utilization
-- Total runtime per day
SELECT
DATE_TRUNC('day', detected_at) AS day,
SUM(execution_time) AS total_seconds,
SUM(execution_time) / 3600 AS total_hours,
COUNT(DISTINCT unique_id) AS unique_models,
COUNT(*) AS total_runs
FROM dbt_node_runs
WHERE detected_at >= CURRENT_DATE - 30
GROUP BY 1
ORDER BY 1
Trend chart shows growing dbt usage. Use для capacity planning warehouse sizing.
Pattern 6 — Slack integration
# scripts/notify_failures.py
import json
import requests
def notify_run_failures():
rr = json.load(open('target/run_results.json'))
m = json.load(open('target/manifest.json'))
failures = [
r for r in rr['results']
if r['status'] in ('error', 'fail', 'runtime error')
]
if not failures:
return
# Group by owner
by_owner = defaultdict(list)
for f in failures:
node = m['nodes'].get(f['unique_id'])
if not node:
continue
owner = node.get('meta', {}).get('owner') or 'unowned'
by_owner[owner].append({
'unique_id': f['unique_id'],
'name': node['name'],
'status': f['status'],
'message': f.get('message', '')[:200],
'failures': f.get('failures'),
'compiled_code': f.get('compiled_code', '')[:500]
})
for owner, items in by_owner.items():
channel = f'#{owner}' if owner != 'unowned' else '#data-quality'
message = f'dbt run failures для {owner}:\n\n'
for item in items[:5]:
message += f'- `{item["unique_id"]}` ({item["status"]})\n'
if item.get('failures'):
message += f' Failures: {item["failures"]} rows\n'
if item.get('message'):
message += f' {item["message"][:100]}\n\n'
send_slack(channel, message)
Owner-targeted notifications — reduces noise.
Pattern 7 — Multi-target comparison
Different targets give different perspectives:
-- Compare prod vs ci runtime
WITH target_stats AS (
SELECT
target,
unique_id,
AVG(execution_time) AS avg_duration,
AVG(rows_affected) AS avg_rows
FROM dbt_node_runs n
JOIN dbt_run_history h USING (invocation_id)
WHERE n.detected_at >= CURRENT_DATE - 7
GROUP BY 1, 2
)
SELECT
unique_id,
MAX(avg_duration) FILTER (WHERE target = 'prod') AS prod_duration,
MAX(avg_duration) FILTER (WHERE target = 'ci') AS ci_duration,
MAX(avg_rows) FILTER (WHERE target = 'prod') AS prod_rows,
MAX(avg_rows) FILTER (WHERE target = 'ci') AS ci_rows
FROM target_stats
GROUP BY 1
HAVING prod_duration / NULLIF(ci_duration, 0) > 10 -- prod 10x slower than ci
ORDER BY prod_duration DESC
CI uses sample data (fast). Prod uses full data (slow). If ratio reasonable — OK. If extreme — possible scaling issue.
Pattern 8 — Cohort analysis
-- Performance over time per model
SELECT
unique_id,
DATE_TRUNC('week', detected_at) AS week,
AVG(execution_time) AS avg_duration,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY execution_time) AS p95_duration
FROM dbt_node_runs
WHERE resource_type = 'model'
AND unique_id = 'model.proj.fct_orders'
AND detected_at >= CURRENT_DATE - 90
GROUP BY 1, 2
ORDER BY week
Output trend chart shows whether fct_orders gradually slowing or stable.
Pattern 9 — Test failure clustering
# Find tests что fail together (correlated failures)
from collections import defaultdict
import json
historical_runs = load_historical_runs(days=30)
# For каждой пары tests — count co-failures
co_failures = defaultdict(int)
total_runs = defaultdict(int)
for run in historical_runs:
failed_uids = {
r['unique_id'] for r in run['results']
if r['status'] == 'fail' and r['unique_id'].startswith('test.')
}
for uid in failed_uids:
total_runs[uid] += 1
failed_list = list(failed_uids)
for i, uid1 in enumerate(failed_list):
for uid2 in failed_list[i+1:]:
pair = tuple(sorted([uid1, uid2]))
co_failures[pair] += 1
# Find clusters
correlation_threshold = 0.7
clusters = []
for (uid1, uid2), count in co_failures.items():
correlation = count / max(total_runs[uid1], total_runs[uid2])
if correlation > correlation_threshold:
clusters.append((uid1, uid2, correlation))
# Output: 'Test X and Test Y fail together 80% of time — likely same root cause'
Pattern 10 — Cron job pattern
# cron_observability.py — runs every 5 min on schedule
import time
from datetime import datetime
while True:
try:
# Process latest run_results если new
latest_artifact = get_latest_artifact_from_s3()
if latest_artifact['invocation_id'] not in loaded_invocations:
load_run_results(latest_artifact)
loaded_invocations.add(latest_artifact['invocation_id'])
# Check для regression
check_regressions()
# Anomaly detection
detect_anomalies()
# Send alerts
process_alert_queue()
except Exception as e:
log_error(e)
time.sleep(300) # 5 minutes
Or use Airflow / Dagster для scheduling.
Architecture overview
┌─────────────────────────┐
│ dbt build (CI / cron) │
└────────────┬────────────┘
│
▼
┌─────────────────────────┐
│ S3 — artifacts │
│ /manifests/<run_id> │
│ /run_results/<run_id> │
│ /catalogs/<run_id> │
└────────────┬────────────┘
│
▼
┌─────────────────────────┐
│ Loader (event-driven │
│ or scheduled) │
└────────────┬────────────┘
│
▼
┌─────────────────────────┐
│ Postgres / Snowflake │
│ - dbt_run_history │
│ - dbt_node_runs │
│ - dbt_test_results │
│ - dbt_source_freshness │
└────────────┬────────────┘
│
▼
┌─────────────────────────┐
│ Metabase / Looker │
│ - Performance dashboard │
│ - Cost dashboard │
│ - Failure tracking │
│ - Capacity planning │
└────────────┬────────────┘
│
▼
┌─────────────────────────┐
│ Slack / PagerDuty │
│ - Alerting │
└─────────────────────────┘
Ключевые выводы
- Time analytics — store per-node run data в database, compute baselines, detect regressions.
- Cost attribution через query_tag mechanism + warehouse QUERY_HISTORY join.
- Performance dashboards — bottleneck identification, materialization analysis, capacity planning.
- Failure tracking — flaky tests, failure clustering, root cause analysis.
- Slack integration — owner-targeted notifications, reducing noise.
- Multi-target comparison — prod vs ci insights.
- Cohort analysis — per-model trends over weeks/months.
- Test correlation — finding co-failing tests для shared root causes.
- Architecture: artifacts -> S3 -> loader -> analytics DB -> dashboards -> alerts.
- Elementary alternative — built-in observability via dbt package; trade-off: less customization but faster setup.