Learning Platform
Глоссарий Troubleshooting
Урок 07.03 · 22 мин
Продвинутый
concurrencysubprocessparallelism

Thread unsafety: subprocess wrapper для parallel dbt

dbtRunner is not thread-safe. Concurrent invocations в same Python process cause race conditions, undefined behavior. Yet парallel dbt runs frequently needed: orchestrators, batch processing, multi-team builds.

Solution: subprocess-based parallelism. Each worker = separate Python process with own dbtRunner. В этом уроке — patterns, performance trade-offs, integration с orchestrators.


GIL + threading vs multiprocessing

Why dbtRunner not thread-safe

dbt-core internals:

  • Global state (parsed manifest, connection pool)
  • Mutable singletons (config, profiles)
  • Non-reentrant Jinja environment
  • Internal caching shared across calls
# DON'T DO THIS
import threading
from dbt.cli.main import dbtRunner

runner = dbtRunner()

def parallel_run(selector):
    result = runner.invoke(['run', '--select', selector])
    return result

# Race conditions
threads = [threading.Thread(target=parallel_run, args=(s,)) for s in selectors]
for t in threads:
    t.start()
for t in threads:
    t.join()
# Undefined behavior: crashes, wrong results, partial output

Don’t use threads.


Subprocess solution

Each parallel run в separate process:

import subprocess
import json

def run_dbt(args):
    \"\"\"Run dbt в subprocess.\"\"\"
    result = subprocess.run(
        ['dbt'] + args,
        capture_output=True,
        text=True
    )
    return {
        'returncode': result.returncode,
        'stdout': result.stdout,
        'stderr': result.stderr
    }

# Parallel execution
from concurrent.futures import ProcessPoolExecutor

selectors = [
    'tag:marts_finance',
    'tag:marts_marketing',
    'tag:marts_sales'
]

with ProcessPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(run_dbt, ['run', '--select', s]) for s in selectors]
    results = [f.result() for f in futures]

for selector, result in zip(selectors, results):
    print(f'{selector}: rc={result["returncode"]}')

Each process — isolated dbt invocation. Safe to parallelize.


Performance trade-offs

import time

selectors = ['tag:a', 'tag:b', 'tag:c', 'tag:d']

# Sequential
t0 = time.time()
for s in selectors:
    subprocess.run(['dbt', 'run', '--select', s])
sequential = time.time() - t0

# Parallel via subprocess
t0 = time.time()
with ProcessPoolExecutor(max_workers=4) as ex:
    list(ex.map(lambda s: subprocess.run(['dbt', 'run', '--select', s]), selectors))
parallel = time.time() - t0

print(f'Sequential: {sequential:.1f}s')
print(f'Parallel (4 procs): {parallel:.1f}s')
print(f'Speedup: {sequential / parallel:.1f}x')

Typical:

Sequential: 60s  (4 × 15s)
Parallel: 18s   (max chunk time)
Speedup: 3.3x

Limited by:

  • Process startup time
  • Warehouse connection limits
  • Disk IO contention

Subprocess wrapper pattern

import subprocess
import json
import os
from dataclasses import dataclass
from typing import Optional

@dataclass
class DbtSubprocessResult:
    success: bool
    returncode: int
    stdout: str
    stderr: str
    duration: float

def run_dbt_subprocess(
    args: list[str],
    project_dir: Optional[str] = None,
    profiles_dir: Optional[str] = None,
    env: Optional[dict] = None,
    timeout: Optional[int] = None
) -> DbtSubprocessResult:
    \"\"\"Run dbt в subprocess с full options.\"\"\"
    import time
    
    cmd = ['dbt'] + args
    
    run_env = os.environ.copy()
    if env:
        run_env.update(env)
    
    t0 = time.time()
    
    try:
        result = subprocess.run(
            cmd,
            cwd=project_dir,
            env=run_env,
            capture_output=True,
            text=True,
            timeout=timeout
        )
        duration = time.time() - t0
        
        return DbtSubprocessResult(
            success=result.returncode == 0,
            returncode=result.returncode,
            stdout=result.stdout,
            stderr=result.stderr,
            duration=duration
        )
    except subprocess.TimeoutExpired:
        return DbtSubprocessResult(
            success=False,
            returncode=-1,
            stdout='',
            stderr=f'Timeout after {timeout}s',
            duration=timeout or 0
        )

