Orchestrator integration: Dagster, Prefect, Airflow Cosmos, headless dbt
dbt отлично запускается standalone, но в production обычно интегрируется с orchestrators: Dagster, Prefect, Airflow. Каждый имеет dbt integration package, exposing dbt models as orchestrator-native assets/tasks. Это даёт scheduling, lineage, alerts, multi-pipeline coordination.
В этом уроке — обзор каждой интеграции, comparison, real config examples, decision criteria.
Why orchestrator integration
dbt альоn — limited:
- Manual triggering or simple cron
- No cross-pipeline coordination
- No retry logic
- Limited observability
- Hard к combine с non-dbt steps (Python, ML, API calls)
Orchestrator adds:
- Scheduled runs
- Failure recovery (retries, alerts)
- Lineage across systems
- Multi-step workflows
- UI / monitoring
- Multi-team coordination
Dagster + dagster-dbt
Best fit: production data platforms, asset-based model, want comprehensive observability.
Architecture
Dagster represents data as assets. dagster-dbt auto-generates Dagster assets from dbt manifest. Each dbt model = Dagster asset.
from dagster import Definitions
from dagster_dbt import DbtCliResource, dbt_assets
from pathlib import Path
DBT_PROJECT = Path('/path/to/dbt')
DBT_MANIFEST = DBT_PROJECT / 'target' / 'manifest.json'
@dbt_assets(manifest=DBT_MANIFEST)
def my_dbt_assets(context, dbt: DbtCliResource):
yield from dbt.cli(['build'], context=context).stream()
defs = Definitions(
assets=[my_dbt_assets],
resources={
'dbt': DbtCliResource(
project_dir=str(DBT_PROJECT),
target='prod'
)
}
)
Features
- Asset-based UI (Dagit)
- Auto-generated assets from dbt manifest
- Native lineage visualization
- Asset materialization tracking
- Retries, alerts
- Sensors (event-driven)
- Schedules
- Branch deployments (Dagster Cloud)
Selective runs
from dagster import AssetSelection
# Run specific assets
my_dbt_assets = AssetSelection.assets(my_dbt_assets).select_only(
'asset:model.proj.fct_orders'
)
Or via dbt selector:
@dbt_assets(manifest=DBT_MANIFEST, select='tag:critical')
def critical_dbt_assets(context, dbt: DbtCliResource):
yield from dbt.cli(['build'], context=context).stream()
Custom logic per asset
from dagster import asset
@asset(deps=[my_dbt_assets])
def custom_post_processing(context):
# Run после dbt assets
do_custom_work()
Schedules
from dagster import schedule, RunRequest
@schedule(cron_schedule='0 6 * * *')
def daily_build(context):
return RunRequest(
run_key=context.scheduled_execution_time.isoformat()
)
Observability
@asset
def model_with_metadata(context):
result = dbt.cli(['build', '--select', 'fct_orders']).wait()
for r in result.result.results:
context.add_output_metadata({
'rows_affected': r.adapter_response.get('rows_affected'),
'duration': r.execution_time
})
Dagster UI shows metadata per materialization.
Production
- Dagster Cloud (managed)
- Self-hosted Dagster Daemon + workspace
- Postgres backend для metadata
- S3 для I/O manager
When choose Dagster
- Production data platform
- 5+ engineers
- Want asset-based mental model
- Multiple data systems integrated
- Lineage critical
- Want vendor option (Cloud)
Prefect + prefect-dbt
Best fit: easier learning curve, hybrid orchestration, modern API.
Architecture
Prefect uses flows и tasks. prefect-dbt integrates dbt as Prefect tasks.
from prefect import flow
from prefect_dbt.cli.commands import DbtCoreOperation
@flow
def dbt_pipeline():
# Build all models
build_result = DbtCoreOperation(
commands=['dbt build'],
project_dir='/path/to/dbt',
profiles_dir='~/.dbt',
overwrite_profiles=False
).run()
return build_result
# Run
dbt_pipeline()
Or with specific commands:
@flow
def staged_dbt():
# Stage 1
DbtCoreOperation(commands=['dbt run --select tag:staging']).run()
# Stage 2 — depends на stage 1
DbtCoreOperation(commands=['dbt run --select tag:marts']).run()
# Stage 3
DbtCoreOperation(commands=['dbt test']).run()
Features
- Cloud UI (Prefect Cloud)
- Decorator-based API (familiar Python)
- Flow versioning
- Retries built-in
- Cache results
- Notifications
- Deploy via Cloud or self-host
Cloud vs OSS
- Prefect OSS: free, self-host
- Prefect Cloud: hosted, free tier for small projects, paid for larger
When choose Prefect
- New projects (modern API)
- Smaller team
- Want easier learning curve
- Like decorator-based syntax
- Want hosted option without enterprise pricing
Airflow + Cosmos
Best fit: existing Airflow infrastructure, мature enterprise teams, mass coordination.
Architecture
Airflow uses DAGs of operators. Cosmos library generates Airflow DAGs from dbt manifest.
from airflow import DAG
from cosmos import DbtDag, ProjectConfig, ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
with DAG(
'dbt_dag',
schedule='@daily',
start_date=days_ago(1)
) as dag:
dbt_dag = DbtDag(
project_config=ProjectConfig('/path/to/dbt'),
profile_config=ProfileConfig(
profile_name='analytics',
target_name='prod',
profile_mapping=SnowflakeUserPasswordProfileMapping(...)
)
)
Cosmos automatically:
- Parses manifest
- Creates Airflow tasks per dbt model
- Resolves dependencies (DAG)
- Runs in parallel based on deps
Features
- Airflow’s mature scheduling
- Web UI
- Connection management
- Operator library (BashOperator, KubernetesOperator, etc.)
- LDAP / SSO support
- Multi-tenancy
Variants
Cosmos рендерит dbt models as Airflow tasks differently:
from cosmos import LoadMode
# 1. DBT_LS — runs dbt ls для discovery
dbt_dag = DbtDag(
project_config=ProjectConfig('/path/to/dbt'),
render_config=RenderConfig(load_method=LoadMode.DBT_LS)
)
# 2. DBT_MANIFEST — reads manifest.json
dbt_dag = DbtDag(
project_config=ProjectConfig('/path/to/dbt', manifest_path='target/manifest.json'),
render_config=RenderConfig(load_method=LoadMode.DBT_MANIFEST)
)
# 3. CUSTOM — programmatic
Each method has trade-offs. Manifest method fastest.
Execution modes
from cosmos import ExecutionMode
# Local — each task subprocess (default)
ExecutionMode.LOCAL
# Virtual env per task
ExecutionMode.VIRTUALENV
# Docker container per task
ExecutionMode.DOCKER
# Kubernetes pod per task
ExecutionMode.KUBERNETES
Choose based on isolation needs.
When choose Airflow + Cosmos
- Existing Airflow infrastructure
- Enterprise environment
- Multi-team / multi-pipeline
- Need K8s execution
- Want mature ecosystem
- Many non-dbt steps
Headless dbt (custom)
Best fit: maximum control, custom orchestration, embedded apps.
Architecture
Skip orchestrator. Run dbt programmatically с dbtRunner. Build custom logic.
from dbt.cli.main import dbtRunner
import schedule
import time
runner = dbtRunner()
def daily_build():
result = runner.invoke(['build', '--target', 'prod'])
if not result.success:
send_alert('dbt failed')
schedule.every().day.at('06:00').do(daily_build)
while True:
schedule.run_pending()
time.sleep(60)
Or via cron:
# crontab
0 6 * * * cd /path/to/dbt && dbt build --target prod
Or GitHub Actions:
on:
schedule:
- cron: '0 6 * * *'
jobs:
build:
steps:
- run: dbt build --target prod
Features
- Maximum simplicity
- No orchestrator dependency
- Lightweight
- Custom logic anywhere
Limitations
- No UI
- Manual error handling
- No lineage tracking
- No multi-pipeline coordination
When choose Headless
- Small team (1-3)
- Simple workflows
- Cost-sensitive
- Have existing scheduler
- Custom logic dominant
Comparison matrix
Feature | Dagster + dbt | Prefect + dbt | Airflow + Cosmos | Headless
---------------------|---------------|----------------|------------------|----------
Maturity | High | Medium | Very high | N/A
UI | Excellent | Good | Good | None
Asset-based | Yes | Flow-based | DAG-based | No
Auto-gen от manifest | Yes | No | Yes (Cosmos) | No
Lineage | Native | Limited | Via plugins | None
Schedule | Built-in | Built-in | Built-in | Cron
Retry | Built-in | Built-in | Built-in | Manual
K8s execution | Yes | Yes | Yes | Manual
Learning curve | Medium | Low-Medium | Medium-High | Trivial
Cloud option | Yes ($) | Yes ($) | Astronomer ($) | N/A
Self-host complexity | Medium | Low | High | None
Multi-pipeline | Strong | Decent | Strong | Manual
Best for | Production | Modern stacks | Enterprise | Simple
Decision framework
Question 1: Existing orchestrator?
├── Airflow -> Add Cosmos
├── Dagster -> Add dagster-dbt
├── Prefect -> Add prefect-dbt
└── None -> Question 2
Question 2: Team size?
├── 1-3 -> Headless (cron / GitHub Actions)
├── 4-10 -> Dagster или Prefect
└── 10+ -> Dagster, Airflow, или custom
Question 3: Compliance / enterprise?
├── Yes -> Airflow (mature, SSO support)
└── No -> Dagster or Prefect
Question 4: Asset-based mental model?
├── Yes -> Dagster
└── No -> Prefect or Airflow
Question 5: Want managed cloud?
├── Yes -> Dagster Cloud, Prefect Cloud, Astronomer (Airflow)
└── No -> Self-host или OSS
Hybrid approaches
Custom orchestrator on top of dbtRunner
class DataPipeline:
def __init__(self):
self.runner = dbtRunner()
self.steps = []
def add_dbt_step(self, name, selector):
self.steps.append({
'type': 'dbt',
'name': name,
'selector': selector
})
def add_python_step(self, name, callable):
self.steps.append({
'type': 'python',
'name': name,
'callable': callable
})
def run(self):
for step in self.steps:
print(f'Running: {step["name"]}')
if step['type'] == 'dbt':
self.runner.invoke(['run', '--select', step['selector']])
elif step['type'] == 'python':
step['callable']()
pipeline = DataPipeline()
pipeline.add_dbt_step('staging', 'tag:staging')
pipeline.add_python_step('validate', lambda: validate_data())
pipeline.add_dbt_step('marts', 'tag:marts')
pipeline.add_python_step('export', lambda: export_к_bi())
pipeline.run()
Simple custom orchestration. Works для basic needs.
Dagster + custom logic
@asset
def ml_pipeline(context, my_dbt_assets):
# After dbt assets complete
# Pull data
data = fetch_data_from_dbt_output()
# ML training
model = train_ml(data)
# Save
save_model(model)
return model
Dagster orchestrates, custom Python для ML steps.
Airflow + dbt run-operation
from airflow.operators.bash import BashOperator
from cosmos import DbtDag
with DAG('hybrid') as dag:
# dbt models
dbt_dag = DbtDag(...)
# Custom Python
extract = BashOperator(
task_id='extract_от_api',
bash_command='python scripts/extract.py'
)
# dbt seeds custom data
seed = BashOperator(
task_id='seed_custom',
bash_command='dbt seed --select custom_seeds'
)
extract >> seed >> dbt_dag
Airflow для overall flow, dbt для transformations.
Production-grade example: Dagster
# definitions.py
from dagster import Definitions, AssetSelection, define_asset_job, ScheduleDefinition
from dagster_dbt import DbtCliResource, dbt_assets
from pathlib import Path
import os
DBT_PROJECT = Path('/dbt/jaffle_shop')
@dbt_assets(
manifest=DBT_PROJECT / 'target' / 'manifest.json',
select='tag:nightly'
)
def nightly_dbt_assets(context, dbt: DbtCliResource):
yield from dbt.cli(['build'], context=context).stream()
@dbt_assets(
manifest=DBT_PROJECT / 'target' / 'manifest.json',
select='tag:hourly'
)
def hourly_dbt_assets(context, dbt: DbtCliResource):
yield from dbt.cli(['build'], context=context).stream()
# Jobs
nightly_job = define_asset_job(
'nightly',
selection=AssetSelection.assets(nightly_dbt_assets)
)
hourly_job = define_asset_job(
'hourly',
selection=AssetSelection.assets(hourly_dbt_assets)
)
# Schedules
nightly_schedule = ScheduleDefinition(
job=nightly_job,
cron_schedule='0 6 * * *',
execution_timezone='UTC'
)
hourly_schedule = ScheduleDefinition(
job=hourly_job,
cron_schedule='0 * * * *',
execution_timezone='UTC'
)
# Definitions
defs = Definitions(
assets=[nightly_dbt_assets, hourly_dbt_assets],
schedules=[nightly_schedule, hourly_schedule],
jobs=[nightly_job, hourly_job],
resources={
'dbt': DbtCliResource(
project_dir=str(DBT_PROJECT),
target=os.environ.get('DBT_TARGET', 'prod'),
profiles_dir='/secrets/'
)
}
)
Production deployment:
# Helm chart for Dagster
# - Dagster Daemon (scheduler)
# - Dagit (web UI)
# - User code workspace
# - Postgres metadata DB
# - S3 для I/O
Production-grade example: Airflow + Cosmos
# dags/dbt_dag.py
from airflow import DAG
from airflow.utils.dates import days_ago
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
import os
profile_config = ProfileConfig(
profile_name='analytics',
target_name='prod',
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id='snowflake_default',
profile_args={
'database': 'ANALYTICS',
'schema': 'main',
'warehouse': 'PROD_WH'
}
)
)
execution_config = ExecutionConfig(
execution_mode='local',
invocation_mode='dbt_runner' # use dbtRunner instead of subprocess
)
with DAG(
'analytics_dbt',
schedule='0 6 * * *',
start_date=days_ago(1),
catchup=False,
max_active_runs=1
) as dag:
dbt_dag = DbtDag(
project_config=ProjectConfig('/dbt/jaffle_shop'),
profile_config=profile_config,
execution_config=execution_config,
operator_args={
'install_deps': True,
'append_env': True
}
)
Cosmos auto-generates:
extract_orders -> stg_orders -> fct_orders -> dim_customers
↓ ↓
test_stg_orders test_fct_orders
Each dbt model becomes Airflow task с native UI.
Common pitfalls
1. Schema migration на dbt upgrade
# Manifest schema changes когда dbt-core upgraded
# Orchestrator integration may break
# Mitigation: pin versions
pip install 'dbt-core==1.11.5' 'dagster-dbt==0.20.0'
2. Connection management
# Orchestrator + dbt = many connections
# Snowflake limit может be exceeded
# Limit concurrency per orchestrator
@dbt_assets(manifest=..., partitions_def=DailyPartitionsDefinition())
def daily_dbt_assets():
pass
# Configure asset execution с max_workers
3. Lineage gaps
# Non-dbt steps don't appear в dbt manifest
# Lineage incomplete
# Document via dbt exposures
exposures:
- name: ml_model
type: ml
depends_on:
- ref('fct_orders')
4. Cost — multiple managed services
Dagster Cloud: $X/month
Airflow Cloud: $Y/month
Plus warehouse cost
Budget accordingly
Ключевые выводы
- Dagster + dagster-dbt — asset-based, native lineage, Dagit UI. Best для production data platforms.
- Prefect + prefect-dbt — flow-based, modern API, easier curve. Good для new projects.
- Airflow + Cosmos — mature, auto-gen от manifest, K8s support. Best для enterprise.
- Headless dbt — minimal, cron-based. Good для simple cases.
- Decision factors: existing infra, team size, asset model preference, enterprise needs.
- Hybrid approaches common — orchestrator + custom Python + dbt.
- Production examples: Dagster c schedules + Cosmos с execution modes.
- Pitfalls: schema migration, connection management, lineage gaps, cost.
- Pick one early — switching cost high.
- All viable — choose based on team preference и existing infrastructure.