Learning Platform
Глоссарий Troubleshooting
Урок 17.02 · 35 мин
Средний
capstoneincrementalsnapshotsscd2unit-testsduckdb-workarounds

Capstone: incremental и snapshots

Это самый “плотный” урок capstone’а. Здесь мы реализуем три критичных production-фичи: incremental materializations для больших fact-таблиц, snapshots с SCD2 для slowly-changing dimensions, и unit tests на бизнес-логику. Это то, что отличает учебный проект от реального.

Предполагается, что у вас уже есть базовый DAG из урока 1: 8 staging моделей загружены, основные intermediate работают, marts существуют как materialized='table'.

SCD Type 2: теоретическое основание для customers_snapshot

План урока

  1. Incremental для mart_orders — стратегия merge с unique_key + incremental_predicates
  2. Incremental для mart_revenue_daily — стратегия insert_overwrite для daily aggregations
  3. Microbatch для int_events_session — где DuckDB позволяет
  4. Snapshot customers_snapshot — timestamp strategy с invalidate_hard_deletes (workaround на DuckDB)
  5. Snapshot products_snapshot — check strategy
  6. Unit tests — 3 unit’а на бизнес-логику

Incremental: mart_orders

Бизнес-контекст

mart_orders — fact table заказов с обогащением: одна строка на заказ, ~10k новых записей в день, история — 3 года, объём ~10M строк.

Каждый run pipeline должен брать только новые / обновлённые заказы (status может меняться: pending -> paid -> shipped -> delivered -> returned). Стратегия merge подходит: dbt будет искать существующие записи по order_id и обновлять их.

Реализация

-- models/marts/core/mart_orders.sql
{{
  config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='merge',
    incremental_predicates=[
      "DBT_INTERNAL_DEST.order_updated_at >= (CURRENT_DATE - INTERVAL '7 days')"
    ],
    on_schema_change='append_new_columns'
  )
}}

WITH orders AS (
  SELECT * FROM {{ ref('int_orders_enriched') }}
  {% if is_incremental() %}
    WHERE order_updated_at >= (
      SELECT COALESCE(MAX(order_updated_at), '1900-01-01'::TIMESTAMP)
      FROM {{ this }}
    ) - INTERVAL '1 day'
  {% endif %}
),

users AS (
  SELECT * FROM {{ ref('int_users_with_first_order') }}
)

SELECT
  o.order_id,
  o.user_id,
  u.user_email,
  u.first_order_date,
  o.order_status,
  o.order_total_usd,
  o.order_items_count,
  o.shipping_address_country,
  o.order_created_at,
  o.order_updated_at,
  CURRENT_TIMESTAMP AS dbt_loaded_at
FROM orders AS o
LEFT JOIN users AS u USING (user_id)

Разбор ключевых элементов

unique_key='order_id' — это primary key для merge. Если запись с таким order_id уже есть в target — она UPDATE’тся, если нет — INSERT.

incremental_strategy='merge' — стратегия. На DuckDB поддерживается append, merge, delete+insert. merge — лучший выбор для fact’ов с updateable status.

incremental_predicates — оптимизация. Без него merge сканирует всю target таблицу для поиска updates. С predicate сканирует только последние 7 дней. На таблице 10M строк это разница “60 секунд” vs “0.5 секунды”.

WARNING

incremental_predicates требуют префикс DBT_INTERNAL_DEST. Это alias, которым dbt именует target в merge-операции. Без префикса DuckDB / Snowflake поймут predicate как условие на source, не target — и предикат тихо проигнорируется (или вообще исключит данные). Это subtle bug, который проявляется как “merge работает, но иногда дубли”.

on_schema_change='append_new_columns' — что делать, если в новой версии модели появилась колонка, а в target её нет. append_new_columns добавит nullable колонку в target. Альтернативы: ignore (новая колонка молча игнорируется), fail (errors), sync_all_columns (полная sync, может быть дорого).

is_incremental() блок — условие “только при инкрементальном run, не при full-refresh / первом run”. Здесь фильтруем upstream на новые/обновлённые записи. Минус 1 день backward — это lookback window для late-arriving data (события, пришедшие с задержкой).

Тестирование

После реализации:

# Первый build — полный
dbt build --select mart_orders --full-refresh

