Learning Platform
Глоссарий Troubleshooting
Урок 22.02 · 25 мин
Средний
CapstoneStagingIntermediateMartsTests

В этом уроке мы пройдём по слоям проекта: raw -> staging -> intermediate -> marts. Каждый слой даст работающие SQL-модели и базовые тесты. К концу урока — dbt build зелёный на 13 моделях.

Шаг 0: Инициализация проекта

Создай новый проект:

mkdir my_jaffle_shop && cd my_jaffle_shop
dbt init . --skip-profile-setup

Это создаст структуру:

my_jaffle_shop/
├── dbt_project.yml
├── models/example/
├── seeds/
├── tests/
├── snapshots/
└── macros/

Удали models/example/ — она с шаблонами, нам не нужна.

В ~/.dbt/profiles.yml добавь:

my_jaffle_shop:
  target: dev
  outputs:
    dev:
      type: duckdb
      path: './my_jaffle_shop.duckdb'
      schema: main
      threads: 4

Обнови dbt_project.yml:

name: 'my_jaffle_shop'
version: '1.0.0'
config-version: 2

profile: 'my_jaffle_shop'

model-paths: ["models"]
seed-paths: ["seeds"]
snapshot-paths: ["snapshots"]
test-paths: ["tests"]
macro-paths: ["macros"]

models:
  my_jaffle_shop:
    staging:
      +materialized: view
    intermediate:
      +materialized: ephemeral
    marts:
      +materialized: table

vars:
  gold_threshold_cents: 50000  # 500$ in cents
  silver_threshold_cents: 10000  # 100$

dbt debug — проверь, что соединение работает.

Шаг 1: Seeds (raw CSV)

Создай 6 CSV в seeds/. Структура:

seeds/raw_customers.csv:

id,name,email,country_code,signup_date
1,Anna Petrova,[email protected],RU,2024-01-15
2,Boris Ivanov,[email protected],RU,2024-02-03
3,Carla Smith,[email protected],US,2024-01-20
...

(Для реального проекта используй ~50-100 строк. Для теста — 10-20.)

seeds/raw_orders.csv:

id,customer_id,placed_at,status
1001,1,2024-03-10 14:23:00,delivered
1002,1,2024-03-15 09:11:00,delivered
1003,2,2024-04-01 12:00:00,returned
...

seeds/raw_order_items.csv:

order_id,product_sku,quantity,unit_price_cents
1001,SKU-A,2,1500
1001,SKU-B,1,2500
1002,SKU-A,1,1500
...

seeds/raw_payments.csv:

id,order_id,paid_at,amount_cents,method
5001,1001,2024-03-10 14:25:00,5500,card
5002,1002,2024-03-15 09:15:00,1500,card
...

seeds/products_catalog.csv:

sku,name,category
SKU-A,Coffee Mug,kitchen
SKU-B,Travel Bottle,kitchen
SKU-C,Notebook,office
...

seeds/country_codes.csv:

code,name
RU,Russia
US,United States
DE,Germany
...

Сидируй:

dbt seed

Должен пройти зелёный — 6 seeds load.

Шаг 2: Sources

Хотя мы используем seeds, удобно объявить их как sources для единообразия. В реальном проекте seeds декларируют через _seeds.yml, не _sources.yml, но для capstone мы выберем простой путь — referenciуем seeds напрямую в staging через ref().

seeds/_seeds.yml — описания:

version: 2

seeds:
  - name: raw_customers
    description: "Synthetic customer data: 50 customers across 5 countries"
    columns:
      - name: id
        description: "Customer primary key"
        data_tests:
          - not_null
          - unique
  
  - name: raw_orders
    description: "Orders placed by customers"
    columns:
      - name: id
        description: "Order primary key"
        data_tests:
          - not_null
          - unique
      - name: customer_id
        data_tests:
          - relationships:
              to: ref('raw_customers')
              field: id
  
  - name: raw_order_items
    description: "Items in each order, joined to products by SKU"
  
  - name: raw_payments
    description: "Payments processed for orders"
  
  - name: products_catalog
    description: "Product reference: SKU, name, category"
  
  - name: country_codes
    description: "ISO-2 country codes mapping"

Проверь:

dbt test --select source:my_jaffle_shop

Шаг 3: Staging слой (4 модели)

Staging — это типизированная очистка raw. Минимум бизнес-логики, максимум consistency: переименовываем колонки, кастуем типы, фильтруем edge cases.

models/staging/stg_jaffle__customers.sql

with source as (
    select * from {{ ref('raw_customers') }}
),

renamed as (
    select
        id as customer_id,
        name as customer_name,
        email,
        country_code,
        signup_date::date as signup_date
    from source
)

select * from renamed

models/staging/stg_jaffle__orders.sql

with source as (
    select * from {{ ref('raw_orders') }}
),

renamed as (
    select
        id as order_id,
        customer_id,
        placed_at::timestamp as order_placed_at,
        status
    from source
)

select * from renamed

models/staging/stg_jaffle__order_items.sql

with source as (
    select * from {{ ref('raw_order_items') }}
),

