Learning Platform
Глоссарий Troubleshooting
Урок 07.01 · 24 мин
Продвинутый
dbtRunnerprogrammaticAPIembedded

dbtRunner API: embedded dbt в Python с 1.5+

dbt-core 1.5+ ввёл официальный programmatic invocation APIdbtRunner. Раньше run dbt из Python требовал subprocess hacks. Сейчас можно invoke dbt directly как Python function, получить structured results, integrate dbt в orchestrators (Dagster, Prefect, Airflow), build embedded data tools.

В этом уроке — основы dbtRunner API, типичные patterns, result handling, и когда использовать vs subprocess.


Зачем programmatic API

До 1.5 запускали dbt из Python через subprocess:

import subprocess

result = subprocess.run(
    ['dbt', 'run', '--select', 'tag:nightly'],
    capture_output=True,
    text=True
)
print(result.stdout)

Проблемы:

  • Long startup time (dbt-core imports ~3 seconds)
  • Plain text parsing — fragile
  • No type-safe error handling
  • Restart dbt каждый call (no caching)
  • Tedious к pass complex args

dbtRunner solves все это.


Базовое использование

from dbt.cli.main import dbtRunner, dbtRunnerResult

runner = dbtRunner()

result: dbtRunnerResult = runner.invoke(['run', '--select', 'tag:nightly'])

if result.success:
    print(f'Run succeeded')
    for r in result.result.results:
        print(f'  {r.node.unique_id}: {r.status}')
else:
    print(f'Run failed: {result.exception}')

runner.invoke() — single method для всех dbt commands.


dbtRunnerResult структура

class dbtRunnerResult:
    success: bool                       # overall success
    exception: Exception | None         # if crashed
    result: RunExecutionResult | None   # actual execution data
    
    # Properties
    @property
    def stdout(self) -> str: ...
    
    @property
    def stderr(self) -> str: ...

Where RunExecutionResult (or similar per command):

class RunExecutionResult:
    results: list[NodeResult]
    elapsed_time: float
    generated_at: datetime
    args: dict

И каждый NodeResult:

class NodeResult:
    node: ModelNode | TestNode | ...  # parsed node object
    status: NodeStatus                # RunStatus, TestStatus
    message: str | None
    failures: int | None
    execution_time: float
    adapter_response: dict
    thread_id: str
    timing: list[TimingInfo]

Type-safe access к result data — без JSON parsing.

Modern type hints: PEP 585 / PEP 604

Commands available

# Run models
result = runner.invoke(['run'])

# With selection
result = runner.invoke(['run', '--select', 'tag:nightly', '+model.proj.fct_orders'])

# Tests
result = runner.invoke(['test'])

# Build (run + test + seed + snapshot)
result = runner.invoke(['build'])

# Parse only
result = runner.invoke(['parse'])

# Compile only
result = runner.invoke(['compile'])

# Generate docs
result = runner.invoke(['docs', 'generate'])

# Source freshness
result = runner.invoke(['source', 'freshness'])

# Seeds
result = runner.invoke(['seed'])

# Snapshots
result = runner.invoke(['snapshot'])

# List
result = runner.invoke(['ls', '--select', 'tag:critical'])

All standard CLI commands available.


Passing arguments

Vars

result = runner.invoke([
    'run',
    '--vars', '{"my_var": "foo", "other_var": 123}'
])

Or через dict JSON-encoded:

import json

vars_dict = {'date_filter': '2026-05-19', 'threshold': 100}

result = runner.invoke([
    'run',
    '--vars', json.dumps(vars_dict)
])

Target

result = runner.invoke(['run', '--target', 'prod'])

Profile dir

result = runner.invoke(['run', '--profiles-dir', '/custom/path/'])

Project dir

result = runner.invoke(['run', '--project-dir', '/path/to/project'])

Full refresh

result = runner.invoke(['run', '--full-refresh'])

Threads

result = runner.invoke(['run', '--threads', '16'])

Iterating results

result = runner.invoke(['build'])

if not result.success:
    print(f'Exception: {result.exception}')
    sys.exit(1)

