Learning Platform
Глоссарий Troubleshooting
Урок 02.04 · 26 мин
Продвинутый
dbt-coreexecutiontracing

dbt run от bash до warehouse: пошаговый трейс

Все предыдущие уроки модуля были «карта». Этот — «трейс». Возьмём конкретный сценарий: проект с двумя моделями (stg_users, dim_users), команда dbt run. Проследим каждый шаг — какой Python-код выполняется, что попадает в Manifest, что в Graph, какой SQL отправляется в warehouse, что возвращается. С реальными временными показателями для понимания где боттлнеки.

Версия — dbt-core 1.11.5, adapter — dbt-duckdb 1.10.1.


Setup сценария

Структура проекта:

my_project/
├── dbt_project.yml
├── profiles.yml
├── models/
│   ├── staging/
│   │   └── stg_users.sql        ← ref('raw_users')
│   └── marts/
│       └── dim_users.sql        ← ref('stg_users')
├── seeds/
│   └── raw_users.csv
└── target/
    └── partial_parse.msgpack    ← после предыдущего run

profiles.yml:

my_project:
  target: dev
  outputs:
    dev:
      type: duckdb
      path: 'my_project.duckdb'
      threads: 4

Запускаем: dbt run.


Шаг 0: bash -> Python

bash: dbt run

dbt — это shell entry point, который установился в virtualenv через pip install dbt-core dbt-duckdb. Это python-script (bin/dbt) с shebang #!/path/to/venv/bin/python. Внутри:

# Generated entrypoint (упрощённо)
from dbt.cli.main import cli
sys.exit(cli())

cli() — это Click group из core/dbt/cli/main.py. Click парсит argv (['run']), находит @cli.command('run'), вызывает её.

Timing: 100-300 мс на cold start Python (импорты dbt-core).


Шаг 1: Click -> Flags -> RunTask

@cli.command('run') в main.py:

@cli.command("run")
@click.option("--select", ...)
@click.option("--exclude", ...)
@click.option("--full-refresh", is_flag=True)
@click.option("--threads", type=int)
# ... ~15 options
@click.pass_context
def run(ctx, **kwargs):
    from dbt.task.run import RunTask
    task = RunTask(args=ctx.obj["flags"], manifest=ctx.obj.get("manifest"))
    return task.run()

ctx.obj["flags"] — это Flags объект, собранный из CLI args и env vars (через @requires.preflight decorator на верхнем уровне CLI).

RunTask(args=flags) — конструктор GraphRunnableTask (он extends). При construction manifest ещё None — не загружен.

Timing: ~50 мс.


Шаг 2: RuntimeConfig.from_args

# core/dbt/task/run.py
class RunTask(GraphRunnableTask):
    def run(self) -> RunResult:
        self.config = RuntimeConfig.from_args(self.args)
        # ...

RuntimeConfig.from_args(args):

  1. Locate dbt_project.yml. Идёт от cwd вверх по дереву, пока не найдёт.
  2. Load Project — читает YAML, парсит, валидирует через ProjectV1 dataclass.
  3. Load Profile — читает ~/.dbt/profiles.yml (или из DBT_PROFILES_DIR env var), резолвит --target (или target: из project).
  4. Render через ProfileRendererenv_var(), var() в YAML резолвятся через Jinja.
  5. Resolve credentialsDuckDBCredentials.from_dict(...) (type-specific dataclass из dbt-duckdb).

Output — RuntimeConfig immutable object со всем нужным.

# Что внутри после from_args
config.project_name = 'my_project'
config.profile_name = 'my_project'
config.target_name = 'dev'
config.credentials = DuckDBCredentials(database='my_project.duckdb', schema='main', ...)
config.threads = 4
config.model_paths = ['models']
config.seed_paths = ['seeds']
config.macro_paths = ['macros']
# ... десятки полей

Timing: 100-200 мс на проекте средней сложности.

profiles.yml для DuckDB

Шаг 3: ManifestLoader (parse phase)

