Production utility macro: audit log table
Этот урок — практический. Пишем полный production-grade utility macro: audit log table для трекинга всех dbt runs модели. Покажем полный цикл: design, dispatch для multi-warehouse, hooks integration, тесты, документация.
Это типичный artifact analytics engineer: каждая модель пишет audit row в общую таблицу analytics_meta.dbt_run_log с metadata о run’е (когда, что, сколько строк, был ли incremental, кто запустил). Это даёт observability над dbt без отдельного инструмента.
Что мы построим
Цели:
- Каждый
dbt runкаждой модели пишет audit row в central log table. - Колонки:
model_name,target_name,run_started_at,invocation_id,is_incremental,rows_affected,git_sha,executed_by. - Работает на DuckDB, Snowflake, BigQuery — single source of truth dispatch.
- Безопасно в production: не падает если log table отсутствует (создаст), не теряет данные при concurrent runs.
- Документация + тесты.
Это покрывает большинство production-needs для observability моделей.
dbt-iii: parse vs execute — почему if execute обязателен Data Quality: observability как governance-практикаArchitecture
Step 1: схема audit log table
Сначала договоримся о схеме:
CREATE TABLE IF NOT EXISTS analytics_meta.dbt_run_log (
invocation_id VARCHAR(36),
model_name VARCHAR(255),
target_name VARCHAR(50),
run_started_at TIMESTAMP,
run_completed_at TIMESTAMP,
is_incremental BOOLEAN,
rows_affected BIGINT,
materialization VARCHAR(50),
git_sha VARCHAR(40),
executed_by VARCHAR(100),
status VARCHAR(20), -- 'success' / 'error' / 'skip'
error_message TEXT
)
Это minimal viable schema. Можно расширять. Хранить эту DDL в migrations / seed (для seed — упрощённая create).
Step 2: macro audit_log_run — основной интерфейс
-- macros/audit/audit_log_run.sql
{% macro audit_log_run(status='success', error_message=none) %}
{#
Записывает audit row для текущего dbt run модели.
Args:
status (str): 'success' | 'error' | 'skip', default 'success'.
error_message (str): сообщение об ошибке, default none.
Returns: SQL INSERT statement.
Notes:
- Использует this, invocation_id, run_started_at — доступны в dbt context.
- Считает rows через run_query (только при execute=True).
- Создаёт audit table если её нет (CREATE TABLE IF NOT EXISTS).
- Multi-warehouse через adapter.dispatch.
Use as post-hook:
{{ config(materialized='table', post_hook="{{ audit_log_run() }}") }}
#}
{{ return(adapter.dispatch('audit_log_run', 'myproject')(status, error_message)) }}
{% endmacro %}
Это interface. Dispatch разрешает на правильную реализацию.
Step 3: default implementation
-- macros/audit/default__audit_log_run.sql
{% macro default__audit_log_run(status, error_message) %}
{% if execute %}
{# 1. Ensure audit table exists #}
{{ ensure_audit_log_table() }}
{# 2. Count rows in this model #}
{% set row_count_query %}
SELECT COUNT(*) FROM {{ this }}
{% endset %}
{% set count_result = run_query(row_count_query) %}
{% set row_count = count_result.columns[0].values()[0] | default(0) %}
{# 3. Generate INSERT #}
INSERT INTO analytics_meta.dbt_run_log (
invocation_id, model_name, target_name,
run_started_at, run_completed_at,
is_incremental, rows_affected, materialization,
git_sha, executed_by, status, error_message
) VALUES (
'{{ invocation_id }}',
'{{ this.name }}',
'{{ target.name }}',
'{{ run_started_at }}',
CURRENT_TIMESTAMP,
{{ is_incremental() | string | lower }},
{{ row_count }},
'{{ config.get("materialized", "unknown") }}',
'{{ env_var("GIT_SHA", "unknown") }}',
'{{ env_var("USER", "unknown") }}',
'{{ status }}',
{% if error_message %}'{{ error_message | replace("'", "''") }}'{% else %}NULL{% endif %}
)
{% endif %}
{% endmacro %}
Что важно:
if execute— защита от parse-time.ensure_audit_log_table— sub-macro, гарантирует существование таблицы.run_queryдля count — узнаём rows в this.run_started_at,invocation_id— dbt context variables.env_var— Git SHA и user из CI environment.- Escape апострофов в error_message — basic SQL injection защита.
Step 4: ensure_audit_log_table sub-macro
-- macros/audit/ensure_audit_log_table.sql
{% macro ensure_audit_log_table() %}
{# Создаёт analytics_meta.dbt_run_log если её нет. Idempotent. #}
{{ return(adapter.dispatch('ensure_audit_log_table', 'myproject')()) }}
{% endmacro %}
-- macros/audit/default__ensure_audit_log_table.sql
{% macro default__ensure_audit_log_table() %}
{% set ddl %}
CREATE TABLE IF NOT EXISTS analytics_meta.dbt_run_log (
invocation_id VARCHAR(36),
model_name VARCHAR(255),
target_name VARCHAR(50),
run_started_at TIMESTAMP,
run_completed_at TIMESTAMP,
is_incremental BOOLEAN,
rows_affected BIGINT,
materialization VARCHAR(50),
git_sha VARCHAR(40),
executed_by VARCHAR(100),
status VARCHAR(20),
error_message TEXT
)
{% endset %}
{% if execute %}
{% do run_query(ddl) %}
{% endif %}
{% endmacro %}
-- macros/audit/duckdb__ensure_audit_log_table.sql
{% macro duckdb__ensure_audit_log_table() %}
{% set ddl %}
CREATE SCHEMA IF NOT EXISTS analytics_meta;
CREATE TABLE IF NOT EXISTS analytics_meta.dbt_run_log (
invocation_id VARCHAR,
model_name VARCHAR,
target_name VARCHAR,
run_started_at TIMESTAMP,
run_completed_at TIMESTAMP,
is_incremental BOOLEAN,
rows_affected BIGINT,
materialization VARCHAR,
git_sha VARCHAR,
executed_by VARCHAR,
status VARCHAR,
error_message TEXT
)
{% endset %}
{% if execute %}
{% do run_query(ddl) %}
{% endif %}
{% endmacro %}
Sub-macro вынесен отдельно для testability и переиспользования (можно вызвать вручную для one-time setup).
DuckDB-specific нюансы:
- Используется
VARCHARбез length (DuckDB не нужен). CREATE SCHEMA IF NOT EXISTS analytics_meta— DuckDB schema создаётся отдельно.
Step 5: Snowflake implementation
-- macros/audit/snowflake__audit_log_run.sql
{% macro snowflake__audit_log_run(status, error_message) %}
{% if execute %}
{{ ensure_audit_log_table() }}
{% set row_count_query %}
SELECT COUNT(*) FROM {{ this }}
{% endset %}
{% set count_result = run_query(row_count_query) %}
{% set row_count = count_result.columns[0].values()[0] | default(0) %}
INSERT INTO analytics_meta.dbt_run_log (
invocation_id, model_name, target_name,
run_started_at, run_completed_at,
is_incremental, rows_affected, materialization,
git_sha, executed_by, status, error_message
)
SELECT
'{{ invocation_id }}',
'{{ this.name }}',
'{{ target.name }}',
'{{ run_started_at }}'::TIMESTAMP_NTZ,
CURRENT_TIMESTAMP::TIMESTAMP_NTZ,
{{ is_incremental() | string | lower }},
{{ row_count }},
'{{ config.get("materialized", "unknown") }}',
'{{ env_var("GIT_SHA", "unknown") }}',
CURRENT_USER(),
'{{ status }}',
{% if error_message %}'{{ error_message | replace("'", "''") }}'{% else %}NULL{% endif %}
{% endif %}
{% endmacro %}
-- macros/audit/snowflake__ensure_audit_log_table.sql
{% macro snowflake__ensure_audit_log_table() %}
{% if execute %}
{% do run_query("CREATE SCHEMA IF NOT EXISTS analytics_meta") %}
{% set ddl %}
CREATE TABLE IF NOT EXISTS analytics_meta.dbt_run_log (
invocation_id VARCHAR(36),
model_name VARCHAR(255),
target_name VARCHAR(50),
run_started_at TIMESTAMP_NTZ,
run_completed_at TIMESTAMP_NTZ,
is_incremental BOOLEAN,
rows_affected BIGINT,
materialization VARCHAR(50),
git_sha VARCHAR(40),
executed_by VARCHAR(100),
status VARCHAR(20),
error_message TEXT
)
{% endset %}
{% do run_query(ddl) %}
{% endif %}
{% endmacro %}
Snowflake-specific:
TIMESTAMP_NTZ(no timezone) для consistency.CURRENT_USER()доступен (на Snowflake — функция).- INSERT SELECT — Snowflake recommends.
Step 6: использование в моделях
Способ 1 — индивидуально в каждой модели:
-- models/marts/fct_orders.sql
{{ config(
materialized='table',
post_hook="{{ audit_log_run() }}"
) }}
SELECT * FROM {{ ref('stg_orders') }}
Способ 2 — глобально через dbt_project.yml:
# dbt_project.yml
models:
myproject:
+post-hook: "{{ audit_log_run() }}"
Способ 2 применяется ко всем моделям проекта. Audit log entry создаётся для каждого dbt run.
Способ 3 — на on-run-end для summary:
on-run-end:
- "{{ log('dbt run completed at ' ~ modules.datetime.datetime.now(), info=true) }}"
Это для overall run-level logging, complement к per-model logging.
Step 7: тесты
Unit-test через test model:
-- tests/audit/test_audit_log_run_compiles.sql
{# Assert: audit_log_run() для test model компилит без error #}
{% if execute %}
{% set compiled = audit_log_run() %}
{% if 'INSERT INTO analytics_meta.dbt_run_log' not in compiled %}
{{ exceptions.raise_compiler_error('audit_log_run did not generate INSERT, got: ' ~ compiled) }}
{% endif %}
{% endif %}
SELECT 1 WHERE 1=0
Integration test:
-- tests/audit/test_audit_log_records_run.sql
{# После dbt run какой-то test_model — должна быть row в audit_log #}
SELECT 1 AS failure
FROM (
SELECT COUNT(*) AS cnt
FROM analytics_meta.dbt_run_log
WHERE model_name = 'test_audit_model'
AND run_started_at >= CURRENT_DATE - INTERVAL '1 day'
) WHERE cnt = 0
Этот тест запускается после регулярного dbt run и проверяет что audit table получила row.
Test на dispatch:
-- tests/audit/test_audit_log_dispatch.sql
{% set compiled = audit_log_run() %}
{% if target.type == 'snowflake' %}
{% if 'CURRENT_USER()' not in compiled %}
{{ exceptions.raise_compiler_error('Expected Snowflake CURRENT_USER() in audit_log_run') }}
{% endif %}
{% elif target.type == 'duckdb' %}
{% if 'env_var' not in compiled and 'USER' not in compiled %}
{{ exceptions.raise_compiler_error('Expected env_var-based user in DuckDB audit_log_run') }}
{% endif %}
{% endif %}
SELECT 1 WHERE 1=0
Проверяет что dispatch выбрал правильную реализацию для текущего адаптера.
Step 8: документация
Файл docs/macros.md содержит два doc-блока — один для audit_log_run, другой для ensure_audit_log_table. Внутри doc-блоков можно использовать обычный markdown.
{% docs macro_audit_log_run %}
Записывает audit row для текущего dbt run модели в analytics_meta.dbt_run_log.
Use as post-hook (per-model или global в dbt_project.yml):
models:
myproject:
+post-hook: "{{ audit_log_run() }}"
Захватывает: model name, target, invocation_id, run timing, row count,
materialization, git_sha, executed_by.
Multi-warehouse: dispatch на duckdb, snowflake, default.
Ensure table: создаётся CREATE IF NOT EXISTS при первом вызове.
{% enddocs %}
{% docs macro_ensure_audit_log_table %}
Создаёт analytics_meta.dbt_run_log если её нет. Idempotent.
Можно вызвать вручную для one-time setup:
dbt run-operation ensure_audit_log_table
{% enddocs %}
И файл macros/_macros.yml описывает аргументы и связывает с doc-блоками:
# macros/_macros.yml
macros:
- name: audit_log_run
description: "{{ doc('macro_audit_log_run') }}"
arguments:
- name: status
type: str
description: "'success' | 'error' | 'skip', default 'success'"
- name: error_message
type: str
description: "Сообщение ошибки, default none"
- name: ensure_audit_log_table
description: "{{ doc('macro_ensure_audit_log_table') }}"
После dbt docs generate macros появятся в Explorer с описанием и аргументами.
Edge cases и gotchas
1. Concurrent runs. Если двое запускают dbt run одновременно, оба пишут в audit. Это нормально (две row в log). Но если concurrent runs одной модели — DuckDB заблокирует second. Snowflake / Postgres — без проблем.
2. Audit table растёт. Без cleanup — millions rows через год. Решение: retention policy. Cron-job, который DELETE FROM analytics_meta.dbt_run_log WHERE run_started_at < CURRENT_DATE - INTERVAL '90 days'.
3. Performance overhead. Каждая модель делает COUNT(*) после run. Для огромных таблиц это дорого. Альтернативы:
- Использовать
dbt_audit.sources(если adapter поддерживает stats). - Не считать count для больших моделей:
audit_log_run(skip_count_threshold=100000000).
4. Failed runs. Если model failed — post_hook не выполнится (dbt не запускает post-hook при error). Solution: использовать on-run-end для capture failures отдельно.
5. Privacy. Audit logs могут содержать PII через env_var USER. Compliance аудит — проверьте что executed_by не leaks.
Полная структура файлов
После этого урока структура macros/audit/:
macros/audit/
audit_log_run.sql # interface (dispatch)
default__audit_log_run.sql # generic
duckdb__audit_log_run.sql # DuckDB-specific (если нужно отличие)
snowflake__audit_log_run.sql # Snowflake
ensure_audit_log_table.sql # interface
default__ensure_audit_log_table.sql
duckdb__ensure_audit_log_table.sql
snowflake__ensure_audit_log_table.sql
_macros.yml # documentation
И в tests/audit/:
tests/audit/
test_audit_log_run_compiles.sql
test_audit_log_records_run.sql
test_audit_log_dispatch.sql
И в docs/:
docs/macros.md
Это полный production-grade artifact. Дублируется через packages.yml в новые проекты команды.
Попробуй сам
В labs:
- Реализуйте полную схему audit log как описано выше. Создайте все macros, dispatch, тесты.
- Добавьте post-hook в одну простую модель. Запустите
dbt run --select my_model. Проверьте что вanalytics_meta.dbt_run_logпоявилась row. - Запустите много моделей:
dbt run. Проверьте что для каждой запись в audit. - Создайте mart-модель
dbt_run_summary.sqlкоторая агрегирует логи: per-model success rate, average duration, last run timestamp. Это даёт мониторинг через BI. - Тестируйте dispatch: переключите
target: duckdb. Если есть Postgres connection, переключите target и запустите снова. Проверьте что compiled SQL использует правильный variant. - Симулируйте failure: разверните audit_log_run в pre-hook и intentionally сломайте one model. Проверьте поведение (audit log получит row про failure через on-run-end?).
Это полное упражнение — production audit infrastructure для dbt-проекта.
Ключевые выводы
- Production utility macro имеет полный цикл: design, dispatch, hooks, tests, docs.
- Dispatch architecture: interface macro плюс default плюс adapter-specific implementations.
if execute— обязательно при использовании run_query / DDL в macros.- Sub-macros для composition: ensure_audit_log_table отдельно от audit_log_run для testability.
- Hooks integration: post-hook per-model или global в dbt_project.yml для consistent audit.
- Documentation: doc blocks плюс _macros.yml — searchable в dbt Explorer.
- Tests: singular tests на compile output, integration tests на runtime behavior, dispatch tests.
- Production-considerations: concurrency, table growth (retention), performance (skip large counts), privacy (PII в env_var).