for node_result in result.result.results:
    node = node_result.node
    status = node_result.status
    duration = node_result.execution_time
    
    print(f'{node.unique_id}: {status} ({duration:.1f}s)')
    
    if status == 'fail':
        # Test failure
        print(f'  Failures: {node_result.failures}')
        print(f'  Message: {node_result.message}')
    elif status == 'error':
        # Execution error
        print(f'  Error: {node_result.message}')

Error handling

from dbt.exceptions import DbtRuntimeError, ParsingError

try:
    result = runner.invoke(['run'])
    
    if not result.success:
        # Command-level failure
        if result.exception:
            # Exception during invocation
            print(f'Exception: {result.exception}')
        else:
            # Some nodes failed
            failures = [r for r in result.result.results if r.status in ('error', 'fail')]
            print(f'{len(failures)} failures')
            for f in failures:
                print(f'  {f.node.unique_id}: {f.message}')
    
except DbtRuntimeError as e:
    # Catastrophic dbt error
    print(f'dbt runtime error: {e}')
except Exception as e:
    print(f'Unexpected error: {e}')

dbtRunnerResult.success is the primary indicator. If False, check exception или iterate result.results для node-level failures.


Use cases

Use case 1 — Orchestrator integration

# Dagster asset
from dagster import asset
from dbt.cli.main import dbtRunner

@asset
def dbt_marts(context):
    runner = dbtRunner()
    result = runner.invoke([
        'build',
        '--select', 'marts',
        '--target', 'prod'
    ])
    
    if not result.success:
        raise Exception(f'dbt failed: {result.exception}')
    
    return {
        'rows_built': sum(
            r.adapter_response.get('rows_affected', 0)
            for r in result.result.results
            if r.status == 'success'
        )
    }

Use case 2 — Custom CLI tool

# my_data_cli.py
import click
from dbt.cli.main import dbtRunner

runner = dbtRunner()

@click.group()
def cli():
    pass

@cli.command()
@click.option('--team', required=True)
def build_team_models(team):
    """Build models для specific team."""
    result = runner.invoke([
        'build',
        '--select', f'tag:{team}',
        '--target', 'prod'
    ])
    if not result.success:
        raise click.Abort()
    click.echo(f'Built {len(result.result.results)} models для {team}')

if __name__ == '__main__':
    cli()

Use case 3 — Embedded dbt в data app

# data_app.py
from flask import Flask, request, jsonify
from dbt.cli.main import dbtRunner

app = Flask(__name__)
runner = dbtRunner()

@app.route('/dbt/run', methods=['POST'])
def trigger_run():
    selector = request.json.get('selector', 'tag:nightly')
    target = request.json.get('target', 'prod')
    
    result = runner.invoke([
        'run',
        '--select', selector,
        '--target', target
    ])
    
    return jsonify({
        'success': result.success,
        'models_run': len([r for r in result.result.results if r.status == 'success']),
        'failures': len([r for r in result.result.results if r.status != 'success'])
    })

Use case 4 — Testing dbt projects

# tests/test_dbt_models.py
from dbt.cli.main import dbtRunner

def test_dbt_project_parses():
    runner = dbtRunner()
    result = runner.invoke(['parse'])
    assert result.success, f'Parse failed: {result.exception}'

def test_critical_models_build():
    runner = dbtRunner()
    result = runner.invoke([
        'build',
        '--select', 'tag:critical',
        '--target', 'ci'
    ])
    assert result.success
    for r in result.result.results:
        if r.node.resource_type == 'test':
            assert r.status == 'pass', f'Test failed: {r.node.unique_id}'

Use case 5 — Custom selectors / DSL