# Имитируем новые данные (изменим seeds или intermediate)
dbt build --select mart_orders

# Должно работать incremental: dbt run --debug покажет MERGE statement

В dbt run --debug вы увидите примерный SQL:

MERGE INTO mart_orders AS DBT_INTERNAL_DEST
USING mart_orders__dbt_tmp AS DBT_INTERNAL_SOURCE
  ON DBT_INTERNAL_DEST.order_updated_at >= (CURRENT_DATE - INTERVAL '7 days')
  AND DBT_INTERNAL_DEST.order_id = DBT_INTERNAL_SOURCE.order_id
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...

Видно DBT_INTERNAL_DEST префикс — значит predicate работает корректно.


Incremental: mart_revenue_daily

Бизнес-контекст

mart_revenue_daily — daily aggregation: одна строка на (day, country). Объём — 365 дней × 30 стран = ~10k строк за год. Каждый run — пересобираем последние 7 дней (на случай late-arriving orders).

Подходящая стратегия — insert_overwrite: dbt удалит данные за дни, которые мы пересобираем, и вставит свежие.

Реализация

-- models/marts/core/mart_revenue_daily.sql
{{
  config(
    materialized='incremental',
    incremental_strategy='delete+insert',
    unique_key=['order_date', 'shipping_address_country'],
    on_schema_change='append_new_columns'
  )
}}

WITH base AS (
  SELECT * FROM {{ ref('mart_orders') }}
  {% if is_incremental() %}
    WHERE order_created_at::DATE >= (CURRENT_DATE - INTERVAL '7 days')
  {% endif %}
)

SELECT
  order_created_at::DATE AS order_date,
  shipping_address_country,
  COUNT(DISTINCT order_id) AS order_count,
  COUNT(DISTINCT user_id) AS unique_customers,
  SUM(order_total_usd) AS total_revenue_usd,
  AVG(order_total_usd) AS avg_order_value_usd
FROM base
WHERE order_status NOT IN ('cancelled', 'fraudulent')
GROUP BY 1, 2
NOTE

На DuckDB insert_overwrite не реализован напрямую — используем delete+insert как эквивалент с тем же поведением. dbt-duckdb 1.10 поддерживает append, merge, delete+insert. На Snowflake / BigQuery insert_overwrite — нативная стратегия с partition-level overwrite, что эффективнее на partitioned tables.

Composite unique_key

Для delete+insert указываем composite unique_key — список колонок. dbt сгенерирует DELETE WHERE по этому списку:

DELETE FROM mart_revenue_daily
WHERE (order_date, shipping_address_country) IN (
  SELECT order_date, shipping_address_country FROM mart_revenue_daily__dbt_tmp
);
INSERT INTO mart_revenue_daily SELECT * FROM mart_revenue_daily__dbt_tmp;

Это делает rebuild только последних 7 дней — старые данные не трогаются.


Microbatch для int_events_session

Бизнес-контекст

int_events_session — sessionization page-views: группируем events одного пользователя в сессии (30 min gap). Объём — десятки миллионов events. Каждый run обрабатывает только новые события.

В dbt 1.9+ для таких случаев есть microbatch incremental strategy — рассчитан на event-time data с batch-обработкой.

Реализация

-- models/intermediate/events/int_events_session.sql
{{
  config(
    materialized='incremental',
    incremental_strategy='microbatch',
    event_time='event_timestamp',
    batch_size='day',
    begin='2023-01-01',
    lookback=1,
    full_refresh=false
  )
}}

WITH events_with_lag AS (
  SELECT
    event_id,
    user_id,
    event_type,
    event_timestamp,
    page_url,
    LAG(event_timestamp) OVER (
      PARTITION BY user_id
      ORDER BY event_timestamp
    ) AS prev_event_timestamp
  FROM {{ ref('stg_events__page_views') }}
),

sessionized AS (
  SELECT
    *,
    CASE
      WHEN prev_event_timestamp IS NULL
        OR EXTRACT(EPOCH FROM (event_timestamp - prev_event_timestamp)) > 1800
      THEN 1
      ELSE 0
    END AS is_new_session
  FROM events_with_lag
)

SELECT
  event_id,
  user_id,
  event_type,
  event_timestamp,
  page_url,
  SUM(is_new_session) OVER (
    PARTITION BY user_id
    ORDER BY event_timestamp
    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  ) AS session_number
