Learning Platform
Глоссарий Troubleshooting
Урок 07.02 · 22 мин
Продвинутый
manifestperformancedbtRunner

Pre-loaded Manifest: оптимизация через избегание re-parsing

dbt parse takes 1-10+ seconds depending on project size. Если вызываете dbtRunner многократно, каждый invocation re-parses (partial parse helps but still has overhead). Pre-loaded Manifest pattern — load once, pass к multiple invocations. Critical для performance в orchestrators и long-running services.

В этом уроке — pre-loaded Manifest API, integration patterns, when to use, и gotchas.


PythonOperator, TaskFlow и venv-варианты

Проблема

from dbt.cli.main import dbtRunner

runner = dbtRunner()

# Each invocation re-parses
result1 = runner.invoke(['run', '--select', 'fct_orders'])
# Parse: 3s, run: 5s = 8s total

result2 = runner.invoke(['run', '--select', 'fct_revenue'])
# Parse: 3s (again!), run: 4s = 7s total

result3 = runner.invoke(['run', '--select', 'dim_users'])
# Parse: 3s, run: 3s = 6s total

# Total: 21 seconds (9s wasted re-parsing)

Partial parse helps если files unchanged, но still has overhead.


Pre-loaded Manifest API

from dbt.cli.main import dbtRunner
import json

# Step 1 — parse once, capture manifest
runner = dbtRunner()
parse_result = runner.invoke(['parse'])

if not parse_result.success:
    raise Exception('Initial parse failed')

# Step 2 — extract manifest object
manifest = parse_result.result

# Step 3 — invoke subsequent commands с pre-loaded manifest
runner_with_manifest = dbtRunner(manifest=manifest)

result1 = runner_with_manifest.invoke(['run', '--select', 'fct_orders'])
# No re-parsing! Just run logic.

result2 = runner_with_manifest.invoke(['run', '--select', 'fct_revenue'])
# Still no re-parse.

result3 = runner_with_manifest.invoke(['run', '--select', 'dim_users'])
# Same.

Now:

  • Parse: 3s once
  • run: 5+4+3 = 12s
  • Total: 15s (vs 21s previously)

29% speedup за skipping re-parses.


Detailed pattern

from dbt.cli.main import dbtRunner
from dbt.contracts.graph.manifest import Manifest

class CachedDbtRunner:
    def __init__(self):
        self._runner = dbtRunner()
        self._manifest: Manifest | None = None
        self._last_parsed_at: float | None = None
    
    def _ensure_manifest(self):
        \"\"\"Parse manifest if not cached.\"\"\"
        import time
        
        if self._manifest is None:
            parse_result = self._runner.invoke(['parse'])
            if not parse_result.success:
                raise Exception(f'Parse failed: {parse_result.exception}')
            self._manifest = parse_result.result
            self._last_parsed_at = time.time()
    
    def invoke(self, args):
        self._ensure_manifest()
        runner_with_manifest = dbtRunner(manifest=self._manifest)
        return runner_with_manifest.invoke(args)
    
    def invalidate(self):
        \"\"\"Force re-parse on next invoke.\"\"\"
        self._manifest = None
        self._last_parsed_at = None

# Usage
cached_runner = CachedDbtRunner()

result1 = cached_runner.invoke(['run', '--select', 'fct_orders'])
result2 = cached_runner.invoke(['run', '--select', 'fct_revenue'])

# Manifest cached, no re-parsing

# After file changes:
cached_runner.invalidate()
result3 = cached_runner.invoke(['run', '--select', 'new_model'])
# Re-parses

Cache invalidation

Pre-loaded manifest becomes stale if dbt files change:

  • Model SQL edited
  • YAML configs changed
  • Macros modified
  • packages.yml changed
  • profiles.yml changed
  • dbt_project.yml changed

Tool must detect changes:

import os
from pathlib import Path
from dbt.cli.main import dbtRunner

