run_query vs statement: introspection и side effects
В этом уроке — два механизма dbt для выполнения SQL внутри macros: run_query() и statement('main'). Они выглядят похоже, но используются для разных целей и имеют важные gotchas, особенно с execute flag.
Понимание этого — обязательный навык для middle: без него custom materializations и introspection macros работают непредсказуемо.
Базовое различие
run_query(sql) | statement('name', fetch_result=True) | |
|---|---|---|
| Когда выполняется | Compile-time / parse-time | Run-time (часть materialization) |
| Возвращает | Agate Table (Python-структура) | Управляет, как dbt запишет result в manifest |
| Использование | Introspection (получить список колонок, COUNT, max value) | Custom materialization main DDL |
execute flag | Игнорирует значение | Только при execute=True |
Простое правило:
- run_query — для чтения warehouse в compile-time, чтобы повлиять на сгенерированный SQL.
- statement(‘main’) — для записи warehouse в run-time, основная DDL custom materialization.
run_query — паттерн использования
dbt-iii: statement() и run_query() — глубокий разбор-- macros/get_max_order_date.sql
{% macro get_max_order_date() %}
{% set query %}
SELECT MAX(order_date) AS max_date FROM {{ ref('fct_orders') }}
{% endset %}
{% set result = run_query(query) %}
{% if execute %}
{% set max_date = result.columns[0].values()[0] %}
{{ return(max_date) }}
{% else %}
{{ return(none) }}
{% endif %}
{% endmacro %}
Использование в модели:
{% set max_date = get_max_order_date() %}
SELECT *
FROM {{ ref('raw_orders') }}
WHERE order_date > '{{ max_date }}'
Что происходит:
- dbt compile модели.
- Встречает
{% set max_date = get_max_order_date() %}. - Вызывает macro. Macro делает
run_query("SELECT MAX(order_date)..."). SQL отправляется warehouse’у. - Результат — Python Agate Table. Извлекаем max_date.
- max_date подставляется в WHERE clause.
- Compiled SQL:
WHERE order_date > '2026-05-19'.
Это introspection: macro смотрит на warehouse state, чтобы определить как construct SQL.
Почему нужен execute flag
dbt вызывает macros дважды:
- Parse-time — для построения dependency graph (DAG).
execute = False. Warehouse не подключён. - Compile/Run-time — для генерации SQL и выполнения.
execute = True. Warehouse подключён.
Если в parse-time вызвать run_query — он не вернёт data (warehouse не подключён). Возможные поведения:
- Старые версии dbt: возвращает empty agate table — macro молча получит NULL.
- Современные: warning или error.
Защита через if execute:
{% set result = run_query(query) %}
{% if execute %}
{# В этой ветке result содержит реальные данные #}
{% set value = result.columns[0].values()[0] %}
{% else %}
{# Parse-time: result пустой. Возвращаем placeholder. #}
{% set value = none %}
{% endif %}
Забыли if execute — macro в parse-time может молча возвращать NULL / empty. В compile-time компилит SQL с NULL значениями. Тяжёлый debug, потому что SQL valid (просто данные не такие).
Реальный use case run_query: get_column_values
dbt-utils package имеет get_column_values — возвращает список уникальных значений колонки. Используется для PIVOT и generate select statements.
Упрощённая реализация:
-- macros/get_column_values.sql
{% macro get_column_values(table, column) %}
{% set query %}
SELECT DISTINCT {{ column }} FROM {{ table }} ORDER BY 1
{% endset %}
{% set result = run_query(query) %}
{% if execute %}
{% set values = result.columns[0].values() | list %}
{{ return(values) }}
{% else %}
{{ return([]) }} {# Parse-time placeholder #}
{% endif %}
{% endmacro %}
Использование для PIVOT:
-- models/marts/sales_by_country.sql
{% set countries = get_column_values(ref('dim_countries'), 'country_code') %}
SELECT
product_id,
{% for c in countries %}
SUM(CASE WHEN country_code = '{{ c }}' THEN revenue ELSE 0 END) AS revenue_{{ c }}
{%- if not loop.last %},{% endif %}
{% endfor %}
FROM {{ ref('fct_sales') }}
GROUP BY product_id
Если в dim_countries пять стран — компилированный SQL имеет пять колонок (revenue_US, revenue_UK, etc.). Если завтра добавится Германия — SQL автоматически расширяется.
Это dynamic SQL через introspection. Возможно только с run_query.
statement(‘main’) — паттерн использования
statement используется внутри custom materializations. Чаще всего — statement('main'):
{% materialization my_custom, adapter='duckdb' %}
{% call statement('main') %}
CREATE OR REPLACE TABLE {{ target_relation }} AS ({{ sql }})
{% endcall %}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
{% call statement('main') %} ... {% endcall %} — это block, в котором генерится SQL. dbt:
- Берёт SQL внутри block.
- Отправляет warehouse’у.
- Записывает результат в
run_results.json(под именем ‘main’).
Важно: statement('main') это main DDL материализации. Без него materialization не делает основную работу. dbt смотрит на ‘main’ result для отчёта по run’у.
Можно иметь дополнительные statements:
{% call statement('audit_log') %}
INSERT INTO audit_log (...) VALUES (...)
{% endcall %}
{% call statement('main') %}
CREATE OR REPLACE TABLE {{ target_relation }} AS ({{ sql }})
{% endcall %}
{% call statement('grant') %}
GRANT SELECT ON {{ target_relation }} TO analyst_role
{% endcall %}
Statements выполняются последовательно в порядке появления. Каждый записывается в run_results.json под своим именем.
fetch_result в statement
{% call statement('count_rows', fetch_result=True) %}
SELECT COUNT(*) FROM {{ target_relation }}
{% endcall %}
{% set count = load_result('count_rows')['data'][0][0] %}
{{ log('Row count: ' ~ count, info=true) }}
fetch_result=True сохраняет результат query, доступный через load_result(). Это полезно когда внутри materialization нужно знать что вернул query.
Без fetch_result (default False) — statement выполняется, но результат не сохраняется. Меньше memory, но load_result не работает.
run_query vs statement — concrete example
Задача: при dbt run модели хотим логировать до и после размер таблицы.
Вариант через run_query (для table, в pre/post hooks):
-- macros/log_row_count.sql
{% macro log_row_count(table_name) %}
{% set query = "SELECT COUNT(*) FROM " ~ table_name %}
{% set result = run_query(query) %}
{% if execute %}
{% set count = result.columns[0].values()[0] %}
{{ log(table_name ~ ' row count: ' ~ count, info=true) }}
{% endif %}
{% endmacro %}
# dbt_project.yml
models:
my_project:
+pre-hook: "{{ log_row_count(this) }}" -- pre run
+post-hook: "{{ log_row_count(this) }}" -- after run
Это работает. run_query в hook — допустимо, если данные нужны для logging / side effects.
Вариант через statement (внутри custom materialization):
{% materialization logged_table, adapter='duckdb' %}
{% call statement('count_before', fetch_result=True) %}
SELECT COUNT(*) FROM {{ this }} WHERE 1=1
{% endcall %}
{% set before_count = load_result('count_before')['data'][0][0] | default(0) %}
{% call statement('main') %}
CREATE OR REPLACE TABLE {{ this }} AS ({{ sql }})
{% endcall %}
{% call statement('count_after', fetch_result=True) %}
SELECT COUNT(*) FROM {{ this }}
{% endcall %}
{% set after_count = load_result('count_after')['data'][0][0] %}
{{ log('Rows: ' ~ before_count ~ ' -> ' ~ after_count, info=true) }}
{{ return({'relations': [this]}) }}
{% endmaterialization %}
Внутри materialization используются statements. run_query тоже можно, но statements семантически правильнее для DDL/side effects.
Gotcha: run_query в parse-time может dependency-graph ломать
Допустим:
-- models/expensive_introspection.sql
{% set count = run_query("SELECT COUNT(*) FROM external.huge_table") %}
SELECT * FROM {{ ref('stg_orders') }} LIMIT {{ count }}
При dbt parse / dbt compile macro вызывается. run_query отправляет SELECT COUNT(*) в warehouse — даже если мы хотели только проверить syntax / dependencies.
Если external.huge_table — это огромная таблица, COUNT(*) дорого. Особенно при многих parse-операциях (CI, IDE auto-complete).
Лучшая практика:
- Минимизируйте run_query в моделях. Лучше в hooks / materializations.
- Кешируйте:
{% if execute %}
{% set cache_key = 'count_huge_table' %}
{% set cached = run_query("SELECT value FROM dbt_cache WHERE key = '" ~ cache_key ~ "'") %}
{% if cached and cached.columns[0].values() | length > 0 %}
{% set count = cached.columns[0].values()[0] %}
{% else %}
{% set count = run_query("SELECT COUNT(*) FROM external.huge_table").columns[0].values()[0] %}
{% do run_query("INSERT INTO dbt_cache (key, value) VALUES ('" ~ cache_key ~ "', " ~ count ~ ")") %}
{% endif %}
{% endif %}
(Это очевидно overkill для большинства случаев, но иллюстрирует идею).
- Используйте
target.nameдля conditional:
{% if target.name == 'prod' %}
{% set count = run_query(...) %}
{% else %}
{% set count = 1000 %} -- placeholder для dev
{% endif %}
Gotcha: result columns vs rows
run_query возвращает Agate Table. Извлекать значения нужно правильно.
{% set result = run_query("SELECT id, name FROM customers LIMIT 3") %}
{% if execute %}
{# Все колонки #}
{% set columns = result.columns %}
{# ['id', 'name'] #}
{# Значения первой колонки (id) — все 3 #}
{% set ids = result.columns[0].values() | list %}
{# [1, 2, 3] #}
{# Значения по rows — итерируем #}
{% for row in result.rows %}
{{ log(row[0] ~ ' ' ~ row[1], info=true) }}
{% endfor %}
{# Скалярный результат — single column, single row #}
{% set scalar = result.columns[0].values()[0] %}
{# 1 #}
{% endif %}
Частая ошибка: result.rows[0] это tuple rows, не одна row. result.columns[0] — это AgateColumn, не Python list. Нужно .values() | list.
Gotcha: load_result после statement
load_result работает только если у statement был fetch_result=True:
{% call statement('my_query', fetch_result=True) %}
SELECT 1, 2
{% endcall %}
{% set result = load_result('my_query') %}
{# result = {'response': {...}, 'data': [(1, 2)], 'agate_table': <...>} #}
Без fetch_result=True:
{% call statement('my_query') %} -- fetch_result default False
SELECT 1, 2
{% endcall %}
{% set result = load_result('my_query') %}
{# result = {'response': {...}} - no 'data'! #}
Это часто ловит junior’ов. Если planируете извлечь данные — fetch_result=True.
Когда использовать какой подход
Используйте run_query когда:
- Нужна introspection в parse/compile-time (получить colum list, max value, count для conditional SQL).
- Macro в обычной модели (не custom materialization).
- Простой SELECT с возвратом значения.
Используйте statement когда:
- Custom materialization: главная DDL должна быть в
statement('main'). - Нужно несколько SQL statements с порядком выполнения и logging.
- Нужен audit trail в run_results.json.
- Side effects (DDL, INSERT, GRANT) — semantically точнее statement.
Между ними нет жёсткой границы — оба можно использовать в одном macro. Главное — execute flag и понимание что для введения SQL в граф нужны разные пути.
Попробуй сам
В labs:
- Создайте
get_column_values_safe— введите safety: проверка что таблица существует, fallback на пустой список если нет. - Используйте в модели для dynamic PIVOT по странам.
- Запустите
dbt parse— проверьте что не падает на parse-time (благодаряif execute). - Запустите
dbt compile— посмотрите generated SQL. - Создайте custom materialization
logged_tableс two statements: ‘main’ (CREATE) и ‘audit’ (INSERT в audit_log). Запустите на простой модели — посмотрите run_results.json, оба statements должны быть. - Симулируйте error: уберите
fetch_result=Trueиз statement и попробуйтеload_result— поймёте чтоdataотсутствует.
Ключевые выводы
run_query(sql)— для introspection в parse/compile-time. Возвращает Agate Table.statement('name')— для main DDL custom materialization или multi-step SQL execution. Записывается в run_results.json.executeflag:run_queryбезif executeможет молча возвращать NULL в parse-time. Защита обязательна.fetch_result=Trueдля statement — если нужно извлечь данные черезload_result().- run_query в моделях — дорого (выполняется при каждой parse / compile). Минимизируйте, кешируйте, или conditional по target.name.
- Agate Table:
result.columns[i].values() | listдля значений колонки;result.rowsдля итерации rows. load_resultвозвращает{'response': ..., 'data': ...}. Безfetch_result=True— нетdata.- Между run_query и statement нет жёсткой границы — выбирайте semantically: introspection vs side effect.