Learning Platform
Глоссарий Troubleshooting
Урок 06.03 · 25 мин
Средний
MacrosMaterializationCustom materializationsdispatchdbt 1.9+

Materialization-aware dispatch

В прошлом уроке мы видели adapter.dispatch для macros, которые генерируют SQL fragments. В этом уроке — как dispatch применяется к самим материализациям: table, view, incremental, snapshot.

Это территория custom materializations — продвинутая тема, на которую middle-engineer попадает, когда нужно нестандартное поведение dbt: например, материализация в Iceberg-таблицу, или dual-write в две таблицы, или специфическая инкрементальная логика.


Что такое materialization в dbt

{{ config(materialized='table') }} — это вызов специальной macro materialization. dbt поставляется с базовыми реализациями:

  • materialization table
  • materialization view
  • materialization incremental
  • materialization ephemeral
  • materialization snapshot

Это всё macros в dbt-core / адаптерах. Их можно посмотреть:

# dbt-core source code, обычно в python venv
find $(dirname $(python -c 'import dbt; print(dbt.__file__)')) -name "*.sql"

Каждая materialization — это функция, которая принимает CTE и возвращает SQL для создания target relation. Она оборачивается в hooks (pre/post), компилирует ddl, делает audit-логи.


Базовый синтаксис custom materialization

