Learning Platform
Глоссарий Troubleshooting
Урок 07.04 · 24 мин
Продвинутый
orchestratorsdagsterprefectairflowcosmos

Orchestrator integration: Dagster, Prefect, Airflow Cosmos, headless dbt

dbt отлично запускается standalone, но в production обычно интегрируется с orchestrators: Dagster, Prefect, Airflow. Каждый имеет dbt integration package, exposing dbt models as orchestrator-native assets/tasks. Это даёт scheduling, lineage, alerts, multi-pipeline coordination.

В этом уроке — обзор каждой интеграции, comparison, real config examples, decision criteria.


Why orchestrator integration

dbt альоn — limited:

  • Manual triggering or simple cron
  • No cross-pipeline coordination
  • No retry logic
  • Limited observability
  • Hard к combine с non-dbt steps (Python, ML, API calls)
Процесс-level архитектура Airflow 2.x Idempotency — MERGE INTO, atomic writes, deterministic paths

Orchestrator adds:

  • Scheduled runs
  • Failure recovery (retries, alerts)
  • Lineage across systems
  • Multi-step workflows
  • UI / monitoring
  • Multi-team coordination

Dagster + dagster-dbt

Best fit: production data platforms, asset-based model, want comprehensive observability.

Architecture

Dagster represents data as assets. dagster-dbt auto-generates Dagster assets from dbt manifest. Each dbt model = Dagster asset.

from dagster import Definitions
from dagster_dbt import DbtCliResource, dbt_assets
from pathlib import Path

DBT_PROJECT = Path('/path/to/dbt')
DBT_MANIFEST = DBT_PROJECT / 'target' / 'manifest.json'

@dbt_assets(manifest=DBT_MANIFEST)
def my_dbt_assets(context, dbt: DbtCliResource):
    yield from dbt.cli(['build'], context=context).stream()

defs = Definitions(
    assets=[my_dbt_assets],
    resources={
        'dbt': DbtCliResource(
            project_dir=str(DBT_PROJECT),
            target='prod'
        )
    }
)

Features

  • Asset-based UI (Dagit)
  • Auto-generated assets from dbt manifest
  • Native lineage visualization
  • Asset materialization tracking
  • Retries, alerts
  • Sensors (event-driven)
  • Schedules
  • Branch deployments (Dagster Cloud)

Selective runs

from dagster import AssetSelection

# Run specific assets
my_dbt_assets = AssetSelection.assets(my_dbt_assets).select_only(
    'asset:model.proj.fct_orders'
)

Or via dbt selector:

@dbt_assets(manifest=DBT_MANIFEST, select='tag:critical')
def critical_dbt_assets(context, dbt: DbtCliResource):
    yield from dbt.cli(['build'], context=context).stream()

Custom logic per asset

from dagster import asset

@asset(deps=[my_dbt_assets])
def custom_post_processing(context):
    # Run после dbt assets
    do_custom_work()

Schedules

from dagster import schedule, RunRequest

@schedule(cron_schedule='0 6 * * *')
def daily_build(context):
    return RunRequest(
        run_key=context.scheduled_execution_time.isoformat()
    )

Observability

@asset
def model_with_metadata(context):
    result = dbt.cli(['build', '--select', 'fct_orders']).wait()
    
    for r in result.result.results:
        context.add_output_metadata({
            'rows_affected': r.adapter_response.get('rows_affected'),
            'duration': r.execution_time
        })

Dagster UI shows metadata per materialization.

Production

  • Dagster Cloud (managed)
  • Self-hosted Dagster Daemon + workspace
  • Postgres backend для metadata
  • S3 для I/O manager

When choose Dagster

  • Production data platform
  • 5+ engineers
  • Want asset-based mental model
  • Multiple data systems integrated
  • Lineage critical
  • Want vendor option (Cloud)

Prefect + prefect-dbt

Best fit: easier learning curve, hybrid orchestration, modern API.

Architecture

Prefect uses flows и tasks. prefect-dbt integrates dbt as Prefect tasks.

from prefect import flow
from prefect_dbt.cli.commands import DbtCoreOperation