class FileWatcherRunner:
    def __init__(self, project_path='.'):
        self._project_path = Path(project_path)
        self._runner = dbtRunner()
        self._manifest = None
        self._file_mtimes = {}
    
    def _scan_files(self) -> dict[str, float]:
        \"\"\"Get current mtimes of all dbt files.\"\"\"
        mtimes = {}
        for ext in ['*.sql', '*.yml', '*.yaml', '*.py']:
            for f in self._project_path.rglob(ext):
                if 'target' in f.parts or 'logs' in f.parts:
                    continue
                mtimes[str(f)] = f.stat().st_mtime
        
        # Also dbt_project.yml, profiles.yml
        for special in ['dbt_project.yml', 'packages.yml']:
            path = self._project_path / special
            if path.exists():
                mtimes[str(path)] = path.stat().st_mtime
        
        return mtimes
    
    def _files_changed(self) -> bool:
        current = self._scan_files()
        return current != self._file_mtimes
    
    def _refresh_manifest(self):
        parse_result = self._runner.invoke(['parse'])
        if parse_result.success:
            self._manifest = parse_result.result
            self._file_mtimes = self._scan_files()
        else:
            raise Exception('Parse failed')
    
    def invoke(self, args):
        if self._manifest is None or self._files_changed():
            self._refresh_manifest()
        
        runner = dbtRunner(manifest=self._manifest)
        return runner.invoke(args)

# Usage
runner = FileWatcherRunner('/path/to/dbt')

result1 = runner.invoke(['run'])  # parses
result2 = runner.invoke(['run'])  # uses cache

# After file edit:
result3 = runner.invoke(['run'])  # detects change, re-parses

Use cases

Use case 1 — REST API server

from fastapi import FastAPI
from dbt.cli.main import dbtRunner

app = FastAPI()

# Pre-load manifest at startup
runner = dbtRunner()
parse_result = runner.invoke(['parse'])
manifest = parse_result.result

@app.post('/run')
async def run_models(selector: str):
    # Reuse pre-loaded manifest
    runner_with_manifest = dbtRunner(manifest=manifest)
    result = runner_with_manifest.invoke([
        'run',
        '--select', selector
    ])
    return {
        'success': result.success,
        'duration': sum(r.execution_time for r in result.result.results)
    }

Subsequent requests skip parse. 10-15s vs 13-18s per request.

Use case 2 — Batch processing

selectors = ['marts.finance', 'marts.sales', 'marts.marketing']

# Single parse
runner = dbtRunner()
parse_result = runner.invoke(['parse'])
manifest = parse_result.result

# Run each batch с pre-loaded manifest
results = []
for selector in selectors:
    runner_cached = dbtRunner(manifest=manifest)
    result = runner_cached.invoke(['run', '--select', selector])
    results.append(result)

# Total: 1 parse + 3 runs = much faster than 3× (parse + run)

Use case 3 — Testing dbt projects

# tests/conftest.py
import pytest
from dbt.cli.main import dbtRunner

@pytest.fixture(scope='session')
def dbt_manifest():
    \"\"\"Pre-parse manifest для all tests.\"\"\"
    runner = dbtRunner()
    parse_result = runner.invoke(['parse'])
    return parse_result.result

# Each test uses cached manifest
def test_critical_models_pass(dbt_manifest):
    runner = dbtRunner(manifest=dbt_manifest)
    result = runner.invoke(['test', '--select', 'tag:critical'])
    assert result.success

def test_marts_build(dbt_manifest):
    runner = dbtRunner(manifest=dbt_manifest)
    result = runner.invoke(['build', '--select', 'marts'])
    assert result.success

Test suite: 3 tests × 1 minute = 3 minutes (with parsing). With cached: 1 parse + 3 × (no parse) = 1 minute.

Use case 4 — Stateful applications

# data_app.py
class DataApp:
    def __init__(self):
        self.runner = dbtRunner()
        parse_result = self.runner.invoke(['parse'])
        self.manifest = parse_result.result
    
    def run_user_request(self, user_id, selector):
        runner = dbtRunner(manifest=self.manifest)
        return runner.invoke(['run', '--select', selector])
    
    def reload(self):
        \"\"\"Reload manifest когда files changed.\"\"\"
        parse_result = self.runner.invoke(['parse'])
        if parse_result.success:
            self.manifest = parse_result.result

Multiple runners share manifest

# Single shared manifest
parse_result = dbtRunner().invoke(['parse'])
shared_manifest = parse_result.result

