Learning Platform
Глоссарий Troubleshooting
Урок 06.05 · 30 мин
Средний
MacrosAudit logProductionHooksEnd-to-end example

Production utility macro: audit log table

Этот урок — практический. Пишем полный production-grade utility macro: audit log table для трекинга всех dbt runs модели. Покажем полный цикл: design, dispatch для multi-warehouse, hooks integration, тесты, документация.

Это типичный artifact analytics engineer: каждая модель пишет audit row в общую таблицу analytics_meta.dbt_run_log с metadata о run’е (когда, что, сколько строк, был ли incremental, кто запустил). Это даёт observability над dbt без отдельного инструмента.


Что мы построим

Цели:

  1. Каждый dbt run каждой модели пишет audit row в central log table.
  2. Колонки: model_name, target_name, run_started_at, invocation_id, is_incremental, rows_affected, git_sha, executed_by.
  3. Работает на DuckDB, Snowflake, BigQuery — single source of truth dispatch.
  4. Безопасно в production: не падает если log table отсутствует (создаст), не теряет данные при concurrent runs.
  5. Документация + тесты.

Это покрывает большинство production-needs для observability моделей.

dbt-iii: parse vs execute — почему if execute обязателен Data Quality: observability как governance-практика

Architecture

Audit log: end-to-end flow
dbt run my_modeldbt run my_model — обычный запуск. Post-hook вызывает audit macro.
post-hook: audit_log_run()Post-hook: audit_log_run(). Этот macro собирает metadata (model name, target, invocation_id, etc) и формирует INSERT в audit table.
audit_log_run() macro: ensure + count + insertMacro делает: 1) ensure audit_log table exists (CREATE IF NOT EXISTS), 2) считает rows в this model (через run_query), 3) генерирует INSERT с metadata.
analytics_meta.dbt_run_log tableAudit log в schema analytics_meta. Доступен через ref() в моделях-отчётах или прямо в BI.
BI / monitoring dashboardsДальше: модель audit_summary.sql агрегирует логи по date/model. BI dashboard видит health моделей.

Step 1: схема audit log table

Сначала договоримся о схеме:

CREATE TABLE IF NOT EXISTS analytics_meta.dbt_run_log (
    invocation_id       VARCHAR(36),
    model_name          VARCHAR(255),
    target_name         VARCHAR(50),
    run_started_at      TIMESTAMP,
    run_completed_at    TIMESTAMP,
    is_incremental      BOOLEAN,
    rows_affected       BIGINT,
    materialization     VARCHAR(50),
    git_sha             VARCHAR(40),
    executed_by         VARCHAR(100),
    status              VARCHAR(20),  -- 'success' / 'error' / 'skip'
    error_message       TEXT
)

Это minimal viable schema. Можно расширять. Хранить эту DDL в migrations / seed (для seed — упрощённая create).


Step 2: macro audit_log_run — основной интерфейс

