Learning Platform
Глоссарий Troubleshooting
Урок 06.01 · 24 мин
Продвинутый
run-resultsartifactsobservability

run_results.json: schema v6, status, timing, adapter_response

target/run_results.json — это execution log dbt run. После каждой dbt команды (run, test, build, seed, snapshot, compile, source freshness), dbt пишет результаты выполнения сюда. Manifest описывает что dbt должен делать, run_results — что произошло.

Senior должен знать структуру run_results наизусть: schema v6 поля, status enum, timing structure, adapter_response — это foundation для observability tools, cost tracking, performance dashboards.

logs/dbt.log: чтение, фильтрация и типовые паттерны (dbt I)

Когда создается

run_results пишется в конце каждого dbt invocation:

dbt run        -> target/run_results.json
dbt test       -> target/run_results.json (overwrites)
dbt build      -> target/run_results.json
dbt source freshness -> target/sources.json (отдельно)
dbt parse      -> НЕ создает run_results (только parsing)

Note: каждый run перезаписывает run_results. Для observability tooling — копируйте immediately после run.


Top-level структура

{
  "metadata": {
    "dbt_schema_version": "https://schemas.getdbt.com/dbt/run-results/v6.json",
    "dbt_version": "1.11.0",
    "generated_at": "2026-05-19T11:23:14.812345Z",
    "invocation_id": "a3f5e8c2-...",
    "env": {}
  },
  "results": [
    {
      "status": "success",
      "timing": [...],
      "thread_id": "Thread-1 (worker)",
      "execution_time": 12.456,
      "adapter_response": {...},
      "message": null,
      "failures": null,
      "unique_id": "model.jaffle_shop.fct_orders",
      "compiled": true,
      "compiled_code": "MERGE INTO ...",
      "relation_name": "\"jaffle_shop\".\"main\".\"fct_orders\""
    }
  ],
  "elapsed_time": 234.567,
  "args": {
    "which": "run",
    "rpc_method": null,
    "log_format": "default",
    "select": ["state:modified+"],
    "exclude": null,
    "vars": {},
    ...
  }
}

Top-level keys:

  • metadata — invocation info (matches manifest.metadata.invocation_id)
  • results — array of per-node results
  • elapsed_time — total wall-clock time
  • args — command arguments + flags + vars

metadata

{
  "dbt_schema_version": "https://schemas.getdbt.com/dbt/run-results/v6.json",
  "dbt_version": "1.11.0",
  "generated_at": "2026-05-19T11:23:14.812345Z",
  "invocation_id": "a3f5e8c2-...",
  "env": {
    "DBT_USER": "alice",
    "DBT_TARGET": "prod"
  }
}
  • dbt_schema_version — formal schema (v6 для dbt 1.6+)
  • invocation_idmatches manifest.metadata.invocation_id. Связывает run_results с produced manifest.
  • env — env vars dbt видел при run start
  • generated_at — UTC timestamp
TIP

`invocation_id` critical для observability tooling. Match run_results к correct manifest через invocation_id. Mismatch = drift между artifacts.


results array

Каждый result — execution outcome для one node:

{
  "status": "success",
  "timing": [
    {
      "name": "compile",
      "started_at": "2026-05-19T11:23:01.123456Z",
      "completed_at": "2026-05-19T11:23:01.234567Z"
    },
    {
      "name": "execute",
      "started_at": "2026-05-19T11:23:01.234567Z",
      "completed_at": "2026-05-19T11:23:13.580123Z"
    }
  ],
  "thread_id": "Thread-2 (worker)",
  "execution_time": 12.456,
  "adapter_response": {
    "_message": "MERGE 1500 0",
    "code": "MERGE",
    "rows_affected": 1500,
    "bytes_processed": 124567890,
    "query_id": "01abc..."
  },
  "message": null,
  "failures": null,
  "unique_id": "model.jaffle_shop.fct_orders",
  "compiled": true,
  "compiled_code": "MERGE INTO \"jaffle_shop\".\"main\".\"fct_orders\" ...",
  "relation_name": "\"jaffle_shop\".\"main\".\"fct_orders\""
}

Полное описание каждого поля ниже.


status — execution outcome

"status": "success"

Possible values:

  • success — node executed successfully
  • error — execution failed (SQL error, adapter exception)
  • skipped — selected but not run (e.g., upstream failed, no relevant changes для defer)
  • pass — only для tests, means assertion passed
  • fail — only для tests, assertion failed (rows returned)
  • warn — only для tests, warning severity threshold exceeded
  • runtime error — adapter could not execute (connection issue, etc.)
  • partial success — some batches succeeded, others failed (microbatch)

