dbtRunner API: embedded dbt в Python с 1.5+
dbt-core 1.5+ ввёл официальный programmatic invocation API — dbtRunner. Раньше run dbt из Python требовал subprocess hacks. Сейчас можно invoke dbt directly как Python function, получить structured results, integrate dbt в orchestrators (Dagster, Prefect, Airflow), build embedded data tools.
В этом уроке — основы dbtRunner API, типичные patterns, result handling, и когда использовать vs subprocess.
Зачем programmatic API
До 1.5 запускали dbt из Python через subprocess:
import subprocess
result = subprocess.run(
['dbt', 'run', '--select', 'tag:nightly'],
capture_output=True,
text=True
)
print(result.stdout)
Проблемы:
- Long startup time (dbt-core imports ~3 seconds)
- Plain text parsing — fragile
- No type-safe error handling
- Restart dbt каждый call (no caching)
- Tedious к pass complex args
dbtRunner solves все это.
Базовое использование
from dbt.cli.main import dbtRunner, dbtRunnerResult
runner = dbtRunner()
result: dbtRunnerResult = runner.invoke(['run', '--select', 'tag:nightly'])
if result.success:
print(f'Run succeeded')
for r in result.result.results:
print(f' {r.node.unique_id}: {r.status}')
else:
print(f'Run failed: {result.exception}')
runner.invoke() — single method для всех dbt commands.
dbtRunnerResult структура
class dbtRunnerResult:
success: bool # overall success
exception: Exception | None # if crashed
result: RunExecutionResult | None # actual execution data
# Properties
@property
def stdout(self) -> str: ...
@property
def stderr(self) -> str: ...
Where RunExecutionResult (or similar per command):
class RunExecutionResult:
results: list[NodeResult]
elapsed_time: float
generated_at: datetime
args: dict
И каждый NodeResult:
class NodeResult:
node: ModelNode | TestNode | ... # parsed node object
status: NodeStatus # RunStatus, TestStatus
message: str | None
failures: int | None
execution_time: float
adapter_response: dict
thread_id: str
timing: list[TimingInfo]
Type-safe access к result data — без JSON parsing.
Modern type hints: PEP 585 / PEP 604Commands available
# Run models
result = runner.invoke(['run'])
# With selection
result = runner.invoke(['run', '--select', 'tag:nightly', '+model.proj.fct_orders'])
# Tests
result = runner.invoke(['test'])
# Build (run + test + seed + snapshot)
result = runner.invoke(['build'])
# Parse only
result = runner.invoke(['parse'])
# Compile only
result = runner.invoke(['compile'])
# Generate docs
result = runner.invoke(['docs', 'generate'])
# Source freshness
result = runner.invoke(['source', 'freshness'])
# Seeds
result = runner.invoke(['seed'])
# Snapshots
result = runner.invoke(['snapshot'])
# List
result = runner.invoke(['ls', '--select', 'tag:critical'])
All standard CLI commands available.
Passing arguments
Vars
result = runner.invoke([
'run',
'--vars', '{"my_var": "foo", "other_var": 123}'
])
Or через dict JSON-encoded:
import json
vars_dict = {'date_filter': '2026-05-19', 'threshold': 100}
result = runner.invoke([
'run',
'--vars', json.dumps(vars_dict)
])
Target
result = runner.invoke(['run', '--target', 'prod'])
Profile dir
result = runner.invoke(['run', '--profiles-dir', '/custom/path/'])
Project dir
result = runner.invoke(['run', '--project-dir', '/path/to/project'])
Full refresh
result = runner.invoke(['run', '--full-refresh'])
Threads
result = runner.invoke(['run', '--threads', '16'])
Iterating results
result = runner.invoke(['build'])
if not result.success:
print(f'Exception: {result.exception}')
sys.exit(1)
for node_result in result.result.results:
node = node_result.node
status = node_result.status
duration = node_result.execution_time
print(f'{node.unique_id}: {status} ({duration:.1f}s)')
if status == 'fail':
# Test failure
print(f' Failures: {node_result.failures}')
print(f' Message: {node_result.message}')
elif status == 'error':
# Execution error
print(f' Error: {node_result.message}')
Error handling
from dbt.exceptions import DbtRuntimeError, ParsingError
try:
result = runner.invoke(['run'])
if not result.success:
# Command-level failure
if result.exception:
# Exception during invocation
print(f'Exception: {result.exception}')
else:
# Some nodes failed
failures = [r for r in result.result.results if r.status in ('error', 'fail')]
print(f'{len(failures)} failures')
for f in failures:
print(f' {f.node.unique_id}: {f.message}')
except DbtRuntimeError as e:
# Catastrophic dbt error
print(f'dbt runtime error: {e}')
except Exception as e:
print(f'Unexpected error: {e}')
dbtRunnerResult.success is the primary indicator. If False, check exception или iterate result.results для node-level failures.
Use cases
Use case 1 — Orchestrator integration
# Dagster asset
from dagster import asset
from dbt.cli.main import dbtRunner
@asset
def dbt_marts(context):
runner = dbtRunner()
result = runner.invoke([
'build',
'--select', 'marts',
'--target', 'prod'
])
if not result.success:
raise Exception(f'dbt failed: {result.exception}')
return {
'rows_built': sum(
r.adapter_response.get('rows_affected', 0)
for r in result.result.results
if r.status == 'success'
)
}
Use case 2 — Custom CLI tool
# my_data_cli.py
import click
from dbt.cli.main import dbtRunner
runner = dbtRunner()
@click.group()
def cli():
pass
@cli.command()
@click.option('--team', required=True)
def build_team_models(team):
"""Build models для specific team."""
result = runner.invoke([
'build',
'--select', f'tag:{team}',
'--target', 'prod'
])
if not result.success:
raise click.Abort()
click.echo(f'Built {len(result.result.results)} models для {team}')
if __name__ == '__main__':
cli()
Use case 3 — Embedded dbt в data app
# data_app.py
from flask import Flask, request, jsonify
from dbt.cli.main import dbtRunner
app = Flask(__name__)
runner = dbtRunner()
@app.route('/dbt/run', methods=['POST'])
def trigger_run():
selector = request.json.get('selector', 'tag:nightly')
target = request.json.get('target', 'prod')
result = runner.invoke([
'run',
'--select', selector,
'--target', target
])
return jsonify({
'success': result.success,
'models_run': len([r for r in result.result.results if r.status == 'success']),
'failures': len([r for r in result.result.results if r.status != 'success'])
})
Use case 4 — Testing dbt projects
# tests/test_dbt_models.py
from dbt.cli.main import dbtRunner
def test_dbt_project_parses():
runner = dbtRunner()
result = runner.invoke(['parse'])
assert result.success, f'Parse failed: {result.exception}'
def test_critical_models_build():
runner = dbtRunner()
result = runner.invoke([
'build',
'--select', 'tag:critical',
'--target', 'ci'
])
assert result.success
for r in result.result.results:
if r.node.resource_type == 'test':
assert r.status == 'pass', f'Test failed: {r.node.unique_id}'
Use case 5 — Custom selectors / DSL
# Build runs based on business logic
def build_with_priority():
runner = dbtRunner()
# First: critical priority
result_critical = runner.invoke([
'build',
'--select', 'tag:priority_critical',
'--target', 'prod',
'--threads', '8'
])
if not result_critical.success:
send_alert('Critical models failed!')
return False
# Then: high priority
result_high = runner.invoke([
'build',
'--select', 'tag:priority_high',
'--target', 'prod',
'--threads', '16'
])
# Finally: rest in parallel
result_rest = runner.invoke([
'build',
'--exclude', 'tag:priority_critical', 'tag:priority_high',
'--target', 'prod',
'--threads', '32'
])
return all([result_critical.success, result_high.success, result_rest.success])
Workflow logic in Python; dbt does data work.
PythonOperator, TaskFlow и venv-вариантыPerformance characteristics
import time
# Subprocess approach (legacy)
t0 = time.time()
import subprocess
subprocess.run(['dbt', 'run'], capture_output=True)
subprocess_time = time.time() - t0
# dbtRunner approach
runner = dbtRunner()
t0 = time.time()
result = runner.invoke(['run'])
runner_time = time.time() - t0
print(f'Subprocess: {subprocess_time:.1f}s')
print(f'dbtRunner: {runner_time:.1f}s')
# Subsequent invocations с same runner
t0 = time.time()
result = runner.invoke(['run', '--select', 'state:modified+'])
runner_second_time = time.time() - t0
print(f'Second runner.invoke: {runner_second_time:.1f}s')
# Faster — process already initialized
Typical results:
Subprocess: 12.5s (includes Python startup + dbt import)
dbtRunner: 9.2s (just dbt logic, no Python startup)
Second dbtRunner: 7.8s (manifest cached)
dbtRunner ~25% faster, более если multiple invocations.
Stdout / stderr
result = runner.invoke(['run'])
# Captured output
print(result.stdout) # all log output
print(result.stderr) # errors
# Note: by default dbt logs к stdout (Python's print)
# dbtRunner captures both
For custom logging:
import logging
# Setup before invoke
logging.basicConfig(level=logging.INFO)
result = runner.invoke([
'run',
'--log-format', 'json', # structured logs
'--log-level', 'info'
])
# Parse JSON logs из result.stdout
for line in result.stdout.split('\n'):
if line.strip():
try:
log = json.loads(line)
print(log)
except json.JSONDecodeError:
pass
Limitations
1. Single-threaded by default
# Each .invoke() blocks
result1 = runner.invoke(['run', '--select', 'foo'])
result2 = runner.invoke(['run', '--select', 'bar'])
# Sequential, не parallel
For parallelism — multiple subprocesses (see lesson 03).
2. State sharing between invocations
# Same runner — может cache manifest
runner = dbtRunner()
runner.invoke(['parse']) # builds manifest
runner.invoke(['run']) # may reuse parse result
# But changes к files between invocations triggers re-parse
Mostly internal — dbt-core handles invalidation.
3. Thread safety
dbtRunner not thread-safe:
# DON'T DO THIS
import threading
def parallel_run(selector):
result = runner.invoke(['run', '--select', selector])
return result
threads = [threading.Thread(target=parallel_run, args=(s,)) for s in selectors]
# Race conditions, undefined behavior
For parallel — separate processes (see lesson 03).
4. Error handling complexity
# Multiple failure modes
result = runner.invoke(['run'])
if result.exception:
# dbtRunner-level exception
handle_critical(result.exception)
elif not result.success:
# Some failures
failures = [r for r in result.result.results if r.status != 'success']
handle_failures(failures)
else:
# All succeeded
pass
Pre-loaded manifest
Performance optimization — pre-load manifest, reuse across invocations:
# Load once
runner = dbtRunner()
parse_result = runner.invoke(['parse'])
# Subsequent invocations don't re-parse
result1 = runner.invoke(['run', '--select', 'fct_orders'])
# Uses cached manifest from parse_result
# Detailed pattern в lesson 02
Args dictionary
Programmatic way к pass args:
from dbt.cli.main import dbtRunner
runner = dbtRunner()
# CLI-style
result = runner.invoke(['run', '--select', 'fct_orders'])
# Pythonic alternative (varies by dbt version)
result = runner.invoke(
args=['run'],
select=['fct_orders'],
target='prod',
threads=16
)
CLI-style most documented и stable.
When NOT to use dbtRunner
1. Simple scripts
# Don't need Python — just use CLI
dbt build --select tag:nightly --target prod
If just running dbt commands, CLI simpler.
2. Parallelism critical
# subprocess-based для parallelism
from concurrent.futures import ProcessPoolExecutor
def run_chunk(selector):
import subprocess
return subprocess.run(['dbt', 'run', '--select', selector])
with ProcessPoolExecutor(max_workers=4) as ex:
results = ex.map(run_chunk, selectors)
subprocess separated processes — true parallelism.
3. Cross-version compatibility
# If supporting multiple dbt versions, subprocess agnostic
# dbtRunner API may change между major versions
Pin к specific dbt-core если using dbtRunner.
4. Isolation requirements
# If dbt crash shouldn't crash parent process
# subprocess isolates
dbtRunner shares process — crashes propagate.
Production patterns
Pattern 1 — Health check service
# health_service.py
from fastapi import FastAPI
from dbt.cli.main import dbtRunner
app = FastAPI()
runner = dbtRunner()
@app.get('/health/dbt')
async def dbt_health():
# Quick parse check
result = runner.invoke(['parse'])
return {
'healthy': result.success,
'manifest_nodes': len(result.result.parsed_data.nodes) if result.success else 0
}
Pattern 2 — Conditional builds
def should_rebuild_marts():
# Custom logic
if is_business_hours():
return False # don't run during business hours
if is_low_traffic():
return True
return False
if should_rebuild_marts():
runner = dbtRunner()
result = runner.invoke([
'build',
'--select', 'marts',
'--target', 'prod'
])
Pattern 3 — Custom orchestration
class DbtOrchestrator:
def __init__(self):
self.runner = dbtRunner()
def build_layer(self, layer, **kwargs):
result = self.runner.invoke([
'build',
'--select', f'tag:{layer}',
'--target', kwargs.get('target', 'prod')
])
return result
def cleanup(self):
self.runner.invoke(['run-operation', 'cleanup_temp'])
orch = DbtOrchestrator()
orch.build_layer('staging')
orch.build_layer('marts', target='prod')
orch.cleanup()
Ключевые выводы
- dbtRunner API — official programmatic invocation с dbt-core 1.5+.
runner.invoke([args])— single method для всех commands. ReturnsdbtRunnerResult.- dbtRunnerResult — success boolean, exception, result с iterable results array.
- NodeResult — type-safe per-node data (node, status, execution_time, adapter_response).
- All commands available: run, test, build, parse, compile, docs, source freshness, etc.
- Performance: ~25% faster than subprocess; faster для multiple invocations.
- Limitations: not thread-safe, single-process, error handling complexity.
- Use cases: orchestrator integration (Dagster, Prefect), embedded data apps, custom CLI tools, testing.
- When NOT to use: simple scripts, parallelism critical, cross-version compat, isolation needs.
- Production patterns: health checks, conditional builds, custom orchestration.