FROM sessionized

Разбор microbatch конфигурации

incremental_strategy='microbatch' — стратегия (GA в 1.9).

event_time='event_timestamp' — колонка, по которой определяется batching.

batch_size='day' — каждый batch = 1 день данных. Альтернативы: hour, month, year.

begin='2023-01-01' — начало истории. Microbatch начинает с этой даты при full-refresh.

lookback=1 — пересобираем последний 1 batch (день) на каждый run. Это покрывает late-arriving events до 1 дня. Если ожидаются late-arrivals до 7 дней — установите lookback=7.

full_refresh=false — защита от случайного --full-refresh. Microbatch может быть очень дорогим на full refresh для длинной истории.

WARNING

Microbatch на DuckDB — частичная поддержка в 1.10. Базовые случаи (batch_size=‘day’ с UTC timestamps) работают. Но: (1) Concurrent batches (concurrent_batches=true) не поддержано на DuckDB. (2) --event-time-start/end для backfills работает не на всех версиях dbt-duckdb. (3) Если ожидается late-arrivals больше lookback — нужны custom incremental_predicates.

В production на Snowflake / BigQuery microbatch более зрелый. Для capstone — простой случай (lookback=1, без concurrent) — работает.


Snapshot: customers_snapshot

Бизнес-контекст

customers — slowly-changing dimension: email, имя, address могут меняться. Нам нужна история через SCD2.

Реализация

В dbt 1.9+ snapshots декларируются в YAML:

# snapshots/customers_snapshot.yml
snapshots:
  - name: customers_snapshot
    relation: source('users_db', 'users')
    config:
      schema: snapshots
      database: capstone
      unique_key: user_id
      strategy: timestamp
      updated_at: updated_at
      hard_deletes: invalidate
      dbt_valid_to_current: "TIMESTAMP '9999-12-31 23:59:59'"
      snapshot_meta_column_names:
        dbt_valid_from: dbt_valid_from
        dbt_valid_to: dbt_valid_to
        dbt_scd_id: dbt_scd_id
        dbt_updated_at: dbt_updated_at
        dbt_is_deleted: dbt_is_deleted

Разбор ключевых элементов

strategy: timestamp — использовать колонку updated_at для определения изменений. Если значение в source изменилось, snapshot закрывает старую версию и создаёт новую.

updated_at: updated_at — колонка с timestamp последнего изменения. Должна обновляться в source при каждом изменении строки.

hard_deletes: invalidate — что делать, если строка пропала из source. invalidate ставит dbt_valid_to = now() для последней версии. Альтернативы:

  • ignore (default) — пропавшая строка остаётся “открытой” forever (dangerous)
  • invalidate — закрыть строку текущим timestamp
  • new_record — добавить новую строку с dbt_is_deleted=true

Для customers invalidate — хороший выбор: знаем дату “удаления” аккаунта.

dbt_valid_to_current — какое значение проставлять dbt_valid_to для текущих (открытых) записей. Default — NULL. Это часто ломает date-range queries: WHERE date BETWEEN dbt_valid_from AND dbt_valid_to будет фильтровать NULL’ы. Альтернатива — '9999-12-31 23:59:59', что делает запросы простыми.

snapshot_meta_column_names (1.9+) — кастомизация имён технических колонок. Например, можно переименовать dbt_scd_id в customer_scd_id для конвенции команды.

DuckDB workaround для hard_deletes

На dbt-duckdb 1.10.0 hard_deletes: invalidate имеет известный bug — не закрывает строки правильно в некоторых сценариях. Workaround — post-hook:

config:
  unique_key: user_id
  strategy: timestamp
  updated_at: updated_at
  post-hook: |
    UPDATE {{ this }}
    SET dbt_valid_to = CURRENT_TIMESTAMP
    WHERE dbt_valid_to IS NULL
      AND user_id NOT IN (SELECT user_id FROM {{ source('users_db', 'users') }})

Это закрывает строки для users, пропавших из source. Не идеальное решение, но рабочее на DuckDB.

TIP

В реальном production-проекте на Snowflake / BigQuery hard_deletes: invalidate работает нативно без workaround. Если вы используете DuckDB только для capstone, а в работе будете на Snowflake — выучите конфигурацию правильно, workaround DuckDB запомните как “так бывает на community-адаптерах”.