Different resource types have different status semantics:

# Models, snapshots, seeds:
# 'success' or 'error' or 'skipped'

# Tests:
# 'pass' or 'fail' or 'warn' or 'error' or 'skipped'

# Operations:
# 'success' or 'error'

timing — execution phases

"timing": [
  {
    "name": "compile",
    "started_at": "2026-05-19T11:23:01.123456Z",
    "completed_at": "2026-05-19T11:23:01.234567Z"
  },
  {
    "name": "execute",
    "started_at": "2026-05-19T11:23:01.234567Z",
    "completed_at": "2026-05-19T11:23:13.580123Z"
  }
]

Array — каждый element — phase of execution.

Common phases:

  • compile — rendering Jinja templates -> SQL
  • execute — running SQL against warehouse

Snapshots, seeds, complex materializations могут add more phases.

Compute phase duration:

from datetime import datetime

for phase in result['timing']:
    start = datetime.fromisoformat(phase['started_at'].rstrip('Z'))
    end = datetime.fromisoformat(phase['completed_at'].rstrip('Z'))
    duration = (end - start).total_seconds()
    print(f"  {phase['name']}: {duration:.3f}s")
NOTE

Sum of phase durations может slightly differ от `execution_time` (overhead, IO). Используйте `execution_time` для overall, timing для phase breakdown.


thread_id

"thread_id": "Thread-2 (worker)"

dbt uses parallel execution через worker threads (threads в profiles.yml). thread_id identifies which thread executed this node.

Useful for:

  • Concurrency analysis (which threads handled which models)
  • Identifying serialization bottlenecks
  • Multi-thread observability
from collections import defaultdict

by_thread = defaultdict(list)
for result in run_results['results']:
    by_thread[result['thread_id']].append(result['execution_time'])

for thread, times in by_thread.items():
    total = sum(times)
    print(f'{thread}: {len(times)} tasks, total {total:.1f}s')

execution_time

"execution_time": 12.456

Wall-clock seconds для full execution (compile + execute + any overhead).

Sum across all results approximates wall-clock total (if threads=1), но parallelism makes actual elapsed_time shorter.


adapter_response — warehouse details

{
  "_message": "MERGE 1500 0",
  "code": "MERGE",
  "rows_affected": 1500,
  "bytes_processed": 124567890,
  "query_id": "01abc-2345-..."
}

Adapter-specific structure. dbt-core defines minimum fields, adapters can extend.

Common fields

  • _message — human-readable status (legacy field)
  • code — SQL command type (CREATE, INSERT, UPDATE, MERGE, COPY, etc.)
  • rows_affected — rows changed by DML
  • query_id — warehouse-specific query identifier

Adapter-specific extensions

Snowflake:

{
  "code": "MERGE",
  "rows_affected": 1500,
  "bytes_processed": 124567890,
  "query_id": "01abc-...",
  "credits_used": 0.0042,
  "execution_time_ms": 12456,
  "warehouse": "PROD_WH"
}

BigQuery:

{
  "code": "DML",
  "rows_affected": 1500,
  "bytes_processed": 124567890,
  "query_id": "job_abc123",
  "slot_ms": 23456789,
  "total_slot_ms": 23456789,
  "total_bytes_billed": 100000000
}

Redshift:

{
  "code": "INSERT",
  "rows_affected": 1500,
  "query_id": "abc123"
}

DuckDB:

{
  "_message": "OK",
  "code": "CREATE TABLE AS",
  "rows_affected": 1500,
  "query_id": null
}

DuckDB is local — no query_id для tracking.


message and failures

"message": null,
"failures": null
  • message — для errors, contains error message
  • failures — для tests, integer count of failed rows

Error example:

{
  "status": "error",
  "unique_id": "model.jaffle_shop.fct_orders",
  "message": "Database Error in model fct_orders (models/marts/fct_orders.sql)\n  Column \"order_idd\" does not exist...",
  "failures": null
}

Test failure:

{
  "status": "fail",
  "unique_id": "test.jaffle_shop.not_null_fct_orders_id",
  "message": "Got 5 rows where expected 0",
  "failures": 5
}

compiled_code и compiled

"compiled": true,
"compiled_code": "MERGE INTO ..."