# core/dbt/task/run.py
def run(self):
    self.config = RuntimeConfig.from_args(self.args)
    self.manifest = ManifestLoader.get_full_manifest(self.config)
    # ...

ManifestLoader.get_full_manifest(config):

  1. Check partial_parse.msgpack. Если есть target/partial_parse.msgpack и hash файлов matches — пропустить full parse.
  2. Build all_projects — главный проект + все packages.yml deps.
  3. Walk file system — найти все .sql, .py, .yml файлы.
  4. Parse macros firstMacroParser по всем macros/ директориям.
  5. Parse modelsModelParser по models/. Для каждой модели:
    • Read raw_code из файла.
    • Tree-sitter static parse — пытается извлечь refs/sources без полного Jinja render. Быстро.
    • Если tree-sitter не справился (сложный Jinja) — full Jinja parse с execute=False.
    • Build ModelNode dataclass с raw_code, ref’ы, depends_on.
    • Сложить в manifest.nodes[unique_id].
  6. Parse schemasSchemaParser по всем _models.yml, _sources.yml. Patch existing nodes (add description, columns, contract, tests).
  7. Parse snapshots, seeds, tests — отдельные парсеры.
  8. _process_refs() — резолвит ref('foo') в unique_id через manifest.ref_lookup.
  9. _process_sources() — резолвит source('schema', 'name').
  10. Build parent_map and child_map — production lookups.
  11. Write target/partial_parse.msgpack для следующего run.

Output — Manifest со всеми нодами и связями.

# Для нашего сценария
manifest.nodes['model.my_project.stg_users']  # ModelNode
manifest.nodes['model.my_project.dim_users']  # ModelNode
manifest.nodes['seed.my_project.raw_users']   # SeedNode

manifest.nodes['model.my_project.stg_users'].depends_on.nodes
# ['seed.my_project.raw_users']

manifest.nodes['model.my_project.dim_users'].depends_on.nodes
# ['model.my_project.stg_users']

Timing:

  • Cold (full parse): 5-15 сек на нашем проекте, 30-120 сек на 1000+ моделей
  • Warm (partial parse via msgpack): 100-500 мс

Это самая дорогая стадия на холодном run. Модуль 02 курса детально разбирает.


Шаг 4: Linker -> DAG

# core/dbt/task/runnable.py
def compile_manifest(self):
    linker = Linker()
    linker.link_graph(self.manifest)
    self.graph = Graph(linker.graph)
# core/dbt/compilation.py
class Linker:
    def link_graph(self, manifest):
        for unique_id, node in manifest.nodes.items():
            self.graph.add_node(unique_id, **node.to_dict())
            for parent_id in node.depends_on.nodes:
                self.graph.add_edge(parent_id, unique_id)
        # cycle check
        try:
            cycle = nx.find_cycle(self.graph)
            raise DbtRuntimeError(f"Cycle detected: {cycle}")
        except nx.NetworkXNoCycle:
            pass

После этого self.graph содержит:

seed.my_project.raw_users  ->  model.my_project.stg_users  ->  model.my_project.dim_users

Граф serialized в target/graph.gpickle.

Timing: менее 100 мс.

ref(): соединяем модели в граф (dbt I)

Шаг 5: SelectionSpec -> selected_uids

# core/dbt/task/runnable.py
def select_resources(self) -> Set[str]:
    spec = parse_difference(self.selection_arg, self.exclusion_arg)
    selector = NodeSelector(self.graph, self.manifest, ...)
    selected_uids = selector.get_selected(spec)
    return selected_uids

В нашем сценарии нет --select и нет --exclude — selection spec пустой, селектор возвращает все модели проекта (но не sources, не seeds — потому что dbt run selects только runnable nodes по умолчанию).

selected_uids = {
    'model.my_project.stg_users',
    'model.my_project.dim_users'
}

raw_users — это seed, не model. Seeds запускаются отдельной командой (dbt seed). В dbt run они skip’ятся (но граф зависимости учитывает: stg_users depends on raw_users, поэтому raw_users должен быть «уже выполнен» — что верно для seeds, они материализуются при dbt seed).