renamed as (
    select
        order_id,
        product_sku,
        quantity,
        unit_price_cents,
        (quantity * unit_price_cents) as line_total_cents
    from source
)

select * from renamed

models/staging/stg_jaffle__payments.sql

with source as (
    select * from {{ ref('raw_payments') }}
),

renamed as (
    select
        id as payment_id,
        order_id,
        paid_at::timestamp as paid_at,
        amount_cents,
        method as payment_method
    from source
)

select * from renamed

Тесты на staging

models/staging/_models.yml:

version: 2

models:
  - name: stg_jaffle__customers
    description: "Cleaned customer data: typed columns, renamed for consistency"
    columns:
      - name: customer_id
        description: "Customer primary key (renamed from raw.id)"
        data_tests:
          - not_null
          - unique
      - name: email
        data_tests:
          - not_null
  
  - name: stg_jaffle__orders
    description: "Cleaned orders data"
    columns:
      - name: order_id
        data_tests:
          - not_null
          - unique
      - name: customer_id
        data_tests:
          - relationships:
              to: ref('stg_jaffle__customers')
              field: customer_id
      - name: status
        data_tests:
          - accepted_values:
              values: ['delivered', 'returned', 'pending']
  
  - name: stg_jaffle__order_items
    description: "Order line items"
    columns:
      - name: order_id
        data_tests:
          - not_null
          - relationships:
              to: ref('stg_jaffle__orders')
              field: order_id
      - name: quantity
        data_tests:
          - not_null
      - name: line_total_cents
        data_tests:
          - not_null
  
  - name: stg_jaffle__payments
    description: "Cleaned payments data"
    columns:
      - name: payment_id
        data_tests:
          - not_null
          - unique
      - name: order_id
        data_tests:
          - not_null
          - relationships:
              to: ref('stg_jaffle__orders')
              field: order_id
      - name: payment_method
        data_tests:
          - accepted_values:
              values: ['card', 'cash', 'bank_transfer']

Запусти:

dbt build --select stg_jaffle__customers stg_jaffle__orders stg_jaffle__order_items stg_jaffle__payments

Должно быть зелёным. Если падает — открой target/compiled/, посмотри SQL, исправь.

Шаг 4: Intermediate (2 модели)

Intermediate — это переходные модели, которые делают неудобную часть бизнес-логики: pivot, join, агрегации, окончательная подготовка перед mart-сборкой.

models/intermediate/int_order_items_pivoted.sql

Это будет one-row-per-order агрегация order_items:

with order_items as (
    select * from {{ ref('stg_jaffle__order_items') }}
),

aggregated as (
    select
        order_id,
        sum(quantity) as total_items_count,
        sum(line_total_cents) as order_total_cents,
        count(distinct product_sku) as distinct_products_count
    from order_items
    group by order_id
)

select * from aggregated

models/intermediate/int_payments_joined.sql

Это join orders + payments:

with orders as (
    select * from {{ ref('stg_jaffle__orders') }}
),

payments as (
    select * from {{ ref('stg_jaffle__payments') }}
),

joined as (
    select
        orders.order_id,
        orders.customer_id,
        orders.order_placed_at,
        orders.status as order_status,
        sum(payments.amount_cents) as total_paid_cents,
        count(payments.payment_id) as payment_count
    from orders
    left join payments on orders.order_id = payments.order_id
    group by orders.order_id, orders.customer_id, orders.order_placed_at, orders.status
)

select * from joined

YAML для intermediate

models/intermediate/_models.yml:

version: 2

models:
  - name: int_order_items_pivoted
    description: "One row per order with item-level aggregates"
    columns:
      - name: order_id
        data_tests:
          - not_null
          - unique
  
  - name: int_payments_joined
    description: "Orders with payment aggregates joined"
    columns:
      - name: order_id
        data_tests:
          - not_null
          - unique

Билд:

dbt build --select intermediate

Note: intermediate материализуется как ephemeral (по конфигу из dbt_project.yml), то есть встраивается в downstream-CTE, не создаётся таблица в warehouse. Это норма для intermediate.

Шаг 5: Marts (3 модели)

Marts — финальные бизнес-сущности. Каждая — это read-ready table для BI или потребителей.

models/marts/customers.sql

Wide customer dimension с агрегатами:

{{ config(materialized='table') }}

with customers as (
    select * from {{ ref('stg_jaffle__customers') }}
),

orders_per_customer as (
    select
        customer_id,
        count(order_id) as orders_count,
        sum(total_paid_cents) as lifetime_spend_cents,
        min(order_placed_at) as first_order_at,
        max(order_placed_at) as latest_order_at
    from {{ ref('int_payments_joined') }}
    group by customer_id
),

countries as (
    select * from {{ ref('country_codes') }}
),

final as (
    select
        c.customer_id,
        c.customer_name,
        c.email,
        c.country_code,
        countries.name as country_name,
        c.signup_date,
        coalesce(opc.orders_count, 0) as orders_count,
        coalesce(opc.lifetime_spend_cents, 0) as lifetime_spend_cents,
        opc.first_order_at,
        opc.latest_order_at,
        case
            when coalesce(opc.lifetime_spend_cents, 0) > {{ var('gold_threshold_cents') }} then 'gold'
            when coalesce(opc.lifetime_spend_cents, 0) > {{ var('silver_threshold_cents') }} then 'silver'
            else 'bronze'
        end as customer_tier
    from customers c
    left join orders_per_customer opc on c.customer_id = opc.customer_id
    left join countries on c.country_code = countries.code
)