Final SQL что dbt actually sent к warehouse. Useful для debugging — exact query что executed.

Note: для some node types (operations, snapshots), compiled_code может быть multi-statement.


relation_name

"relation_name": "\"jaffle_shop\".\"main\".\"fct_orders\""

Fully qualified warehouse identifier. Quoted properly для adapter syntax.

Useful для:

  • Joining к warehouse metadata (INFORMATION_SCHEMA)
  • Generating cleanup scripts
  • Identifying actual tables/views в warehouse

elapsed_time — total wall-clock

"elapsed_time": 234.567

Total seconds от dbt start к finish. Includes:

  • Parsing time
  • Compilation
  • Execution
  • Final teardown

For parallel runs с threads > 1, elapsed_time < sum(execution_time).


args — invocation context

"args": {
  "which": "build",
  "select": ["state:modified+"],
  "exclude": null,
  "selector": null,
  "vars": {"my_var": "foo"},
  "target": "prod",
  "full_refresh": false,
  "fail_fast": false,
  "threads": 8,
  "log_format": "json",
  "log_level": "info",
  "warn_error": false
}

Full command-line args + config. Useful для reproducing run.


Реальный fragment — successful incremental run

{
  "status": "success",
  "timing": [
    {
      "name": "compile",
      "started_at": "2026-05-19T11:23:01.123Z",
      "completed_at": "2026-05-19T11:23:01.234Z"
    },
    {
      "name": "execute",
      "started_at": "2026-05-19T11:23:01.234Z",
      "completed_at": "2026-05-19T11:23:13.580Z"
    }
  ],
  "thread_id": "Thread-2 (worker)",
  "execution_time": 12.456,
  "adapter_response": {
    "_message": "MERGE 1500 0",
    "code": "MERGE",
    "rows_affected": 1500,
    "bytes_processed": 124567890,
    "query_id": "01abc..."
  },
  "message": null,
  "failures": null,
  "unique_id": "model.jaffle_shop.fct_orders",
  "compiled": true,
  "compiled_code": "MERGE INTO \"jaffle_shop\".\"main\".\"fct_orders\" AS DBT_INTERNAL_DEST USING (...) ...",
  "relation_name": "\"jaffle_shop\".\"main\".\"fct_orders\""
}

Incremental MERGE updated 1500 rows. 12.5 seconds. 125MB bytes processed.


Failed test example

{
  "status": "fail",
  "timing": [
    {"name": "compile", "started_at": "...", "completed_at": "..."},
    {"name": "execute", "started_at": "...", "completed_at": "..."}
  ],
  "thread_id": "Thread-1 (worker)",
  "execution_time": 2.34,
  "adapter_response": {
    "_message": "SELECT 5",
    "code": "SELECT",
    "rows_affected": -1
  },
  "message": "Got 5 rows where expected 0",
  "failures": 5,
  "unique_id": "test.jaffle_shop.not_null_fct_orders_id.abc12",
  "compiled": true,
  "compiled_code": "SELECT * FROM \"jaffle_shop\".\"main\".\"fct_orders\" WHERE id IS NULL",
  "relation_name": null
}

5 rows violated not_null constraint.


Skipped node example

{
  "status": "skipped",
  "timing": [],
  "thread_id": "Thread-3 (worker)",
  "execution_time": 0.0,
  "adapter_response": {},
  "message": "Skipped due to upstream failure",
  "failures": null,
  "unique_id": "model.jaffle_shop.dim_customers",
  "compiled": false,
  "compiled_code": null,
  "relation_name": null
}

dbt didn’t execute (upstream model failed).


Парсинг run_results в Python

import json
from collections import Counter
from datetime import datetime

run_results = json.load(open('target/run_results.json'))

# Status breakdown
statuses = Counter(r['status'] for r in run_results['results'])
print(f'Status: {statuses}')

# Total duration
total = sum(r['execution_time'] for r in run_results['results'])
print(f'Total execution time: {total:.1f}s')
print(f'Wall-clock elapsed: {run_results["elapsed_time"]:.1f}s')
print(f'Parallelism speedup: {total / run_results["elapsed_time"]:.1f}x')

# Failed
failed = [r for r in run_results['results'] if r['status'] in ('error', 'fail')]
for f in failed:
    print(f"\nFAILED: {f['unique_id']}")
    print(f"  Status: {f['status']}")
    print(f"  Message: {f['message'][:200] if f['message'] else 'N/A'}")
    if f.get('failures'):
        print(f"  Failures: {f['failures']}")

