Learning Platform
Глоссарий Troubleshooting
Урок 04.04 · 24 мин
Продвинутый
jinjastatementrun-query

statement(), run_query(), load_result() — SQL на runtime

В materialization-коде и в advanced macros senior часто шлёт SQL и обрабатывает результаты. Для этого dbt предоставляет три primitive: statement(), run_query(), load_result(). Они связаны, но используются в разных сценариях.

В этом уроке мы разберём каждый, посмотрим на их Python-определения, увидим как они работают в реальных materializations dbt-core, и обсудим senior-нюансы (transactions, error handling, agate.Table).


statement() — named SQL execution

statement — это block-style Jinja macro (вызывается через {% call %}). Выполняет SQL и optionally сохраняет результат под именем.

{% call statement(name='my_query', fetch_result=True, auto_begin=True) %}
  SELECT COUNT(*) FROM {{ ref('foo') }}
{% endcall %}

{% set result = load_result('my_query') %}
{% set count = result['data'][0][0] %}

Parameters:

  • name: str — name под которым result доступен через load_result(name). 'main' — special name для materialization’s primary statement.
  • fetch_result: bool — если True, results возвращаются (для SELECT). Если False, выполняется без fetching (для DDL/DML).
  • auto_begin: bool — implicit BEGIN transaction.
run_query vs statement: introspection и side effects (dbt II)

Return через load_result(name):

{
    'response': AdapterResponse(...),  # rows_affected, code, _message
    'data': List[List[Any]],            # rows as list-of-lists (only if fetch_result=True)
    'agate_table': agate.Table,         # full agate table (only if fetch_result=True)
    'status': 'success' | 'error'
}

statement(‘main’) — особое имя

В materializations используется statement('main') как primary SQL:

{% materialization view, default %}
  
  {%- set target_relation = this.incorporate(type='view') -%}
  
  {{ run_hooks(pre_hooks) }}
  
  -- main statement
  {% call statement('main') -%}
    {{ create_view_as(target_relation, sql) }}
  {%- endcall %}
  
  {{ run_hooks(post_hooks) }}
  {{ adapter.commit() }}
  
  {{ return({'relations': [target_relation]}) }}

{% endmaterialization %}

Почему ‘main’: dbt-core’s RunRunner tracks main statement result для:

  • RunResult.adapter_response (rows_affected)
  • run_results.json.results[].adapter_response
  • dbt show output

Materialization без statement('main') — анти-паттерн (RunResult won’t have adapter_response).


Python implementation

core/dbt/context/providers.py:

def statement(self, name, fetch_result=False, auto_begin=True):
    """The main statement macro for the materialization."""
    
    @contextmanager
    def statement_impl():
        if not self.execute:
            yield None
            return
        
        # Render the SQL inside {% call statement %} ... {% endcall %}
        sql = caller()
        
        # Execute via adapter
        response, agate_table = self.adapter.execute(sql, auto_begin=auto_begin, fetch=fetch_result)
        
        # Store result
        if fetch_result:
            self._results[name] = {
                'response': response,
                'data': agate_table.rows if agate_table else [],
                'agate_table': agate_table,
                'status': 'success'
            }
        else:
            self._results[name] = {
                'response': response,
                'status': 'success'
            }
        
        # Track 'main' specifically
        if name == 'main':
            self._main_result = self._results[name]
        
        yield response
    
    return statement_impl

self._results — это dict хранящий named results в текущей invocation. load_result(name) обращается к нему.


run_query() — convenience wrapper

run_query — упрощённый интерфейс. Без {% call %} block, без name:

{% if execute %}
  {% set result = run_query("SELECT COUNT(*) FROM " ~ ref('foo')) %}
  {% set count = result.columns[0].values()[0] %}
{% endif %}

Сигнатура:

def run_query(sql: str) -> Optional[agate.Table]:

Returns agate.Table directly (не dict). Только для SELECT (auto-set fetch_result=True). На parse phase returns None.

Implementation (упрощённо):

def run_query(self, sql):
    if not self.execute:
        return None
    response, agate_table = self.adapter.execute(sql, auto_begin=True, fetch=True)
    return agate_table

run_query — это statement minus name + fetch_result=True + auto_begin=True + returns table directly. Полезно для inline queries без save.


Когда statement vs run_query

Use casestatementrun_query
Main DDL в materialization[x] (name=‘main’)[ ]
Pre-hook DML[x] (name=‘pre_hook’)возможно
Custom SELECT для analysisвозможно[x] (simpler)
Multiple SQL с shared transaction[x] (auto_begin=True)[ ] (each opens new)
Multiple results saved[x] (load_result(...))[ ]
Inline metadata fetchвозможно[x]

Rule of thumb:

  • В materialization — statement('main') для main, statement('cleanup') для cleanup, etc.
  • В model body — run_query для simple metadata lookups.
  • В macros — depends. Если macro нужен result повторно — statement+load_result. Если one-shot — run_query.