Snapshot: products_snapshot — check strategy

Бизнес-контекст

products обновляется без явного updated_at (legacy система). Используем check strategy — dbt считает hash от указанных колонок и сравнивает с прошлым snapshot.

Реализация

-- snapshots/products_snapshot.sql
{% snapshot products_snapshot %}

{{
  config(
    target_schema='snapshots',
    unique_key='product_id',
    strategy='check',
    check_cols=['product_name', 'category_id', 'price_usd', 'is_active'],
    invalidate_hard_deletes=True
  )
}}

SELECT
  product_id,
  product_name,
  category_id,
  price_usd,
  is_active,
  CURRENT_TIMESTAMP AS snapshot_checked_at
FROM {{ source('products_db', 'products') }}

{% endsnapshot %}

Разбор

strategy='check' — сравнение по hash колонок. Если хеш изменился — есть update.

check_cols=['product_name', 'category_id', 'price_usd', 'is_active'] — какие колонки отслеживать. ВСЕ колонки, кроме PK, лучше не указывать — добавление новой колонки в check_cols заставит dbt считать все существующие записи как “изменённые” и создать новые версии.

invalidate_hard_deletes=True — старый синтаксис (pre-1.9) для hard_deletes. Эквивалент hard_deletes: invalidate в YAML.

WARNING

check_cols pitfall: если вы изменяете список колонок в check_cols, при следующем run все строки считаются “изменёнными” (hash же другой) — будут созданы новые версии для всего. История ломается. Подходите к check_cols консервативно — добавляйте только когда точно знаете, что эта колонка должна отслеживаться.


Unit tests на критичные модели

Unit tests — это GA в dbt 1.8+, тесты с явным given/expect для бизнес-логики моделей. В capstone делаем 3 unit’а.

Unit test 1: int_marketing_attribution

Бизнес-логика: attribution-touch получает 100% credit, если single-touch, или 50/50 если first/last touch.

# unit_tests/int_marketing_attribution_unit.yml
unit_tests:
  - name: test_attribution_single_touch
    model: int_marketing_attribution
    given:
      - input: ref('stg_marketing__events')
        rows:
          - {user_id: 1, campaign_id: 'c1', event_at: '2026-05-01', event_type: 'click'}
      - input: ref('stg_orders_db__orders')
        rows:
          - {user_id: 1, order_id: 'o1', order_total_usd: 100, order_created_at: '2026-05-02'}
    expect:
      rows:
        - {user_id: 1, campaign_id: 'c1', attributed_revenue: 100.0, attribution_type: 'single_touch'}

  - name: test_attribution_first_last_touch
    model: int_marketing_attribution
    given:
      - input: ref('stg_marketing__events')
        rows:
          - {user_id: 2, campaign_id: 'c_first', event_at: '2026-05-01', event_type: 'click'}
          - {user_id: 2, campaign_id: 'c_last', event_at: '2026-05-02', event_type: 'click'}
      - input: ref('stg_orders_db__orders')
        rows:
          - {user_id: 2, order_id: 'o2', order_total_usd: 200, order_created_at: '2026-05-03'}
    expect:
      rows:
        - {user_id: 2, campaign_id: 'c_first', attributed_revenue: 100.0, attribution_type: 'first_touch'}
        - {user_id: 2, campaign_id: 'c_last', attributed_revenue: 100.0, attribution_type: 'last_touch'}

Unit test 2: mart_orders fields enrichment

# unit_tests/mart_orders_unit.yml
unit_tests:
  - name: test_mart_orders_joins_user_email
    model: mart_orders
    given:
      - input: ref('int_orders_enriched')
        rows:
          - {order_id: 'o1', user_id: 1, order_status: 'paid', order_total_usd: 100,
             order_items_count: 2, shipping_address_country: 'US',
             order_created_at: '2026-05-01', order_updated_at: '2026-05-01'}
      - input: ref('int_users_with_first_order')
        rows:
          - {user_id: 1, user_email: '[email protected]', first_order_date: '2026-05-01'}
    expect:
      rows:
        - {order_id: 'o1', user_id: 1, user_email: '[email protected]',
           first_order_date: '2026-05-01', order_status: 'paid',
           order_total_usd: 100, order_items_count: 2,
           shipping_address_country: 'US'}

