Capstone: incremental и snapshots
Это самый “плотный” урок capstone’а. Здесь мы реализуем три критичных production-фичи: incremental materializations для больших fact-таблиц, snapshots с SCD2 для slowly-changing dimensions, и unit tests на бизнес-логику. Это то, что отличает учебный проект от реального.
Предполагается, что у вас уже есть базовый DAG из урока 1: 8 staging моделей загружены, основные intermediate работают, marts существуют как materialized='table'.
План урока
- Incremental для
mart_orders— стратегияmergeс unique_key + incremental_predicates - Incremental для
mart_revenue_daily— стратегияinsert_overwriteдля daily aggregations - Microbatch для
int_events_session— где DuckDB позволяет - Snapshot
customers_snapshot— timestamp strategy с invalidate_hard_deletes (workaround на DuckDB) - Snapshot
products_snapshot— check strategy - Unit tests — 3 unit’а на бизнес-логику
Incremental: mart_orders
Бизнес-контекст
mart_orders — fact table заказов с обогащением: одна строка на заказ, ~10k новых записей в день, история — 3 года, объём ~10M строк.
Каждый run pipeline должен брать только новые / обновлённые заказы (status может меняться: pending -> paid -> shipped -> delivered -> returned). Стратегия merge подходит: dbt будет искать существующие записи по order_id и обновлять их.
Реализация
-- models/marts/core/mart_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge',
incremental_predicates=[
"DBT_INTERNAL_DEST.order_updated_at >= (CURRENT_DATE - INTERVAL '7 days')"
],
on_schema_change='append_new_columns'
)
}}
WITH orders AS (
SELECT * FROM {{ ref('int_orders_enriched') }}
{% if is_incremental() %}
WHERE order_updated_at >= (
SELECT COALESCE(MAX(order_updated_at), '1900-01-01'::TIMESTAMP)
FROM {{ this }}
) - INTERVAL '1 day'
{% endif %}
),
users AS (
SELECT * FROM {{ ref('int_users_with_first_order') }}
)
SELECT
o.order_id,
o.user_id,
u.user_email,
u.first_order_date,
o.order_status,
o.order_total_usd,
o.order_items_count,
o.shipping_address_country,
o.order_created_at,
o.order_updated_at,
CURRENT_TIMESTAMP AS dbt_loaded_at
FROM orders AS o
LEFT JOIN users AS u USING (user_id)
Разбор ключевых элементов
unique_key='order_id' — это primary key для merge. Если запись с таким order_id уже есть в target — она UPDATE’тся, если нет — INSERT.
incremental_strategy='merge' — стратегия. На DuckDB поддерживается append, merge, delete+insert. merge — лучший выбор для fact’ов с updateable status.
incremental_predicates — оптимизация. Без него merge сканирует всю target таблицу для поиска updates. С predicate сканирует только последние 7 дней. На таблице 10M строк это разница “60 секунд” vs “0.5 секунды”.
incremental_predicates требуют префикс DBT_INTERNAL_DEST. Это alias, которым dbt именует target в merge-операции. Без префикса DuckDB / Snowflake поймут predicate как условие на source, не target — и предикат тихо проигнорируется (или вообще исключит данные). Это subtle bug, который проявляется как “merge работает, но иногда дубли”.
on_schema_change='append_new_columns' — что делать, если в новой версии модели появилась колонка, а в target её нет. append_new_columns добавит nullable колонку в target. Альтернативы: ignore (новая колонка молча игнорируется), fail (errors), sync_all_columns (полная sync, может быть дорого).
is_incremental() блок — условие “только при инкрементальном run, не при full-refresh / первом run”. Здесь фильтруем upstream на новые/обновлённые записи. Минус 1 день backward — это lookback window для late-arriving data (события, пришедшие с задержкой).
Тестирование
После реализации:
# Первый build — полный
dbt build --select mart_orders --full-refresh
# Имитируем новые данные (изменим seeds или intermediate)
dbt build --select mart_orders
# Должно работать incremental: dbt run --debug покажет MERGE statement
В dbt run --debug вы увидите примерный SQL:
MERGE INTO mart_orders AS DBT_INTERNAL_DEST
USING mart_orders__dbt_tmp AS DBT_INTERNAL_SOURCE
ON DBT_INTERNAL_DEST.order_updated_at >= (CURRENT_DATE - INTERVAL '7 days')
AND DBT_INTERNAL_DEST.order_id = DBT_INTERNAL_SOURCE.order_id
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...
Видно DBT_INTERNAL_DEST префикс — значит predicate работает корректно.
Incremental: mart_revenue_daily
Бизнес-контекст
mart_revenue_daily — daily aggregation: одна строка на (day, country). Объём — 365 дней × 30 стран = ~10k строк за год. Каждый run — пересобираем последние 7 дней (на случай late-arriving orders).
Подходящая стратегия — insert_overwrite: dbt удалит данные за дни, которые мы пересобираем, и вставит свежие.
Реализация
-- models/marts/core/mart_revenue_daily.sql
{{
config(
materialized='incremental',
incremental_strategy='delete+insert',
unique_key=['order_date', 'shipping_address_country'],
on_schema_change='append_new_columns'
)
}}
WITH base AS (
SELECT * FROM {{ ref('mart_orders') }}
{% if is_incremental() %}
WHERE order_created_at::DATE >= (CURRENT_DATE - INTERVAL '7 days')
{% endif %}
)
SELECT
order_created_at::DATE AS order_date,
shipping_address_country,
COUNT(DISTINCT order_id) AS order_count,
COUNT(DISTINCT user_id) AS unique_customers,
SUM(order_total_usd) AS total_revenue_usd,
AVG(order_total_usd) AS avg_order_value_usd
FROM base
WHERE order_status NOT IN ('cancelled', 'fraudulent')
GROUP BY 1, 2
На DuckDB insert_overwrite не реализован напрямую — используем delete+insert как эквивалент с тем же поведением. dbt-duckdb 1.10 поддерживает append, merge, delete+insert. На Snowflake / BigQuery insert_overwrite — нативная стратегия с partition-level overwrite, что эффективнее на partitioned tables.
Composite unique_key
Для delete+insert указываем composite unique_key — список колонок. dbt сгенерирует DELETE WHERE по этому списку:
DELETE FROM mart_revenue_daily
WHERE (order_date, shipping_address_country) IN (
SELECT order_date, shipping_address_country FROM mart_revenue_daily__dbt_tmp
);
INSERT INTO mart_revenue_daily SELECT * FROM mart_revenue_daily__dbt_tmp;
Это делает rebuild только последних 7 дней — старые данные не трогаются.
Microbatch для int_events_session
Бизнес-контекст
int_events_session — sessionization page-views: группируем events одного пользователя в сессии (30 min gap). Объём — десятки миллионов events. Каждый run обрабатывает только новые события.
В dbt 1.9+ для таких случаев есть microbatch incremental strategy — рассчитан на event-time data с batch-обработкой.
Реализация
-- models/intermediate/events/int_events_session.sql
{{
config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='event_timestamp',
batch_size='day',
begin='2023-01-01',
lookback=1,
full_refresh=false
)
}}
WITH events_with_lag AS (
SELECT
event_id,
user_id,
event_type,
event_timestamp,
page_url,
LAG(event_timestamp) OVER (
PARTITION BY user_id
ORDER BY event_timestamp
) AS prev_event_timestamp
FROM {{ ref('stg_events__page_views') }}
),
sessionized AS (
SELECT
*,
CASE
WHEN prev_event_timestamp IS NULL
OR EXTRACT(EPOCH FROM (event_timestamp - prev_event_timestamp)) > 1800
THEN 1
ELSE 0
END AS is_new_session
FROM events_with_lag
)
SELECT
event_id,
user_id,
event_type,
event_timestamp,
page_url,
SUM(is_new_session) OVER (
PARTITION BY user_id
ORDER BY event_timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS session_number
FROM sessionized
Разбор microbatch конфигурации
incremental_strategy='microbatch' — стратегия (GA в 1.9).
event_time='event_timestamp' — колонка, по которой определяется batching.
batch_size='day' — каждый batch = 1 день данных. Альтернативы: hour, month, year.
begin='2023-01-01' — начало истории. Microbatch начинает с этой даты при full-refresh.
lookback=1 — пересобираем последний 1 batch (день) на каждый run. Это покрывает late-arriving events до 1 дня. Если ожидаются late-arrivals до 7 дней — установите lookback=7.
full_refresh=false — защита от случайного --full-refresh. Microbatch может быть очень дорогим на full refresh для длинной истории.
Microbatch на DuckDB — частичная поддержка в 1.10. Базовые случаи (batch_size=‘day’ с UTC timestamps) работают. Но: (1) Concurrent batches (concurrent_batches=true) не поддержано на DuckDB. (2) --event-time-start/end для backfills работает не на всех версиях dbt-duckdb. (3) Если ожидается late-arrivals больше lookback — нужны custom incremental_predicates.
В production на Snowflake / BigQuery microbatch более зрелый. Для capstone — простой случай (lookback=1, без concurrent) — работает.
Snapshot: customers_snapshot
Бизнес-контекст
customers — slowly-changing dimension: email, имя, address могут меняться. Нам нужна история через SCD2.
Реализация
В dbt 1.9+ snapshots декларируются в YAML:
# snapshots/customers_snapshot.yml
snapshots:
- name: customers_snapshot
relation: source('users_db', 'users')
config:
schema: snapshots
database: capstone
unique_key: user_id
strategy: timestamp
updated_at: updated_at
hard_deletes: invalidate
dbt_valid_to_current: "TIMESTAMP '9999-12-31 23:59:59'"
snapshot_meta_column_names:
dbt_valid_from: dbt_valid_from
dbt_valid_to: dbt_valid_to
dbt_scd_id: dbt_scd_id
dbt_updated_at: dbt_updated_at
dbt_is_deleted: dbt_is_deleted
Разбор ключевых элементов
strategy: timestamp — использовать колонку updated_at для определения изменений. Если значение в source изменилось, snapshot закрывает старую версию и создаёт новую.
updated_at: updated_at — колонка с timestamp последнего изменения. Должна обновляться в source при каждом изменении строки.
hard_deletes: invalidate — что делать, если строка пропала из source. invalidate ставит dbt_valid_to = now() для последней версии. Альтернативы:
ignore(default) — пропавшая строка остаётся “открытой” forever (dangerous)invalidate— закрыть строку текущим timestampnew_record— добавить новую строку сdbt_is_deleted=true
Для customers invalidate — хороший выбор: знаем дату “удаления” аккаунта.
dbt_valid_to_current — какое значение проставлять dbt_valid_to для текущих (открытых) записей. Default — NULL. Это часто ломает date-range queries: WHERE date BETWEEN dbt_valid_from AND dbt_valid_to будет фильтровать NULL’ы. Альтернатива — '9999-12-31 23:59:59', что делает запросы простыми.
snapshot_meta_column_names (1.9+) — кастомизация имён технических колонок. Например, можно переименовать dbt_scd_id в customer_scd_id для конвенции команды.
DuckDB workaround для hard_deletes
На dbt-duckdb 1.10.0 hard_deletes: invalidate имеет известный bug — не закрывает строки правильно в некоторых сценариях. Workaround — post-hook:
config:
unique_key: user_id
strategy: timestamp
updated_at: updated_at
post-hook: |
UPDATE {{ this }}
SET dbt_valid_to = CURRENT_TIMESTAMP
WHERE dbt_valid_to IS NULL
AND user_id NOT IN (SELECT user_id FROM {{ source('users_db', 'users') }})
Это закрывает строки для users, пропавших из source. Не идеальное решение, но рабочее на DuckDB.
В реальном production-проекте на Snowflake / BigQuery hard_deletes: invalidate работает нативно без workaround. Если вы используете DuckDB только для capstone, а в работе будете на Snowflake — выучите конфигурацию правильно, workaround DuckDB запомните как “так бывает на community-адаптерах”.
Snapshot: products_snapshot — check strategy
Бизнес-контекст
products обновляется без явного updated_at (legacy система). Используем check strategy — dbt считает hash от указанных колонок и сравнивает с прошлым snapshot.
Реализация
-- snapshots/products_snapshot.sql
{% snapshot products_snapshot %}
{{
config(
target_schema='snapshots',
unique_key='product_id',
strategy='check',
check_cols=['product_name', 'category_id', 'price_usd', 'is_active'],
invalidate_hard_deletes=True
)
}}
SELECT
product_id,
product_name,
category_id,
price_usd,
is_active,
CURRENT_TIMESTAMP AS snapshot_checked_at
FROM {{ source('products_db', 'products') }}
{% endsnapshot %}
Разбор
strategy='check' — сравнение по hash колонок. Если хеш изменился — есть update.
check_cols=['product_name', 'category_id', 'price_usd', 'is_active'] — какие колонки отслеживать. ВСЕ колонки, кроме PK, лучше не указывать — добавление новой колонки в check_cols заставит dbt считать все существующие записи как “изменённые” и создать новые версии.
invalidate_hard_deletes=True — старый синтаксис (pre-1.9) для hard_deletes. Эквивалент hard_deletes: invalidate в YAML.
check_cols pitfall: если вы изменяете список колонок в check_cols, при следующем run все строки считаются “изменёнными” (hash же другой) — будут созданы новые версии для всего. История ломается. Подходите к check_cols консервативно — добавляйте только когда точно знаете, что эта колонка должна отслеживаться.
Unit tests на критичные модели
Unit tests — это GA в dbt 1.8+, тесты с явным given/expect для бизнес-логики моделей. В capstone делаем 3 unit’а.
Unit test 1: int_marketing_attribution
Бизнес-логика: attribution-touch получает 100% credit, если single-touch, или 50/50 если first/last touch.
# unit_tests/int_marketing_attribution_unit.yml
unit_tests:
- name: test_attribution_single_touch
model: int_marketing_attribution
given:
- input: ref('stg_marketing__events')
rows:
- {user_id: 1, campaign_id: 'c1', event_at: '2026-05-01', event_type: 'click'}
- input: ref('stg_orders_db__orders')
rows:
- {user_id: 1, order_id: 'o1', order_total_usd: 100, order_created_at: '2026-05-02'}
expect:
rows:
- {user_id: 1, campaign_id: 'c1', attributed_revenue: 100.0, attribution_type: 'single_touch'}
- name: test_attribution_first_last_touch
model: int_marketing_attribution
given:
- input: ref('stg_marketing__events')
rows:
- {user_id: 2, campaign_id: 'c_first', event_at: '2026-05-01', event_type: 'click'}
- {user_id: 2, campaign_id: 'c_last', event_at: '2026-05-02', event_type: 'click'}
- input: ref('stg_orders_db__orders')
rows:
- {user_id: 2, order_id: 'o2', order_total_usd: 200, order_created_at: '2026-05-03'}
expect:
rows:
- {user_id: 2, campaign_id: 'c_first', attributed_revenue: 100.0, attribution_type: 'first_touch'}
- {user_id: 2, campaign_id: 'c_last', attributed_revenue: 100.0, attribution_type: 'last_touch'}
Unit test 2: mart_orders fields enrichment
# unit_tests/mart_orders_unit.yml
unit_tests:
- name: test_mart_orders_joins_user_email
model: mart_orders
given:
- input: ref('int_orders_enriched')
rows:
- {order_id: 'o1', user_id: 1, order_status: 'paid', order_total_usd: 100,
order_items_count: 2, shipping_address_country: 'US',
order_created_at: '2026-05-01', order_updated_at: '2026-05-01'}
- input: ref('int_users_with_first_order')
rows:
- {user_id: 1, user_email: '[email protected]', first_order_date: '2026-05-01'}
expect:
rows:
- {order_id: 'o1', user_id: 1, user_email: '[email protected]',
first_order_date: '2026-05-01', order_status: 'paid',
order_total_usd: 100, order_items_count: 2,
shipping_address_country: 'US'}
Unit test 3: mart_revenue_daily aggregation
# unit_tests/mart_revenue_daily_unit.yml
unit_tests:
- name: test_revenue_excludes_cancelled
model: mart_revenue_daily
given:
- input: ref('mart_orders')
rows:
- {order_id: 'o1', order_status: 'paid', order_total_usd: 100,
shipping_address_country: 'US', order_created_at: '2026-05-01'}
- {order_id: 'o2', order_status: 'cancelled', order_total_usd: 50,
shipping_address_country: 'US', order_created_at: '2026-05-01'}
- {order_id: 'o3', order_status: 'fraudulent', order_total_usd: 200,
shipping_address_country: 'US', order_created_at: '2026-05-01'}
expect:
rows:
- {order_date: '2026-05-01', shipping_address_country: 'US',
order_count: 1, unique_customers: 1, total_revenue_usd: 100.0,
avg_order_value_usd: 100.0}
Запуск unit tests
# Только unit tests
dbt test --select test_type:unit
# Конкретный unit
dbt test --select int_marketing_attribution
# Вместе с data tests
dbt test
Unit tests запускаются БЕЗ обращения к warehouse — given/expect задают данные inline. Это позволяет тестировать бизнес-логику в CI без затрат на warehouse.
Проверка после реализации
Запустите полный build:
dbt deps # установить packages
dbt build --full-refresh
Ожидаемый результат:
- 30+ моделей построены
- mart_orders, mart_revenue_daily, int_events_session — incremental (но первый build — full)
- customers_snapshot, products_snapshot созданы со SCD2 структурой
- Все unit tests passed
Затем имитируйте инкрементальный run:
# Изменим seed (добавим новый order)
dbt seed --select raw_orders --full-refresh
dbt build --select mart_orders+
Должно работать incremental — заметьте в логах “(incremental)” вместо “(table)” для mart_orders.
Что middle-инженер должен унести
- Знать, когда какую стратегию выбирать:
merge(status updateable),delete+insert(daily aggregations),microbatch(event-time data). - Уметь корректно использовать
incremental_predicatesс префиксомDBT_INTERNAL_DEST. - Понимать snapshot strategies:
timestamp(withupdated_at) vscheck(withcheck_cols). - Знать
hard_deletesoptions и их impact. - Использовать
dbt_valid_to_currentдля удобства downstream queries. - Знать ограничения DuckDB-adapter’а и workarounds для production-features (microbatch concurrent, hard_deletes invalidate).
- Писать unit tests с given/expect для бизнес-логики.