agate.Table API

Результат run_queryload_result(...)['agate_table']) — это agate.Table object из third-party library.

table = run_query("SELECT id, name FROM users LIMIT 3")

# Columns
table.columns  # List[Column]
table.columns[0].name  # 'id'
table.columns[0].data_type  # agate.Number

# Iterate rows
for row in table.rows:
    print(row['id'], row['name'])

# Access by column name
table.columns['id']  # Column
table.columns['id'].values()  # tuple of values

# Convert to list of dicts
[dict(row) for row in table.rows]

Senior gotcha: agate.Table значения типизированы — date columns appear as datetime.date, numbers as Decimal. Если вы concatenate в string без convert — может быть unexpected formatting:

{% set max_date = run_query("SELECT MAX(d) FROM t").columns[0].values()[0] %}
-- max_date is datetime.date, not string

WHERE date > '{{ max_date }}'  -- works but produces '2026-05-19'
WHERE date > {{ max_date }}    -- WRONG — produces 2026-05-19 без quotes

Always quote dates if you cast to string:

WHERE date > '{{ max_date | string }}'

Senior patterns

Pattern 1: dynamic schema discovery

{% macro get_columns(rel) %}
  {% if execute %}
    {% set query %}
      SELECT column_name, data_type
      FROM information_schema.columns
      WHERE table_schema = '{{ rel.schema }}'
        AND table_name = '{{ rel.identifier }}'
    {% endset %}
    
    {% set result = run_query(query) %}
    {% set cols = [] %}
    {% for row in result.rows %}
      {{ cols.append({'name': row['column_name'], 'type': row['data_type']}) }}
    {% endfor %}
    {{ return(cols) }}
  {% else %}
    {{ return([]) }}
  {% endif %}
{% endmacro %}

Pattern 2: pre-execution validation

{% materialization audit_logged, default %}
  
  {%- set target_relation = this -%}
  
  -- Check audit table exists before proceeding
  {% if execute %}
    {% set audit_check %}
      SELECT 1 FROM {{ var('audit_schema', 'audit') }}.runs LIMIT 1
    {% endset %}
    
    {% set result = run_query(audit_check) %}
    {% if result is none %}
      {{ exceptions.raise_compiler_error("audit.runs table not found") }}
    {% endif %}
  {% endif %}
  
  -- main statement
  {% call statement('main') -%}
    {{ create_table_as(false, target_relation, sql) }}
  {%- endcall %}
  
  -- log to audit
  {% call statement('audit_log') -%}
    INSERT INTO {{ var('audit_schema') }}.runs (model, run_at, rows)
    VALUES ('{{ model.name }}', CURRENT_TIMESTAMP, {{ load_result('main')['response']['rows_affected'] }})
  {%- endcall %}
  
  {{ adapter.commit() }}
  {{ return({'relations': [target_relation]}) }}

{% endmaterialization %}

Pattern 3: multi-statement transaction

{% materialization complex_swap, default %}
  
  {%- set old_relation = adapter.get_relation(database=this.database, schema=this.schema, identifier=this.identifier ~ '_old') -%}
  {%- set new_relation = this.incorporate(path={'identifier': this.identifier ~ '_new'}) -%}
  
  -- Begin transaction
  {{ adapter.begin() }}
  
  {% call statement('drop_old', auto_begin=False) -%}
    DROP TABLE IF EXISTS {{ old_relation }}
  {%- endcall %}
  
  {% call statement('create_new', auto_begin=False) -%}
    CREATE TABLE {{ new_relation }} AS ({{ sql }})
  {%- endcall %}
  
  {% call statement('rename', auto_begin=False) -%}
    ALTER TABLE {{ this }} RENAME TO {{ old_relation.identifier }};
    ALTER TABLE {{ new_relation }} RENAME TO {{ this.identifier }};
  {%- endcall %}
  
  {{ adapter.commit() }}
  
  {{ return({'relations': [this]}) }}

{% endmaterialization %}

auto_begin=False важно — мы explicit’но управляем transaction через adapter.begin() / adapter.commit(). Default behavior — каждый statement в своей transaction (что не подходит для multi-statement workflows).


load_result() для main

После materialization выполнения, dbt сохраняет main result в RunResult:

# core/dbt/task/run.py (упрощённо)
class RunRunner:
    def execute(self, model, manifest):
        # ... compile model
        materialization_macro = self.get_materialization(model)
        result = MacroGenerator(materialization_macro, context)(model)
        
        # Получаем main result
        main_response = context._main_result.get('response')
        
        return RunResult(
            status='success',
            adapter_response=main_response,
            execution_time=...,
            # ...
        )

RunResult.adapter_response — это что попадает в run_results.json и доступно в hooks через results variable.


Error handling

Если SQL fails:

{% call statement('main') %}
  CREATE TABLE non_existent.table AS SELECT 1
{% endcall %}