# Slowest nodes
slowest = sorted(run_results['results'], key=lambda r: -r['execution_time'])[:10]
print('\nTop 10 slowest:')
for r in slowest:
    print(f"  {r['unique_id']}: {r['execution_time']:.1f}s")

Output:

Status: Counter({'success': 47, 'pass': 28, 'fail': 1, 'skipped': 2})
Total execution time: 145.6s
Wall-clock elapsed: 24.3s
Parallelism speedup: 6.0x

FAILED: test.jaffle_shop.not_null_fct_orders_id.abc12
  Status: fail
  Message: Got 5 rows where expected 0
  Failures: 5

Top 10 slowest:
  model.jaffle_shop.fct_orders: 45.2s
  model.jaffle_shop.fct_revenue: 32.1s
  ...

Use cases run_results

1. Performance dashboard

import pandas as pd

df = pd.DataFrame([
    {
        'unique_id': r['unique_id'],
        'status': r['status'],
        'execution_time': r['execution_time'],
        'thread_id': r['thread_id'],
        'rows_affected': r.get('adapter_response', {}).get('rows_affected'),
        'bytes_processed': r.get('adapter_response', {}).get('bytes_processed')
    }
    for r in run_results['results']
])

# Aggregate
print(df.groupby('status')['execution_time'].sum())
print(df.nlargest(10, 'execution_time'))

2. Cost tracking

# Snowflake credits = bytes_processed / X (warehouse-specific)
total_bytes = sum(
    r.get('adapter_response', {}).get('bytes_processed', 0)
    for r in run_results['results']
)
print(f'Total bytes: {total_bytes / 1e9:.1f} GB')

3. Anomaly detection

# Compare runs over time
# If fct_orders typically takes 12s but ran 60s today -> alert
historical = load_historical_runs(unique_id='model.proj.fct_orders')
current = next(r for r in run_results['results'] if r['unique_id'] == 'model.proj.fct_orders')

avg = sum(h['execution_time'] for h in historical) / len(historical)
if current['execution_time'] > avg * 3:
    print(f'ANOMALY: 3x slower than average')

4. Slack notifications

failed_tests = [
    r for r in run_results['results']
    if r['status'] == 'fail'
]

for failed in failed_tests:
    send_slack({
        'channel': '#data-quality',
        'message': f'Test failed: {failed["unique_id"]}\n'
                  f'Failures: {failed["failures"]}\n'
                  f'Compiled SQL: {failed["compiled_code"][:500]}'
    })

Combining manifest + run_results

Critical pattern — enrich run_results с manifest metadata:

manifest = json.load(open('target/manifest.json'))
run_results = json.load(open('target/run_results.json'))

# Verify invocation_id matches
assert manifest['metadata']['invocation_id'] == run_results['metadata']['invocation_id'], \
    'Manifest and run_results from different runs!'

for r in run_results['results']:
    uid = r['unique_id']
    node = manifest['nodes'].get(uid) or manifest.get('sources', {}).get(uid)
    if not node:
        continue
    
    # Enriched record
    enriched = {
        # From run_results
        'status': r['status'],
        'duration': r['execution_time'],
        'rows_affected': r.get('adapter_response', {}).get('rows_affected'),
        # From manifest
        'resource_type': node['resource_type'],
        'name': node['name'],
        'materialized': node.get('config', {}).get('materialized'),
        'tags': node.get('tags', []),
        'owner': node.get('meta', {}).get('owner'),
        'access': node.get('config', {}).get('access')
    }
    
    store_к_warehouse(enriched)

This — основа всех observability tools.


Schema v6 detail

Поля специфичны для v6 (dbt 1.6+). Earlier versions имеют subset:

  • v3 (dbt 0.20) — basic
  • v4 (dbt 1.0) — added some fields
  • v5 (dbt 1.2)
  • v6 (dbt 1.6+) — current

v7 expected для dbt 1.12+ (semantic layer updates).


Antipatterns

1. Not validating invocation_id match

# BAD — using run_results from different run
run_results = json.load(open('target/run_results.json'))
manifest = json.load(open('artifacts/old_manifest.json'))
# Different invocations — fields don't correlate

Always check invocation_id consistency.

2. Assuming all results have adapter_response

# BAD
rows = r['adapter_response']['rows_affected']
# Skipped nodes: adapter_response = {}, KeyError

Use .get():