Parallel orchestration

from concurrent.futures import ProcessPoolExecutor, as_completed

def parallel_dbt_builds(work_items, max_workers=4):
    \"\"\"Run multiple dbt builds в parallel.\"\"\"
    results = {}
    
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        # Submit all
        future_to_item = {
            executor.submit(run_dbt_subprocess, item['args']): item
            for item in work_items
        }
        
        # Collect as they complete
        for future in as_completed(future_to_item):
            item = future_to_item[future]
            try:
                result = future.result()
                results[item['name']] = result
                print(f'{item["name"]}: {"[x]" if result.success else "[ ]"} ({result.duration:.1f}s)')
            except Exception as e:
                results[item['name']] = e
                print(f'{item["name"]}: Exception {e}')
    
    return results

# Usage
work_items = [
    {'name': 'finance', 'args': ['build', '--select', 'tag:finance']},
    {'name': 'marketing', 'args': ['build', '--select', 'tag:marketing']},
    {'name': 'sales', 'args': ['build', '--select', 'tag:sales']},
    {'name': 'operations', 'args': ['build', '--select', 'tag:operations']}
]

results = parallel_dbt_builds(work_items, max_workers=4)

Output:

finance: [x] (45.2s)
marketing: [x] (32.1s)
sales: [x] (28.5s)
operations: [ ] (18.3s)

Handling failures

def parallel_dbt_with_retries(work_items, max_workers=4, max_retries=2):
    \"\"\"Parallel с retry for failed items.\"\"\"
    results = {}
    
    pending = list(work_items)
    
    for attempt in range(max_retries + 1):
        if not pending:
            break
        
        with ProcessPoolExecutor(max_workers=max_workers) as ex:
            futures = {
                ex.submit(run_dbt_subprocess, item['args']): item
                for item in pending
            }
            
            current_failures = []
            for future in as_completed(futures):
                item = futures[future]
                result = future.result()
                
                if result.success:
                    results[item['name']] = result
                else:
                    current_failures.append(item)
                    if attempt == max_retries:
                        results[item['name']] = result
        
        pending = current_failures
        if pending:
            print(f'Retry attempt {attempt + 1}: {len(pending)} failures')
    
    return results

Resource management

Connection limits

# Snowflake has max concurrent connections
# Don't overwhelm warehouse

# Limit dbt threads in each subprocess
def run_dbt_limited(args):
    full_args = ['dbt'] + args + ['--threads', '4']
    return run_dbt_subprocess(full_args)

Combined: 4 processes × 4 threads = 16 connections. Stay below limit.

Memory

import psutil

def check_memory_before_spawn(threshold_pct=80):
    memory = psutil.virtual_memory()
    if memory.percent > threshold_pct:
        return False
    return True

if check_memory_before_spawn():
    executor.submit(run_dbt_subprocess, args)

Skip starting processes if memory pressure.

Disk IO

# Heavy disk activity при parsing
# Sequential parse, parallel run?

# Phase 1 — sequential parse
for project in projects:
    subprocess.run(['dbt', 'parse', '--project-dir', project])

# Phase 2 — parallel run
with ProcessPoolExecutor(max_workers=4) as ex:
    list(ex.map(lambda p: subprocess.run(['dbt', 'run', '--project-dir', p]), projects))

Inter-process communication

import json
import tempfile