# Multiple workers
def worker(work_id):
    runner = dbtRunner(manifest=shared_manifest)
    return runner.invoke(['run', '--select', f'tag:work_{work_id}'])

# Each worker reuses manifest
results = [worker(i) for i in range(5)]

Note: still single-threaded sequential due к dbtRunner not thread-safe. For parallelism — separate processes (lesson 03).


Performance benchmark

import time
from dbt.cli.main import dbtRunner

# Test setup — typical project
NUM_INVOCATIONS = 10

# Method 1 — fresh runner каждый раз
t0 = time.time()
for _ in range(NUM_INVOCATIONS):
    runner = dbtRunner()
    result = runner.invoke(['compile', '--select', 'fct_orders'])
method1_time = time.time() - t0

# Method 2 — same runner (uses internal cache)
t0 = time.time()
runner = dbtRunner()
for _ in range(NUM_INVOCATIONS):
    result = runner.invoke(['compile', '--select', 'fct_orders'])
method2_time = time.time() - t0

# Method 3 — pre-loaded manifest
parse_result = dbtRunner().invoke(['parse'])
manifest = parse_result.result

t0 = time.time()
for _ in range(NUM_INVOCATIONS):
    runner = dbtRunner(manifest=manifest)
    result = runner.invoke(['compile', '--select', 'fct_orders'])
method3_time = time.time() - t0

print(f'Method 1 (fresh runner): {method1_time:.1f}s')
print(f'Method 2 (cached runner): {method2_time:.1f}s')
print(f'Method 3 (pre-loaded manifest): {method3_time:.1f}s')

Real numbers (project с 500 models):

Method 1 (fresh runner): 35.2s  # 3.5s/invocation
Method 2 (cached runner): 12.5s  # 1.3s/invocation
Method 3 (pre-loaded manifest): 8.7s  # 0.9s/invocation

Method 3 fastest, especially для repeated invocations.


Combined с file watching

import time
import threading
from pathlib import Path

class WatchedRunner:
    def __init__(self, project_path):
        self.project_path = Path(project_path)
        self.manifest = None
        self.lock = threading.Lock()
        self._reload()
        
        # Start watcher thread
        self.watcher = threading.Thread(target=self._watch_loop, daemon=True)
        self.watcher.start()
    
    def _reload(self):
        runner = dbtRunner()
        result = runner.invoke(['parse'])
        if result.success:
            with self.lock:
                self.manifest = result.result
    
    def _watch_loop(self):
        last_check_mtimes = self._get_mtimes()
        while True:
            time.sleep(2)  # check every 2 seconds
            current_mtimes = self._get_mtimes()
            if current_mtimes != last_check_mtimes:
                print('Files changed, reloading manifest...')
                self._reload()
                last_check_mtimes = current_mtimes
    
    def _get_mtimes(self):
        mtimes = {}
        for ext in ['*.sql', '*.yml']:
            for f in self.project_path.rglob(ext):
                if 'target' not in f.parts:
                    mtimes[str(f)] = f.stat().st_mtime
        return mtimes
    
    def invoke(self, args):
        with self.lock:
            manifest = self.manifest
        runner = dbtRunner(manifest=manifest)
        return runner.invoke(args)

# Usage
runner = WatchedRunner('/path/to/dbt')
# Background thread keeps manifest fresh

# User code uses without worry о cache
while True:
    user_input = input('Select: ')
    result = runner.invoke(['run', '--select', user_input])

When pre-loaded manifest doesn’t help

1. Single invocation

# Just one command — pre-loading doesn't help
runner = dbtRunner()
result = runner.invoke(['run'])
# Parse happens anyway. No reuse.

2. Long delays между invocations

# Files change между invocations
runner_with_manifest = dbtRunner(manifest=manifest)
result1 = runner_with_manifest.invoke(['run'])

# 1 hour later — code likely changed
result2 = runner_with_manifest.invoke(['run'])
# Manifest stale, errors или wrong behavior

Use cache invalidation или skip pre-loading.

3. Different projects

# Pre-loaded manifest is project-specific
# If multiple projects, separate manifests

4. Different targets

# Manifest target-specific (configs resolve per target)
# Pre-loaded для dev target — wrong для prod runs
parse_result = runner.invoke(['parse', '--target', 'dev'])
# Don't use this for prod runs
result = runner.invoke(['run', '--target', 'prod'])  # may produce wrong configs