# Build runs based on business logic
def build_with_priority():
    runner = dbtRunner()
    
    # First: critical priority
    result_critical = runner.invoke([
        'build',
        '--select', 'tag:priority_critical',
        '--target', 'prod',
        '--threads', '8'
    ])
    
    if not result_critical.success:
        send_alert('Critical models failed!')
        return False
    
    # Then: high priority
    result_high = runner.invoke([
        'build',
        '--select', 'tag:priority_high',
        '--target', 'prod',
        '--threads', '16'
    ])
    
    # Finally: rest in parallel
    result_rest = runner.invoke([
        'build',
        '--exclude', 'tag:priority_critical', 'tag:priority_high',
        '--target', 'prod',
        '--threads', '32'
    ])
    
    return all([result_critical.success, result_high.success, result_rest.success])

Workflow logic in Python; dbt does data work.

PythonOperator, TaskFlow и venv-варианты

Performance characteristics

import time

# Subprocess approach (legacy)
t0 = time.time()
import subprocess
subprocess.run(['dbt', 'run'], capture_output=True)
subprocess_time = time.time() - t0

# dbtRunner approach
runner = dbtRunner()
t0 = time.time()
result = runner.invoke(['run'])
runner_time = time.time() - t0

print(f'Subprocess: {subprocess_time:.1f}s')
print(f'dbtRunner: {runner_time:.1f}s')

# Subsequent invocations с same runner
t0 = time.time()
result = runner.invoke(['run', '--select', 'state:modified+'])
runner_second_time = time.time() - t0

print(f'Second runner.invoke: {runner_second_time:.1f}s')
# Faster — process already initialized

Typical results:

Subprocess: 12.5s  (includes Python startup + dbt import)
dbtRunner: 9.2s   (just dbt logic, no Python startup)
Second dbtRunner: 7.8s  (manifest cached)

dbtRunner ~25% faster, более если multiple invocations.


Stdout / stderr

result = runner.invoke(['run'])

# Captured output
print(result.stdout)  # all log output
print(result.stderr)  # errors

# Note: by default dbt logs к stdout (Python's print)
# dbtRunner captures both

For custom logging:

import logging

# Setup before invoke
logging.basicConfig(level=logging.INFO)

result = runner.invoke([
    'run',
    '--log-format', 'json',  # structured logs
    '--log-level', 'info'
])

# Parse JSON logs из result.stdout
for line in result.stdout.split('\n'):
    if line.strip():
        try:
            log = json.loads(line)
            print(log)
        except json.JSONDecodeError:
            pass

Limitations

1. Single-threaded by default

# Each .invoke() blocks
result1 = runner.invoke(['run', '--select', 'foo'])
result2 = runner.invoke(['run', '--select', 'bar'])
# Sequential, не parallel

For parallelism — multiple subprocesses (see lesson 03).

2. State sharing between invocations

# Same runner — может cache manifest
runner = dbtRunner()
runner.invoke(['parse'])  # builds manifest
runner.invoke(['run'])    # may reuse parse result

# But changes к files between invocations triggers re-parse

Mostly internal — dbt-core handles invalidation.

3. Thread safety

dbtRunner not thread-safe:

# DON'T DO THIS
import threading

def parallel_run(selector):
    result = runner.invoke(['run', '--select', selector])
    return result

threads = [threading.Thread(target=parallel_run, args=(s,)) for s in selectors]
# Race conditions, undefined behavior

For parallel — separate processes (see lesson 03).

4. Error handling complexity

# Multiple failure modes
result = runner.invoke(['run'])

if result.exception:
    # dbtRunner-level exception
    handle_critical(result.exception)
elif not result.success:
    # Some failures
    failures = [r for r in result.result.results if r.status != 'success']
    handle_failures(failures)
else:
    # All succeeded
    pass

Pre-loaded manifest

Performance optimization — pre-load manifest, reuse across invocations:

# Load once
runner = dbtRunner()
parse_result = runner.invoke(['parse'])

# Subsequent invocations don't re-parse
result1 = runner.invoke(['run', '--select', 'fct_orders'])
# Uses cached manifest from parse_result

# Detailed pattern в lesson 02

Args dictionary

Programmatic way к pass args:

from dbt.cli.main import dbtRunner

runner = dbtRunner()

# CLI-style
result = runner.invoke(['run', '--select', 'fct_orders'])

