Materialization-aware dispatch
В прошлом уроке мы видели adapter.dispatch для macros, которые генерируют SQL fragments. В этом уроке — как dispatch применяется к самим материализациям: table, view, incremental, snapshot.
Это территория custom materializations — продвинутая тема, на которую middle-engineer попадает, когда нужно нестандартное поведение dbt: например, материализация в Iceberg-таблицу, или dual-write в две таблицы, или специфическая инкрементальная логика.
Что такое materialization в dbt
{{ config(materialized='table') }} — это вызов специальной macro materialization. dbt поставляется с базовыми реализациями:
materialization tablematerialization viewmaterialization incrementalmaterialization ephemeralmaterialization snapshot
Это всё macros в dbt-core / адаптерах. Их можно посмотреть:
# dbt-core source code, обычно в python venv
find $(dirname $(python -c 'import dbt; print(dbt.__file__)')) -name "*.sql"
Каждая materialization — это функция, которая принимает CTE и возвращает SQL для создания target relation. Она оборачивается в hooks (pre/post), компилирует ddl, делает audit-логи.
Базовый синтаксис custom materialization
dbt-iii: анатомия materialization — registry, manifest, lookup chain dbt-iii: lifecycle materialization — кэш, full_refresh, atomicity{% materialization {name} [, {adapter='default'}] %}
{# ... тело: CREATE / INSERT / DROP ... #}
{% endmaterialization %}
Пример: материализация, которая всегда CREATE OR REPLACE table с timestamp в имени (для immutable snapshots):
-- macros/materializations/duckdb__immutable_table.sql
{% materialization immutable_table, adapter='duckdb' %}
{# 1. Identifier — имя финальной таблицы с timestamp #}
{% set target_relation = api.Relation.create(
identifier=this.identifier ~ '_' ~ run_started_at.strftime('%Y%m%d_%H%M%S'),
schema=this.schema,
database=this.database,
type='table'
) %}
{# 2. Pre-hooks #}
{{ run_hooks(pre_hooks, inside_transaction=False) }}
{# 3. CREATE OR REPLACE TABLE с SQL модели как body #}
{% call statement('main') -%}
CREATE OR REPLACE TABLE {{ target_relation }} AS (
{{ sql }}
)
{%- endcall %}
{# 4. Post-hooks #}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{# 5. Возврат отношения, которое dbt запишет в manifest #}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
Использование в модели:
-- models/marts/snapshots/customer_attributes_snapshot.sql
{{ config(materialized='immutable_table') }}
SELECT customer_id, tier, attribute_set
FROM {{ ref('stg_customer_attributes') }}
После dbt run появится таблица customer_attributes_snapshot_20260519_143022 (с timestamp). Каждый run создаёт новую таблицу — не перезаписывает старую. Это полезно для archival snapshots.
Dispatch для materializations
Custom materialization тоже выбирается dispatch’ом. Шаги:
1. Generic materialization-интерфейс — обычно не нужен (используется dispatch автоматически через adapter argument).
2. Adapter-specific implementations:
-- macros/materializations/duckdb__delta_table.sql
{% materialization delta_table, adapter='duckdb' %}
-- DuckDB implementation
{% endmaterialization %}
-- macros/materializations/snowflake__delta_table.sql
{% materialization delta_table, adapter='snowflake' %}
-- Snowflake implementation (Iceberg, snowtables, etc.)
{% endmaterialization %}
-- macros/materializations/default__delta_table.sql
{% materialization delta_table, default %}
-- Fallback - может просто как table или error
{{ exceptions.raise_compiler_error('delta_table not supported on ' ~ adapter.type()) }}
{% endmaterialization %}
В YAML модели:
{{ config(materialized='delta_table') }}
dbt по adapter.type() находит правильный variant.
materialization-aware dispatch внутри macro (1.9+)
В dbt 1.9 появился новый паттерн: macro, которая ведёт себя по-разному в зависимости от materialization, в котором используется.
Это полезно для hooks и config patterns. Например, на incremental моделях хотим автоматически добавлять merge_update_columns, на table — нет:
-- macros/auto_merge_columns.sql
{% macro auto_merge_columns() %}
{% if config.get('materialized') == 'incremental' %}
{% set update_cols = adapter.get_columns_in_relation(this) | map(attribute='name') | list %}
{% set protected = ['_loaded_at', 'id'] %}
{% set merge_cols = update_cols | reject('in', protected) | list %}
{% do config.set('merge_update_columns', merge_cols) %}
{% endif %}
{% endmacro %}
Использование:
-- models/my_incremental.sql
{{ config(materialized='incremental', unique_key='id') }}
{{ auto_merge_columns() }}
SELECT ... FROM ...
При materialized='incremental' macro вычисляет список колонок и устанавливает merge_update_columns через config.set(). На других материализациях — ничего не делает.
config.set() работает в parse-time / compile-time. На run-time изменение config уже не поможет — dbt уже выбрал стратегию. Поэтому macro вызывайте до SELECT statement.
Override default materialization для адаптера
Иногда хочется поменять default behavior materialized='table' для всего проекта. Например, на DuckDB всегда добавлять index после CREATE.
Подход 1 — post-hook на проектном уровне (без override):
# dbt_project.yml
models:
+post-hook: "{{ create_default_index() }}"
Это работает для всех моделей одинаково. Не зависит от materialization.
Подход 2 — override materialization через dispatch:
-- macros/materializations/duckdb__table.sql
{% materialization table, adapter='duckdb', override=true %}
-- Скопировать default DuckDB table implementation
-- ... добавить custom logic в конце
{% endmaterialization %}
override=true (или просто replacement) перезаписывает встроенный default. Это глобально применяется для всех materialized='table' на DuckDB.
Override built-in materialization — risky. При update dbt-core / dbt-duckdb ваш override может остаться с устаревшей логикой. Тестируйте после каждого upgrade. Используйте только если действительно нужно (нет post-hook решения).
Reading materialization context
Внутри custom materialization доступны:
model— config-словарь модели (имя, schema, materialized, etc).sql— render SQL модели (CTE из ваших WITH … SELECT).pre_hooks/post_hooks— списки hooks.target_relation— финальное Relation (target.identifier).existing_relation— текущая таблица в warehouse (если есть).run_started_at— timestamp начала run’а.
Пример использования:
{% materialization audit_table, adapter='duckdb' %}
{% set target_relation = this %}
{% set existing = adapter.get_relation(...) %}
{% if existing is none %}
{# First run — create #}
{% call statement('main') %}
CREATE TABLE {{ target_relation }} AS ({{ sql }})
{% endcall %}
{% else %}
{# Incremental — append with audit info #}
{% call statement('main') %}
INSERT INTO {{ target_relation }}
SELECT *,
'{{ run_started_at }}' AS _inserted_at,
'{{ invocation_id }}' AS _invocation_id
FROM ({{ sql }}) sub
{% endcall %}
{% endif %}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
Это простая custom incremental с автоматическим audit. Подходит для лога событий, где always-append + audit columns.
Реальный пример: materialization для external location
DuckDB позволяет писать таблицы как parquet файлы в S3/local. Это полезно для archival / sharing.
-- macros/materializations/duckdb__external_parquet.sql
{% materialization external_parquet, adapter='duckdb' %}
{% set location = config.get('location') %}
{% if location is none %}
{{ exceptions.raise_compiler_error("external_parquet requires 'location' config") }}
{% endif %}
{# COPY вместо CREATE TABLE — пишем parquet #}
{% call statement('main') %}
COPY ({{ sql }})
TO '{{ location }}'
(FORMAT PARQUET, COMPRESSION ZSTD)
{% endcall %}
{# Регистрируем как external table в DuckDB для downstream ref() #}
{% set target_relation = api.Relation.create(
identifier=this.identifier,
schema=this.schema,
database=this.database,
type='view'
) %}
{% call statement('register_view') %}
CREATE OR REPLACE VIEW {{ target_relation }} AS
SELECT * FROM read_parquet('{{ location }}')
{% endcall %}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
Использование:
-- models/marts/archive_orders.sql
{{ config(
materialized='external_parquet',
location='s3://my-bucket/archive/orders.parquet'
) }}
SELECT * FROM {{ ref('fct_orders') }}
WHERE order_date < CURRENT_DATE - INTERVAL '1 year'
После dbt run archive_orders записывается как parquet в S3, плюс создаётся view в DuckDB, который читает этот parquet. Downstream ref('archive_orders') работает прозрачно.
materializations и hooks
Custom materialization сама контролирует, когда вызывать pre/post hooks. Обычно:
{% materialization my_custom, adapter='duckdb' %}
{# Before everything — outside transaction #}
{{ run_hooks(pre_hooks, inside_transaction=False) }}
{% call statement('main') %}
-- main DDL
{% endcall %}
{# After main DDL — outside transaction #}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{% endmaterialization %}
inside_transaction — параметр для warehouse с транзакциями (Snowflake / Postgres). На DuckDB — single-writer, транзакции автоматически на каждый statement.
Если в custom materialization вы забудете run_hooks(post_hooks) — все пользовательские +post-hook в моделях не сработают. Это распространённая ошибка.
Когда писать custom materialization
Не часто. В большинстве случаев хватает встроенных + конфигов:
incremental_strategy(append / delete+insert / merge / microbatch).pre_hook/post_hookдля audit, grants, custom DDL.query_tag(Snowflake) для tagging.partition_by/cluster_by(BigQuery / Snowflake).
Custom materialization оправдана когда:
- Внешний формат — Iceberg, Delta, parquet в lake.
- Dual-write — записать в две таблицы одновременно (например, prod + archive).
- Special DDL — версионированные таблицы (immutable_table выше).
- Integration с external system — снапшот в DocumentDB / Redis.
- Custom incremental logic — слишком сложно для встроенного incremental.
Цена custom materialization — поддержка. Каждый upgrade dbt может что-то сломать. Документируйте, тестируйте, имейте план отката.
Тестирование custom materialization
Сложно. Самые надёжные подходы:
1. Integration tests с реальными моделями:
Создайте набор тестовых моделей в tests/integration/:
-- tests/integration/models/test_external_parquet.sql
{{ config(
materialized='external_parquet',
location='/tmp/test_output.parquet',
tags=['integration_test']
) }}
SELECT 1 AS id, 'test' AS name
В CI: dbt run --select tag:integration_test. Проверьте что parquet создан, view зарегистрирован, downstream ref() работает.
2. Singular tests на result:
-- tests/test_external_parquet_creates_parquet.sql
{# Запускается после dbt run #}
SELECT 1 WHERE NOT EXISTS (
SELECT 1 FROM read_parquet('/tmp/test_output.parquet')
)
Падает если parquet не создан.
3. Unit tests (1.8+) — пока работают только для models, не materializations.
В будущих версиях dbt планирует расширить unit tests на materializations. На 2026 — integration tests.
Попробуй сам
В labs:
- Создайте custom materialization
audit_tableдля DuckDB: при первом run — CREATE TABLE, при последующих — INSERT с audit columns (_inserted_at,_invocation_id). - Используйте его в модели
audit_events.sql. Запустите дважды — проверьте что строки накопились с разными invocation_id. - Сделайте dispatch:
duckdb__audit_table+default__audit_table(error). Проверьте поведение на target=duckdb. - Добавьте hooks support: pre_hook / post_hook должны работать. Проверьте через простой
{{ log() }}hook. - Напишите integration test: tests/integration/, который запускает audit_table model и валидирует структуру таблицы.
Это упражнение даёт ощущение полного цикла custom materialization.
Ключевые выводы
- Materialization в dbt — это macro со специальным синтаксисом
{% materialization name, adapter='X' %}. - Dispatch применяется к materialization тоже:
duckdb__tablevssnowflake__tablevsdefault__table. - 1.9+ добавил materialization-aware patterns через
config.set()иconfig.get('materialized')в обычных macros. - Override built-in materializations возможен через
override=true, но risky при upgrade dbt. - Custom materialization имеет доступ к
model,sql,pre_hooks,post_hooks,this,run_started_at. - Use cases: external locations (parquet/Iceberg), dual-write, versioned tables, integration с non-warehouse.
- Testing — integration tests с реальными моделями в
tests/integration/, singular tests на result. - Поддержка custom materialization — налог. Каждый upgrade dbt — re-test.