def run_dbt_with_artifacts(args, output_dir):
    \"\"\"Run dbt и return artifacts.\"\"\"
    # Each process writes к different target dir
    full_args = args + ['--target-path', output_dir]
    
    result = subprocess.run(['dbt'] + full_args, capture_output=True, text=True)
    
    # Read artifacts
    artifacts = {}
    for artifact in ['manifest.json', 'run_results.json']:
        path = f'{output_dir}/{artifact}'
        if os.path.exists(path):
            artifacts[artifact] = json.load(open(path))
    
    return {
        'returncode': result.returncode,
        'artifacts': artifacts
    }

# Parallel с artifacts collection
with ProcessPoolExecutor(max_workers=4) as ex:
    futures = []
    for i, work in enumerate(work_items):
        output_dir = tempfile.mkdtemp()
        futures.append(ex.submit(run_dbt_with_artifacts, work['args'], output_dir))
    
    results = [f.result() for f in futures]

Each process writes к own target_path. Аggregate results.


Async vs subprocess

Async approach (limited)

import asyncio
import subprocess

async def run_dbt_async(args):
    proc = await asyncio.create_subprocess_exec(
        'dbt', *args,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )
    stdout, stderr = await proc.communicate()
    return proc.returncode, stdout.decode(), stderr.decode()

# Concurrent (not parallel, due to GIL)
async def main():
    tasks = [run_dbt_async(['run', '--select', s]) for s in selectors]
    results = await asyncio.gather(*tasks)
    return results

asyncio.run(main())

Async still uses subprocess internally. Same parallelism. Different syntax.

True parallel — ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor(max_workers=4) as ex:
    results = list(ex.map(run_dbt_subprocess, [['run', '--select', s] for s in selectors]))

Both work. Choose based on architecture preference.


Orchestrator integration

Dagster (per-asset subprocess)

from dagster import asset, AssetExecutionContext

@asset
def marts_finance(context: AssetExecutionContext):
    result = run_dbt_subprocess(['build', '--select', 'marts.finance'])
    if not result.success:
        raise Exception(result.stderr)
    return result

@asset  
def marts_marketing(context: AssetExecutionContext):
    result = run_dbt_subprocess(['build', '--select', 'marts.marketing'])
    if not result.success:
        raise Exception(result.stderr)
    return result

# Dagster runs assets в parallel based on dependencies

Each asset = subprocess. Dagster handles parallelism.

Prefect (parallel tasks)

from prefect import flow, task

@task
def run_dbt(args):
    return run_dbt_subprocess(args)

@flow
def parallel_pipeline():
    futures = [
        run_dbt.submit(['build', '--select', f'tag:{tag}'])
        for tag in ['finance', 'marketing', 'sales']
    ]
    results = [f.result() for f in futures]
    return results

Prefect manages task execution.

Airflow (parallel operators)

from airflow import DAG
from airflow.operators.python import PythonOperator

with DAG('parallel_dbt', schedule_interval='daily') as dag:
    finance = PythonOperator(
        task_id='finance',
        python_callable=lambda: run_dbt_subprocess(['build', '--select', 'marts.finance'])
    )
    marketing = PythonOperator(
        task_id='marketing',
        python_callable=lambda: run_dbt_subprocess(['build', '--select', 'marts.marketing'])
    )
    # Tasks run в parallel by default

Performance optimization

Sub-second startup via uvloop

import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicyType())

# Faster async event loop для many subprocess calls

Process pool reuse

class DbtPool:
    def __init__(self, max_workers=4):
        self.executor = ProcessPoolExecutor(max_workers=max_workers)
    
    def submit(self, args):
        return self.executor.submit(run_dbt_subprocess, args)
    
    def shutdown(self):
        self.executor.shutdown(wait=True)

# Reuse pool across calls
pool = DbtPool(max_workers=4)

results1 = [pool.submit(args).result() for args in batch1]
results2 = [pool.submit(args).result() for args in batch2]

pool.shutdown()

Caching artifacts

# If running same dbt operations repeatedly:
# - Parse once
# - Pass partial_parse cache к subprocesses
# - Reuse