Timing: менее 50 мс.


Шаг 6: GraphQueue + ThreadPool

# core/dbt/task/runnable.py
def execute_with_hooks(self, selected_uids):
    self.run_queue = self.get_graph_queue()  # GraphQueue
    
    with self.adapter.connection_named("master"):
        self.before_run(adapter)  # on-run-start hooks
        
        pool = ThreadPoolExecutor(max_workers=self.config.threads)
        
        while not self.run_queue.empty():
            node = self.run_queue.get(block=True)
            future = pool.submit(self.call_runner, node)
            # ... handling
        
        pool.shutdown()
        self.after_run(adapter, results)

GraphQueue имеет встроенный topological sort: возвращает ноды только после того как все их dependencies done.

В нашем сценарии (threads=4):

  • t=0: queue.get() возвращает stg_users (нет deps в selected set, raw_users — seed, не selected).
  • t=0: Worker 1 запускает RunRunner(stg_users).
  • t=200мс: stg_users done, queue.mark_done().
  • t=200мс: queue.get() возвращает dim_users (stg_users done).
  • t=200мс: Worker 2 запускает RunRunner(dim_users).
  • t=400мс: dim_users done.

С threads=4, но только 2 ноды и линейная зависимость — параллелизм не использован. На 100 моделей с разветвлённым DAG — все 4 workers загружены.

Timing setup: менее 10 мс. Execution — зависит от моделей.

Conditional materialization: table в prod, view в dev

Шаг 7a: RunRunner.compile() для stg_users

# core/dbt/task/run.py
class RunRunner(CompileRunner):
    def execute(self, model, manifest):
        compiled_node = self.compile(manifest)
        # ...
    
    def compile(self, manifest):
        # Создаём ParseProvider / RuntimeProvider context
        context = generate_runtime_model_context(model, self.config, manifest)
        # Render Jinja
        compiled_code = MacroGenerator(model.raw_code, context)()
        model.compiled_code = compiled_code
        return model

Для stg_users.sql (raw_code):

SELECT
    user_id,
    email,
    created_at
FROM {{ ref('raw_users') }}
WHERE created_at IS NOT NULL

После Jinja render с RuntimeProvider context:

SELECT
    user_id,
    email,
    created_at
FROM "my_project"."main"."raw_users"
WHERE created_at IS NOT NULL

ref('raw_users') резолвится в Relation object, который через __str__() рендерится в quoted identifier.

Записывается в target/compiled/my_project/models/staging/stg_users.sql.

Timing: 10-50 мс на простой модели. На сложных моделях с include macros может быть до секунды.


Шаг 7b: Materialization для stg_users

# core/dbt/task/run.py (упрощённо)
def execute(self, model, manifest):
    compiled_node = self.compile(manifest)
    materialization_macro = self.get_materialization(model)  # materialization_table_default
    context = generate_runtime_model_context(model, self.config, manifest)
    result = MacroGenerator(materialization_macro, context)(model)
    return result

Lookup materialization: model.config.materialized == 'view' (default из dbt_project.yml). dbt ищет:

  1. materialization_view_duckdb — adapter-specific?
  2. materialization_view_default — fallback.

Находит materialization_view_defaultinclude/global_project/macros/materializations/models/view/). Это Jinja-макрос ~100 строк:

{% materialization view, default %}
  
  {%- set existing_relation = load_cached_relation(this) -%}
  {%- set target_relation = this.incorporate(type='view') -%}
  {%- set intermediate_relation = make_intermediate_relation(target_relation) -%}

  {{ run_hooks(pre_hooks) }}
  
  -- main statement: CREATE OR REPLACE VIEW ...
  {% call statement('main') -%}
    {{ create_view_as(target_relation, sql) }}
  {%- endcall %}
  
  {{ adapter.rename_relation(target_relation, backup_relation) if existing_relation }}
  
  {{ adapter.drop_relation(intermediate_relation) }}
  {{ run_hooks(post_hooks) }}
  {{ adapter.commit() }}
  
  {{ return({'relations': [target_relation]}) }}