# Pythonic alternative (varies by dbt version)
result = runner.invoke(
    args=['run'],
    select=['fct_orders'],
    target='prod',
    threads=16
)

CLI-style most documented и stable.


When NOT to use dbtRunner

1. Simple scripts

# Don't need Python — just use CLI
dbt build --select tag:nightly --target prod

If just running dbt commands, CLI simpler.

2. Parallelism critical

# subprocess-based для parallelism
from concurrent.futures import ProcessPoolExecutor

def run_chunk(selector):
    import subprocess
    return subprocess.run(['dbt', 'run', '--select', selector])

with ProcessPoolExecutor(max_workers=4) as ex:
    results = ex.map(run_chunk, selectors)

subprocess separated processes — true parallelism.

3. Cross-version compatibility

# If supporting multiple dbt versions, subprocess agnostic
# dbtRunner API may change между major versions

Pin к specific dbt-core если using dbtRunner.

4. Isolation requirements

# If dbt crash shouldn't crash parent process
# subprocess isolates

dbtRunner shares process — crashes propagate.


Production patterns

Pattern 1 — Health check service

# health_service.py
from fastapi import FastAPI
from dbt.cli.main import dbtRunner

app = FastAPI()
runner = dbtRunner()

@app.get('/health/dbt')
async def dbt_health():
    # Quick parse check
    result = runner.invoke(['parse'])
    return {
        'healthy': result.success,
        'manifest_nodes': len(result.result.parsed_data.nodes) if result.success else 0
    }

Pattern 2 — Conditional builds

def should_rebuild_marts():
    # Custom logic
    if is_business_hours():
        return False  # don't run during business hours
    if is_low_traffic():
        return True
    return False

if should_rebuild_marts():
    runner = dbtRunner()
    result = runner.invoke([
        'build',
        '--select', 'marts',
        '--target', 'prod'
    ])

Pattern 3 — Custom orchestration

class DbtOrchestrator:
    def __init__(self):
        self.runner = dbtRunner()
    
    def build_layer(self, layer, **kwargs):
        result = self.runner.invoke([
            'build',
            '--select', f'tag:{layer}',
            '--target', kwargs.get('target', 'prod')
        ])
        return result
    
    def cleanup(self):
        self.runner.invoke(['run-operation', 'cleanup_temp'])

orch = DbtOrchestrator()
orch.build_layer('staging')
orch.build_layer('marts', target='prod')
orch.cleanup()

Ключевые выводы

  1. dbtRunner API — official programmatic invocation с dbt-core 1.5+.
  2. runner.invoke([args]) — single method для всех commands. Returns dbtRunnerResult.
  3. dbtRunnerResult — success boolean, exception, result с iterable results array.
  4. NodeResult — type-safe per-node data (node, status, execution_time, adapter_response).
  5. All commands available: run, test, build, parse, compile, docs, source freshness, etc.
  6. Performance: ~25% faster than subprocess; faster для multiple invocations.
  7. Limitations: not thread-safe, single-process, error handling complexity.
  8. Use cases: orchestrator integration (Dagster, Prefect), embedded data apps, custom CLI tools, testing.
  9. When NOT to use: simple scripts, parallelism critical, cross-version compat, isolation needs.
  10. Production patterns: health checks, conditional builds, custom orchestration.