@flow
def dbt_pipeline():
    # Build all models
    build_result = DbtCoreOperation(
        commands=['dbt build'],
        project_dir='/path/to/dbt',
        profiles_dir='~/.dbt',
        overwrite_profiles=False
    ).run()
    
    return build_result

# Run
dbt_pipeline()

Or with specific commands:

@flow
def staged_dbt():
    # Stage 1
    DbtCoreOperation(commands=['dbt run --select tag:staging']).run()
    
    # Stage 2 — depends на stage 1
    DbtCoreOperation(commands=['dbt run --select tag:marts']).run()
    
    # Stage 3
    DbtCoreOperation(commands=['dbt test']).run()

Features

  • Cloud UI (Prefect Cloud)
  • Decorator-based API (familiar Python)
  • Flow versioning
  • Retries built-in
  • Cache results
  • Notifications
  • Deploy via Cloud or self-host

Cloud vs OSS

  • Prefect OSS: free, self-host
  • Prefect Cloud: hosted, free tier for small projects, paid for larger

When choose Prefect

  • New projects (modern API)
  • Smaller team
  • Want easier learning curve
  • Like decorator-based syntax
  • Want hosted option without enterprise pricing

Airflow + Cosmos

Best fit: existing Airflow infrastructure, мature enterprise teams, mass coordination.

Architecture

Airflow uses DAGs of operators. Cosmos library generates Airflow DAGs from dbt manifest.

from airflow import DAG
from cosmos import DbtDag, ProjectConfig, ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

with DAG(
    'dbt_dag',
    schedule='@daily',
    start_date=days_ago(1)
) as dag:
    dbt_dag = DbtDag(
        project_config=ProjectConfig('/path/to/dbt'),
        profile_config=ProfileConfig(
            profile_name='analytics',
            target_name='prod',
            profile_mapping=SnowflakeUserPasswordProfileMapping(...)
        )
    )

Cosmos automatically:

  • Parses manifest
  • Creates Airflow tasks per dbt model
  • Resolves dependencies (DAG)
  • Runs in parallel based on deps

Features

  • Airflow’s mature scheduling
  • Web UI
  • Connection management
  • Operator library (BashOperator, KubernetesOperator, etc.)
  • LDAP / SSO support
  • Multi-tenancy

Variants

Cosmos рендерит dbt models as Airflow tasks differently:

from cosmos import LoadMode

# 1. DBT_LS — runs dbt ls для discovery
dbt_dag = DbtDag(
    project_config=ProjectConfig('/path/to/dbt'),
    render_config=RenderConfig(load_method=LoadMode.DBT_LS)
)

# 2. DBT_MANIFEST — reads manifest.json
dbt_dag = DbtDag(
    project_config=ProjectConfig('/path/to/dbt', manifest_path='target/manifest.json'),
    render_config=RenderConfig(load_method=LoadMode.DBT_MANIFEST)
)

# 3. CUSTOM — programmatic

Each method has trade-offs. Manifest method fastest.

Execution modes

from cosmos import ExecutionMode

# Local — each task subprocess (default)
ExecutionMode.LOCAL

# Virtual env per task
ExecutionMode.VIRTUALENV

# Docker container per task
ExecutionMode.DOCKER

# Kubernetes pod per task
ExecutionMode.KUBERNETES

Choose based on isolation needs.

When choose Airflow + Cosmos

  • Existing Airflow infrastructure
  • Enterprise environment
  • Multi-team / multi-pipeline
  • Need K8s execution
  • Want mature ecosystem
  • Many non-dbt steps

Headless dbt (custom)

Best fit: maximum control, custom orchestration, embedded apps.

Architecture

Skip orchestrator. Run dbt programmatically с dbtRunner. Build custom logic.

from dbt.cli.main import dbtRunner
import schedule
import time

runner = dbtRunner()

def daily_build():
    result = runner.invoke(['build', '--target', 'prod'])
    if not result.success:
        send_alert('dbt failed')

schedule.every().day.at('06:00').do(daily_build)

while True:
    schedule.run_pending()
    time.sleep(60)

Or via cron:

# crontab
0 6 * * * cd /path/to/dbt && dbt build --target prod

Or GitHub Actions:

on:
  schedule:
    - cron: '0 6 * * *'