{% endmaterialization %}

statement('main') — это вызов adapter.execute с SQL. Возвращает AdapterResponse.

create_view_as(target_relation, sql) — другой Jinja-макрос, который генерирует CREATE OR REPLACE VIEW ... AS .... В нашем случае:

CREATE OR REPLACE VIEW "my_project"."main"."stg_users" AS (
SELECT
    user_id,
    email,
    created_at
FROM "my_project"."main"."raw_users"
WHERE created_at IS NOT NULL
)

Timing: macro execution 30-100 мс.


Шаг 7c: adapter.execute -> DuckDB

# Inside statement('main') macro — calls adapter.execute(sql)
# core/dbt/adapters/sql/impl.py (simplified)
def execute(self, sql, auto_begin=True, fetch=False):
    connection = self.connections.get_thread_connection()
    return self.connections.execute(sql, auto_begin, fetch)

get_thread_connection() — thread-local cache. На первом вызове в треде — открывает connection (DuckDB соединяется с файлом my_project.duckdb).

connections.execute(sql) — для DuckDB через dbt-duckdb:

# dbt-duckdb/dbt/adapters/duckdb/connections.py (упрощённо)
def execute(self, sql, auto_begin=True, fetch=False):
    with self._cursor() as cursor:
        cursor.execute(sql)
        if fetch:
            return cursor.fetchall()
        return AdapterResponse(_message=cursor.description, rows_affected=cursor.rowcount)

cursor.execute(sql) — это DuckDB-native operation. DuckDB парсит SQL, оптимизирует, выполняет в-памяти. Для view — это просто DDL, нет данных движения.

Returns AdapterResponse(code='SUCCESS', rows_affected=0).

Timing: 10-100 мс на CREATE VIEW (быстро). На больших INSERT/MERGE — секунды/минуты.


Шаг 8: RunResult -> node_results -> run_results.json

# core/dbt/task/runnable.py
def call_runner(self, runner) -> RunResult:
    started = time.time()
    try:
        result = runner.run()  # RunResult
    except Exception as e:
        result = RunResult(status=NodeStatus.Error, ...)
    finished = time.time()
    result.execution_time = finished - started
    self.node_results.append(result)
    return result

После того как stg_users и dim_users отработали:

self.node_results = [
    RunResult(status='success', unique_id='model.my_project.stg_users', execution_time=0.18, ...),
    RunResult(status='success', unique_id='model.my_project.dim_users', execution_time=0.22, ...),
]

В after_run:

# core/dbt/task/runnable.py
def after_run(self, adapter, result):
    # ... on-run-end hooks
    artifact = RunResultsArtifact.from_node_results(self.node_results, ...)
    write_json(artifact, "target/run_results.json")

run_results.json — это публичный артефакт (модуль 05 курса):

{
  "metadata": {...},
  "results": [
    {
      "unique_id": "model.my_project.stg_users",
      "status": "success",
      "execution_time": 0.18,
      "adapter_response": {"code": "SUCCESS", "rows_affected": 0},
      "timing": [...],
      "thread_id": "Thread-1"
    },
    ...
  ],
  "elapsed_time": 0.42,
  "args": {...}
}

Timing: менее 50 мс.


Полный timing budget

Timing budget одного dbt run (на cold/warm запусках)
0. bash -> PythonPython interpreter cold start. Import dbt-core (~200 modules).
1. CLI -> FlagsClick парсинг.
2. RuntimeConfigdbt_project.yml + profiles.yml + vars merge.
3. ParseCold: 5-15 сек на small project, 30-120 сек на 1000+. Warm (msgpack): 100-500 мс.
4. LinkerDAG construction + cycle check. Networkx.
5. SelectionSelectionSpec parsing + traversal.
6. ThreadPool startWorkers init, connections opened.
7. Per-node executionCompile + materialize + adapter.execute. Зависит от warehouse, queries. 10-1000 мс per model.
8. RunResults serializeJSON dump + write to target/run_results.json.

