statement(), run_query(), load_result() — SQL на runtime
В materialization-коде и в advanced macros senior часто шлёт SQL и обрабатывает результаты. Для этого dbt предоставляет три primitive: statement(), run_query(), load_result(). Они связаны, но используются в разных сценариях.
В этом уроке мы разберём каждый, посмотрим на их Python-определения, увидим как они работают в реальных materializations dbt-core, и обсудим senior-нюансы (transactions, error handling, agate.Table).
statement() — named SQL execution
statement — это block-style Jinja macro (вызывается через {% call %}). Выполняет SQL и optionally сохраняет результат под именем.
{% call statement(name='my_query', fetch_result=True, auto_begin=True) %}
SELECT COUNT(*) FROM {{ ref('foo') }}
{% endcall %}
{% set result = load_result('my_query') %}
{% set count = result['data'][0][0] %}
Parameters:
name: str— name под которым result доступен черезload_result(name).'main'— special name для materialization’s primary statement.fetch_result: bool— если True, results возвращаются (для SELECT). Если False, выполняется без fetching (для DDL/DML).auto_begin: bool— implicit BEGIN transaction.
Return через load_result(name):
{
'response': AdapterResponse(...), # rows_affected, code, _message
'data': List[List[Any]], # rows as list-of-lists (only if fetch_result=True)
'agate_table': agate.Table, # full agate table (only if fetch_result=True)
'status': 'success' | 'error'
}
statement(‘main’) — особое имя
В materializations используется statement('main') как primary SQL:
{% materialization view, default %}
{%- set target_relation = this.incorporate(type='view') -%}
{{ run_hooks(pre_hooks) }}
-- main statement
{% call statement('main') -%}
{{ create_view_as(target_relation, sql) }}
{%- endcall %}
{{ run_hooks(post_hooks) }}
{{ adapter.commit() }}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
Почему ‘main’: dbt-core’s RunRunner tracks main statement result для:
RunResult.adapter_response(rows_affected)run_results.json.results[].adapter_responsedbt showoutput
Materialization без statement('main') — анти-паттерн (RunResult won’t have adapter_response).
Python implementation
core/dbt/context/providers.py:
def statement(self, name, fetch_result=False, auto_begin=True):
"""The main statement macro for the materialization."""
@contextmanager
def statement_impl():
if not self.execute:
yield None
return
# Render the SQL inside {% call statement %} ... {% endcall %}
sql = caller()
# Execute via adapter
response, agate_table = self.adapter.execute(sql, auto_begin=auto_begin, fetch=fetch_result)
# Store result
if fetch_result:
self._results[name] = {
'response': response,
'data': agate_table.rows if agate_table else [],
'agate_table': agate_table,
'status': 'success'
}
else:
self._results[name] = {
'response': response,
'status': 'success'
}
# Track 'main' specifically
if name == 'main':
self._main_result = self._results[name]
yield response
return statement_impl
self._results — это dict хранящий named results в текущей invocation. load_result(name) обращается к нему.
run_query() — convenience wrapper
run_query — упрощённый интерфейс. Без {% call %} block, без name:
{% if execute %}
{% set result = run_query("SELECT COUNT(*) FROM " ~ ref('foo')) %}
{% set count = result.columns[0].values()[0] %}
{% endif %}
Сигнатура:
def run_query(sql: str) -> Optional[agate.Table]:
Returns agate.Table directly (не dict). Только для SELECT (auto-set fetch_result=True). На parse phase returns None.
Implementation (упрощённо):
def run_query(self, sql):
if not self.execute:
return None
response, agate_table = self.adapter.execute(sql, auto_begin=True, fetch=True)
return agate_table
run_query — это statement minus name + fetch_result=True + auto_begin=True + returns table directly. Полезно для inline queries без save.
Когда statement vs run_query
| Use case | statement | run_query |
|---|---|---|
| Main DDL в materialization | [x] (name=‘main’) | [ ] |
| Pre-hook DML | [x] (name=‘pre_hook’) | возможно |
| Custom SELECT для analysis | возможно | [x] (simpler) |
| Multiple SQL с shared transaction | [x] (auto_begin=True) | [ ] (each opens new) |
| Multiple results saved | [x] (load_result(...)) | [ ] |
| Inline metadata fetch | возможно | [x] |
Rule of thumb:
- В materialization —
statement('main')для main,statement('cleanup')для cleanup, etc. - В model body —
run_queryдля simple metadata lookups. - В macros — depends. Если macro нужен result повторно — statement+load_result. Если one-shot — run_query.
agate.Table API
Результат run_query (и load_result(...)['agate_table']) — это agate.Table object из third-party library.
table = run_query("SELECT id, name FROM users LIMIT 3")
# Columns
table.columns # List[Column]
table.columns[0].name # 'id'
table.columns[0].data_type # agate.Number
# Iterate rows
for row in table.rows:
print(row['id'], row['name'])
# Access by column name
table.columns['id'] # Column
table.columns['id'].values() # tuple of values
# Convert to list of dicts
[dict(row) for row in table.rows]
Senior gotcha: agate.Table значения типизированы — date columns appear as datetime.date, numbers as Decimal. Если вы concatenate в string без convert — может быть unexpected formatting:
{% set max_date = run_query("SELECT MAX(d) FROM t").columns[0].values()[0] %}
-- max_date is datetime.date, not string
WHERE date > '{{ max_date }}' -- works but produces '2026-05-19'
WHERE date > {{ max_date }} -- WRONG — produces 2026-05-19 без quotes
Always quote dates if you cast to string:
WHERE date > '{{ max_date | string }}'
Senior patterns
Pattern 1: dynamic schema discovery
{% macro get_columns(rel) %}
{% if execute %}
{% set query %}
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = '{{ rel.schema }}'
AND table_name = '{{ rel.identifier }}'
{% endset %}
{% set result = run_query(query) %}
{% set cols = [] %}
{% for row in result.rows %}
{{ cols.append({'name': row['column_name'], 'type': row['data_type']}) }}
{% endfor %}
{{ return(cols) }}
{% else %}
{{ return([]) }}
{% endif %}
{% endmacro %}
Pattern 2: pre-execution validation
{% materialization audit_logged, default %}
{%- set target_relation = this -%}
-- Check audit table exists before proceeding
{% if execute %}
{% set audit_check %}
SELECT 1 FROM {{ var('audit_schema', 'audit') }}.runs LIMIT 1
{% endset %}
{% set result = run_query(audit_check) %}
{% if result is none %}
{{ exceptions.raise_compiler_error("audit.runs table not found") }}
{% endif %}
{% endif %}
-- main statement
{% call statement('main') -%}
{{ create_table_as(false, target_relation, sql) }}
{%- endcall %}
-- log to audit
{% call statement('audit_log') -%}
INSERT INTO {{ var('audit_schema') }}.runs (model, run_at, rows)
VALUES ('{{ model.name }}', CURRENT_TIMESTAMP, {{ load_result('main')['response']['rows_affected'] }})
{%- endcall %}
{{ adapter.commit() }}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
Pattern 3: multi-statement transaction
{% materialization complex_swap, default %}
{%- set old_relation = adapter.get_relation(database=this.database, schema=this.schema, identifier=this.identifier ~ '_old') -%}
{%- set new_relation = this.incorporate(path={'identifier': this.identifier ~ '_new'}) -%}
-- Begin transaction
{{ adapter.begin() }}
{% call statement('drop_old', auto_begin=False) -%}
DROP TABLE IF EXISTS {{ old_relation }}
{%- endcall %}
{% call statement('create_new', auto_begin=False) -%}
CREATE TABLE {{ new_relation }} AS ({{ sql }})
{%- endcall %}
{% call statement('rename', auto_begin=False) -%}
ALTER TABLE {{ this }} RENAME TO {{ old_relation.identifier }};
ALTER TABLE {{ new_relation }} RENAME TO {{ this.identifier }};
{%- endcall %}
{{ adapter.commit() }}
{{ return({'relations': [this]}) }}
{% endmaterialization %}
auto_begin=False важно — мы explicit’но управляем transaction через adapter.begin() / adapter.commit(). Default behavior — каждый statement в своей transaction (что не подходит для multi-statement workflows).
load_result() для main
После materialization выполнения, dbt сохраняет main result в RunResult:
# core/dbt/task/run.py (упрощённо)
class RunRunner:
def execute(self, model, manifest):
# ... compile model
materialization_macro = self.get_materialization(model)
result = MacroGenerator(materialization_macro, context)(model)
# Получаем main result
main_response = context._main_result.get('response')
return RunResult(
status='success',
adapter_response=main_response,
execution_time=...,
# ...
)
RunResult.adapter_response — это что попадает в run_results.json и доступно в hooks через results variable.
Error handling
Если SQL fails:
{% call statement('main') %}
CREATE TABLE non_existent.table AS SELECT 1
{% endcall %}
adapter.execute raises DatabaseError. Это propagates через Jinja, материализация падает, RunResult.status = ‘error’.
Catch в macro:
{% if execute %}
{% set query = "SELECT 1" %}
{% set result = run_query(query) %}
{% if result is none %}
{{ log("Query failed or skipped", info=True) }}
-- но run_query не raises на failure, returns None
-- adapter.execute raises на real failures
{% endif %}
{% endif %}
Для proper try/catch — используйте Python-side через programmatic API. В Jinja нет try/catch.
Senior gotcha: result keys
load_result возвращает dict с string keys:
{% set r = load_result('main') %}
{{ r['response'] }} -- AdapterResponse
{{ r['data'] }} -- List[List]
{{ r['agate_table'] }} -- agate.Table
{{ r['status'] }} -- 'success' / 'error'
r.response (dot access) не работает — это dict, нужно r['response']. Тем более:
{{ r['response']['rows_affected'] }}
AdapterResponse тоже dict-like в Jinja context (не Python dataclass attribute access).
Попробуй сам
-
Run_query simple test:
-- macros/test_runq.sql `{% macro test_runq() %}` `{% if execute %}` `{% set result = run_query("SELECT 1 as one, 'hello' as msg") %}` `{{ log("Got " ~ (result.rows | length) ~ " rows", info=True) }}` `{{ log("First row: " ~ result.rows[0]['one'] ~ ", " ~ result.rows[0]['msg'], info=True) }}` `{% endif %}` `{% endmacro %}`dbt run-operation test_runq -
Statement multi-result:
-- macros/test_stmt.sql `{% macro test_stmt() %}` `{% if execute %}` `{% call statement('q1', fetch_result=True) %}` SELECT 1 as x `{% endcall %}` `{% call statement('q2', fetch_result=True) %}` SELECT 2 as x `{% endcall %}` `{{ log("q1: " ~ load_result('q1')['data'][0][0], info=True) }}` `{{ log("q2: " ~ load_result('q2')['data'][0][0], info=True) }}` `{% endif %}` `{% endmacro %}`dbt run-operation test_stmt -
Inspect materialization main:
# Run a simple model dbt run --select test_ctx # Check run_results.json cat target/run_results.json | jq '.results[0].adapter_response' -
Sandbox via dbt show:
dbt show --inline "SELECT 1 as x, 'a' as y" # show команда использует statement internally -
Read
core/dbt/context/providers.py— findstatementandrun_querymethods.