jobs:
  build:
    steps:
      - run: dbt build --target prod

Features

  • Maximum simplicity
  • No orchestrator dependency
  • Lightweight
  • Custom logic anywhere

Limitations

  • No UI
  • Manual error handling
  • No lineage tracking
  • No multi-pipeline coordination

When choose Headless

  • Small team (1-3)
  • Simple workflows
  • Cost-sensitive
  • Have existing scheduler
  • Custom logic dominant

Comparison matrix

Feature              | Dagster + dbt | Prefect + dbt | Airflow + Cosmos | Headless
---------------------|---------------|----------------|------------------|----------
Maturity             | High          | Medium         | Very high        | N/A
UI                   | Excellent     | Good           | Good             | None
Asset-based          | Yes           | Flow-based     | DAG-based        | No
Auto-gen от manifest | Yes           | No             | Yes (Cosmos)     | No
Lineage              | Native        | Limited        | Via plugins      | None
Schedule             | Built-in      | Built-in       | Built-in         | Cron
Retry                | Built-in      | Built-in       | Built-in         | Manual
K8s execution        | Yes           | Yes            | Yes              | Manual
Learning curve       | Medium        | Low-Medium     | Medium-High      | Trivial
Cloud option         | Yes ($)       | Yes ($)        | Astronomer ($)   | N/A
Self-host complexity | Medium        | Low            | High             | None
Multi-pipeline       | Strong        | Decent         | Strong           | Manual
Best for             | Production    | Modern stacks  | Enterprise       | Simple

Decision framework

Question 1: Existing orchestrator?
├── Airflow -> Add Cosmos
├── Dagster -> Add dagster-dbt
├── Prefect -> Add prefect-dbt
└── None -> Question 2

Question 2: Team size?
├── 1-3 -> Headless (cron / GitHub Actions)
├── 4-10 -> Dagster или Prefect
└── 10+ -> Dagster, Airflow, или custom

Question 3: Compliance / enterprise?
├── Yes -> Airflow (mature, SSO support)
└── No -> Dagster or Prefect

Question 4: Asset-based mental model?
├── Yes -> Dagster
└── No -> Prefect or Airflow

Question 5: Want managed cloud?
├── Yes -> Dagster Cloud, Prefect Cloud, Astronomer (Airflow)
└── No -> Self-host или OSS

Hybrid approaches

Custom orchestrator on top of dbtRunner

class DataPipeline:
    def __init__(self):
        self.runner = dbtRunner()
        self.steps = []
    
    def add_dbt_step(self, name, selector):
        self.steps.append({
            'type': 'dbt',
            'name': name,
            'selector': selector
        })
    
    def add_python_step(self, name, callable):
        self.steps.append({
            'type': 'python',
            'name': name,
            'callable': callable
        })
    
    def run(self):
        for step in self.steps:
            print(f'Running: {step["name"]}')
            if step['type'] == 'dbt':
                self.runner.invoke(['run', '--select', step['selector']])
            elif step['type'] == 'python':
                step['callable']()

pipeline = DataPipeline()
pipeline.add_dbt_step('staging', 'tag:staging')
pipeline.add_python_step('validate', lambda: validate_data())
pipeline.add_dbt_step('marts', 'tag:marts')
pipeline.add_python_step('export', lambda: export_к_bi())
pipeline.run()

Simple custom orchestration. Works для basic needs.

Dagster + custom logic

@asset
def ml_pipeline(context, my_dbt_assets):
    # After dbt assets complete
    
    # Pull data
    data = fetch_data_from_dbt_output()
    
    # ML training
    model = train_ml(data)
    
    # Save
    save_model(model)
    
    return model

Dagster orchestrates, custom Python для ML steps.

Airflow + dbt run-operation

from airflow.operators.bash import BashOperator
from cosmos import DbtDag

with DAG('hybrid') as dag:
    # dbt models
    dbt_dag = DbtDag(...)
    
    # Custom Python
    extract = BashOperator(
        task_id='extract_от_api',
        bash_command='python scripts/extract.py'
    )
    
    # dbt seeds custom data
    seed = BashOperator(
        task_id='seed_custom',
        bash_command='dbt seed --select custom_seeds'
    )
    
    extract >> seed >> dbt_dag