Total для нашего сценария:

  • Cold (full parse): ~5-7 сек
  • Warm (partial parse): ~800 мс - 1.5 сек

На production проекте с 1000 моделей:

  • Cold: 60-180 сек (parse доминирует), плюс execution time моделей
  • Warm: 5-15 сек fixed overhead + execution time

Боттлнек на больших проектах — parse phase. Это причина появления dbt Fusion (модуль 12) — Rust parsing даёт 30x speedup.


Что записано на диск после run

target/
├── manifest.json              ← полный Manifest (модуль 04)
├── partial_parse.msgpack      ← двоичный snapshot для next run
├── graph.gpickle              ← networkx граф
├── run_results.json           ← результаты этого run (модуль 05)
├── catalog.json               ← (только после dbt docs generate)
├── compiled/
│   └── my_project/models/staging/stg_users.sql  ← compiled SQL
├── run/
│   └── my_project/models/staging/stg_users.sql  ← materialization-wrapped SQL
└── logs/
    └── dbt.log

Senior использует все эти файлы для observability, debugging, state comparison, BI integration. Модули 04-05 курса детально разбирают artifacts.


Попробуй сам

  1. Запустите dbt run --debug на любом тестовом проекте. Debug-mode выводит каждое событие event stream.
  2. Найдите в выводе: Acquiring new connection, Began executing node, Finished running node, Connection released. Это event types из core/dbt/events/types.py.
  3. Откройте target/run_results.json. Найдите ваши модели. Посмотрите execution_time, adapter_response, timing (массив с разбивкой на phases).
  4. Откройте target/compiled/.../my_model.sql и target/run/.../my_model.sql. Разница — первый это compile (чистый SQL после Jinja), второй — после materialization wrap (CREATE TABLE/VIEW + main SELECT).
  5. Профилируйте dbt run через cProfile:
    python -c "
    from dbt.cli.main import dbtRunner
    import cProfile
    cProfile.run('dbtRunner().invoke([\"run\"])', sort='cumulative')
    " | head -40
    Топ функций по cumulative time покажет где tracking budget.

После этой задачи у вас будет mental model «вот так dbt run выглядит изнутри во времени», не только в архитектуре.


Проверка знанийKnowledge check
На production проекте 1500 моделей: dbt run --select my_model занимает 35 секунд, при том что моя модель компилируется и выполняется за 2 секунды. Где остальные 33 секунды и как сократить?
ОтветAnswer
Остальные 33 секунды — это accumulated overhead pipeline-стадий, доминируемый parse phase. Конкретно: (1) Cold parse — ~30 сек на 1500 моделей. dbt parse читает все .sql/.yml файлы, builds full Manifest, даже если вы select'ите одну модель. SelectionSpec применяется уже на post-parse Manifest. (2) Linker + Graph — ~2 сек на 1500 нод (networkx scaling). (3) Остальные стадии — ~1 сек. Сокращение: (a) Partial parsing — после первого run partial_parse.msgpack используется, parse падает до 1-3 сек. Убедитесь что partial parse не инвалидируется (--no-partial-parse не передан, env vars не меняются между runs, dbt_project.yml не меняется). (b) dbt parse уже выполнен в CI — можно использовать --defer + --state path/to/manifest для skip parsing своего проекта (но в этом сценарии вы local, не CI). (c) dbt Fusion — Rust parsing даёт 30x speedup, 30 сек -> 1 сек. Это main use case для миграции на Fusion (модуль 12). (d) Reduce project size — если возможно, split в multi-project через dbt Mesh (модуль 11), parsing каждого project независимо. (e) Optimize macro complexity — много nested macros через packages.yml усложняют parsing. Diagnose: dbt parse --no-partial-parse — даёт чистый cold parse timing. Если >30 сек на 1500 моделей — это normal scaling, нужны structural решения, не tuning.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. На project 1500 моделей, dbt run --select my_model занимает 35 секунд (cold), при том что execution самой модели ~2 сек. Где остальные 33 секунды?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4