rows = r.get('adapter_response', {}).get('rows_affected')

3. Treating failures as error

# BAD
if r['status'] == 'error':
    handle_failure(r)
# Misses 'fail' status (tests)

Multi-status check:

if r['status'] in ('error', 'fail', 'runtime error'):
    handle_failure(r)

4. Trusting timing for cost

# BAD — assumes execution_time × hourly rate
cost = r['execution_time'] / 3600 * hourly_rate
# Snowflake bills based on warehouse size and time, not just duration
# Use bytes_processed or credits_used from adapter_response

Use adapter_response fields для accurate cost.


Ключевые выводы

  1. run_results.json — execution log dbt run. Overwritten each invocation.
  2. invocation_id matches manifest — критично для observability.
  3. status enum: success/error/skipped (models), pass/fail/warn (tests), partial success (microbatch).
  4. timing array — phases (compile, execute) с timestamps.
  5. execution_time — wall-clock seconds для full execution.
  6. adapter_response — warehouse details: rows_affected, bytes_processed, query_id; adapter-specific extensions.
  7. message + failures — error details (model) или test failure count.
  8. compiled_code — exact SQL sent to warehouse.
  9. elapsed_time — total wall-clock; parallelism makes < sum(execution_time).
  10. args — full invocation context.
  11. Combine manifest + run_results через invocation_id для full observability.
  12. Schema v6 для dbt 1.6+; v7 expected с MetricFlow updates.