dbt-iii: анатомия materialization — registry, manifest, lookup chain dbt-iii: lifecycle materialization — кэш, full_refresh, atomicity
{% materialization {name} [, {adapter='default'}] %}
  {# ... тело: CREATE / INSERT / DROP ... #}
{% endmaterialization %}

Пример: материализация, которая всегда CREATE OR REPLACE table с timestamp в имени (для immutable snapshots):

-- macros/materializations/duckdb__immutable_table.sql
{% materialization immutable_table, adapter='duckdb' %}

  {# 1. Identifier — имя финальной таблицы с timestamp #}
  {% set target_relation = api.Relation.create(
      identifier=this.identifier ~ '_' ~ run_started_at.strftime('%Y%m%d_%H%M%S'),
      schema=this.schema,
      database=this.database,
      type='table'
  ) %}

  {# 2. Pre-hooks #}
  {{ run_hooks(pre_hooks, inside_transaction=False) }}

  {# 3. CREATE OR REPLACE TABLE с SQL модели как body #}
  {% call statement('main') -%}
    CREATE OR REPLACE TABLE {{ target_relation }} AS (
      {{ sql }}
    )
  {%- endcall %}

  {# 4. Post-hooks #}
  {{ run_hooks(post_hooks, inside_transaction=False) }}

  {# 5. Возврат отношения, которое dbt запишет в manifest #}
  {{ return({'relations': [target_relation]}) }}

{% endmaterialization %}

Использование в модели:

-- models/marts/snapshots/customer_attributes_snapshot.sql
{{ config(materialized='immutable_table') }}

SELECT customer_id, tier, attribute_set
FROM {{ ref('stg_customer_attributes') }}

После dbt run появится таблица customer_attributes_snapshot_20260519_143022 (с timestamp). Каждый run создаёт новую таблицу — не перезаписывает старую. Это полезно для archival snapshots.


Dispatch для materializations

Custom materialization тоже выбирается dispatch’ом. Шаги:

1. Generic materialization-интерфейс — обычно не нужен (используется dispatch автоматически через adapter argument).

2. Adapter-specific implementations:

-- macros/materializations/duckdb__delta_table.sql
{% materialization delta_table, adapter='duckdb' %}
  -- DuckDB implementation
{% endmaterialization %}

-- macros/materializations/snowflake__delta_table.sql
{% materialization delta_table, adapter='snowflake' %}
  -- Snowflake implementation (Iceberg, snowtables, etc.)
{% endmaterialization %}

-- macros/materializations/default__delta_table.sql
{% materialization delta_table, default %}
  -- Fallback - может просто как table или error
  {{ exceptions.raise_compiler_error('delta_table not supported on ' ~ adapter.type()) }}
{% endmaterialization %}

В YAML модели:

{{ config(materialized='delta_table') }}

dbt по adapter.type() находит правильный variant.


materialization-aware dispatch внутри macro (1.9+)

В dbt 1.9 появился новый паттерн: macro, которая ведёт себя по-разному в зависимости от materialization, в котором используется.

Это полезно для hooks и config patterns. Например, на incremental моделях хотим автоматически добавлять merge_update_columns, на table — нет:

-- macros/auto_merge_columns.sql
{% macro auto_merge_columns() %}
  {% if config.get('materialized') == 'incremental' %}
    {% set update_cols = adapter.get_columns_in_relation(this) | map(attribute='name') | list %}
    {% set protected = ['_loaded_at', 'id'] %}
    {% set merge_cols = update_cols | reject('in', protected) | list %}
    {% do config.set('merge_update_columns', merge_cols) %}
  {% endif %}
{% endmacro %}

Использование:

-- models/my_incremental.sql
{{ config(materialized='incremental', unique_key='id') }}
{{ auto_merge_columns() }}

SELECT ... FROM ...

При materialized='incremental' macro вычисляет список колонок и устанавливает merge_update_columns через config.set(). На других материализациях — ничего не делает.

NOTE

config.set() работает в parse-time / compile-time. На run-time изменение config уже не поможет — dbt уже выбрал стратегию. Поэтому macro вызывайте до SELECT statement.


Override default materialization для адаптера

Иногда хочется поменять default behavior materialized='table' для всего проекта. Например, на DuckDB всегда добавлять index после CREATE.

Подход 1 — post-hook на проектном уровне (без override):

# dbt_project.yml
models:
  +post-hook: "{{ create_default_index() }}"

Это работает для всех моделей одинаково. Не зависит от materialization.

Подход 2 — override materialization через dispatch:

-- macros/materializations/duckdb__table.sql
{% materialization table, adapter='duckdb', override=true %}
  -- Скопировать default DuckDB table implementation
  -- ... добавить custom logic в конце
{% endmaterialization %}

override=true (или просто replacement) перезаписывает встроенный default. Это глобально применяется для всех materialized='table' на DuckDB.

DANGER

Override built-in materialization — risky. При update dbt-core / dbt-duckdb ваш override может остаться с устаревшей логикой. Тестируйте после каждого upgrade. Используйте только если действительно нужно (нет post-hook решения).


Reading materialization context

Внутри custom materialization доступны:

  • model — config-словарь модели (имя, schema, materialized, etc).
  • sql — render SQL модели (CTE из ваших WITH … SELECT).
  • pre_hooks / post_hooks — списки hooks.
  • target_relation — финальное Relation (target.identifier).
  • existing_relation — текущая таблица в warehouse (если есть).
  • run_started_at — timestamp начала run’а.

Пример использования:

{% materialization audit_table, adapter='duckdb' %}
  
  {% set target_relation = this %}
  {% set existing = adapter.get_relation(...) %}
  
  {% if existing is none %}
    {# First run — create #}
    {% call statement('main') %}
      CREATE TABLE {{ target_relation }} AS ({{ sql }})
    {% endcall %}
  {% else %}
    {# Incrementalappend with audit info #}
    {% call statement('main') %}
      INSERT INTO {{ target_relation }}
      SELECT *,
        '{{ run_started_at }}' AS _inserted_at,
        '{{ invocation_id }}' AS _invocation_id
      FROM ({{ sql }}) sub
    {% endcall %}
  {% endif %}
  
  {{ return({'relations': [target_relation]}) }}
{% endmaterialization %}

Это простая custom incremental с автоматическим audit. Подходит для лога событий, где always-append + audit columns.


Реальный пример: materialization для external location

DuckDB позволяет писать таблицы как parquet файлы в S3/local. Это полезно для archival / sharing.

-- macros/materializations/duckdb__external_parquet.sql
{% materialization external_parquet, adapter='duckdb' %}

  {% set location = config.get('location') %}
  {% if location is none %}
    {{ exceptions.raise_compiler_error("external_parquet requires 'location' config") }}
  {% endif %}

  {# COPY вместо CREATE TABLE — пишем parquet #}
  {% call statement('main') %}
    COPY ({{ sql }})
    TO '{{ location }}'
    (FORMAT PARQUET, COMPRESSION ZSTD)
  {% endcall %}

  {# Регистрируем как external table в DuckDB для downstream ref() #}
  {% set target_relation = api.Relation.create(
    identifier=this.identifier,
    schema=this.schema,
    database=this.database,
    type='view'
  ) %}

  {% call statement('register_view') %}
    CREATE OR REPLACE VIEW {{ target_relation }} AS
    SELECT * FROM read_parquet('{{ location }}')
  {% endcall %}

  {{ return({'relations': [target_relation]}) }}

{% endmaterialization %}

Использование:

-- models/marts/archive_orders.sql
{{ config(
    materialized='external_parquet',
    location='s3://my-bucket/archive/orders.parquet'
) }}

SELECT * FROM {{ ref('fct_orders') }}
WHERE order_date < CURRENT_DATE - INTERVAL '1 year'

После dbt run archive_orders записывается как parquet в S3, плюс создаётся view в DuckDB, который читает этот parquet. Downstream ref('archive_orders') работает прозрачно.


materializations и hooks

Custom materialization сама контролирует, когда вызывать pre/post hooks. Обычно:

{% materialization my_custom, adapter='duckdb' %}
  {# Before everything — outside transaction #}
  {{ run_hooks(pre_hooks, inside_transaction=False) }}

  {% call statement('main') %}
    -- main DDL
  {% endcall %}

  {# After main DDL — outside transaction #}
  {{ run_hooks(post_hooks, inside_transaction=False) }}
{% endmaterialization %}

inside_transaction — параметр для warehouse с транзакциями (Snowflake / Postgres). На DuckDB — single-writer, транзакции автоматически на каждый statement.

Если в custom materialization вы забудете run_hooks(post_hooks) — все пользовательские +post-hook в моделях не сработают. Это распространённая ошибка.


Когда писать custom materialization

Не часто. В большинстве случаев хватает встроенных + конфигов:

  • incremental_strategy (append / delete+insert / merge / microbatch).
  • pre_hook / post_hook для audit, grants, custom DDL.
  • query_tag (Snowflake) для tagging.
  • partition_by / cluster_by (BigQuery / Snowflake).

Custom materialization оправдана когда:

  1. Внешний формат — Iceberg, Delta, parquet в lake.
  2. Dual-write — записать в две таблицы одновременно (например, prod + archive).
  3. Special DDL — версионированные таблицы (immutable_table выше).
  4. Integration с external system — снапшот в DocumentDB / Redis.
  5. Custom incremental logic — слишком сложно для встроенного incremental.

Цена custom materialization — поддержка. Каждый upgrade dbt может что-то сломать. Документируйте, тестируйте, имейте план отката.


Тестирование custom materialization

Сложно. Самые надёжные подходы:

1. Integration tests с реальными моделями:

Создайте набор тестовых моделей в tests/integration/:

-- tests/integration/models/test_external_parquet.sql
{{ config(
    materialized='external_parquet',
    location='/tmp/test_output.parquet',
    tags=['integration_test']
) }}

SELECT 1 AS id, 'test' AS name

В CI: dbt run --select tag:integration_test. Проверьте что parquet создан, view зарегистрирован, downstream ref() работает.

2. Singular tests на result:

-- tests/test_external_parquet_creates_parquet.sql
{# Запускается после dbt run #}
SELECT 1 WHERE NOT EXISTS (
    SELECT 1 FROM read_parquet('/tmp/test_output.parquet')
)

Падает если parquet не создан.

3. Unit tests (1.8+) — пока работают только для models, не materializations.

В будущих версиях dbt планирует расширить unit tests на materializations. На 2026 — integration tests.


Попробуй сам

В labs:

  1. Создайте custom materialization audit_table для DuckDB: при первом run — CREATE TABLE, при последующих — INSERT с audit columns (_inserted_at, _invocation_id).
  2. Используйте его в модели audit_events.sql. Запустите дважды — проверьте что строки накопились с разными invocation_id.
  3. Сделайте dispatch: duckdb__audit_table + default__audit_table (error). Проверьте поведение на target=duckdb.
  4. Добавьте hooks support: pre_hook / post_hook должны работать. Проверьте через простой {{ log() }} hook.
  5. Напишите integration test: tests/integration/, который запускает audit_table model и валидирует структуру таблицы.

Это упражнение даёт ощущение полного цикла custom materialization.


Ключевые выводы

  1. Materialization в dbt — это macro со специальным синтаксисом {% materialization name, adapter='X' %}.
  2. Dispatch применяется к materialization тоже: duckdb__table vs snowflake__table vs default__table.
  3. 1.9+ добавил materialization-aware patterns через config.set() и config.get('materialized') в обычных macros.
  4. Override built-in materializations возможен через override=true, но risky при upgrade dbt.
  5. Custom materialization имеет доступ к model, sql, pre_hooks, post_hooks, this, run_started_at.
  6. Use cases: external locations (parquet/Iceberg), dual-write, versioned tables, integration с non-warehouse.
  7. Testing — integration tests с реальными моделями в tests/integration/, singular tests на result.
  8. Поддержка custom materialization — налог. Каждый upgrade dbt — re-test.
Проверка знанийKnowledge check
Production проект на DuckDB. Аналитики хотят archive снапшоты — каждый dbt run создаёт immutable parquet в s3://archive/{model}_{run_date}.parquet, при этом latest доступен через ref(). Какая стратегия?
ОтветAnswer
Это типичный use case для **custom materialization**. Архитектура:\n\n```sql\n-- macros/materializations/duckdb__archive_parquet.sql\n{% materialization archive_parquet, adapter='duckdb' %}\n\n {% set base_location = config.get('archive_location') %}\n {% set run_date = run_started_at.strftime('%Y-%m-%d') %}\n {% set archive_path = base_location ~ '/' ~ this.identifier ~ '_' ~ run_date ~ '.parquet' %}\n {% set latest_path = base_location ~ '/' ~ this.identifier ~ '_latest.parquet' %}\n\n {{ run_hooks(pre_hooks, inside_transaction=False) }}\n\n {# 1. Записываем archive (timestamped) #}\n {% call statement('archive') %}\n COPY ({{ sql }})\n TO '{{ archive_path }}'\n (FORMAT PARQUET, COMPRESSION ZSTD);\n {% endcall %}\n\n {# 2. Записываем latest (overwrite) #}\n {% call statement('main') %}\n COPY ({{ sql }})\n TO '{{ latest_path }}'\n (FORMAT PARQUET, COMPRESSION ZSTD);\n {% endcall %}\n\n {# 3. View для downstream ref() #}\n {% set target_relation = api.Relation.create(\n identifier=this.identifier, schema=this.schema, database=this.database, type='view'\n ) %}\n\n {% call statement('register_view') %}\n CREATE OR REPLACE VIEW {{ target_relation }} AS\n SELECT * FROM read_parquet('{{ latest_path }}');\n {% endcall %}\n\n {{ run_hooks(post_hooks, inside_transaction=False) }}\n {{ return({'relations': [target_relation]}) }}\n\n{% endmaterialization %}\n```\n\nИспользование:\n\n```sql\n-- models/marts/archive_orders.sql\n{{ config(\n materialized='archive_parquet',\n archive_location='s3://my-bucket/archive/orders'\n) }}\n\nSELECT * FROM {{ ref('fct_orders') }}\n```\n\nЧто это даёт:\n\n1. **Каждый run пишет архив**: `s3://archive/orders/archive_orders_2026-05-19.parquet`. Immutable.\n2. **Latest всегда доступен**: `archive_orders_latest.parquet` overwrite каждый run.\n3. **Downstream ref() работает прозрачно**: view над latest parquet. Аналитики просто `SELECT * FROM {{ ref('archive_orders') }}` и видят свежие данные.\n4. **DuckDB ATTACH** к S3 через httpfs/secrets — настроено в profiles.yml.\n\nДополнительные considerations:\n\n- **Retention policy**: archive файлы накапливаются. Нужен external cleanup job (Airflow / scheduled lambda), удаляет файлы старше N дней. Не в dbt — это task для other tool.\n- **Cost**: каждый run = новый файл в S3. Если table большая, storage costs растут. Compress + partition по дате.\n- **Restore**: если нужно вернуть к версии 2026-04-15, на лету: `SELECT * FROM read_parquet('s3://.../archive_orders_2026-04-15.parquet')` в ad-hoc query.\n- **Тесты**: integration tests в tests/integration/ — запускают модель, проверяют что оба файла созданы, view работает.\n\nАльтернативы без custom materialization:\n\n**1. post-hook approach** — стандартная materialized='table', post-hook делает COPY в parquet. Проще, но less elegant: модель материализуется дважды (table в DuckDB + COPY).\n\n**2. external table + scheduled archive** — модель materialized='external_location' (если есть в dbt-duckdb), отдельный scheduled job делает COPY с timestamp suffix. Decoupled но more moving parts.\n\n**3. dbt Mesh + cross-project ref** — overkill для single project.\n\nДля чистого dbt-native подхода — custom materialization. Тратите время на её написание один раз, потом аналитики получают archive automatically.
Проверка знанийKnowledge check
Команда хочет: для всех incremental моделей в проекте автоматически добавлять merge_update_columns = все колонки кроме id, _loaded_at. Какой dbt 1.9+ паттерн использовать?
ОтветAnswer
Это classic case materialization-aware macro через `config.set()` в dbt 1.9+.\n\nРешение:\n\n```sql\n-- macros/auto_merge_columns.sql\n{% macro auto_merge_columns(protected_columns=['id', '_loaded_at']) %}\n{#\n Автоматически устанавливает merge_update_columns для incremental моделей.\n Берёт все колонки this таблицы, исключает protected, ставит в config.\n \n Args:\n protected_columns: list колонок, которые НЕ обновлять при merge\n \n Использование в модели:\n {{ config(materialized='incremental', unique_key='id') }}\n {{ auto_merge_columns() }}\n SELECT ... FROM ...\n#}\n\n {# Работает только для incremental, на других - noop #}\n {% if config.get('materialized') == 'incremental' %}\n \n {# Защита: вызываем только если таблица существует (не на первом run) #}\n {% if execute and adapter.get_relation(\n database=this.database, schema=this.schema, identifier=this.identifier\n ) is not none %}\n \n {% set cols = adapter.get_columns_in_relation(this) %}\n {% set col_names = cols | map(attribute='name') | list %}\n {% set update_cols = col_names | reject('in', protected_columns) | list %}\n \n {% do config.set('merge_update_columns', update_cols) %}\n {% do log('Auto-set merge_update_columns for ' ~ this.identifier ~ ': ' ~ update_cols, info=true) %}\n \n {% endif %}\n \n {% endif %}\n\n{% endmacro %}\n```\n\nИспользование в incremental модели:\n\n```sql\n-- models/marts/fct_orders.sql\n{{ config(\n materialized='incremental',\n unique_key='order_id',\n incremental_strategy='merge'\n) }}\n{{ auto_merge_columns(protected_columns=['order_id', 'created_at']) }}\n\nSELECT * FROM {{ ref('stg_orders') }}\n{% if is_incremental() %}\nWHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})\n{% endif %}\n```\n\nЧто происходит:\n\n1. `auto_merge_columns` запускается до SELECT statement (это compile time).\n2. Проверяет config.get('materialized') = 'incremental' — да, продолжаем.\n3. Проверяет что таблица уже существует в warehouse (не первый run).\n4. Получает список колонок таблицы.\n5. Исключает protected ([order_id, created_at]).\n6. Устанавливает config.set('merge_update_columns', [...]).\n7. dbt при материализации читает merge_update_columns и использует в MERGE SQL.\n\nПреимущества:\n\n- **Автоматизация**: одно вызов macro в модели, не нужно перечислять колонки вручную.\n- **Адаптируется к schema evolution**: если в таблицу добавили новую колонку, она автоматически попадает в update.\n- **Защита id / audit колонок**: эти не обновляются при merge.\n- **Materialization-aware**: на table / view моделях macro — noop, не влияет.\n\nКлючевые detail:\n\n- **`execute`**: dbt parse/compile может вызывать macro дважды. Первый раз `execute=False` — нет access к warehouse. Второй раз `execute=True` — есть. Защита `if execute` нужна, иначе adapter.get_columns_in_relation падает на parse-time.\n- **`adapter.get_relation`**: первый run модели — таблицы ещё нет. get_columns_in_relation падает. Защита через get_relation is not none.\n- **`config.set()`**: работает только в compile-time. Если попытаться в runtime SELECT (типа `SELECT {{ config.set(...) }}`) — не сработает (или сработает не как ожидается).\n\nЭто пример того, как dbt 1.9+ позволяет писать **smart macros**, которые реагируют на materialization context. До 1.9 это требовало custom materialization (overkill для такого).

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 6. В dbt 1.9+ появился materialization-aware pattern через config.set(). Какой типичный use case?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5