adapter.execute raises DatabaseError. Это propagates через Jinja, материализация падает, RunResult.status = ‘error’.

Catch в macro:

{% if execute %}
  {% set query = "SELECT 1" %}
  {% set result = run_query(query) %}
  
  {% if result is none %}
    {{ log("Query failed or skipped", info=True) }}
    -- но run_query не raises на failure, returns None
    -- adapter.execute raises на real failures
  {% endif %}
{% endif %}

Для proper try/catch — используйте Python-side через programmatic API. В Jinja нет try/catch.


Senior gotcha: result keys

load_result возвращает dict с string keys:

{% set r = load_result('main') %}
{{ r['response'] }}        -- AdapterResponse
{{ r['data'] }}            -- List[List]
{{ r['agate_table'] }}     -- agate.Table
{{ r['status'] }}          -- 'success' / 'error'

r.response (dot access) не работает — это dict, нужно r['response']. Тем более:

{{ r['response']['rows_affected'] }}

AdapterResponse тоже dict-like в Jinja context (не Python dataclass attribute access).


Попробуй сам

  1. Run_query simple test:

    -- macros/test_runq.sql
    `{% macro test_runq() %}`
      `{% if execute %}`
        `{% set result = run_query("SELECT 1 as one, 'hello' as msg") %}`
        `{{ log("Got " ~ (result.rows | length) ~ " rows", info=True) }}`
        `{{ log("First row: " ~ result.rows[0]['one'] ~ ", " ~ result.rows[0]['msg'], info=True) }}`
      `{% endif %}`
    `{% endmacro %}`
    dbt run-operation test_runq
  2. Statement multi-result:

    -- macros/test_stmt.sql
    `{% macro test_stmt() %}`
      `{% if execute %}`
        `{% call statement('q1', fetch_result=True) %}`
          SELECT 1 as x
        `{% endcall %}`
        `{% call statement('q2', fetch_result=True) %}`
          SELECT 2 as x
        `{% endcall %}`
        
        `{{ log("q1: " ~ load_result('q1')['data'][0][0], info=True) }}`
        `{{ log("q2: " ~ load_result('q2')['data'][0][0], info=True) }}`
      `{% endif %}`
    `{% endmacro %}`
    dbt run-operation test_stmt
  3. Inspect materialization main:

    # Run a simple model
    dbt run --select test_ctx
    
    # Check run_results.json
    cat target/run_results.json | jq '.results[0].adapter_response'
  4. Sandbox via dbt show:

    dbt show --inline "SELECT 1 as x, 'a' as y"
    # show команда использует statement internally
  5. Read core/dbt/context/providers.py — find statement and run_query methods.


Проверка знанийKnowledge check
Вы пишете custom materialization audit_logged которая после main CREATE TABLE должна выполнить INSERT в audit table. Хотите, чтобы main + INSERT были в одной transaction (либо обе success, либо rollback). Как структурировать statements и transaction management?
ОтветAnswer
Structured approach с явным transaction control: ```jinja {% materialization audit_logged, default %} {%- set target_relation = this.incorporate(type='table') -%} -- Explicit BEGIN, не полагаемся на auto_begin {{ adapter.begin() }} -- Main statement БЕЗ auto_begin (мы уже в transaction) {% call statement('main', auto_begin=False) -%} {{ create_table_as(false, target_relation, sql) }} {%- endcall %} -- Get row count for audit {%- set rows = load_result('main')['response']['rows_affected'] -%} -- Audit INSERT в той же transaction {% call statement('audit_insert', auto_begin=False) -%} INSERT INTO {{ var('audit_schema', 'audit') }}.model_runs (model_name, executed_at, rows_loaded, materialization_type) VALUES ( '{{ model.name }}', CURRENT_TIMESTAMP, {{ rows }}, 'audit_logged' ) {%- endcall %} -- Explicit commit обоих statements atomically {{ adapter.commit() }} {{ return({'relations': [target_relation]}) }} {% endmaterialization %} ```. Ключевые points: (1) adapter.begin() явно начинает transaction. (2) auto_begin=False в каждом statement — иначе они каждый раз начинают свою transaction, нарушая atomicity. (3) adapter.commit() в конце commits всё. (4) Если main fails — exception propagates, materialization fails, и transaction implicit rolled back (большинство adapters auto-rollback on uncaught exception). (5) Если audit_insert fails — главное, что main был успешен. Если транзакция rolled back, главная таблица не создаётся. Это behavior to confirm — иногда вы хотите main success даже если audit fails. Alternative: split — main в своей transaction (commit immediately), audit как best-effort с try-catch (через programmatic API или ignore_errors macro). Production tip: всегда добавляйте {% if execute %} guards вокруг statement blocks — на parse phase statements skip, но Jinja syntax все равно validated.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Senior пишет custom materialization, нужно выполнить main DDL и затем INSERT в audit table atomically. Какой transaction control approach?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5