Проверка знанийKnowledge check
dbt run завершился. CI tool сравнивает с previous run для detect performance regression. Какие run_results fields критичны? Какие edge cases?
ОтветAnswer
**Critical fields для regression detection**:\n\n**1. Per-node execution_time**:\n\n```python\nfor r in run_results['results']:\n uid = r['unique_id']\n current_duration = r['execution_time']\n \n # Compare к baseline\n historical_avg = get_historical_avg(uid, days=30)\n \n if current_duration > historical_avg * 2:\n flag_regression(uid, current_duration, historical_avg)\n```\n\n**2. adapter_response.bytes_processed**:\n\nMore meaningful than execution_time для cloud warehouses:\n\n```python\nbytes_current = r['adapter_response'].get('bytes_processed', 0)\n# Bytes scanned correlate с warehouse cost (BQ, Snowflake)\n```\n\n**3. adapter_response.rows_affected**:\n\nMassive change in rows affected = potential bug:\n\n```python\nrows_current = r['adapter_response'].get('rows_affected', 0)\nrows_historical = get_historical_rows(uid)\n\nif abs(rows_current - rows_historical) / rows_historical > 0.5:\n flag_volume_change(uid, rows_current, rows_historical)\n```\n\n**4. status**:\n\nNew errors / unexpected skips:\n\n```python\nif r['status'] in ('error', 'fail', 'runtime error'):\n flag_failure(r)\nif r['status'] == 'skipped':\n if uid not in expected_skipped:\n flag_unexpected_skip(uid)\n```\n\n**5. timing phases**:\n\nIdentify which phase regressed:\n\n```python\nfor phase in r['timing']:\n start = parse(phase['started_at'])\n end = parse(phase['completed_at'])\n phase_duration = (end - start).total_seconds()\n \n historical_phase_avg = get_phase_avg(uid, phase['name'])\n if phase_duration > historical_phase_avg * 2:\n flag_phase_regression(uid, phase['name'], phase_duration, historical_phase_avg)\n```\n\nE.g., if 'execute' phase regressed but 'compile' didn't — issue в warehouse.\n\n**6. adapter_response.query_id**:\n\nLink к warehouse query history для deeper analysis:\n\n```python\nquery_id = r['adapter_response'].get('query_id')\nif query_id and is_regression:\n # Query Snowflake QUERY_HISTORY для query plan, scan size, etc.\n plan = get_snowflake_query_plan(query_id)\n log_to_dashboard(uid, plan)\n```\n\n**7. compiled_code**:\n\nIf SQL changed между runs:\n\n```python\nprev_sql = previous_run.get('compiled_code')\ncurr_sql = r.get('compiled_code')\n\nif prev_sql != curr_sql:\n # SQL changed — regression может быть из-за changes\n log_sql_changes(uid, prev_sql, curr_sql)\n```\n\n**Edge cases**:\n\n**1. Skipped nodes**:\n\n```python\nif r['status'] == 'skipped':\n # Not actually run — duration = 0, no adapter_response\n # Don't include в regression analysis\n continue\n```\n\n**2. First run** (no baseline):\n\n```python\nhistorical = get_historical(uid)\nif len(historical) менее 7:\n # Not enough data — establish baseline first\n save_to_history(uid, current)\n continue\n```\n\n**3. Schema changes**:\n\nIf manifest changed между runs:\n\n```python\nif manifest['nodes'][uid]['checksum'] != previous_manifest['nodes'][uid]['checksum']:\n # Body changed — expected что performance может differ\n flag_code_change(uid)\n```\n\nDon't false-flag intentional changes.\n\n**4. Thread parallelism**:\n\n```python\n# elapsed_time accounts для parallelism\n# Don't compare elapsed_time when thread count changed\nif run_results['args']['threads'] != previous_run_args['threads']:\n log_warning('Thread count changed — elapsed_time не comparable')\n```\n\n**5. Microbatch partial success**:\n\n```python\nif r['status'] == 'partial success':\n # Some batches succeeded, others failed\n # Track which batches\n batch_results = r.get('batch_results', {})\n for batch_id, batch_status in batch_results.items():\n if batch_status == 'failed':\n flag_batch_failure(uid, batch_id)\n```\n\n**6. Warehouse capacity issues**:\n\n```python\n# If many models slow simultaneously -> warehouse contention\nslow_count = sum(1 for r in results if r['execution_time'] > historical_avg * 2)\nif slow_count > len(results) * 0.3:\n flag_warehouse_contention()\n```\n\n**7. Cold cache effects**:\n\nFirst run после warehouse restart — slower (cache empty):\n\n```python\nhour = parse(run_results['metadata']['generated_at']).hour\nif hour == 0 and warehouse_typically_restarts:\n apply_lenient_thresholds()\n```\n\n**8. Data volume changes**:\n\nLegitimate growth — не regression:\n\n```python\nrows_growth = current_rows / historical_rows\nif rows_growth > 1.5:\n # Data grew 50%+ — duration likely к grow proportionally\n expected_duration = historical_duration * (rows_growth ** 0.5) # sublinear\n is_regression = current_duration > expected_duration * 1.5\n```\n\n**Production regression dashboard structure**:\n\n```sql\nCREATE TABLE dbt_performance_history (\n invocation_id VARCHAR,\n unique_id VARCHAR,\n detected_at TIMESTAMP,\n execution_time FLOAT,\n rows_affected BIGINT,\n bytes_processed BIGINT,\n status VARCHAR,\n -- ...\n);\n\n-- Query: regression alerts\nWITH baseline AS (\n SELECT\n unique_id,\n AVG(execution_time) AS baseline_avg,\n STDDEV(execution_time) AS baseline_stddev\n FROM dbt_performance_history\n WHERE detected_at BETWEEN CURRENT_DATE - 30 AND CURRENT_DATE - 1\n GROUP BY 1\n HAVING COUNT(*) не меньше 7 -- need samples\n),\ncurrent AS (\n SELECT unique_id, execution_time\n FROM dbt_performance_history\n WHERE detected_at = CURRENT_DATE\n)\nSELECT\n c.unique_id,\n c.execution_time,\n b.baseline_avg,\n (c.execution_time - b.baseline_avg) / b.baseline_avg AS pct_increase\nFROM current c\nJOIN baseline b USING (unique_id)\nWHERE c.execution_time > b.baseline_avg + 3 * b.baseline_stddev -- 3-sigma\nORDER BY pct_increase DESC\n```\n\n**Alerting strategy**:\n\n- Slack для 2x regression\n- PagerDuty для 5x regression on critical models\n- Daily report — top regressions weekly trends\n- Auto-create Linear ticket для investigated\n\n**Production-grade observability**: multi-metric regression detection, edge case handling, severity tiers, action-oriented alerts.\n\nFoundation для healthy data platform — issues caught immediately, не discovered weeks later.
Проверка знанийKnowledge check
Team хочет track 'data freshness': для каждой source — когда data last updated, alert если stale. Какие artifacts использовать, как imlement?
ОтветAnswer
**Sources freshness — отдельный artifact**.\n\ndbt's source freshness functionality:\n\n```bash\ndbt source freshness\n# Writes target/sources.json\n```\n\n**sources.json structure**:\n\n```json\n{\n "metadata": {\n "dbt_schema_version": "https://schemas.getdbt.com/dbt/sources/v3.json",\n "dbt_version": "1.11.0",\n "generated_at": "..."\n },\n "results": [\n {\n "unique_id": "source.proj.raw.orders",\n "max_loaded_at": "2026-05-19T10:23:14.812345Z",\n "snapshotted_at": "2026-05-19T11:00:00.000Z",\n "max_loaded_at_time_ago_in_s": 2287.0,\n "status": "pass",\n "criteria": {\n "warn_after": {"count": 12, "period": "hour"},\n "error_after": {"count": 24, "period": "hour"},\n "filter": null\n },\n "adapter_response": {},\n "execution_time": 0.234,\n "timing": [...],\n "thread_id": "Thread-1 (worker)"\n }\n ],\n "elapsed_time": 1.234\n}\n```\n\n**Source config в YAML** specifies freshness:\n\n```yaml\n# models/_sources/raw.yml\nsources:\n - name: raw\n tables:\n - name: orders\n loaded_at_field: _synced_at\n freshness:\n warn_after: {count: 12, period: hour}\n error_after: {count: 24, period: hour}\n filter: _synced_at > current_date - interval '7 day'\n```\n\n**Setup**:\n\n**1. Setup loaded_at_field на каждом source**:\n\n```yaml\n# models/_sources/stripe.yml\nsources:\n - name: stripe\n tables:\n - name: orders\n description: "Order events от Stripe"\n loaded_at_field: created_at # column tracking sync time\n freshness:\n warn_after: {count: 6, period: hour}\n error_after: {count: 24, period: hour}\n \n - name: customers\n loaded_at_field: last_updated_at\n freshness:\n warn_after: {count: 1, period: day}\n error_after: {count: 3, period: day}\n```\n\n**2. Run dbt source freshness в CI**:\n\n```yaml\n# .github/workflows/dbt-freshness.yml\non:\n schedule:\n - cron: '0 * * * *' # hourly\n workflow_dispatch:\n\njobs:\n freshness:\n steps:\n - run: dbt source freshness\n - run: python scripts/process_freshness.py target/sources.json\n - run: aws s3 cp target/sources.json s3://org/dbt/freshness/sources_$(date +%Y%m%d_%H%M%S).json\n```\n\n**3. Processing script**:\n\n```python\n# scripts/process_freshness.py\nimport json\nimport sys\nfrom datetime import datetime\nfrom collections import Counter\n\nsources = json.load(open(sys.argv[1]))\n\nresults_by_status = Counter(r['status'] for r in sources['results'])\nprint(f'Freshness check: {results_by_status}')\n\n# Process each\nfor r in sources['results']:\n status = r['status']\n \n if status == 'pass':\n continue\n \n # Build alert\n uid = r['unique_id']\n max_loaded_at = r.get('max_loaded_at')\n age_seconds = r.get('max_loaded_at_time_ago_in_s', 0)\n age_hours = age_seconds / 3600\n criteria = r.get('criteria', {})\n \n if status == 'warn':\n send_slack({\n 'channel': '#data-quality',\n 'message': f'ВНИМАНИЕ: Source freshness warning: {uid}\\n'\n f'Last loaded: {max_loaded_at}\\n'\n f'Age: {age_hours:.1f}h (warn threshold: {criteria["warn_after"]})'\n })\n elif status == 'error':\n send_slack({\n 'channel': '#data-platform',\n 'message': f' Source freshness ERROR: {uid}\\n'\n f'Last loaded: {max_loaded_at}\\n'\n f'Age: {age_hours:.1f}h (error threshold: {criteria["error_after"]})'\n })\n # Trigger PagerDuty\n send_pagerduty({\n 'title': f'Source data stale: {uid}',\n 'severity': 'error',\n 'details': r\n })\n elif status == 'runtime error':\n send_slack({\n 'channel': '#data-platform',\n 'message': f' Failed checking freshness: {uid}\\n{r.get("message")}'\n })\n \n print(f'{status}: {uid} (age {age_hours:.1f}h)')\n\n# Exit code\nif any(r['status'] == 'error' for r in sources['results']):\n sys.exit(1)\n```\n\n**4. Historical tracking**:\n\n```sql\n-- Store freshness history\nCREATE TABLE source_freshness_history (\n unique_id VARCHAR,\n detected_at TIMESTAMP,\n max_loaded_at TIMESTAMP,\n age_seconds BIGINT,\n status VARCHAR,\n warn_threshold_seconds BIGINT,\n error_threshold_seconds BIGINT\n);\n\n-- Daily load\nINSERT INTO source_freshness_history\nSELECT * FROM read_json_auto('target/sources.json');\n```\n\n**5. Dashboard** в Metabase:\n\n**Card 1 — Current status**:\n\n```sql\nSELECT\n unique_id,\n max_loaded_at,\n age_seconds / 3600 AS age_hours,\n status\nFROM source_freshness_history\nWHERE detected_at = (SELECT MAX(detected_at) FROM source_freshness_history)\nORDER BY age_hours DESC\n```\n\n**Card 2 — Trend**:\n\n```sql\nSELECT\n DATE_TRUNC('day', detected_at) AS day,\n unique_id,\n AVG(age_seconds / 3600.0) AS avg_age_hours\nFROM source_freshness_history\nWHERE detected_at не меньше CURRENT_DATE - 30\nGROUP BY 1, 2\n```\n\n**Card 3 — Recurring offenders**:\n\n```sql\nSELECT\n unique_id,\n COUNT(*) FILTER (WHERE status = 'error') AS error_count_30d,\n COUNT(*) FILTER (WHERE status = 'warn') AS warn_count_30d\nFROM source_freshness_history\nWHERE detected_at не меньше CURRENT_DATE - 30\nGROUP BY 1\nHAVING error_count_30d > 0 OR warn_count_30d > 5\nORDER BY error_count_30d DESC\n```\n\nIdentify chronically late sources.\n\n**6. Lineage-aware alerts**:\n\nWhen source is stale, alert downstream:\n\n```python\nmanifest = load_manifest('target/manifest.json')\nchild_map = manifest['child_map']\n\nfor r in sources['results']:\n if r['status'] != 'error':\n continue\n \n source_uid = r['unique_id']\n \n # Find downstream consumers\n downstream = walk_downstream(source_uid, child_map)\n \n # Group by exposure\n affected_exposures = [\n manifest['exposures'][d]\n for d in downstream\n if d.startswith('exposure.')\n ]\n \n for exp in affected_exposures:\n send_email({\n 'to': exp['owner']['email'],\n 'subject': f'Upstream data stale: source {source_uid}',\n 'body': f'Your dashboard {exp["name"]} depends на {source_uid} '\n f'which is {r["max_loaded_at_time_ago_in_s"] / 3600:.1f}h stale. '\n f'Dashboard data may be outdated.'\n })\n```\n\n**7. Frequency tuning**:\n\n```yaml\n# Hourly check для critical sources\n- name: critical_orders\n freshness:\n warn_after: {count: 1, period: hour}\n error_after: {count: 3, period: hour}\n\n# Daily для less critical\n- name: marketing_events\n freshness:\n warn_after: {count: 12, period: hour}\n error_after: {count: 36, period: hour}\n\n# Weekly для slow-changing\n- name: country_codes\n freshness:\n warn_after: {count: 7, period: day}\n error_after: {count: 30, period: day}\n```\n\n**8. Filter — для partitioned tables**:\n\n```yaml\nsources:\n - name: events\n tables:\n - name: page_views\n loaded_at_field: event_date\n freshness:\n warn_after: {count: 2, period: hour}\n error_after: {count: 6, period: hour}\n # Only check today's partition (skip historical)\n filter: event_date не меньше current_date\n```\n\nFor huge tables — scan only recent data.\n\n**9. Integration с Elementary**:\n\nElementary has freshness detection built-in:\n\n```yaml\nmodels:\n - name: orders\n config:\n meta:\n elementary:\n freshness:\n timestamp_column: created_at\n schedule_interval: hourly\n backfill_days: 1\n```\n\nElementary maintains historical freshness data и trend.\n\n**Production lessons**:\n\n1. **dbt source freshness — primary tool**\n2. **Run frequently** (hourly для critical)\n3. **sources.json stored** для historical\n4. **Tiered alerting** (warn -> error -> PagerDuty)\n5. **Lineage-aware notifications** to consumer owners\n6. **Filter** для partitioned tables\n7. **Dashboard** showing trends\n8. **Recurring offenders** tracking\n\n**Artifacts used**:\n\n- `target/sources.json` — primary source data\n- `target/manifest.json` — downstream lineage для notifications\n- `target/run_results.json` (separate, для model run analytics)\n\nFreshness monitoring — fundamental к data platform reliability. Mature data orgs invest здесь.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Tool parses run_results.json. Status field has multiple possible values. Когда model имеет status='skipped' что произошло?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4