-- macros/audit/audit_log_run.sql
{% macro audit_log_run(status='success', error_message=none) %}
{#
  Записывает audit row для текущего dbt run модели.

  Args:
    status (str): 'success' | 'error' | 'skip', default 'success'.
    error_message (str): сообщение об ошибке, default none.

  Returns: SQL INSERT statement.

  Notes:
    - Использует this, invocation_id, run_started_at — доступны в dbt context.
    - Считает rows через run_query (только при execute=True).
    - Создаёт audit table если её нет (CREATE TABLE IF NOT EXISTS).
    - Multi-warehouse через adapter.dispatch.

  Use as post-hook:
    {{ config(materialized='table', post_hook="{{ audit_log_run() }}") }}
#}
  {{ return(adapter.dispatch('audit_log_run', 'myproject')(status, error_message)) }}
{% endmacro %}

Это interface. Dispatch разрешает на правильную реализацию.


Step 3: default implementation

-- macros/audit/default__audit_log_run.sql
{% macro default__audit_log_run(status, error_message) %}
  {% if execute %}
    {# 1. Ensure audit table exists #}
    {{ ensure_audit_log_table() }}
    
    {# 2. Count rows in this model #}
    {% set row_count_query %}
      SELECT COUNT(*) FROM {{ this }}
    {% endset %}
    {% set count_result = run_query(row_count_query) %}
    {% set row_count = count_result.columns[0].values()[0] | default(0) %}
    
    {# 3. Generate INSERT #}
    INSERT INTO analytics_meta.dbt_run_log (
        invocation_id, model_name, target_name,
        run_started_at, run_completed_at,
        is_incremental, rows_affected, materialization,
        git_sha, executed_by, status, error_message
    ) VALUES (
        '{{ invocation_id }}',
        '{{ this.name }}',
        '{{ target.name }}',
        '{{ run_started_at }}',
        CURRENT_TIMESTAMP,
        {{ is_incremental() | string | lower }},
        {{ row_count }},
        '{{ config.get("materialized", "unknown") }}',
        '{{ env_var("GIT_SHA", "unknown") }}',
        '{{ env_var("USER", "unknown") }}',
        '{{ status }}',
        {% if error_message %}'{{ error_message | replace("'", "''") }}'{% else %}NULL{% endif %}
    )
  {% endif %}
{% endmacro %}

Что важно:

  1. if execute — защита от parse-time.
  2. ensure_audit_log_table — sub-macro, гарантирует существование таблицы.
  3. run_query для count — узнаём rows в this.
  4. run_started_at, invocation_id — dbt context variables.
  5. env_var — Git SHA и user из CI environment.
  6. Escape апострофов в error_message — basic SQL injection защита.

Step 4: ensure_audit_log_table sub-macro

-- macros/audit/ensure_audit_log_table.sql
{% macro ensure_audit_log_table() %}
{# Создаёт analytics_meta.dbt_run_log если её нет. Idempotent. #}
  {{ return(adapter.dispatch('ensure_audit_log_table', 'myproject')()) }}
{% endmacro %}

-- macros/audit/default__ensure_audit_log_table.sql
{% macro default__ensure_audit_log_table() %}
  {% set ddl %}
    CREATE TABLE IF NOT EXISTS analytics_meta.dbt_run_log (
        invocation_id VARCHAR(36),
        model_name VARCHAR(255),
        target_name VARCHAR(50),
        run_started_at TIMESTAMP,
        run_completed_at TIMESTAMP,
        is_incremental BOOLEAN,
        rows_affected BIGINT,
        materialization VARCHAR(50),
        git_sha VARCHAR(40),
        executed_by VARCHAR(100),
        status VARCHAR(20),
        error_message TEXT
    )
  {% endset %}
  
  {% if execute %}
    {% do run_query(ddl) %}
  {% endif %}
{% endmacro %}

-- macros/audit/duckdb__ensure_audit_log_table.sql
{% macro duckdb__ensure_audit_log_table() %}
  {% set ddl %}
    CREATE SCHEMA IF NOT EXISTS analytics_meta;
    CREATE TABLE IF NOT EXISTS analytics_meta.dbt_run_log (
        invocation_id VARCHAR,
        model_name VARCHAR,
        target_name VARCHAR,
        run_started_at TIMESTAMP,
        run_completed_at TIMESTAMP,
        is_incremental BOOLEAN,
        rows_affected BIGINT,
        materialization VARCHAR,
        git_sha VARCHAR,
        executed_by VARCHAR,
        status VARCHAR,
        error_message TEXT
    )
  {% endset %}
  
  {% if execute %}
    {% do run_query(ddl) %}
  {% endif %}
{% endmacro %}

Sub-macro вынесен отдельно для testability и переиспользования (можно вызвать вручную для one-time setup).

DuckDB-specific нюансы:

  • Используется VARCHAR без length (DuckDB не нужен).
  • CREATE SCHEMA IF NOT EXISTS analytics_meta — DuckDB schema создаётся отдельно.

Step 5: Snowflake implementation

-- macros/audit/snowflake__audit_log_run.sql
{% macro snowflake__audit_log_run(status, error_message) %}
  {% if execute %}
    {{ ensure_audit_log_table() }}
    
    {% set row_count_query %}
      SELECT COUNT(*) FROM {{ this }}
    {% endset %}
    {% set count_result = run_query(row_count_query) %}
    {% set row_count = count_result.columns[0].values()[0] | default(0) %}
    
    INSERT INTO analytics_meta.dbt_run_log (
        invocation_id, model_name, target_name,
        run_started_at, run_completed_at,
        is_incremental, rows_affected, materialization,
        git_sha, executed_by, status, error_message
    )
    SELECT
        '{{ invocation_id }}',
        '{{ this.name }}',
        '{{ target.name }}',
        '{{ run_started_at }}'::TIMESTAMP_NTZ,
        CURRENT_TIMESTAMP::TIMESTAMP_NTZ,
        {{ is_incremental() | string | lower }},
        {{ row_count }},
        '{{ config.get("materialized", "unknown") }}',
        '{{ env_var("GIT_SHA", "unknown") }}',
        CURRENT_USER(),
        '{{ status }}',
        {% if error_message %}'{{ error_message | replace("'", "''") }}'{% else %}NULL{% endif %}
  {% endif %}
{% endmacro %}

-- macros/audit/snowflake__ensure_audit_log_table.sql
{% macro snowflake__ensure_audit_log_table() %}
  {% if execute %}
    {% do run_query("CREATE SCHEMA IF NOT EXISTS analytics_meta") %}
    {% set ddl %}
      CREATE TABLE IF NOT EXISTS analytics_meta.dbt_run_log (
          invocation_id VARCHAR(36),
          model_name VARCHAR(255),
          target_name VARCHAR(50),
          run_started_at TIMESTAMP_NTZ,
          run_completed_at TIMESTAMP_NTZ,
          is_incremental BOOLEAN,
          rows_affected BIGINT,
          materialization VARCHAR(50),
          git_sha VARCHAR(40),
          executed_by VARCHAR(100),
          status VARCHAR(20),
          error_message TEXT
      )
    {% endset %}
    {% do run_query(ddl) %}
  {% endif %}
{% endmacro %}

Snowflake-specific:

  • TIMESTAMP_NTZ (no timezone) для consistency.
  • CURRENT_USER() доступен (на Snowflake — функция).
  • INSERT SELECT — Snowflake recommends.

Step 6: использование в моделях

Способ 1 — индивидуально в каждой модели:

-- models/marts/fct_orders.sql
{{ config(
    materialized='table',
    post_hook="{{ audit_log_run() }}"
) }}

SELECT * FROM {{ ref('stg_orders') }}

Способ 2 — глобально через dbt_project.yml:

# dbt_project.yml
models:
  myproject:
    +post-hook: "{{ audit_log_run() }}"

Способ 2 применяется ко всем моделям проекта. Audit log entry создаётся для каждого dbt run.

Способ 3 — на on-run-end для summary:

on-run-end:
  - "{{ log('dbt run completed at ' ~ modules.datetime.datetime.now(), info=true) }}"

Это для overall run-level logging, complement к per-model logging.


Step 7: тесты

Unit-test через test model:

-- tests/audit/test_audit_log_run_compiles.sql
{# Assert: audit_log_run() для test model компилит без error #}
{% if execute %}
  {% set compiled = audit_log_run() %}
  {% if 'INSERT INTO analytics_meta.dbt_run_log' not in compiled %}
    {{ exceptions.raise_compiler_error('audit_log_run did not generate INSERT, got: ' ~ compiled) }}
  {% endif %}
{% endif %}

SELECT 1 WHERE 1=0

Integration test:

-- tests/audit/test_audit_log_records_run.sql
{# После dbt run какой-то test_model — должна быть row в audit_log #}
SELECT 1 AS failure
FROM (
    SELECT COUNT(*) AS cnt 
    FROM analytics_meta.dbt_run_log
    WHERE model_name = 'test_audit_model'
      AND run_started_at >= CURRENT_DATE - INTERVAL '1 day'
) WHERE cnt = 0

Этот тест запускается после регулярного dbt run и проверяет что audit table получила row.

Test на dispatch:

-- tests/audit/test_audit_log_dispatch.sql
{% set compiled = audit_log_run() %}

{% if target.type == 'snowflake' %}
  {% if 'CURRENT_USER()' not in compiled %}
    {{ exceptions.raise_compiler_error('Expected Snowflake CURRENT_USER() in audit_log_run') }}
  {% endif %}
{% elif target.type == 'duckdb' %}
  {% if 'env_var' not in compiled and 'USER' not in compiled %}
    {{ exceptions.raise_compiler_error('Expected env_var-based user in DuckDB audit_log_run') }}
  {% endif %}
{% endif %}

SELECT 1 WHERE 1=0

Проверяет что dispatch выбрал правильную реализацию для текущего адаптера.


Step 8: документация

Файл docs/macros.md содержит два doc-блока — один для audit_log_run, другой для ensure_audit_log_table. Внутри doc-блоков можно использовать обычный markdown.

{% docs macro_audit_log_run %}
Записывает audit row для текущего dbt run модели в analytics_meta.dbt_run_log.

Use as post-hook (per-model или global в dbt_project.yml):

    models:
      myproject:
        +post-hook: "{{ audit_log_run() }}"

Захватывает: model name, target, invocation_id, run timing, row count,
materialization, git_sha, executed_by.

Multi-warehouse: dispatch на duckdb, snowflake, default.

Ensure table: создаётся CREATE IF NOT EXISTS при первом вызове.
{% enddocs %}

{% docs macro_ensure_audit_log_table %}
Создаёт analytics_meta.dbt_run_log если её нет. Idempotent.

Можно вызвать вручную для one-time setup:

    dbt run-operation ensure_audit_log_table
{% enddocs %}

И файл macros/_macros.yml описывает аргументы и связывает с doc-блоками:

# macros/_macros.yml
macros:
  - name: audit_log_run
    description: "{{ doc('macro_audit_log_run') }}"
    arguments:
      - name: status
        type: str
        description: "'success' | 'error' | 'skip', default 'success'"
      - name: error_message
        type: str
        description: "Сообщение ошибки, default none"

  - name: ensure_audit_log_table
    description: "{{ doc('macro_ensure_audit_log_table') }}"

После dbt docs generate macros появятся в Explorer с описанием и аргументами.


Edge cases и gotchas

1. Concurrent runs. Если двое запускают dbt run одновременно, оба пишут в audit. Это нормально (две row в log). Но если concurrent runs одной модели — DuckDB заблокирует second. Snowflake / Postgres — без проблем.

2. Audit table растёт. Без cleanup — millions rows через год. Решение: retention policy. Cron-job, который DELETE FROM analytics_meta.dbt_run_log WHERE run_started_at < CURRENT_DATE - INTERVAL '90 days'.

3. Performance overhead. Каждая модель делает COUNT(*) после run. Для огромных таблиц это дорого. Альтернативы:

  • Использовать dbt_audit.sources (если adapter поддерживает stats).
  • Не считать count для больших моделей: audit_log_run(skip_count_threshold=100000000).

4. Failed runs. Если model failed — post_hook не выполнится (dbt не запускает post-hook при error). Solution: использовать on-run-end для capture failures отдельно.

5. Privacy. Audit logs могут содержать PII через env_var USER. Compliance аудит — проверьте что executed_by не leaks.


Полная структура файлов

После этого урока структура macros/audit/:

macros/audit/
  audit_log_run.sql              # interface (dispatch)
  default__audit_log_run.sql     # generic
  duckdb__audit_log_run.sql      # DuckDB-specific (если нужно отличие)
  snowflake__audit_log_run.sql   # Snowflake
  ensure_audit_log_table.sql     # interface
  default__ensure_audit_log_table.sql
  duckdb__ensure_audit_log_table.sql
  snowflake__ensure_audit_log_table.sql
  _macros.yml                    # documentation

И в tests/audit/:

tests/audit/
  test_audit_log_run_compiles.sql
  test_audit_log_records_run.sql
  test_audit_log_dispatch.sql

И в docs/:

docs/macros.md

Это полный production-grade artifact. Дублируется через packages.yml в новые проекты команды.


Попробуй сам

В labs:

  1. Реализуйте полную схему audit log как описано выше. Создайте все macros, dispatch, тесты.
  2. Добавьте post-hook в одну простую модель. Запустите dbt run --select my_model. Проверьте что в analytics_meta.dbt_run_log появилась row.
  3. Запустите много моделей: dbt run. Проверьте что для каждой запись в audit.
  4. Создайте mart-модель dbt_run_summary.sql которая агрегирует логи: per-model success rate, average duration, last run timestamp. Это даёт мониторинг через BI.
  5. Тестируйте dispatch: переключите target: duckdb. Если есть Postgres connection, переключите target и запустите снова. Проверьте что compiled SQL использует правильный variant.
  6. Симулируйте failure: разверните audit_log_run в pre-hook и intentionally сломайте one model. Проверьте поведение (audit log получит row про failure через on-run-end?).

Это полное упражнение — production audit infrastructure для dbt-проекта.


Ключевые выводы

  1. Production utility macro имеет полный цикл: design, dispatch, hooks, tests, docs.
  2. Dispatch architecture: interface macro плюс default плюс adapter-specific implementations.
  3. if execute — обязательно при использовании run_query / DDL в macros.
  4. Sub-macros для composition: ensure_audit_log_table отдельно от audit_log_run для testability.
  5. Hooks integration: post-hook per-model или global в dbt_project.yml для consistent audit.
  6. Documentation: doc blocks плюс _macros.yml — searchable в dbt Explorer.
  7. Tests: singular tests на compile output, integration tests на runtime behavior, dispatch tests.
  8. Production-considerations: concurrency, table growth (retention), performance (skip large counts), privacy (PII в env_var).
Проверка знанийKnowledge check
Audit log macro работает 3 месяца в проде. analytics_meta.dbt_run_log уже 50 миллионов строк, BI dashboard 'dbt health' стал медленным. Какие 3 направления оптимизации?
ОтветAnswer
Это классическая проблема audit infrastructure: linear growth без cleanup. Три направления: 1) Retention policy (немедленно). Классический cron-cleanup через on-run-end macro cleanup_audit_log(retention_days=90), внутри DELETE FROM analytics_meta.dbt_run_log WHERE run_started_at < CURRENT_DATE - INTERVAL 90 days. Запускается один раз в день (on-run-end делает каждый run, но DELETE с WHERE — fast если индекс на run_started_at). Просмотрите rate: 50M / 3 месяца = ~550K rows/day. Если 100 моделей × runs/день × ~70 runs = consistent. С retention=90 дней log будет ~50M, не растёт. Если хочется меньше — retention=30. 2) Партиционирование / clustering на warehouse (среднесрок). DuckDB не поддерживает партиционирование, но Snowflake и BigQuery поддерживают. Snowflake: CLUSTER BY (DATE(run_started_at), model_name). Запросы last 7 days for model X становятся быстрыми (pruning по cluster keys). BigQuery: PARTITION BY DATE(run_started_at) CLUSTER BY model_name. Запросы BI с filter на run_started_at используют partition pruning. Cost reduction. DuckDB workaround: materialize отдельные daily / monthly tables через incremental model с unique_key=invocation_id. 3) Pre-aggregation для dashboards (фундаментальное решение). BI dashboard не читает 50M raw rows. Pre-aggregate в модель dbt_run_summary_daily.sql: GROUP BY DATE(run_started_at), model_name, target_name с COUNT, COUNT FILTER WHERE status=success, AVG duration, SUM rows_affected. BI коннектится к dbt_run_summary_daily (не raw log). Эта таблица 100 моделей × 90 дней = 9K rows — мгновенно. Для drill-down (показать конкретные runs модели X в день Y) BI делает query в raw log с фильтром — быстро если partition / cluster keys настроены. Дополнительные оптимизации: sampling at write time (если данные slow-changing, писать audit row только для каждого N-го run, loss of granularity, win in storage); archive в parquet / Iceberg (после 90 дней — переносить в cold storage, доступ через external table); skip audit для определённых моделей (dev / staging environments — добавить config флаг); async write (вместо синхронного INSERT в post-hook — async через queue, сложнее, но не блокирует dbt run). Рекомендация по приоритету: (1) сначала retention policy — quick win; (2) потом pre-aggregation для BI — фундаментально решает медлительность dashboard; (3) если warehouse это позволяет — partitioning / clustering для drill-down speed; (4) архитектурно: переходить от write everything к write summary + sample raw.
Проверка знанийKnowledge check
Audit log macro работает в post-hook. Модель failed (error в SELECT) — post-hook НЕ выполнился. Audit table не содержит запись о failure. Как зафиксировать failure?
ОтветAnswer
Это известный паттерн dbt: post-hook не выполняется при failed run. Это design choice — если model failed, dbt не пытается продолжать execution. Решения: 1) on-run-end для failure capture. on-run-end запускается в конце всего dbt run, независимо от success/failure. Имеет access к results object со списком всех моделей и их статусов. В dbt_project.yml: on-run-end: [audit_log_failures()]. Macro audit_log_failures итерирует через results, и для каждого result где result.status == "error" формирует INSERT в analytics_meta.dbt_run_log с полями invocation_id, model_name=result.node.name, target_name, run_started_at=result.timing[0].started_at, run_completed_at=result.timing[0].completed_at, status="error", error_message=result.message (с экранированием апострофов). on-run-end итерирует через results и пишет audit row для failed моделей. 2) pre-hook плюс post-hook combo. Похитрее: пишем attempt row в pre-hook (model started), success update в post-hook. Если post-hook не выполнился (model failed) — у нас остаётся attempt row без success update. В dbt_project.yml: +pre-hook: audit_log_attempt(), +post-hook: audit_log_success(). Затем mart-модель dbt_run_log_enriched.sql: CASE WHEN status IS NULL AND attempt_at IS NOT NULL THEN failed ELSE status END AS inferred_status. Минус: attempted-but-not-completed rows нужно постоянно reconcile в model. Сложнее. 3) Use try/catch in pre-hook (новый approach). dbt 1.10+ обсуждает on-failure hooks (proposal). Пока не stable. Когда выйдет — будет первоклассное решение. 4) External monitoring (recommended для prod). on-run-end плюс структурированный JSON output: on-run-end: [emit_run_summary()]. Macro emit_run_summary итерирует через results, для каждого error пишет log line FAIL: model_name — message с info=true. В CI/CD log line picked up by Datadog log shipper. Datadog / PagerDuty получают log lines в realtime, создают incidents. Рекомендация для middle-проекта: комбо on-run-end macro audit_log_run_complete (записывает summary для ВСЕХ моделей — success и failure) — это replace индивидуальный post-hook. Индивидуальный post-hook остаётся для models где нужны specific actions (audit конкретных columns). on-run-end: [audit_log_full_run()]. Macro audit_log_full_run обрабатывает и success и error — one row per model per run, INSERT INTO analytics_meta.dbt_run_log с status=success или error и error_message=result.message при error. Это one-stop solution: каждый run каждой модели = одна row в audit, независимо от success/failure. Снимает need для разных hooks.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 6. Какие компоненты составляют полный production audit log macro?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5