If multi-target, separate manifests per target.


Gotchas

1. Manifest schema migration

parse_result = dbtRunner().invoke(['parse'])
manifest = parse_result.result

# Upgrade dbt-core
# pip install --upgrade dbt-core

# Try к use cached manifest
runner = dbtRunner(manifest=manifest)
# May fail — internal schema differs

If dbt-core upgraded, reload manifest.

2. Cross-process serialization

import pickle

# Serialize manifest для passing к subprocess
with open('manifest.pkl', 'wb') as f:
    pickle.dump(manifest, f)

# Other process
with open('manifest.pkl', 'rb') as f:
    manifest = pickle.load(f)
runner = dbtRunner(manifest=manifest)
# Pickle uses Python internals — may break across dbt versions

Pickle works для same dbt-core version. Cross-version risky.

3. Memory usage

# Large manifest takes memory
# 5000 models -> ~500MB-1GB manifest object

Process running pre-loaded manifest needs memory headroom.

4. Implicit re-parsing

# Some commands trigger re-parse anyway
result = runner.invoke(['parse', '--no-partial-parse'])  # forces re-parse
result = runner.invoke(['clean'])  # may invalidate manifest

Production patterns

Pattern 1 — Application initialization

# app.py
from fastapi import FastAPI
from dbt.cli.main import dbtRunner

app = FastAPI()

manifest = None
runner = None

@app.on_event('startup')
async def startup():
    global manifest, runner
    runner = dbtRunner()
    parse_result = runner.invoke(['parse'])
    if not parse_result.success:
        raise Exception('Cannot start: dbt parse failed')
    manifest = parse_result.result

@app.post('/run')
async def run(selector: str):
    runner_with_manifest = dbtRunner(manifest=manifest)
    result = runner_with_manifest.invoke(['run', '--select', selector])
    return {'success': result.success}

Pattern 2 — Long-running daemon

# daemon.py
import time
from dbt.cli.main import dbtRunner

class DbtDaemon:
    def __init__(self):
        self.runner = dbtRunner()
        self._refresh_manifest()
    
    def _refresh_manifest(self):
        result = self.runner.invoke(['parse'])
        self.manifest = result.result if result.success else None
    
    def run_periodic_task(self):
        while True:
            if not self.manifest:
                self._refresh_manifest()
            
            try:
                runner = dbtRunner(manifest=self.manifest)
                result = runner.invoke(['run', '--select', 'tag:scheduled'])
                process_result(result)
            except Exception as e:
                # Manifest may be stale
                self._refresh_manifest()
            
            time.sleep(3600)  # 1 hour

daemon = DbtDaemon()
daemon.run_periodic_task()

Pattern 3 — Test harness

# pytest fixtures
@pytest.fixture(scope='session')
def dbt_manifest():
    runner = dbtRunner()
    return runner.invoke(['parse']).result

@pytest.fixture
def dbt_runner(dbt_manifest):
    return dbtRunner(manifest=dbt_manifest)

# Tests
def test_a(dbt_runner):
    result = dbt_runner.invoke(['build', '--select', 'a'])
    assert result.success

def test_b(dbt_runner):
    result = dbt_runner.invoke(['build', '--select', 'b'])
    assert result.success

100 tests benefit if each uses pre-loaded manifest.


Ключевые выводы

  1. Pre-loaded Manifest pattern — load manifest once, pass к subsequent dbtRunner invocations.
  2. Performance gain: 20-40% faster для repeated invocations.
  3. API: dbtRunner(manifest=manifest_object).invoke([args]).
  4. Cache invalidation needed — detect file changes, re-parse when needed.
  5. Use cases: REST API servers, batch processing, test harnesses, long-running daemons.
  6. When doesn’t help: single invocations, long gaps между calls, different projects/targets.
  7. Gotchas: schema migration на dbt upgrade, cross-process serialization, memory usage.
  8. Production patterns: app initialization at startup, file watcher background thread, test session fixtures.
  9. Combine с invalidation: file watcher for staleness detection.
  10. Trade-off: complexity vs performance — only worth it для frequent invocations.