def parallel_runs_с_cache():
    # Initial parse
    subprocess.run(['dbt', 'parse'])
    
    # All parallel workers read partial_parse cache
    with ProcessPoolExecutor(max_workers=4) as ex:
        futures = [ex.submit(run_dbt_subprocess, ['run', '--select', s]) for s in selectors]
        results = [f.result() for f in futures]
    
    return results

partial_parse.msgpack shared across subprocesses via filesystem.


Common pitfalls

1. Database connection exhaustion

# 4 processes × 4 threads = 16 simultaneous queries
# Warehouse limits: 8 connections concurrent
# -> some queries queued или error

# Reduce threads per process
['dbt', 'run', '--threads', '2']  # 4 × 2 = 8 total

2. Disk space

# Each subprocess writes к target/
# Multiple processes -> write conflicts

# Use separate target dirs
['dbt', 'run', '--target-path', f'target_{i}/']

3. State sharing

# Subprocesses don't share manifest cache
# Each re-parses
# Pre-loaded manifest pattern not available cross-process

Pre-parsing single time помогает (via shared partial_parse.msgpack).

4. Error propagation

# Subprocess error не obvious
result = run_dbt_subprocess(['run'])
if result.returncode != 0:
    # Parse stderr для details
    print(result.stderr)
    # Or — parse run_results.json для structured errors

Always check returncode и parse artifacts.


Ключевые выводы

  1. dbtRunner not thread-safe — race conditions, undefined behavior. Don’t use threads.
  2. Subprocess solution — each parallel run in separate process. Safe to parallelize.
  3. ProcessPoolExecutor — pythonic API для process pool.
  4. Performance: 3-4x speedup typical с 4 workers.
  5. Resource limits: warehouse connections, memory, disk IO — manage carefully.
  6. Async via asyncio.create_subprocess_exec — alternative syntax, same parallelism.
  7. Orchestrators: Dagster, Prefect, Airflow manage parallelism naturally. Each asset/task = subprocess.
  8. Optimization: process pool reuse, pre-parsing, target-path separation.
  9. Pitfalls: connection exhaustion, disk conflicts, no manifest sharing, error propagation.
  10. Use cases: batch builds, multi-team pipelines, CI parallel testing.