Проверка знанийKnowledge check
Senior хочет 'Dagster integration с dbt'. Use dbtRunner API. Architecture?
ОтветAnswer
**dbtRunner + Dagster pattern**:\n\n**Setup**:\n\n```bash\npip install dagster dagster-dbt\n```\n\ndagster-dbt uses dbtRunner internally.\n\n**Asset definitions**:\n\n```python\n# assets.py\nfrom dagster import asset, AssetExecutionContext\nfrom dagster_dbt import dbt_assets, DbtCliResource\nimport json\nfrom pathlib import Path\n\nDBT_PROJECT_PATH = Path('/path/to/dbt')\nDBT_MANIFEST = DBT_PROJECT_PATH / 'target' / 'manifest.json'\n\n@dbt_assets(manifest=DBT_MANIFEST)\ndef my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):\n yield from dbt.cli(['build'], context=context).stream()\n```\n\nGenerated assets — one per dbt model. Dagster materializes them.\n\n**Definitions**:\n\n```python\n# __init__.py\nfrom dagster import Definitions\nfrom dagster_dbt import DbtCliResource\n\ndefs = Definitions(\n assets=[my_dbt_assets],\n resources={\n 'dbt': DbtCliResource(\n project_dir=str(DBT_PROJECT_PATH),\n target='prod'\n )\n }\n)\n```\n\n**Run в Dagster UI**:\n\n'''bash\ndagster dev\n# Opens Dagit UI\n# Click 'Materialize' на assets\n'''\n\nDagster generates DAG from dbt manifest, materializes assets in order.\n\n**Custom invocation patterns**:\n\n**Pattern 1 — Selective builds**:\n\n```python\n@asset\ndef build_marts(context, dbt: DbtCliResource):\n result = dbt.cli(\n ['build', '--select', 'marts'],\n context=context\n ).wait()\n \n if not result.success:\n raise Exception('Marts failed')\n return result\n```\n\n**Pattern 2 — Multi-stage**:\n\n```python\n@asset\ndef foundational(context, dbt: DbtCliResource):\n return dbt.cli(['build', '--select', 'tag:foundational'], context=context).wait()\n\n@asset(deps=[foundational])\ndef marts(context, dbt: DbtCliResource):\n return dbt.cli(['build', '--select', 'marts'], context=context).wait()\n\n@asset(deps=[marts])\ndef metrics(context, dbt: DbtCliResource):\n return dbt.cli(['build', '--select', 'tag:metrics'], context=context).wait()\n```\n\nStaged pipeline.\n\n**Pattern 3 — Custom logic**:\n\n```python\n@asset\ndef conditional_run(context, dbt: DbtCliResource):\n # Custom business logic\n if is_business_hours():\n context.log.info('Skipping during business hours')\n return\n \n # Run only certain tags\n target_tags = decide_tags(context)\n \n result = dbt.cli([\n 'build',\n '--select', f'tag:{target_tags}',\n '--target', 'prod'\n ]).wait()\n \n if not result.success:\n # Custom alerting\n send_pagerduty(result.exception)\n raise Exception('Failed')\n```\n\n**Scheduling**:\n\n```python\nfrom dagster import schedule\n\n@schedule(cron_schedule='0 6 * * *') # 6am daily\ndef daily_build(context):\n return RunRequest(\n run_key=context.scheduled_execution_time.isoformat(),\n run_config={\n 'assets': ['my_dbt_assets']\n }\n )\n```\n\n**Observability**:\n\nDagster automatically:\n- Tracks asset materializations\n- Records execution times\n- Shows lineage\n- Alerts on failures\n\n```python\nfrom dagster import EventLogEntry\n\n# Query lineage\ncontext.log.info(f'Materialized {asset_key}')\n\n# Failures auto-recorded\n```\n\n**Cost integration**:\n\n```python\n@asset\ndef tracked_build(context, dbt: DbtCliResource):\n result = dbt.cli(['build', '--select', 'fct_orders']).wait()\n \n # Extract costs from result\n for r in result.result.results:\n if r.status == 'success':\n context.add_output_metadata({\n 'rows_affected': r.adapter_response.get('rows_affected'),\n 'execution_time': r.execution_time,\n 'bytes_processed': r.adapter_response.get('bytes_processed')\n })\n```\n\nObservability built-in.\n\n**Comparison alternatives**:\n\n**Airflow + Cosmos**:\n\n```python\n# airflow_dag.py\nfrom airflow import DAG\nfrom cosmos import DbtDag\n\nwith DAG('dbt_pipeline', schedule_interval='0 6 * * *') as dag:\n dbt_dag = DbtDag(\n project_config={'project_dir': '/path/to/dbt'},\n target='prod',\n on_failure=alert_pagerduty\n )\n```\n\nCosmos auto-generates Airflow tasks от manifest.\n\n**Prefect**:\n\n```python\nfrom prefect import flow\nfrom prefect_dbt.cli.commands import run_dbt_cli\n\n@flow\ndef dbt_pipeline():\n result = run_dbt_cli(\n command='build',\n project_dir='/path/to/dbt',\n target='prod'\n )\n if not result.success:\n raise Exception('Failed')\n'''\n\n**Comparison**:\n\n'''\n | Dagster + dbt | Airflow + Cosmos | Prefect + dbt\n--------------|---------------|-----------------|----------------\nMaturity | High | High | Medium\nUI | Asset-based | Task-based | Flow-based\nLineage | Native | Via plugins | Limited\nObservability | Strong | Strong | Decent\nLearning curve| Medium | Medium-High | Low\nCloud option | Dagster Cloud | Astronomer | Prefect Cloud\n'''\n\nAll viable. Choose based на team preference.\n\n**Production reality**: \n\n- Mid-size teams choose Dagster (clear asset model)\n- Large enterprises often Airflow (mature ecosystem)\n- New projects сейчас prefer Prefect (modern API)\n\n**dbtRunner** is foundation для все three. Each tool adds:\n\n- Scheduling\n- Lineage visualization\n- Alerting\n- Multi-pipeline orchestration\n- Cross-system integration\n\n**Production-grade**: orchestrator + dbt = proper data pipeline. dbtRunner API enables clean integration.
Проверка знанийKnowledge check
Health check service для dbt project. Endpoint /health/dbt returns status. Implementation considerations?
ОтветAnswer
**Health check service architecture**:\n\n```python\n# health_service.py\nfrom fastapi import FastAPI\nfrom dbt.cli.main import dbtRunner\nimport os\nimport time\nfrom datetime import datetime, timedelta\n\napp = FastAPI()\n\n# Global runner — initialize once\nrunner = dbtRunner()\n\n# Cache last check\n_last_check = {\n 'timestamp': None,\n 'status': None\n}\n\[email protected]('/health/dbt')\nasync def dbt_health():\n \"\"\"Quick health check — parse only.\"\"\"\n # Throttle expensive checks\n if _last_check['timestamp']:\n age = (datetime.now() - _last_check['timestamp']).total_seconds()\n if age менее 60: # cached for 1 minute\n return _last_check['status']\n \n # Run quick check\n t0 = time.time()\n try:\n result = runner.invoke(['parse'])\n duration = time.time() - t0\n \n if result.success:\n manifest = result.result.parsed_data\n status = {\n 'healthy': True,\n 'check_duration_seconds': duration,\n 'manifest_nodes': len(manifest.nodes),\n 'manifest_sources': len(manifest.sources),\n 'manifest_macros': len(manifest.macros),\n 'dbt_version': manifest.metadata.dbt_version,\n 'schema_version': manifest.metadata.dbt_schema_version,\n 'generated_at': manifest.metadata.generated_at.isoformat()\n }\n else:\n status = {\n 'healthy': False,\n 'error': str(result.exception),\n 'check_duration_seconds': duration\n }\n except Exception as e:\n status = {\n 'healthy': False,\n 'error': str(e)\n }\n \n # Update cache\n _last_check['timestamp'] = datetime.now()\n _last_check['status'] = status\n \n return status\n\[email protected]('/health/dbt/deep')\nasync def dbt_health_deep():\n \"\"\"Deeper check — parse + compile.\"\"\"\n result = runner.invoke(['compile'])\n \n return {\n 'healthy': result.success,\n 'errors': str(result.exception) if not result.success else None\n }\n'''\n\n**Considerations**:\n\n**1. Throttling**:\n\nParse takes ~1-3s. If health endpoint called 100/min, dbt overwhelmed.\n\n```python\n# Cache results\n_last_check = {'timestamp': None, 'status': None}\n\nif age менее 60: # cache 1 minute\n return cached\n```\n\n**2. Concurrency safety**:\n\ndbtRunner **not thread-safe**. FastAPI async — concurrent requests могут race.\n\n```python\nimport asyncio\n\n_lock = asyncio.Lock()\n\[email protected]('/health/dbt')\nasync def dbt_health():\n async with _lock:\n result = runner.invoke(['parse'])\n # ...\n```\n\nLock prevents simultaneous invocations.\n\nAlternatively — use separate process для each request (subprocess), но slower.\n\n**3. Service initialization**:\n\n```python\[email protected]_event('startup')\nasync def startup():\n # Pre-warm dbt\n global runner\n runner = dbtRunner()\n # Initial parse loads manifest\n runner.invoke(['parse'])\n'''\n\nFirst request after startup uses warm runner.\n\n**4. Failure modes**:\n\nDifferent issues могут break dbt:\n\n- File deleted/modified -> parse fails\n- Database down -> run fails (но parse OK)\n- Profiles.yml invalid -> all commands fail\n- Adapter not installed -> connection fails\n\nTier health checks:\n\n```python\[email protected]('/health/parse') # just parses\[email protected]('/health/compile') # parses + compiles (needs DB connection)\[email protected]('/health/test') # runs no-op test\n```\n\nUser hits appropriate level.\n\n**5. Resource isolation**:\n\nIf health endpoint blocks main process due to dbt:\n\n```python\n# Bad — health blocks all\nasync def dbt_health():\n result = runner.invoke(['parse'])\n return result\n\n# Better — separate process pool\nfrom concurrent.futures import ProcessPoolExecutor\nimport asyncio\n\nexecutor = ProcessPoolExecutor(max_workers=2)\n\nasync def dbt_health():\n loop = asyncio.get_event_loop()\n result = await loop.run_in_executor(executor, run_dbt_parse)\n return result\n\ndef run_dbt_parse():\n runner = dbtRunner() # new instance per process\n return runner.invoke(['parse'])\n```\n\n**6. Kubernetes readiness/liveness**:\n\n```yaml\n# k8s deployment\nlivenessProbe:\n httpGet:\n path: /health/dbt\n port: 8000\n initialDelaySeconds: 30\n periodSeconds: 60\n timeoutSeconds: 10\n failureThreshold: 3\n'''\n\n6-minute window before pod restart.\n\n**7. Monitoring**:\n\n```python\nimport time\n\[email protected]('/health/dbt')\nasync def dbt_health():\n t0 = time.time()\n result = runner.invoke(['parse'])\n duration = time.time() - t0\n \n # Send metric к Datadog/Prometheus\n metrics.gauge('dbt.health.duration_seconds', duration)\n metrics.increment('dbt.health.checks', tags={\n 'status': 'success' if result.success else 'failed'\n })\n \n return ...\n```\n\nTrack health check duration trend — increasing = bigger project или slowing.\n\n**8. Alerting**:\n\n```python\nif duration > 5:\n send_slack('Health check slow — manifest may be growing')\n\nif not result.success:\n send_pagerduty('dbt parse failing')\n'''\n\n**9. Detailed diagnostics**:\n\n```python\[email protected]('/health/dbt/detailed')\nasync def detailed():\n return {\n 'parse': await dbt_health(),\n 'connection': await test_connection(),\n 'recent_runs': get_recent_run_status(),\n 'failed_tests_last_24h': count_failures(hours=24),\n 'source_freshness': await check_freshness()\n }\n'''\n\nFull picture для investigation.\n\n**10. Connection management**:\n\nIf dbt holds DB connections, may exhaust connection pool:\n\n```python\n# Configure dbt's threads to prevent connection storms\[email protected]_event('startup')\nasync def startup():\n runner = dbtRunner()\n # Override default threads\n result = runner.invoke(['parse', '--threads', '4'])\n'''\n\n**Production-grade health check**:\n\n- Multiple endpoint tiers (parse, compile, test)\n- Throttled (caching)\n- Async-safe (lock or separate process)\n- Metric collection\n- Alerting на degradation\n- Resource isolated\n- K8s integrated\n\nFoundation для reliable dbt-powered services. Use sparingly — most apps don't need this level.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Choose между dbtRunner и subprocess для production usage. Trade-offs?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4