Unit test 3: mart_revenue_daily aggregation

# unit_tests/mart_revenue_daily_unit.yml
unit_tests:
  - name: test_revenue_excludes_cancelled
    model: mart_revenue_daily
    given:
      - input: ref('mart_orders')
        rows:
          - {order_id: 'o1', order_status: 'paid', order_total_usd: 100,
             shipping_address_country: 'US', order_created_at: '2026-05-01'}
          - {order_id: 'o2', order_status: 'cancelled', order_total_usd: 50,
             shipping_address_country: 'US', order_created_at: '2026-05-01'}
          - {order_id: 'o3', order_status: 'fraudulent', order_total_usd: 200,
             shipping_address_country: 'US', order_created_at: '2026-05-01'}
    expect:
      rows:
        - {order_date: '2026-05-01', shipping_address_country: 'US',
           order_count: 1, unique_customers: 1, total_revenue_usd: 100.0,
           avg_order_value_usd: 100.0}

Запуск unit tests

# Только unit tests
dbt test --select test_type:unit

# Конкретный unit
dbt test --select int_marketing_attribution

# Вместе с data tests
dbt test

Unit tests запускаются БЕЗ обращения к warehouse — given/expect задают данные inline. Это позволяет тестировать бизнес-логику в CI без затрат на warehouse.


Проверка после реализации

Запустите полный build:

dbt deps  # установить packages
dbt build --full-refresh

Ожидаемый результат:

  • 30+ моделей построены
  • mart_orders, mart_revenue_daily, int_events_session — incremental (но первый build — full)
  • customers_snapshot, products_snapshot созданы со SCD2 структурой
  • Все unit tests passed

Затем имитируйте инкрементальный run:

# Изменим seed (добавим новый order)
dbt seed --select raw_orders --full-refresh
dbt build --select mart_orders+

Должно работать incremental — заметьте в логах “(incremental)” вместо “(table)” для mart_orders.


Что middle-инженер должен унести

  1. Знать, когда какую стратегию выбирать: merge (status updateable), delete+insert (daily aggregations), microbatch (event-time data).
  2. Уметь корректно использовать incremental_predicates с префиксом DBT_INTERNAL_DEST.
  3. Понимать snapshot strategies: timestamp (with updated_at) vs check (with check_cols).
  4. Знать hard_deletes options и их impact.
  5. Использовать dbt_valid_to_current для удобства downstream queries.
  6. Знать ограничения DuckDB-adapter’а и workarounds для production-features (microbatch concurrent, hard_deletes invalidate).
  7. Писать unit tests с given/expect для бизнес-логики.

Проверка знанийKnowledge check
В вашем mart_orders incremental с unique_key='order_id', strategy='merge'. Через неделю обнаруживаете: производительность падает с 30 секунд до 8 минут. dbt --debug показывает, что MERGE сканирует всю target-таблицу (10M строк). Какая конфигурационная ошибка и как исправить?
ОтветAnswer
Ошибка — отсутствие incremental_predicates с префиксом DBT_INTERNAL_DEST. Без них MERGE сканирует всю target-таблицу при каждом run в поиске matches по unique_key. На 10M строк это становится bottleneck. Решение: добавить в config: incremental_predicates=["DBT_INTERNAL_DEST.order_updated_at >= (CURRENT_DATE - INTERVAL '7 days')"]. Это ограничивает scope MERGE последними 7 днями, что соответствует возможному lookback для late-arriving updates. КРИТИЧНО: префикс DBT_INTERNAL_DEST обязателен — это alias target в merge-операции. Без префикса dbt применит predicate к source (incoming данные), что (а) не оптимизирует target scan и (б) может молча исключить данные. На 10M строк правильный predicate даёт улучшение в 10-50x (только активная партиция сканируется). Дополнительно: проверить, что в Snowflake / BigQuery таблица кластеризована по order_updated_at — это синергично с predicate. Альтернатива на partitioned warehouses (BigQuery) — использовать insert_overwrite с partition_by, что ещё эффективнее, но требует date-partitioned source.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 6. В capstone mart_orders configured как incremental с unique_key='order_id', incremental_strategy='merge'. Через неделю замечают: производительность упала с 30 секунд до 8 минут на 10M строк. Какая критичная конфигурация пропущена?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4