select * from final

models/marts/orders.sql

Fact-таблица заказов с обогащением:

{{ config(materialized='table') }}

with orders as (
    select * from {{ ref('int_payments_joined') }}
),

items as (
    select * from {{ ref('int_order_items_pivoted') }}
),

final as (
    select
        o.order_id,
        o.customer_id,
        o.order_placed_at,
        o.order_status,
        i.total_items_count,
        i.order_total_cents,
        i.distinct_products_count,
        o.total_paid_cents,
        o.payment_count,
        case
            when o.total_paid_cents = i.order_total_cents then 'paid_full'
            when o.total_paid_cents > 0 then 'paid_partial'
            else 'unpaid'
        end as payment_status
    from orders o
    left join items i on o.order_id = i.order_id
)

select * from final

models/marts/revenue_daily.sql

Daily aggregate выручки:

{{ config(materialized='table') }}

with orders as (
    select * from {{ ref('orders') }}
)

select
    date(order_placed_at) as order_date,
    count(distinct order_id) as orders_count,
    count(distinct customer_id) as unique_customers,
    sum(order_total_cents) as gross_revenue_cents,
    sum(total_paid_cents) as paid_revenue_cents,
    sum(order_total_cents) - sum(total_paid_cents) as unpaid_revenue_cents
from orders
group by date(order_placed_at)
order by order_date

YAML для marts

models/marts/_models.yml:

version: 2

models:
  - name: customers
    description: "Customer dimension with lifetime metrics and tier"
    columns:
      - name: customer_id
        data_tests:
          - not_null
          - unique
      - name: customer_tier
        data_tests:
          - accepted_values:
              values: ['bronze', 'silver', 'gold']
      - name: lifetime_spend_cents
        data_tests:
          - not_null
  
  - name: orders
    description: "Order fact with item and payment aggregates"
    columns:
      - name: order_id
        data_tests:
          - not_null
          - unique
      - name: customer_id
        data_tests:
          - relationships:
              to: ref('customers')
              field: customer_id
      - name: payment_status
        data_tests:
          - accepted_values:
              values: ['paid_full', 'paid_partial', 'unpaid']
  
  - name: revenue_daily
    description: "Daily revenue aggregates: orders, customers, gross/paid/unpaid"
    columns:
      - name: order_date
        data_tests:
          - not_null
          - unique
      - name: gross_revenue_cents
        data_tests:
          - not_null

Singular тесты

tests/orders_total_equals_items_sum.sql — проверяем, что order_total_cents равен сумме line_totals:

-- Test fails if any order has total != sum of its items
with orders as (
    select order_id, order_total_cents
    from {{ ref('orders') }}
),

items_sum as (
    select order_id, sum(line_total_cents) as items_total
    from {{ ref('stg_jaffle__order_items') }}
    group by order_id
)

select
    o.order_id,
    o.order_total_cents,
    i.items_total
from orders o
join items_sum i on o.order_id = i.order_id
where o.order_total_cents != i.items_total

tests/revenue_daily_matches_orders.sql — daily revenue должно равняться сумме orders по дню:

-- Test fails if revenue_daily doesn't match orders aggregation
with daily as (
    select order_date, gross_revenue_cents
    from {{ ref('revenue_daily') }}
),

from_orders as (
    select date(order_placed_at) as order_date, sum(order_total_cents) as orders_sum
    from {{ ref('orders') }}
    group by date(order_placed_at)
)

select
    d.order_date,
    d.gross_revenue_cents,
    o.orders_sum
from daily d
join from_orders o on d.order_date = o.order_date
where d.gross_revenue_cents != o.orders_sum

Финальный build всего

dbt build

Должен пройти зелёный. Должно быть:

  • 6 seeds loaded
  • 4 staging-моделей материализованы как view
  • 2 intermediate как ephemeral (не показывается отдельно, встроены в downstream)
  • 3 marts как table
  • 15+ тестов проходят

Если что-то падает — открывай compiled SQL, дебажь, фикси, retry.


Чек-лист этого урока

  • dbt_project.yml настроен: model-paths, seed-paths, materializations.
  • 6 raw CSV в seeds/, sourced через _seeds.yml.
  • 4 staging-моделей: rename, type-cast, базовая чистка. Тесты: not_null + unique на PK, relationships, accepted_values.
  • 2 intermediate (ephemeral): pivot order_items, join payments к orders.
  • 3 mart-модели: customers (dim wide), orders (fact), revenue_daily (aggregate).
  • Singular tests: business invariants (sum of items = order_total).
  • Финальный dbt build зелёный.
Star schema: теория mart-слоя

В следующем уроке — snapshot, descriptions, doc blocks, exposures.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 6. В capstone staging-моделях должна быть минимальная бизнес-логика. Что конкретно должна делать staging-модель?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4