Airflow для overall flow, dbt для transformations.


Production-grade example: Dagster

# definitions.py
from dagster import Definitions, AssetSelection, define_asset_job, ScheduleDefinition
from dagster_dbt import DbtCliResource, dbt_assets
from pathlib import Path
import os

DBT_PROJECT = Path('/dbt/jaffle_shop')

@dbt_assets(
    manifest=DBT_PROJECT / 'target' / 'manifest.json',
    select='tag:nightly'
)
def nightly_dbt_assets(context, dbt: DbtCliResource):
    yield from dbt.cli(['build'], context=context).stream()

@dbt_assets(
    manifest=DBT_PROJECT / 'target' / 'manifest.json',
    select='tag:hourly'
)
def hourly_dbt_assets(context, dbt: DbtCliResource):
    yield from dbt.cli(['build'], context=context).stream()

# Jobs
nightly_job = define_asset_job(
    'nightly',
    selection=AssetSelection.assets(nightly_dbt_assets)
)

hourly_job = define_asset_job(
    'hourly',
    selection=AssetSelection.assets(hourly_dbt_assets)
)

# Schedules
nightly_schedule = ScheduleDefinition(
    job=nightly_job,
    cron_schedule='0 6 * * *',
    execution_timezone='UTC'
)

hourly_schedule = ScheduleDefinition(
    job=hourly_job,
    cron_schedule='0 * * * *',
    execution_timezone='UTC'
)

# Definitions
defs = Definitions(
    assets=[nightly_dbt_assets, hourly_dbt_assets],
    schedules=[nightly_schedule, hourly_schedule],
    jobs=[nightly_job, hourly_job],
    resources={
        'dbt': DbtCliResource(
            project_dir=str(DBT_PROJECT),
            target=os.environ.get('DBT_TARGET', 'prod'),
            profiles_dir='/secrets/'
        )
    }
)

Production deployment:

# Helm chart for Dagster
# - Dagster Daemon (scheduler)
# - Dagit (web UI)
# - User code workspace
# - Postgres metadata DB
# - S3 для I/O

Production-grade example: Airflow + Cosmos

# dags/dbt_dag.py
from airflow import DAG
from airflow.utils.dates import days_ago
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
import os

profile_config = ProfileConfig(
    profile_name='analytics',
    target_name='prod',
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id='snowflake_default',
        profile_args={
            'database': 'ANALYTICS',
            'schema': 'main',
            'warehouse': 'PROD_WH'
        }
    )
)

execution_config = ExecutionConfig(
    execution_mode='local',
    invocation_mode='dbt_runner'  # use dbtRunner instead of subprocess
)

with DAG(
    'analytics_dbt',
    schedule='0 6 * * *',
    start_date=days_ago(1),
    catchup=False,
    max_active_runs=1
) as dag:
    dbt_dag = DbtDag(
        project_config=ProjectConfig('/dbt/jaffle_shop'),
        profile_config=profile_config,
        execution_config=execution_config,
        operator_args={
            'install_deps': True,
            'append_env': True
        }
    )

Cosmos auto-generates:

extract_orders -> stg_orders -> fct_orders -> dim_customers
                  ↓             ↓
              test_stg_orders   test_fct_orders

Each dbt model becomes Airflow task с native UI.


Common pitfalls

1. Schema migration на dbt upgrade

# Manifest schema changes когда dbt-core upgraded
# Orchestrator integration may break

# Mitigation: pin versions
pip install 'dbt-core==1.11.5' 'dagster-dbt==0.20.0'

2. Connection management

# Orchestrator + dbt = many connections
# Snowflake limit может be exceeded

# Limit concurrency per orchestrator
@dbt_assets(manifest=..., partitions_def=DailyPartitionsDefinition())
def daily_dbt_assets():
    pass

# Configure asset execution с max_workers

3. Lineage gaps

# Non-dbt steps don't appear в dbt manifest
# Lineage incomplete

# Document via dbt exposures
exposures:
  - name: ml_model
    type: ml
    depends_on:
      - ref('fct_orders')

4. Cost — multiple managed services

Dagster Cloud: $X/month
Airflow Cloud: $Y/month
Plus warehouse cost

