Pre-loaded Manifest: оптимизация через избегание re-parsing
dbt parse takes 1-10+ seconds depending on project size. Если вызываете dbtRunner многократно, каждый invocation re-parses (partial parse helps but still has overhead). Pre-loaded Manifest pattern — load once, pass к multiple invocations. Critical для performance в orchestrators и long-running services.
В этом уроке — pre-loaded Manifest API, integration patterns, when to use, и gotchas.
PythonOperator, TaskFlow и venv-варианты
Проблема
from dbt.cli.main import dbtRunner
runner = dbtRunner()
# Each invocation re-parses
result1 = runner.invoke(['run', '--select', 'fct_orders'])
# Parse: 3s, run: 5s = 8s total
result2 = runner.invoke(['run', '--select', 'fct_revenue'])
# Parse: 3s (again!), run: 4s = 7s total
result3 = runner.invoke(['run', '--select', 'dim_users'])
# Parse: 3s, run: 3s = 6s total
# Total: 21 seconds (9s wasted re-parsing)
Partial parse helps если files unchanged, но still has overhead.
Pre-loaded Manifest API
from dbt.cli.main import dbtRunner
import json
# Step 1 — parse once, capture manifest
runner = dbtRunner()
parse_result = runner.invoke(['parse'])
if not parse_result.success:
raise Exception('Initial parse failed')
# Step 2 — extract manifest object
manifest = parse_result.result
# Step 3 — invoke subsequent commands с pre-loaded manifest
runner_with_manifest = dbtRunner(manifest=manifest)
result1 = runner_with_manifest.invoke(['run', '--select', 'fct_orders'])
# No re-parsing! Just run logic.
result2 = runner_with_manifest.invoke(['run', '--select', 'fct_revenue'])
# Still no re-parse.
result3 = runner_with_manifest.invoke(['run', '--select', 'dim_users'])
# Same.
Now:
- Parse: 3s once
- run: 5+4+3 = 12s
- Total: 15s (vs 21s previously)
29% speedup за skipping re-parses.
Detailed pattern
from dbt.cli.main import dbtRunner
from dbt.contracts.graph.manifest import Manifest
class CachedDbtRunner:
def __init__(self):
self._runner = dbtRunner()
self._manifest: Manifest | None = None
self._last_parsed_at: float | None = None
def _ensure_manifest(self):
\"\"\"Parse manifest if not cached.\"\"\"
import time
if self._manifest is None:
parse_result = self._runner.invoke(['parse'])
if not parse_result.success:
raise Exception(f'Parse failed: {parse_result.exception}')
self._manifest = parse_result.result
self._last_parsed_at = time.time()
def invoke(self, args):
self._ensure_manifest()
runner_with_manifest = dbtRunner(manifest=self._manifest)
return runner_with_manifest.invoke(args)
def invalidate(self):
\"\"\"Force re-parse on next invoke.\"\"\"
self._manifest = None
self._last_parsed_at = None
# Usage
cached_runner = CachedDbtRunner()
result1 = cached_runner.invoke(['run', '--select', 'fct_orders'])
result2 = cached_runner.invoke(['run', '--select', 'fct_revenue'])
# Manifest cached, no re-parsing
# After file changes:
cached_runner.invalidate()
result3 = cached_runner.invoke(['run', '--select', 'new_model'])
# Re-parses
Cache invalidation
Pre-loaded manifest becomes stale if dbt files change:
- Model SQL edited
- YAML configs changed
- Macros modified
- packages.yml changed
- profiles.yml changed
- dbt_project.yml changed
Tool must detect changes:
import os
from pathlib import Path
from dbt.cli.main import dbtRunner
class FileWatcherRunner:
def __init__(self, project_path='.'):
self._project_path = Path(project_path)
self._runner = dbtRunner()
self._manifest = None
self._file_mtimes = {}
def _scan_files(self) -> dict[str, float]:
\"\"\"Get current mtimes of all dbt files.\"\"\"
mtimes = {}
for ext in ['*.sql', '*.yml', '*.yaml', '*.py']:
for f in self._project_path.rglob(ext):
if 'target' in f.parts or 'logs' in f.parts:
continue
mtimes[str(f)] = f.stat().st_mtime
# Also dbt_project.yml, profiles.yml
for special in ['dbt_project.yml', 'packages.yml']:
path = self._project_path / special
if path.exists():
mtimes[str(path)] = path.stat().st_mtime
return mtimes
def _files_changed(self) -> bool:
current = self._scan_files()
return current != self._file_mtimes
def _refresh_manifest(self):
parse_result = self._runner.invoke(['parse'])
if parse_result.success:
self._manifest = parse_result.result
self._file_mtimes = self._scan_files()
else:
raise Exception('Parse failed')
def invoke(self, args):
if self._manifest is None or self._files_changed():
self._refresh_manifest()
runner = dbtRunner(manifest=self._manifest)
return runner.invoke(args)
# Usage
runner = FileWatcherRunner('/path/to/dbt')
result1 = runner.invoke(['run']) # parses
result2 = runner.invoke(['run']) # uses cache
# After file edit:
result3 = runner.invoke(['run']) # detects change, re-parses
Use cases
Use case 1 — REST API server
from fastapi import FastAPI
from dbt.cli.main import dbtRunner
app = FastAPI()
# Pre-load manifest at startup
runner = dbtRunner()
parse_result = runner.invoke(['parse'])
manifest = parse_result.result
@app.post('/run')
async def run_models(selector: str):
# Reuse pre-loaded manifest
runner_with_manifest = dbtRunner(manifest=manifest)
result = runner_with_manifest.invoke([
'run',
'--select', selector
])
return {
'success': result.success,
'duration': sum(r.execution_time for r in result.result.results)
}
Subsequent requests skip parse. 10-15s vs 13-18s per request.
Use case 2 — Batch processing
selectors = ['marts.finance', 'marts.sales', 'marts.marketing']
# Single parse
runner = dbtRunner()
parse_result = runner.invoke(['parse'])
manifest = parse_result.result
# Run each batch с pre-loaded manifest
results = []
for selector in selectors:
runner_cached = dbtRunner(manifest=manifest)
result = runner_cached.invoke(['run', '--select', selector])
results.append(result)
# Total: 1 parse + 3 runs = much faster than 3× (parse + run)
Use case 3 — Testing dbt projects
# tests/conftest.py
import pytest
from dbt.cli.main import dbtRunner
@pytest.fixture(scope='session')
def dbt_manifest():
\"\"\"Pre-parse manifest для all tests.\"\"\"
runner = dbtRunner()
parse_result = runner.invoke(['parse'])
return parse_result.result
# Each test uses cached manifest
def test_critical_models_pass(dbt_manifest):
runner = dbtRunner(manifest=dbt_manifest)
result = runner.invoke(['test', '--select', 'tag:critical'])
assert result.success
def test_marts_build(dbt_manifest):
runner = dbtRunner(manifest=dbt_manifest)
result = runner.invoke(['build', '--select', 'marts'])
assert result.success
Test suite: 3 tests × 1 minute = 3 minutes (with parsing). With cached: 1 parse + 3 × (no parse) = 1 minute.
Use case 4 — Stateful applications
# data_app.py
class DataApp:
def __init__(self):
self.runner = dbtRunner()
parse_result = self.runner.invoke(['parse'])
self.manifest = parse_result.result
def run_user_request(self, user_id, selector):
runner = dbtRunner(manifest=self.manifest)
return runner.invoke(['run', '--select', selector])
def reload(self):
\"\"\"Reload manifest когда files changed.\"\"\"
parse_result = self.runner.invoke(['parse'])
if parse_result.success:
self.manifest = parse_result.result
Multiple runners share manifest
# Single shared manifest
parse_result = dbtRunner().invoke(['parse'])
shared_manifest = parse_result.result
# Multiple workers
def worker(work_id):
runner = dbtRunner(manifest=shared_manifest)
return runner.invoke(['run', '--select', f'tag:work_{work_id}'])
# Each worker reuses manifest
results = [worker(i) for i in range(5)]
Note: still single-threaded sequential due к dbtRunner not thread-safe. For parallelism — separate processes (lesson 03).
Performance benchmark
import time
from dbt.cli.main import dbtRunner
# Test setup — typical project
NUM_INVOCATIONS = 10
# Method 1 — fresh runner каждый раз
t0 = time.time()
for _ in range(NUM_INVOCATIONS):
runner = dbtRunner()
result = runner.invoke(['compile', '--select', 'fct_orders'])
method1_time = time.time() - t0
# Method 2 — same runner (uses internal cache)
t0 = time.time()
runner = dbtRunner()
for _ in range(NUM_INVOCATIONS):
result = runner.invoke(['compile', '--select', 'fct_orders'])
method2_time = time.time() - t0
# Method 3 — pre-loaded manifest
parse_result = dbtRunner().invoke(['parse'])
manifest = parse_result.result
t0 = time.time()
for _ in range(NUM_INVOCATIONS):
runner = dbtRunner(manifest=manifest)
result = runner.invoke(['compile', '--select', 'fct_orders'])
method3_time = time.time() - t0
print(f'Method 1 (fresh runner): {method1_time:.1f}s')
print(f'Method 2 (cached runner): {method2_time:.1f}s')
print(f'Method 3 (pre-loaded manifest): {method3_time:.1f}s')
Real numbers (project с 500 models):
Method 1 (fresh runner): 35.2s # 3.5s/invocation
Method 2 (cached runner): 12.5s # 1.3s/invocation
Method 3 (pre-loaded manifest): 8.7s # 0.9s/invocation
Method 3 fastest, especially для repeated invocations.
Combined с file watching
import time
import threading
from pathlib import Path
class WatchedRunner:
def __init__(self, project_path):
self.project_path = Path(project_path)
self.manifest = None
self.lock = threading.Lock()
self._reload()
# Start watcher thread
self.watcher = threading.Thread(target=self._watch_loop, daemon=True)
self.watcher.start()
def _reload(self):
runner = dbtRunner()
result = runner.invoke(['parse'])
if result.success:
with self.lock:
self.manifest = result.result
def _watch_loop(self):
last_check_mtimes = self._get_mtimes()
while True:
time.sleep(2) # check every 2 seconds
current_mtimes = self._get_mtimes()
if current_mtimes != last_check_mtimes:
print('Files changed, reloading manifest...')
self._reload()
last_check_mtimes = current_mtimes
def _get_mtimes(self):
mtimes = {}
for ext in ['*.sql', '*.yml']:
for f in self.project_path.rglob(ext):
if 'target' not in f.parts:
mtimes[str(f)] = f.stat().st_mtime
return mtimes
def invoke(self, args):
with self.lock:
manifest = self.manifest
runner = dbtRunner(manifest=manifest)
return runner.invoke(args)
# Usage
runner = WatchedRunner('/path/to/dbt')
# Background thread keeps manifest fresh
# User code uses without worry о cache
while True:
user_input = input('Select: ')
result = runner.invoke(['run', '--select', user_input])
When pre-loaded manifest doesn’t help
1. Single invocation
# Just one command — pre-loading doesn't help
runner = dbtRunner()
result = runner.invoke(['run'])
# Parse happens anyway. No reuse.
2. Long delays между invocations
# Files change между invocations
runner_with_manifest = dbtRunner(manifest=manifest)
result1 = runner_with_manifest.invoke(['run'])
# 1 hour later — code likely changed
result2 = runner_with_manifest.invoke(['run'])
# Manifest stale, errors или wrong behavior
Use cache invalidation или skip pre-loading.
3. Different projects
# Pre-loaded manifest is project-specific
# If multiple projects, separate manifests
4. Different targets
# Manifest target-specific (configs resolve per target)
# Pre-loaded для dev target — wrong для prod runs
parse_result = runner.invoke(['parse', '--target', 'dev'])
# Don't use this for prod runs
result = runner.invoke(['run', '--target', 'prod']) # may produce wrong configs
If multi-target, separate manifests per target.
Gotchas
1. Manifest schema migration
parse_result = dbtRunner().invoke(['parse'])
manifest = parse_result.result
# Upgrade dbt-core
# pip install --upgrade dbt-core
# Try к use cached manifest
runner = dbtRunner(manifest=manifest)
# May fail — internal schema differs
If dbt-core upgraded, reload manifest.
2. Cross-process serialization
import pickle
# Serialize manifest для passing к subprocess
with open('manifest.pkl', 'wb') as f:
pickle.dump(manifest, f)
# Other process
with open('manifest.pkl', 'rb') as f:
manifest = pickle.load(f)
runner = dbtRunner(manifest=manifest)
# Pickle uses Python internals — may break across dbt versions
Pickle works для same dbt-core version. Cross-version risky.
3. Memory usage
# Large manifest takes memory
# 5000 models -> ~500MB-1GB manifest object
Process running pre-loaded manifest needs memory headroom.
4. Implicit re-parsing
# Some commands trigger re-parse anyway
result = runner.invoke(['parse', '--no-partial-parse']) # forces re-parse
result = runner.invoke(['clean']) # may invalidate manifest
Production patterns
Pattern 1 — Application initialization
# app.py
from fastapi import FastAPI
from dbt.cli.main import dbtRunner
app = FastAPI()
manifest = None
runner = None
@app.on_event('startup')
async def startup():
global manifest, runner
runner = dbtRunner()
parse_result = runner.invoke(['parse'])
if not parse_result.success:
raise Exception('Cannot start: dbt parse failed')
manifest = parse_result.result
@app.post('/run')
async def run(selector: str):
runner_with_manifest = dbtRunner(manifest=manifest)
result = runner_with_manifest.invoke(['run', '--select', selector])
return {'success': result.success}
Pattern 2 — Long-running daemon
# daemon.py
import time
from dbt.cli.main import dbtRunner
class DbtDaemon:
def __init__(self):
self.runner = dbtRunner()
self._refresh_manifest()
def _refresh_manifest(self):
result = self.runner.invoke(['parse'])
self.manifest = result.result if result.success else None
def run_periodic_task(self):
while True:
if not self.manifest:
self._refresh_manifest()
try:
runner = dbtRunner(manifest=self.manifest)
result = runner.invoke(['run', '--select', 'tag:scheduled'])
process_result(result)
except Exception as e:
# Manifest may be stale
self._refresh_manifest()
time.sleep(3600) # 1 hour
daemon = DbtDaemon()
daemon.run_periodic_task()
Pattern 3 — Test harness
# pytest fixtures
@pytest.fixture(scope='session')
def dbt_manifest():
runner = dbtRunner()
return runner.invoke(['parse']).result
@pytest.fixture
def dbt_runner(dbt_manifest):
return dbtRunner(manifest=dbt_manifest)
# Tests
def test_a(dbt_runner):
result = dbt_runner.invoke(['build', '--select', 'a'])
assert result.success
def test_b(dbt_runner):
result = dbt_runner.invoke(['build', '--select', 'b'])
assert result.success
100 tests benefit if each uses pre-loaded manifest.
Ключевые выводы
- Pre-loaded Manifest pattern — load manifest once, pass к subsequent dbtRunner invocations.
- Performance gain: 20-40% faster для repeated invocations.
- API:
dbtRunner(manifest=manifest_object).invoke([args]). - Cache invalidation needed — detect file changes, re-parse when needed.
- Use cases: REST API servers, batch processing, test harnesses, long-running daemons.
- When doesn’t help: single invocations, long gaps между calls, different projects/targets.
- Gotchas: schema migration на dbt upgrade, cross-process serialization, memory usage.
- Production patterns: app initialization at startup, file watcher background thread, test session fixtures.
- Combine с invalidation: file watcher for staleness detection.
- Trade-off: complexity vs performance — only worth it для frequent invocations.