Проверка знанийKnowledge check
REST API server uses dbtRunner. Each request parses manifest (1s). After implementing pre-loaded manifest, latency drops. Pitfalls и safety considerations?
ОтветAnswer
**Pre-loaded manifest в API server**:\n\n**Setup**:\n\n```python\nfrom fastapi import FastAPI\nfrom dbt.cli.main import dbtRunner\n\napp = FastAPI()\n\nmanifest = None\nrunner = None\n\[email protected]_event('startup')\nasync def startup():\n global manifest, runner\n runner = dbtRunner()\n parse_result = runner.invoke(['parse'])\n if not parse_result.success:\n raise Exception('Cannot start')\n manifest = parse_result.result\n\[email protected]('/run')\nasync def run(selector: str):\n runner_with_manifest = dbtRunner(manifest=manifest)\n return runner_with_manifest.invoke(['run', '--select', selector])\n```\n\nLatency reduction:\n- Before: 1s parse + 5s run = 6s per request\n- After: 5s run = 5s per request\n- 17% faster, но 100% если parse dominates\n\n**Pitfalls**:\n\n**1. Stale manifest**:\n\nIf engineer updates dbt code (git pull) while server running, server uses old manifest.\n\nConsequences:\n- New models не visible\n- Deleted models cause errors\n- Config changes ignored\n- SQL changes used old\n\nFix — file watcher:\n\n```python\nfrom watchdog.observers import Observer\nfrom watchdog.events import FileSystemEventHandler\n\nclass ManifestReloader(FileSystemEventHandler):\n def __init__(self, app):\n self.app = app\n \n def on_any_event(self, event):\n if not event.is_directory and event.src_path.endswith(('.sql', '.yml')):\n print(f'File changed: {event.src_path}')\n self.app.reload_manifest()\n\nclass DbtServer:\n def __init__(self, project_path):\n self.project_path = project_path\n self.runner = dbtRunner()\n self.manifest = None\n self.lock = threading.Lock()\n self.reload_manifest()\n \n # Watch\n self.observer = Observer()\n self.observer.schedule(\n ManifestReloader(self),\n path=project_path,\n recursive=True\n )\n self.observer.start()\n \n def reload_manifest(self):\n try:\n with self.lock:\n result = self.runner.invoke(['parse'])\n if result.success:\n self.manifest = result.result\n print('Manifest reloaded')\n except Exception as e:\n print(f'Reload failed: {e}')\n'''\n\n**2. Concurrent reload during invocation**:\n\nFile changes while request processing:\n\n```python\[email protected]('/run')\nasync def run(selector: str):\n # Snapshot current manifest\n with server.lock:\n current_manifest = server.manifest\n \n # Use snapshot regardless of subsequent changes\n runner = dbtRunner(manifest=current_manifest)\n return runner.invoke(['run', '--select', selector])\n'''\n\nSnapshot prevents inconsistency.\n\n**3. dbt-core upgrades**:\n\nServer running с dbt-core 1.10. Engineer upgrades к 1.12. \n- Server still uses cached manifest\n- Schema migration may invalidate manifest\n- Restart server required\n\nDeployment process:\n\n```yaml\n# CI/CD\n- name: Upgrade dbt\n run: pip install --upgrade dbt-core\n\n- name: Restart server\n run: systemctl restart dbt-server\n'''\n\nDocument: 'After dbt upgrade, restart server.'\n\n**4. Multi-target conflicts**:\n\n```python\n# Server pre-loaded для prod target\nparse_result = runner.invoke(['parse', '--target', 'prod'])\n\[email protected]('/run-dev')\nasync def run_dev(selector: str):\n # Using prod manifest for dev target!\n runner = dbtRunner(manifest=manifest)\n return runner.invoke(['run', '--select', selector, '--target', 'dev'])\n # Manifest configs are prod, but using dev target\n # Inconsistent\n'''\n\nFix — separate manifests per target:\n\n```python\nmanifests_by_target = {}\n\[email protected]_event('startup')\nasync def startup():\n for target in ['dev', 'ci', 'prod']:\n result = dbtRunner().invoke(['parse', '--target', target])\n if result.success:\n manifests_by_target[target] = result.result\n\[email protected]('/run')\nasync def run(target: str, selector: str):\n manifest = manifests_by_target.get(target)\n if not manifest:\n raise HTTPException(404, 'Unknown target')\n runner = dbtRunner(manifest=manifest)\n return runner.invoke(['run', '--select', selector, '--target', target])\n'''\n\n**5. Memory growth**:\n\nLarge manifest cached + many concurrent requests:\n\n```python\nimport psutil\n\[email protected]('/health')\nasync def health():\n process = psutil.Process()\n memory_mb = process.memory_info().rss / 1024 / 1024\n \n if memory_mb > 8000: # 8GB threshold\n # Restart?\n return {'memory_warning': True}\n \n return {'memory_mb': memory_mb}\n'''\n\nMonitor RAM, restart if needed.\n\n**6. Lock contention**:\n\nMultiple concurrent requests блокируют:\n\n```python\nimport asyncio\n\nlock = asyncio.Lock()\n\[email protected]('/run')\nasync def run(selector: str):\n async with lock: # serialize\n runner = dbtRunner(manifest=manifest)\n return runner.invoke(['run', '--select', selector])\n'''\n\nReducing throughput. Alternative — process pool:\n\n```python\nfrom concurrent.futures import ProcessPoolExecutor\n\nexecutor = ProcessPoolExecutor(max_workers=4)\n\nasync def run(selector: str):\n loop = asyncio.get_event_loop()\n # Each process has own manifest\n return await loop.run_in_executor(executor, run_dbt, selector)\n\ndef run_dbt(selector):\n runner = dbtRunner()\n return runner.invoke(['run', '--select', selector])\n'''\n\nProcess pool — true parallelism, но losing pre-loaded manifest benefit.\n\n**7. Connection management**:\n\nDBt runs hold warehouse connections. Concurrent requests могут exhaust pool:\n\n```python\nimport asyncio\n\n# Limit concurrent dbt operations\nsemaphore = asyncio.Semaphore(5)\n\nasync def run(selector: str):\n async with semaphore: # max 5 concurrent\n runner = dbtRunner(manifest=manifest)\n return runner.invoke(['run', '--select', selector])\n'''\n\n**8. Failure recovery**:\n\nIf manifest somehow corrupted:\n\n```python\[email protected]('/run')\nasync def run(selector: str):\n try:\n runner = dbtRunner(manifest=manifest)\n return runner.invoke(['run', '--select', selector])\n except Exception as e:\n # Try refresh manifest\n server.reload_manifest()\n runner = dbtRunner(manifest=server.manifest)\n return runner.invoke(['run', '--select', selector])\n'''\n\nGraceful degradation.\n\n**Safety checklist**:\n\n1. File watcher -> auto-reload\n2. Lock for thread safety\n3. Snapshot before invocation\n4. Per-target manifests\n5. Memory monitoring\n6. Concurrent request limits\n7. Graceful failure handling\n8. dbt-core upgrade -> server restart\n9. Comprehensive logging\n10. Health endpoint exposing status\n\n**Production architecture**:\n\n```\n┌──────────────────┐\n│ Load balancer │\n└────────┬─────────┘\n │\n ┌────┴────┐\n ▼ ▼\n┌────────┐ ┌────────┐\n│Server 1│ │Server 2│ (replicas)\n│Cached │ │Cached │\n│manifest│ │manifest│\n└────────┘ └────────┘\n │ │\n ▼ ▼\n┌──────────────────┐\n│ File watcher │ (shared via SHM или per-server)\n└──────────────────┘\n'''\n\nMultiple replicas — high availability. Each has own cached manifest. File watcher per replica.\n\n**Pre-loaded manifest** is performance optimization. Implementation requires care, especially для concurrent / production use. Worth the complexity для high-traffic services.
Проверка знанийKnowledge check
Test suite has 50 tests, each takes 5s due to dbt parse. With pre-loaded manifest, time drops к 3s per test. Implementation approach?
ОтветAnswer
**Pytest fixture для pre-loaded manifest**:\n\n```python\n# tests/conftest.py\nimport pytest\nfrom dbt.cli.main import dbtRunner\n\[email protected](scope='session')\ndef dbt_manifest():\n \"\"\"Parse manifest once for entire test session.\"\"\"\n runner = dbtRunner()\n result = runner.invoke(['parse'])\n if not result.success:\n pytest.exit(f'Parse failed: {result.exception}')\n return result.result\n\[email protected]\ndef dbt_runner(dbt_manifest):\n \"\"\"Fresh runner per test, но с cached manifest.\"\"\"\n return dbtRunner(manifest=dbt_manifest)\n'''\n\n**Test patterns**:\n\n```python\n# tests/test_dbt_models.py\n\ndef test_critical_models_build(dbt_runner):\n result = dbt_runner.invoke([\n 'build',\n '--select', 'tag:critical',\n '--target', 'ci'\n ])\n assert result.success\n\ndef test_finance_models_pass_tests(dbt_runner):\n result = dbt_runner.invoke([\n 'test',\n '--select', 'tag:finance',\n '--target', 'ci'\n ])\n assert result.success\n\ndef test_no_failing_tests(dbt_runner):\n result = dbt_runner.invoke(['test', '--target', 'ci'])\n failures = [r for r in result.result.results if r.status == 'fail']\n assert len(failures) == 0, f'Failing tests: {failures}'\n\ndef test_models_have_descriptions(dbt_manifest):\n \"\"\"Static check — no execution needed.\"\"\"\n for uid, node in dbt_manifest.nodes.items():\n if node.resource_type != 'model':\n continue\n if not node.description:\n pytest.fail(f'Model {uid} missing description')\n\ndef test_public_models_have_contracts(dbt_manifest):\n \"\"\"Static check.\"\"\"\n for uid, node in dbt_manifest.nodes.items():\n if (\n node.resource_type == 'model'\n and node.config.access == 'public'\n and not node.config.contract.enforced\n ):\n pytest.fail(f'Public model {uid} missing contract')\n'''\n\n**Pytest scope semantics**:\n\n- `scope='session'` — fixture lives для entire test session (one parse)\n- `scope='module'` — one per test file\n- `scope='function'` — one per test (default, expensive!)\n\n**Session-scope** is right choice.\n\n**Performance benchmark**:\n\nBefore:\n\n```python\n# Each test parses\ndef test_a():\n runner = dbtRunner()\n result = runner.invoke(['test', '--select', 'a'])\n # 5s (parse) + 1s (test) = 6s\n\n# 50 tests: 50 × 6 = 300s = 5 minutes\n'''\n\nAfter:\n\n'''python\n# Session fixture parses once\[email protected](scope='session')\ndef manifest():\n return dbtRunner().invoke(['parse']).result\n\ndef test_a(manifest):\n runner = dbtRunner(manifest=manifest)\n result = runner.invoke(['test', '--select', 'a'])\n # 0s (parse) + 1s (test) = 1s (after first)\n\n# 50 tests: 1 × 5 (first parse) + 50 × 1 = 55s\n'''\n\n5x speedup. From 5 minutes к 1 minute.\n\n**Edge cases**:\n\n**1. Tests that modify state**:\n\n'''python\ndef test_seed_loading(dbt_runner):\n result = dbt_runner.invoke(['seed'])\n # Modifies warehouse\n # Other tests may see this state\n'''\n\nIsolate using:\n- Separate test schemas\n- Database transactions\n- Cleanup fixtures\n\n**2. Tests that modify manifest**:\n\n'''python\ndef test_with_vars(dbt_runner):\n # vars don't modify manifest на cache level\n result = dbt_runner.invoke([\n 'run',\n '--select', 'fct_orders',\n '--vars', '{"my_var": "foo"}'\n ])\n'''\n\nVars passed per-invocation. Manifest cache shared, но runtime resolution per-call.\n\n**3. Tests modifying dbt files**:\n\n'''python\ndef test_after_file_change(tmp_path):\n # Modify dbt file\n model_file = tmp_path / 'models/new_model.sql'\n model_file.write_text('SELECT 1')\n \n # Session-cached manifest doesn't include new_model\n # Need fresh manifest\n runner = dbtRunner()\n runner.invoke(['parse'])\n'''\n\nUse 'function' scope для such tests:\n\n'''python\[email protected]\ndef fresh_manifest():\n return dbtRunner().invoke(['parse']).result\n'''\n\n**4. Parallel test execution**:\n\nPytest с 'pytest-xdist':\n\n'''bash\npytest -n 4 # 4 parallel workers\n'''\n\nWith session fixture — each worker has own manifest (4 parses).\n\nNot ideal, но better than 50 parses.\n\nFor true sharing — file fixture:\n\n'''python\[email protected](scope='session')\ndef shared_manifest_path(tmp_path_factory):\n \"\"\"Pre-generate manifest, store in tmp.\"\"\"\n # Worker 0 generates\n if worker_id == 'master':\n runner = dbtRunner()\n result = runner.invoke(['parse'])\n # Save\n path = tmp_path_factory.mktemp('manifest') / 'manifest.pkl'\n with open(path, 'wb') as f:\n pickle.dump(result.result, f)\n return path\n \n # Others wait и read\n path = ...\n return path\n\[email protected]\ndef dbt_runner(shared_manifest_path):\n with open(shared_manifest_path, 'rb') as f:\n manifest = pickle.load(f)\n return dbtRunner(manifest=manifest)\n'''\n\nAll workers reuse single parse.\n\n**5. Schema migration concerns**:\n\nIf dbt-core upgraded между test runs, cached manifest may be stale on disk. \n\nFix:\n\n'''python\[email protected](scope='session', autouse=True)\ndef clear_cache_if_dbt_upgraded():\n \"\"\"Clear cache if dbt-core version changed.\"\"\"\n from dbt.version import __version__\n cache_file = '/tmp/dbt_test_cache_version'\n \n last_version = None\n if os.path.exists(cache_file):\n last_version = open(cache_file).read().strip()\n \n if last_version != __version__:\n # Clear all caches\n shutil.rmtree('target/', ignore_errors=True)\n open(cache_file, 'w').write(__version__)\n'''\n\n**Production test pattern**:\n\n'''python\n# conftest.py\nimport pytest\nimport pickle\nfrom pathlib import Path\nfrom dbt.cli.main import dbtRunner\n\[email protected](scope='session')\ndef dbt_manifest(tmp_path_factory):\n \"\"\"Parse manifest once per test session.\"\"\"\n runner = dbtRunner()\n \n # Try cached от disk first\n cache_path = tmp_path_factory.mktemp('cache') / 'manifest.pkl'\n if cache_path.exists():\n try:\n with open(cache_path, 'rb') as f:\n return pickle.load(f)\n except Exception:\n pass # Cache invalid, re-parse\n \n # Parse\n result = runner.invoke(['parse'])\n if not result.success:\n pytest.exit(f'Parse failed: {result.exception}')\n \n # Cache\n with open(cache_path, 'wb') as f:\n pickle.dump(result.result, f)\n \n return result.result\n\[email protected]\ndef dbt_runner(dbt_manifest):\n return dbtRunner(manifest=dbt_manifest)\n'''\n\n**Test categorization**:\n\n'''python\n# Fast static tests (no DB)\[email protected]\ndef test_static_check(dbt_manifest):\n ...\n\n# Slow execution tests (DB queries)\[email protected]\ndef test_actual_run(dbt_runner):\n ...\n'''\n\nRun only fast tests during development:\n\n'''bash\npytest -m fast # 5 seconds\npytest # all tests, 1 minute с cache\n'''\n\n**CI optimization**:\n\n'''yaml\n# .github/workflows/test.yml\n- name: Cache dbt parse\n uses: actions/cache@v3\n with:\n path: target/partial_parse.msgpack\n key: dbt-parse-${{ hashFiles('models/**', 'dbt_project.yml') }}\n\n- name: Run tests\n run: pytest --tb=short\n'''\n\nCache partial parse cache between CI runs.\n\n**Production test suite speed**:\n\n- 50 tests, 5s each (with parse): 250s = 4 minutes\n- With session cache: 1 parse (5s) + 50 × 0.5s = 30s\n- 8x speedup\n\nWorth the implementation для test suites.\n\n**Production-grade**: session fixture для shared parse. Significant speedup для test suites. Foundation для fast CI feedback loops.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 3. API server uses dbtRunner с pre-loaded manifest. После 3 days uptime, engineers report 'changes к dbt code not visible'. Diagnosis?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4