Budget accordingly

Ключевые выводы

  1. Dagster + dagster-dbt — asset-based, native lineage, Dagit UI. Best для production data platforms.
  2. Prefect + prefect-dbt — flow-based, modern API, easier curve. Good для new projects.
  3. Airflow + Cosmos — mature, auto-gen от manifest, K8s support. Best для enterprise.
  4. Headless dbt — minimal, cron-based. Good для simple cases.
  5. Decision factors: existing infra, team size, asset model preference, enterprise needs.
  6. Hybrid approaches common — orchestrator + custom Python + dbt.
  7. Production examples: Dagster c schedules + Cosmos с execution modes.
  8. Pitfalls: schema migration, connection management, lineage gaps, cost.
  9. Pick one early — switching cost high.
  10. All viable — choose based on team preference и existing infrastructure.
Проверка знанийKnowledge check
Choose orchestrator для new 200-model dbt project, team 5 engineers, no existing infrastructure. Decision factors?
ОтветAnswer
**Decision analysis для greenfield project**:\n\n**Team profile**:\n- 5 engineers (small-medium)\n- New project (no legacy)\n- 200 models (medium size)\n\n**Recommended**: Dagster или Prefect.\n\n**Detailed comparison**:\n\n**Dagster**:\n\nPros для this team:\n- Asset-based model — natural для data\n- Excellent UI showing lineage\n- Production-ready features built-in\n- Active development\n- Good Python integration\n- Free tier sufficient (OSS)\n\nCons:\n- Steeper learning curve чем Prefect\n- More complex deployment\n- Heavier infrastructure\n\nSetup time: 1-2 weeks\nOngoing maintenance: 5-10 hrs/week\n\n**Prefect**:\n\nPros для this team:\n- Familiar Python API\n- Easier learning curve\n- Good для hybrid (dbt + custom Python)\n- Modern, growing ecosystem\n- Cloud option easy\n- Quick к prototype\n\nCons:\n- Less mature than Airflow / Dagster\n- prefect-dbt simpler чем dagster-dbt\n- Smaller community\n- Less lineage visualization\n\nSetup time: 3-5 days\nOngoing maintenance: 3-5 hrs/week\n\n**Why NOT others**:\n\n**Airflow + Cosmos**:\n- Mature, но heavy для greenfield\n- 200 models не enterprise scale\n- Cosmos works, но overkill\n- Setup complexity high\n- Learning curve steep для new team\n- Better when существующая Airflow infrastructure\n\n**Headless (cron + dbt)**:\n- 5 engineers, 200 models too complex для cron\n- No multi-pipeline coordination\n- Limited observability\n- Hard к scale beyond simple use cases\n\n**Decision matrix**:\n\n'''\nCriteria | Dagster | Prefect | Airflow+Cosmos | Headless\n----------------------|---------|---------|----------------|----------\nTeam size (5 eng) | Good | Good | Heavy | Light\nModel count (200) | Good | Good | Overkill | Limited\nNo legacy infra | OK setup| Easy setup| Hard setup | Trivial\nLineage importance | Strong | Decent | Strong | None\nLearning curve | Medium | Low | High | Trivial\nProduction-readiness | High | Medium | Very high | Low\nROI 1 year | High | High | Medium | Limited\nROI 3 years | Very high| High | High | Limits\n'''\n\n**Recommendation: Dagster**\n\nReasoning:\n\n1. **Asset-based model fits dbt naturally** — каждый model = asset.\n2. **Excellent lineage** — critical для understanding 200 models.\n3. **Production-grade** — будет scale с команда.\n4. **Active development** — feature improvements ongoing.\n5. **dagster-dbt mature** — well-integrated package.\n6. **5 engineers can handle complexity** — small enough to learn together.\n\n**Implementation plan**:\n\n**Week 1 — Learn**:\n\n- Engineers complete Dagster tutorial\n- Read dagster-dbt docs\n- Setup local Dagster\n\n**Week 2 — Setup**:\n\n'''bash\n# Install\npip install dagster dagster-dbt\n\n# Project structure\nmkdir -p dagster_project\ncd dagster_project\n\n# Create base\ndagster project scaffold --name analytics_dagster\n'''\n\nConfigure:\n\n'''python\n# definitions.py\nfrom dagster import Definitions\nfrom dagster_dbt import DbtCliResource, dbt_assets\n\n@dbt_assets(manifest='dbt_project/target/manifest.json')\ndef all_assets(context, dbt: DbtCliResource):\n yield from dbt.cli(['build'], context=context).stream()\n\ndefs = Definitions(\n assets=[all_assets],\n resources={'dbt': DbtCliResource(project_dir='dbt_project')}\n)\n'''\n\n**Week 3 — Production deploy**:\n\n- Choose Dagster Cloud (managed, $X/month) или self-host\n- Setup Postgres metadata DB\n- Configure secrets management\n- Setup branch deployments\n\nKubernetes self-host example:\n\n'''yaml\n# helm install\nhelm install dagster dagster/dagster\n\n# Components:\n# - Daemon (scheduler)\n# - Webserver (Dagit)\n# - User code (your code)\n# - Postgres\n'''\n\n**Week 4 — Schedules + alerts**:\n\n'''python\nfrom dagster import schedule, sensor, RunRequest\n\n@schedule(cron_schedule='0 6 * * *')\ndef daily_build():\n return RunRequest()\n\n# Slack alerts\nfrom dagster_slack import slack_on_failure\n\n@slack_on_failure(webhook_url=...)\n@asset\ndef important_asset(context, dbt):\n ...\n'''\n\n**Week 5-6 — Migrate existing**:\n\nMigrate any existing manual cron jobs к Dagster.\n\n**Long-term plan**:\n\n'''\nQ1: Setup, learn, basic schedules\nQ2: Custom integrations (ML, BI, etc.)\nQ3: Multi-team coordination, branch deployments\nQ4: Optimization, observability deepening\n'''\n\n**Cost estimation**:\n\n- Dagster OSS: $0/month\n- Self-hosting (AWS/GCP): ~$200-500/month\n- Dagster Cloud Starter: ~$100/month per organization\n- Engineering time: ~1-2 month full-time для setup\n\nTotal first year: $5-15k если self-host, ~$1.2k Cloud + $X engineering.\n\nROI: saves 10+ hours/week engineering time vs manual cron. Plus production reliability.\n\n**Alternative — start с Prefect**:\n\nIf team uncertain about Dagster commitment:\n\n**Phase 1 (months 1-3)**: Use Prefect — easier к learn.\n\n**Phase 2 (months 4-6)**: Evaluate.\n\nIf Prefect sufficient -> stay.\nIf need more (lineage, asset model) -> migrate к Dagster.\n\n**Migration easier early** before lots of state накоплено.\n\n**Avoid mistake**: chose Airflow because 'industry standard'. Airflow excellent, но overkill для small/medium teams. Heavier setup, steeper curve, distributed complexity.\n\n**Production lessons**:\n\n1. Choose based on team fit, не just popularity\n2. Greenfield -> modern orchestrator (Dagster/Prefect)\n3. Asset-based mental model great для data\n4. Plan for growth (5 -> 20 engineers, 200 -> 2000 models)\n5. Don't underestimate setup time\n6. Invest в team learning\n7. Cloud option valuable initially (reduce ops)\n\n**Recommendation finale**: Dagster для long-term value. Prefect если team needs fastest path к production. Both excellent choices. Avoid Airflow unless existing infrastructure.
Проверка знанийKnowledge check
Production scenario: 50-model dbt project, team 2 engineers, $0 budget, want production-grade scheduling/alerts. Architecture?
ОтветAnswer
**Pragmatic architecture для small team, $0 budget**:\n\n**Option recommendation**: GitHub Actions + headless dbt + custom Slack/PagerDuty integration.\n\n**Why not orchestrator**:\n\n- Dagster Cloud: $100+/month\n- Self-host Dagster: complexity\n- Prefect Cloud: free tier limited\n- Airflow: heavy для 50 models, 2 engineers\n\n**Why GitHub Actions**:\n\n- Free (sufficient minutes для daily build)\n- Schedules built-in\n- Integration с repo\n- No infrastructure к maintain\n- Email alerts native\n\n**Architecture**:\n\n```\n┌──────────────────────┐\n│ GitHub repo │\n│ (dbt project) │\n└──────────┬───────────┘\n │\n ▼\n┌──────────────────────┐\n│ GitHub Actions cron │\n│ - 0 6 * * * (daily) │\n│ - 0 18 * * * (PM) │\n└──────────┬───────────┘\n │\n ▼\n┌──────────────────────┐\n│ Run dbt build │\n│ (subprocess) │\n└──────────┬───────────┘\n │\n ▼\n┌──────────────────────┐\n│ Process run_results │\n│ Send Slack alerts │\n│ Send to PagerDuty │\n│ (on failure) │\n└──────────────────────┘\n```\n\n**Implementation**:\n\n**Step 1 — GitHub Actions workflow**:\n\n```yaml\n# .github/workflows/dbt-build.yml\nname: dbt Daily Build\n\non:\n schedule:\n - cron: '0 6 * * *'\n workflow_dispatch:\n\njobs:\n build:\n runs-on: ubuntu-latest\n timeout-minutes: 60\n \n steps:\n - uses: actions/checkout@v4\n \n - name: Setup Python\n uses: actions/setup-python@v4\n with:\n python-version: '3.11'\n \n - name: Install dbt\n run: pip install dbt-core==1.11 dbt-snowflake==1.11\n \n - name: Restore partial parse cache\n uses: actions/cache@v3\n with:\n path: target/partial_parse.msgpack\n key: dbt-parse-${{ hashFiles('models/**', 'dbt_project.yml') }}\n \n - name: dbt build\n env:\n SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}\n SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}\n SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}\n run: |\n dbt build --target prod\n \n - name: Process results\n if: always()\n run: |\n python scripts/notify_failures.py\n env:\n SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }}\n PAGERDUTY_TOKEN: ${{ secrets.PAGERDUTY_TOKEN }}\n \n - name: Upload artifacts\n if: always()\n uses: actions/upload-artifact@v3\n with:\n name: dbt-artifacts\n path: target/\n'''\n\n**Step 2 — Notification script**:\n\n'''python\n# scripts/notify_failures.py\nimport json\nimport os\nimport requests\n\ndef main():\n # Read run_results\n if not os.path.exists('target/run_results.json'):\n print('No run_results — nothing к process')\n return\n \n rr = json.load(open('target/run_results.json'))\n manifest = json.load(open('target/manifest.json'))\n \n # Find failures\n failures = [\n r for r in rr['results']\n if r['status'] in ('error', 'fail')\n ]\n \n if not failures:\n # Success — optional notification\n send_slack({\n 'text': f'[x] dbt daily build succeeded ({len(rr["results"])} runs)'\n })\n return\n \n # Categorize\n errors = [f for f in failures if f['status'] == 'error']\n test_failures = [f for f in failures if f['status'] == 'fail']\n \n # Build alert\n message = f'ВНИМАНИЕ: dbt build failures:\\n\\n'\n \n if errors:\n message += f'* {len(errors)} Errors* (models failed to run):\\n'\n for err in errors[:5]:\n node = manifest['nodes'].get(err['unique_id'])\n name = node['name'] if node else err['unique_id']\n message += f'- '{name}': {err.get(\"message\", \"\")[:200]}\\n'\n if len(errors) > 5:\n message += f' ... and {len(errors) - 5} more\\n'\n message += '\\n'\n \n if test_failures:\n message += f'*ВНИМАНИЕ: {len(test_failures)} Test Failures*:\\n'\n for tf in test_failures[:5]:\n message += f'- '{tf[\"unique_id\"]}': {tf.get(\"failures\", \"?\")} failed rows\\n'\n \n send_slack({'text': message})\n \n # PagerDuty for production errors\n if errors:\n critical_errors = [\n e for e in errors\n if is_critical(e['unique_id'], manifest)\n ]\n if critical_errors:\n send_pagerduty(critical_errors)\n\ndef is_critical(uid, manifest):\n node = manifest['nodes'].get(uid)\n if not node:\n return False\n return (\n node['config'].get('access') == 'public' or\n 'critical' in node.get('tags', [])\n )\n\ndef send_slack(payload):\n webhook = os.environ.get('SLACK_WEBHOOK')\n if not webhook:\n return\n requests.post(webhook, json=payload)\n\ndef send_pagerduty(errors):\n token = os.environ.get('PAGERDUTY_TOKEN')\n if not token:\n return\n \n # PagerDuty Events API v2\n payload = {\n 'routing_key': token,\n 'event_action': 'trigger',\n 'payload': {\n 'summary': f'{len(errors)} critical dbt models failed',\n 'severity': 'error',\n 'source': 'dbt-build'\n }\n }\n requests.post('https://events.pagerduty.com/v2/enqueue', json=payload)\n\nif __name__ == '__main__':\n main()\n'''\n\n**Step 3 — Multiple schedules**:\n\n'''yaml\n# .github/workflows/dbt-hourly.yml — для critical models\non:\n schedule:\n - cron: '0 * * * *' # hourly\n\njobs:\n hourly:\n runs-on: ubuntu-latest\n steps:\n - run: dbt run --select tag:hourly --target prod\n'''\n\n**Step 4 — Source freshness**:\n\n'''yaml\n# .github/workflows/freshness.yml\non:\n schedule:\n - cron: '*/15 * * * *' # every 15 min\n\njobs:\n freshness:\n steps:\n - run: dbt source freshness\n - run: python scripts/check_freshness.py\n'''\n\n**Step 5 — Slim CI для PRs**:\n\n'''yaml\n# .github/workflows/dbt-pr.yml\non:\n pull_request:\n\njobs:\n ci:\n steps:\n - name: Download baseline\n uses: actions/download-artifact@v3\n with:\n name: dbt-artifacts\n path: baseline/\n \n - name: Slim CI\n run: |\n dbt build --select state:modified+ --defer --state baseline/\n'''\n\n**Step 6 — Documentation site**:\n\n'''yaml\n# .github/workflows/dbt-docs.yml\non:\n push:\n branches: [main]\n\njobs:\n docs:\n steps:\n - run: dbt docs generate\n - run: aws s3 sync target/ s3://docs-bucket/\n # Or GitHub Pages\n'''\n\n**Cost estimate**:\n\n- GitHub Actions: free (within 2000 min/month free tier)\n- 50 models × daily build = ~30 min/run = 60 min × 30 days = 1800 min/month [x]\n- Snowflake: separately, query costs\n- Slack/PagerDuty: free tiers\n\n**Operational cost: $0/month**\n\n**Trade-offs vs orchestrator**:\n\n**Limitations**:\n\n- No UI (use Slack alerts)\n- Limited multi-pipeline coordination\n- Manual lineage (via dbt docs)\n- No advanced features (sensors, branch deployments)\n- Difficult к scale beyond 200-300 models\n\n**When fine**:\n\n- Small team\n- Predictable schedules\n- Limited cross-system coordination\n- Can use dbt docs for lineage\n\n**When upgrade к orchestrator**:\n\n- Team grows к 5+ engineers\n- 200+ models\n- Need cross-system orchestration\n- Want lineage UI\n- Have budget\n\n**Comparison**:\n\n'''\nGitHub Actions + dbt | Dagster Cloud Starter | Self-host Airflow\n--------------------|----------------------|-------------------\n$0/month | $100/month | $300-500/month\nSlack alerts | Built-in | Built-in\nNo UI | Excellent UI | Good UI\nLineage via dbt docs | Native | Via plugins\nLimited scaling | Production-ready | Enterprise-ready\nQuick setup | Medium setup | Hard setup\n'''\n\n**Production usage**:\n\n- Many small/medium teams use exactly this stack\n- Sufficient для менее 200 models\n- Easy к maintain\n- Migrate к Dagster/Prefect когда team grows\n\n**Roadmap**:\n\n'''\nMonth 1-12: GitHub Actions + headless dbt\nMonth 12+: Evaluate orchestrator\n - Team size 5+\n - Model count 200+\n - Need advanced features\n - Have budget\n'''\n\nThis is pragmatic production-grade для small team. Don't over-engineer. Scale up когда warranted.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 3. Choose orchestrator для 200-model dbt project, team 5 engineers, greenfield. Comparison?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4