Thread unsafety: subprocess wrapper для parallel dbt
dbtRunner is not thread-safe. Concurrent invocations в same Python process cause race conditions, undefined behavior. Yet парallel dbt runs frequently needed: orchestrators, batch processing, multi-team builds.
Solution: subprocess-based parallelism. Each worker = separate Python process with own dbtRunner. В этом уроке — patterns, performance trade-offs, integration с orchestrators.
GIL + threading vs multiprocessing
Why dbtRunner not thread-safe
dbt-core internals:
- Global state (parsed manifest, connection pool)
- Mutable singletons (config, profiles)
- Non-reentrant Jinja environment
- Internal caching shared across calls
# DON'T DO THIS
import threading
from dbt.cli.main import dbtRunner
runner = dbtRunner()
def parallel_run(selector):
result = runner.invoke(['run', '--select', selector])
return result
# Race conditions
threads = [threading.Thread(target=parallel_run, args=(s,)) for s in selectors]
for t in threads:
t.start()
for t in threads:
t.join()
# Undefined behavior: crashes, wrong results, partial output
Don’t use threads.
Subprocess solution
Each parallel run в separate process:
import subprocess
import json
def run_dbt(args):
\"\"\"Run dbt в subprocess.\"\"\"
result = subprocess.run(
['dbt'] + args,
capture_output=True,
text=True
)
return {
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr
}
# Parallel execution
from concurrent.futures import ProcessPoolExecutor
selectors = [
'tag:marts_finance',
'tag:marts_marketing',
'tag:marts_sales'
]
with ProcessPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(run_dbt, ['run', '--select', s]) for s in selectors]
results = [f.result() for f in futures]
for selector, result in zip(selectors, results):
print(f'{selector}: rc={result["returncode"]}')
Each process — isolated dbt invocation. Safe to parallelize.
Performance trade-offs
import time
selectors = ['tag:a', 'tag:b', 'tag:c', 'tag:d']
# Sequential
t0 = time.time()
for s in selectors:
subprocess.run(['dbt', 'run', '--select', s])
sequential = time.time() - t0
# Parallel via subprocess
t0 = time.time()
with ProcessPoolExecutor(max_workers=4) as ex:
list(ex.map(lambda s: subprocess.run(['dbt', 'run', '--select', s]), selectors))
parallel = time.time() - t0
print(f'Sequential: {sequential:.1f}s')
print(f'Parallel (4 procs): {parallel:.1f}s')
print(f'Speedup: {sequential / parallel:.1f}x')
Typical:
Sequential: 60s (4 × 15s)
Parallel: 18s (max chunk time)
Speedup: 3.3x
Limited by:
- Process startup time
- Warehouse connection limits
- Disk IO contention
Subprocess wrapper pattern
import subprocess
import json
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class DbtSubprocessResult:
success: bool
returncode: int
stdout: str
stderr: str
duration: float
def run_dbt_subprocess(
args: list[str],
project_dir: Optional[str] = None,
profiles_dir: Optional[str] = None,
env: Optional[dict] = None,
timeout: Optional[int] = None
) -> DbtSubprocessResult:
\"\"\"Run dbt в subprocess с full options.\"\"\"
import time
cmd = ['dbt'] + args
run_env = os.environ.copy()
if env:
run_env.update(env)
t0 = time.time()
try:
result = subprocess.run(
cmd,
cwd=project_dir,
env=run_env,
capture_output=True,
text=True,
timeout=timeout
)
duration = time.time() - t0
return DbtSubprocessResult(
success=result.returncode == 0,
returncode=result.returncode,
stdout=result.stdout,
stderr=result.stderr,
duration=duration
)
except subprocess.TimeoutExpired:
return DbtSubprocessResult(
success=False,
returncode=-1,
stdout='',
stderr=f'Timeout after {timeout}s',
duration=timeout or 0
)
Parallel orchestration
from concurrent.futures import ProcessPoolExecutor, as_completed
def parallel_dbt_builds(work_items, max_workers=4):
\"\"\"Run multiple dbt builds в parallel.\"\"\"
results = {}
with ProcessPoolExecutor(max_workers=max_workers) as executor:
# Submit all
future_to_item = {
executor.submit(run_dbt_subprocess, item['args']): item
for item in work_items
}
# Collect as they complete
for future in as_completed(future_to_item):
item = future_to_item[future]
try:
result = future.result()
results[item['name']] = result
print(f'{item["name"]}: {"[x]" if result.success else "[ ]"} ({result.duration:.1f}s)')
except Exception as e:
results[item['name']] = e
print(f'{item["name"]}: Exception {e}')
return results
# Usage
work_items = [
{'name': 'finance', 'args': ['build', '--select', 'tag:finance']},
{'name': 'marketing', 'args': ['build', '--select', 'tag:marketing']},
{'name': 'sales', 'args': ['build', '--select', 'tag:sales']},
{'name': 'operations', 'args': ['build', '--select', 'tag:operations']}
]
results = parallel_dbt_builds(work_items, max_workers=4)
Output:
finance: [x] (45.2s)
marketing: [x] (32.1s)
sales: [x] (28.5s)
operations: [ ] (18.3s)
Handling failures
def parallel_dbt_with_retries(work_items, max_workers=4, max_retries=2):
\"\"\"Parallel с retry for failed items.\"\"\"
results = {}
pending = list(work_items)
for attempt in range(max_retries + 1):
if not pending:
break
with ProcessPoolExecutor(max_workers=max_workers) as ex:
futures = {
ex.submit(run_dbt_subprocess, item['args']): item
for item in pending
}
current_failures = []
for future in as_completed(futures):
item = futures[future]
result = future.result()
if result.success:
results[item['name']] = result
else:
current_failures.append(item)
if attempt == max_retries:
results[item['name']] = result
pending = current_failures
if pending:
print(f'Retry attempt {attempt + 1}: {len(pending)} failures')
return results
Resource management
Connection limits
# Snowflake has max concurrent connections
# Don't overwhelm warehouse
# Limit dbt threads in each subprocess
def run_dbt_limited(args):
full_args = ['dbt'] + args + ['--threads', '4']
return run_dbt_subprocess(full_args)
Combined: 4 processes × 4 threads = 16 connections. Stay below limit.
Memory
import psutil
def check_memory_before_spawn(threshold_pct=80):
memory = psutil.virtual_memory()
if memory.percent > threshold_pct:
return False
return True
if check_memory_before_spawn():
executor.submit(run_dbt_subprocess, args)
Skip starting processes if memory pressure.
Disk IO
# Heavy disk activity при parsing
# Sequential parse, parallel run?
# Phase 1 — sequential parse
for project in projects:
subprocess.run(['dbt', 'parse', '--project-dir', project])
# Phase 2 — parallel run
with ProcessPoolExecutor(max_workers=4) as ex:
list(ex.map(lambda p: subprocess.run(['dbt', 'run', '--project-dir', p]), projects))
Inter-process communication
import json
import tempfile
def run_dbt_with_artifacts(args, output_dir):
\"\"\"Run dbt и return artifacts.\"\"\"
# Each process writes к different target dir
full_args = args + ['--target-path', output_dir]
result = subprocess.run(['dbt'] + full_args, capture_output=True, text=True)
# Read artifacts
artifacts = {}
for artifact in ['manifest.json', 'run_results.json']:
path = f'{output_dir}/{artifact}'
if os.path.exists(path):
artifacts[artifact] = json.load(open(path))
return {
'returncode': result.returncode,
'artifacts': artifacts
}
# Parallel с artifacts collection
with ProcessPoolExecutor(max_workers=4) as ex:
futures = []
for i, work in enumerate(work_items):
output_dir = tempfile.mkdtemp()
futures.append(ex.submit(run_dbt_with_artifacts, work['args'], output_dir))
results = [f.result() for f in futures]
Each process writes к own target_path. Аggregate results.
Async vs subprocess
Async approach (limited)
import asyncio
import subprocess
async def run_dbt_async(args):
proc = await asyncio.create_subprocess_exec(
'dbt', *args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
return proc.returncode, stdout.decode(), stderr.decode()
# Concurrent (not parallel, due to GIL)
async def main():
tasks = [run_dbt_async(['run', '--select', s]) for s in selectors]
results = await asyncio.gather(*tasks)
return results
asyncio.run(main())
Async still uses subprocess internally. Same parallelism. Different syntax.
True parallel — ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as ex:
results = list(ex.map(run_dbt_subprocess, [['run', '--select', s] for s in selectors]))
Both work. Choose based on architecture preference.
Orchestrator integration
Dagster (per-asset subprocess)
from dagster import asset, AssetExecutionContext
@asset
def marts_finance(context: AssetExecutionContext):
result = run_dbt_subprocess(['build', '--select', 'marts.finance'])
if not result.success:
raise Exception(result.stderr)
return result
@asset
def marts_marketing(context: AssetExecutionContext):
result = run_dbt_subprocess(['build', '--select', 'marts.marketing'])
if not result.success:
raise Exception(result.stderr)
return result
# Dagster runs assets в parallel based on dependencies
Each asset = subprocess. Dagster handles parallelism.
Prefect (parallel tasks)
from prefect import flow, task
@task
def run_dbt(args):
return run_dbt_subprocess(args)
@flow
def parallel_pipeline():
futures = [
run_dbt.submit(['build', '--select', f'tag:{tag}'])
for tag in ['finance', 'marketing', 'sales']
]
results = [f.result() for f in futures]
return results
Prefect manages task execution.
Airflow (parallel operators)
from airflow import DAG
from airflow.operators.python import PythonOperator
with DAG('parallel_dbt', schedule_interval='daily') as dag:
finance = PythonOperator(
task_id='finance',
python_callable=lambda: run_dbt_subprocess(['build', '--select', 'marts.finance'])
)
marketing = PythonOperator(
task_id='marketing',
python_callable=lambda: run_dbt_subprocess(['build', '--select', 'marts.marketing'])
)
# Tasks run в parallel by default
Performance optimization
Sub-second startup via uvloop
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicyType())
# Faster async event loop для many subprocess calls
Process pool reuse
class DbtPool:
def __init__(self, max_workers=4):
self.executor = ProcessPoolExecutor(max_workers=max_workers)
def submit(self, args):
return self.executor.submit(run_dbt_subprocess, args)
def shutdown(self):
self.executor.shutdown(wait=True)
# Reuse pool across calls
pool = DbtPool(max_workers=4)
results1 = [pool.submit(args).result() for args in batch1]
results2 = [pool.submit(args).result() for args in batch2]
pool.shutdown()
Caching artifacts
# If running same dbt operations repeatedly:
# - Parse once
# - Pass partial_parse cache к subprocesses
# - Reuse
def parallel_runs_с_cache():
# Initial parse
subprocess.run(['dbt', 'parse'])
# All parallel workers read partial_parse cache
with ProcessPoolExecutor(max_workers=4) as ex:
futures = [ex.submit(run_dbt_subprocess, ['run', '--select', s]) for s in selectors]
results = [f.result() for f in futures]
return results
partial_parse.msgpack shared across subprocesses via filesystem.
Common pitfalls
1. Database connection exhaustion
# 4 processes × 4 threads = 16 simultaneous queries
# Warehouse limits: 8 connections concurrent
# -> some queries queued или error
# Reduce threads per process
['dbt', 'run', '--threads', '2'] # 4 × 2 = 8 total
2. Disk space
# Each subprocess writes к target/
# Multiple processes -> write conflicts
# Use separate target dirs
['dbt', 'run', '--target-path', f'target_{i}/']
3. State sharing
# Subprocesses don't share manifest cache
# Each re-parses
# Pre-loaded manifest pattern not available cross-process
Pre-parsing single time помогает (via shared partial_parse.msgpack).
4. Error propagation
# Subprocess error не obvious
result = run_dbt_subprocess(['run'])
if result.returncode != 0:
# Parse stderr для details
print(result.stderr)
# Or — parse run_results.json для structured errors
Always check returncode и parse artifacts.
Ключевые выводы
- dbtRunner not thread-safe — race conditions, undefined behavior. Don’t use threads.
- Subprocess solution — each parallel run in separate process. Safe to parallelize.
- ProcessPoolExecutor — pythonic API для process pool.
- Performance: 3-4x speedup typical с 4 workers.
- Resource limits: warehouse connections, memory, disk IO — manage carefully.
- Async via asyncio.create_subprocess_exec — alternative syntax, same parallelism.
- Orchestrators: Dagster, Prefect, Airflow manage parallelism naturally. Each asset/task = subprocess.
- Optimization: process pool reuse, pre-parsing, target-path separation.
- Pitfalls: connection exhaustion, disk conflicts, no manifest sharing, error propagation.
- Use cases: batch builds, multi-team pipelines, CI parallel testing.