Проверка знанийKnowledge check
Production scenario: build 50 model batches в parallel via subprocess. Snowflake warehouse has 16 concurrent query limit. Configuration?
ОтветAnswer
**Tuning parallelism для warehouse limits**:\n\n**Math**:\n\nSnowflake limit: 16 concurrent queries\nWant max throughput\n\nFormula:\n`processes × threads_per_process ≤ 16`\n\nOptions:\n- 4 processes × 4 threads = 16 [x]\n- 8 processes × 2 threads = 16 [x]\n- 2 processes × 8 threads = 16 [x]\n- 16 processes × 1 thread = 16 [x]\n\n**Trade-offs**:\n\n**More processes, fewer threads**:\n- Higher process overhead\n- More OS resources\n- Better isolation\n- Different batches truly independent\n\n**Fewer processes, more threads**:\n- Lower overhead\n- Better thread efficiency within process\n- Less isolation\n- One process bottleneck affects more work\n\n**Recommendation**: 4 processes × 4 threads (balanced).\n\n**Implementation**:\n\n```python\nimport subprocess\nfrom concurrent.futures import ProcessPoolExecutor\nimport os\n\nWAREHOUSE_LIMIT = 16\nPROCESSES = 4\nTHREADS = WAREHOUSE_LIMIT // PROCESSES\n\ndef run_batch(batch_args):\n \"\"\"Run dbt batch с thread limit.\"\"\"\n cmd = ['dbt', 'run', '--threads', str(THREADS)] + batch_args\n \n result = subprocess.run(\n cmd,\n capture_output=True,\n text=True,\n env={**os.environ, 'DBT_TARGET_PATH': f'target_{os.getpid()}'}\n )\n \n return {\n 'returncode': result.returncode,\n 'pid': os.getpid(),\n 'stdout': result.stdout[:500],\n 'stderr': result.stderr[:500] if result.returncode != 0 else None\n }\n\nbatches = [\n ['--select', 'tag:finance'],\n ['--select', 'tag:marketing'],\n ['--select', 'tag:sales'],\n ['--select', 'tag:operations'],\n # ... 50 batches total\n]\n\nwith ProcessPoolExecutor(max_workers=PROCESSES) as ex:\n results = list(ex.map(run_batch, batches))\n\nfor batch, result in zip(batches, results):\n status = '[x]' if result['returncode'] == 0 else '[ ]'\n print(f'{status} {batch[1]}: pid={result["pid"]}')\n'''\n\n**Issues и mitigations**:\n\n**1. Connection storms на startup**:\n\n4 processes × 4 threads = 16 connections opening simultaneously can overwhelm warehouse.\n\nFix — stagger startup:\n\n```python\nimport time\n\ndef staggered_run(batch_args, delay):\n time.sleep(delay)\n return run_batch(batch_args)\n\nwith ProcessPoolExecutor(max_workers=PROCESSES) as ex:\n futures = []\n for i, batch in enumerate(batches):\n delay = i * 2 # 2-second stagger\n futures.append(ex.submit(staggered_run, batch, delay))\n results = [f.result() for f in futures]\n'''\n\n**2. Target path conflicts**:\n\n4 processes writing к single target/ -> race conditions.\n\nFix — separate target dirs per process:\n\n```python\nimport tempfile\n\ndef run_batch_isolated(batch_args):\n target_dir = tempfile.mkdtemp(prefix='dbt_target_')\n cmd = [\n 'dbt', 'run',\n '--threads', str(THREADS),\n '--target-path', target_dir\n ] + batch_args\n \n try:\n result = subprocess.run(cmd, capture_output=True, text=True)\n \n # Read artifacts\n artifacts = {}\n for art in ['manifest.json', 'run_results.json']:\n path = os.path.join(target_dir, art)\n if os.path.exists(path):\n with open(path) as f:\n artifacts[art] = json.load(f)\n \n return {\n 'returncode': result.returncode,\n 'artifacts': artifacts\n }\n finally:\n shutil.rmtree(target_dir, ignore_errors=True)\n'''\n\n**3. Adaptive throttling**:\n\nMonitor warehouse load, adjust dynamically:\n\n```python\nimport snowflake.connector\n\ndef warehouse_concurrent_count():\n conn = snowflake.connector.connect(...)\n cur = conn.cursor()\n cur.execute(\"\"\"\n SELECT COUNT(*)\n FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY\n WHERE EXECUTION_STATUS = 'RUNNING'\n AND WAREHOUSE_NAME = 'DBT_WH'\n \"\"\")\n return cur.fetchone()[0]\n\ndef adaptive_parallel():\n while batches:\n load = warehouse_concurrent_count()\n \n if load менее 10:\n available_slots = 4\n elif load менее 14:\n available_slots = 2\n else:\n available_slots = 0\n time.sleep(30)\n continue\n \n # Submit available batches\n batch_chunk = batches[:available_slots]\n batches = batches[available_slots:]\n \n with ProcessPoolExecutor(max_workers=len(batch_chunk)) as ex:\n for batch in batch_chunk:\n ex.submit(run_batch, batch)\n'''\n\n**4. Logging и observability**:\n\n```python\nimport logging\n\nlogger = logging.getLogger(__name__)\n\ndef run_batch_logged(batch_args):\n batch_name = batch_args[1] if len(batch_args) > 1 else 'unknown'\n \n logger.info(f'Starting batch: {batch_name}')\n t0 = time.time()\n \n result = run_batch(batch_args)\n \n duration = time.time() - t0\n if result['returncode'] == 0:\n logger.info(f'Completed batch: {batch_name} ({duration:.1f}s)')\n else:\n logger.error(f'Failed batch: {batch_name} ({duration:.1f}s)')\n \n return result\n'''\n\n**5. Resource monitoring**:\n\n```python\nimport psutil\n\ndef check_resources():\n mem = psutil.virtual_memory().percent\n cpu = psutil.cpu_percent(interval=1)\n \n if mem > 80 or cpu > 90:\n return False\n return True\n\ndef run_batch_safely(batch):\n while not check_resources():\n time.sleep(10)\n return run_batch(batch)\n'''\n\n**6. Failure handling**:\n\n```python\ndef parallel_with_retries(batches, max_retries=2):\n pending = list(batches)\n final_results = {}\n \n for attempt in range(max_retries + 1):\n if not pending:\n break\n \n with ProcessPoolExecutor(max_workers=PROCESSES) as ex:\n futures = {ex.submit(run_batch, b): b for b in pending}\n failures = []\n \n for future in as_completed(futures):\n batch = futures[future]\n result = future.result()\n \n if result['returncode'] == 0:\n final_results[batch] = result\n else:\n failures.append(batch)\n if attempt == max_retries:\n final_results[batch] = result\n \n pending = failures\n if pending:\n logger.warning(f'Retrying {len(pending)} failures')\n time.sleep(30)\n \n return final_results\n'''\n\n**Production architecture**:\n\n'''\n┌──────────────────────────────────┐\n│ Coordinator process │\n│ - Manages batch queue │\n│ - Monitors warehouse load │\n│ - Adaptive scheduling │\n│ - Retry coordination │\n└────────┬─────────────────────────┘\n │\n ▼ submit batches\n┌──────────────────────────────────┐\n│ ProcessPoolExecutor │\n│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐\n│ │Proc 1│ │Proc 2│ │Proc 3│ │Proc 4│\n│ │ 4 │ │ 4 │ │ 4 │ │ 4 │\n│ │threads│threads│threads│threads│\n│ └──────┘ └──────┘ └──────┘ └──────┘\n└──────────┼──────────┼──────────┼──────────┼─────┘\n │ │ │ │\n └──────────┴──────────┴──────────┴──── 16 connections\n │\n ▼\n ┌──────────────────┐\n │ Snowflake WH │\n │ (limit: 16) │\n └──────────────────┘\n'''\n\n**Total time estimation**:\n\n50 batches × avg 30s each = 1500s sequential\n4 parallel processes = 1500s / 4 = 375s ≈ 6.25 minutes\n\nReal: 8-10 minutes (overhead, stragglers)\n\nVs sequential: ~25 minutes\n\n**Production lessons**:\n\n1. Calculate connection budget\n2. Stagger startup\n3. Isolate target paths\n4. Adaptive throttling\n5. Logging per process\n6. Resource monitoring\n7. Retry failed batches\n8. Coordinate via parent process\n\nThis is production-grade parallel dbt orchestration.
Проверка знанийKnowledge check
Compare ProcessPoolExecutor vs Dagster для parallel dbt. Use cases для each?
ОтветAnswer
**ProcessPoolExecutor**:\n\nPython stdlib, simple parallelism via subprocess.\n\n**Pros**:\n- No external dependencies\n- Simple Python\n- Custom logic friendly\n- Lightweight\n\n**Cons**:\n- No UI / monitoring\n- Manual error handling\n- No scheduling\n- No lineage tracking\n- No persistent state\n\n**Use cases**:\n- CI parallel testing\n- Ad-hoc batch processing\n- Custom orchestration logic\n- Standalone scripts\n\n**Example**:\n\n```python\n# CI test parallelization\nfrom concurrent.futures import ProcessPoolExecutor\n\ndef run_test_chunk(chunk):\n return subprocess.run(['dbt', 'test', '--select'] + chunk)\n\nchunks = split_tests_into_chunks(tests, n=4)\nwith ProcessPoolExecutor(max_workers=4) as ex:\n results = list(ex.map(run_test_chunk, chunks))\n'''\n\n**Dagster**:\n\nProduction orchestrator с UI, lineage, scheduling.\n\n**Pros**:\n- Web UI (Dagit)\n- Asset-based model — clear dependencies\n- Lineage visualization\n- Scheduling (cron, sensors)\n- Built-in retry\n- Observability (logs, metrics)\n- Alerting integrations\n- Multi-pipeline coordination\n- Persistent runs database\n\n**Cons**:\n- More complex setup\n- Requires hosting (Dagster Daemon)\n- Learning curve\n- Overhead для simple cases\n\n**Use cases**:\n- Production pipelines\n- Multi-team coordination\n- Long-running services\n- Mission-critical data flows\n- Need observability / SLAs\n\n**Example**:\n\n```python\n# Dagster — production pipeline\nfrom dagster import asset, AssetKey\nfrom dagster_dbt import DbtCliResource, dbt_assets\n\n@dbt_assets(manifest='target/manifest.json')\ndef my_dbt_assets(context, dbt: DbtCliResource):\n yield from dbt.cli(['build'], context=context).stream()\n'''\n\nDagster auto-generates parallel asset execution based on DAG.\n\n**Comparison table**:\n\n'''\nFeature | ProcessPool | Dagster\n---------------------|-------------|------------\nSetup complexity | Low | Medium-High\nExternal deps | None | Dagster, daemon, DB\nUI | None | Strong (Dagit)\nLineage | None | Native\nScheduling | Manual cron | Built-in\nRetry logic | Manual | Built-in\nObservability | Logs only | Metrics, traces\nMulti-project | Manual | Native\nState persistence | None | DB-backed\nLearning curve | Low | Medium\nProduction-ready | Yes (basic) | Yes (enterprise)\n'''\n\n**Decision framework**:\n\n**Choose ProcessPoolExecutor когда**:\n- Quick scripts / ad-hoc work\n- Limited team size\n- Existing scheduler (cron, GitHub Actions)\n- Want minimal dependencies\n- Custom logic in Python (хочется flexibility)\n\n**Choose Dagster когда**:\n- Production data pipelines\n- Multi-team coordination needed\n- Want observability / monitoring\n- Multiple data systems integrated\n- SLA requirements\n- Lineage tracking critical\n\n**Hybrid approach**:\n\n'''python\n# Dagster orchestrates, ProcessPoolExecutor для parallelism inside\n@asset\ndef parallel_test_assets(context):\n from concurrent.futures import ProcessPoolExecutor\n \n with ProcessPoolExecutor(max_workers=4) as ex:\n results = list(ex.map(run_test_chunk, chunks))\n \n return results\n'''\n\nDagster для high-level workflow, ProcessPool для granular parallelism.\n\n**Alternatives к consider**:\n\n**Prefect**:\n- Similar к Dagster\n- Easier learning curve\n- Better for ad-hoc / interactive\n- prefect-dbt integration available\n\n**Airflow + Cosmos**:\n- Industry standard\n- Cosmos auto-generates от dbt manifest\n- Mature ecosystem\n- Heavy для simple cases\n\n**Custom solution**:\n- Build on ProcessPoolExecutor\n- Add database tracking\n- Add Slack notifications\n- May reach 30% of Dagster features for 10% effort\n\n**Production reality**:\n\n- Small team (1-3): ProcessPoolExecutor + cron\n- Medium team (4-10): Dagster или Prefect\n- Large team (10+): Dagster, Airflow, или custom orchestrator\n- Enterprise: Dagster Cloud или managed Airflow\n\n**Common pattern**:\n\n'''\nDevelopment: ProcessPoolExecutor scripts\nStaging: Dagster (testing in mature env)\nProduction: Dagster + monitoring stack\n'''\n\n**ROI calculation**:\n\nDagster setup: ~1-2 weeks engineering\nMaintenance: ~5-10 hrs/week\n\nBreaks even когда:\n- Multiple parallel pipelines\n- Multiple teams\n- Need lineage / observability\n- SLA requirements\n\nOtherwise — ProcessPoolExecutor sufficient.\n\n**Lesson**: don't over-engineer. ProcessPoolExecutor often enough. Adopt Dagster когда complexity demands it.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 3. dbtRunner not thread-safe. Production server needs parallel handling. Choose architecture?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4