dbt run от bash до warehouse: пошаговый трейс
Все предыдущие уроки модуля были «карта». Этот — «трейс». Возьмём конкретный сценарий: проект с двумя моделями (stg_users, dim_users), команда dbt run. Проследим каждый шаг — какой Python-код выполняется, что попадает в Manifest, что в Graph, какой SQL отправляется в warehouse, что возвращается. С реальными временными показателями для понимания где боттлнеки.
Версия — dbt-core 1.11.5, adapter — dbt-duckdb 1.10.1.
Setup сценария
Структура проекта:
my_project/
├── dbt_project.yml
├── profiles.yml
├── models/
│ ├── staging/
│ │ └── stg_users.sql ← ref('raw_users')
│ └── marts/
│ └── dim_users.sql ← ref('stg_users')
├── seeds/
│ └── raw_users.csv
└── target/
└── partial_parse.msgpack ← после предыдущего run
profiles.yml:
my_project:
target: dev
outputs:
dev:
type: duckdb
path: 'my_project.duckdb'
threads: 4
Запускаем: dbt run.
Шаг 0: bash -> Python
bash: dbt run
dbt — это shell entry point, который установился в virtualenv через pip install dbt-core dbt-duckdb. Это python-script (bin/dbt) с shebang #!/path/to/venv/bin/python. Внутри:
# Generated entrypoint (упрощённо)
from dbt.cli.main import cli
sys.exit(cli())
cli() — это Click group из core/dbt/cli/main.py. Click парсит argv (['run']), находит @cli.command('run'), вызывает её.
Timing: 100-300 мс на cold start Python (импорты dbt-core).
Шаг 1: Click -> Flags -> RunTask
@cli.command('run') в main.py:
@cli.command("run")
@click.option("--select", ...)
@click.option("--exclude", ...)
@click.option("--full-refresh", is_flag=True)
@click.option("--threads", type=int)
# ... ~15 options
@click.pass_context
def run(ctx, **kwargs):
from dbt.task.run import RunTask
task = RunTask(args=ctx.obj["flags"], manifest=ctx.obj.get("manifest"))
return task.run()
ctx.obj["flags"] — это Flags объект, собранный из CLI args и env vars (через @requires.preflight decorator на верхнем уровне CLI).
RunTask(args=flags) — конструктор GraphRunnableTask (он extends). При construction manifest ещё None — не загружен.
Timing: ~50 мс.
Шаг 2: RuntimeConfig.from_args
# core/dbt/task/run.py
class RunTask(GraphRunnableTask):
def run(self) -> RunResult:
self.config = RuntimeConfig.from_args(self.args)
# ...
RuntimeConfig.from_args(args):
- Locate
dbt_project.yml. Идёт от cwd вверх по дереву, пока не найдёт. - Load Project — читает YAML, парсит, валидирует через
ProjectV1dataclass. - Load Profile — читает
~/.dbt/profiles.yml(или изDBT_PROFILES_DIRenv var), резолвит--target(илиtarget:из project). - Render через ProfileRenderer —
env_var(),var()в YAML резолвятся через Jinja. - Resolve credentials —
DuckDBCredentials.from_dict(...)(type-specific dataclass из dbt-duckdb).
Output — RuntimeConfig immutable object со всем нужным.
# Что внутри после from_args
config.project_name = 'my_project'
config.profile_name = 'my_project'
config.target_name = 'dev'
config.credentials = DuckDBCredentials(database='my_project.duckdb', schema='main', ...)
config.threads = 4
config.model_paths = ['models']
config.seed_paths = ['seeds']
config.macro_paths = ['macros']
# ... десятки полей
Timing: 100-200 мс на проекте средней сложности.
profiles.yml для DuckDBШаг 3: ManifestLoader (parse phase)
# core/dbt/task/run.py
def run(self):
self.config = RuntimeConfig.from_args(self.args)
self.manifest = ManifestLoader.get_full_manifest(self.config)
# ...
ManifestLoader.get_full_manifest(config):
- Check partial_parse.msgpack. Если есть
target/partial_parse.msgpackи hash файлов matches — пропустить full parse. - Build all_projects — главный проект + все packages.yml deps.
- Walk file system — найти все
.sql,.py,.ymlфайлы. - Parse macros first —
MacroParserпо всемmacros/директориям. - Parse models —
ModelParserпоmodels/. Для каждой модели:- Read raw_code из файла.
- Tree-sitter static parse — пытается извлечь refs/sources без полного Jinja render. Быстро.
- Если tree-sitter не справился (сложный Jinja) — full Jinja parse с
execute=False. - Build
ModelNodedataclass с raw_code, ref’ы, depends_on. - Сложить в
manifest.nodes[unique_id].
- Parse schemas —
SchemaParserпо всем_models.yml,_sources.yml. Patch existing nodes (add description, columns, contract, tests). - Parse snapshots, seeds, tests — отдельные парсеры.
_process_refs()— резолвитref('foo')вunique_idчерезmanifest.ref_lookup._process_sources()— резолвитsource('schema', 'name').- Build
parent_mapandchild_map— production lookups. - Write
target/partial_parse.msgpackдля следующего run.
Output — Manifest со всеми нодами и связями.
# Для нашего сценария
manifest.nodes['model.my_project.stg_users'] # ModelNode
manifest.nodes['model.my_project.dim_users'] # ModelNode
manifest.nodes['seed.my_project.raw_users'] # SeedNode
manifest.nodes['model.my_project.stg_users'].depends_on.nodes
# ['seed.my_project.raw_users']
manifest.nodes['model.my_project.dim_users'].depends_on.nodes
# ['model.my_project.stg_users']
Timing:
- Cold (full parse): 5-15 сек на нашем проекте, 30-120 сек на 1000+ моделей
- Warm (partial parse via msgpack): 100-500 мс
Это самая дорогая стадия на холодном run. Модуль 02 курса детально разбирает.
Шаг 4: Linker -> DAG
# core/dbt/task/runnable.py
def compile_manifest(self):
linker = Linker()
linker.link_graph(self.manifest)
self.graph = Graph(linker.graph)
# core/dbt/compilation.py
class Linker:
def link_graph(self, manifest):
for unique_id, node in manifest.nodes.items():
self.graph.add_node(unique_id, **node.to_dict())
for parent_id in node.depends_on.nodes:
self.graph.add_edge(parent_id, unique_id)
# cycle check
try:
cycle = nx.find_cycle(self.graph)
raise DbtRuntimeError(f"Cycle detected: {cycle}")
except nx.NetworkXNoCycle:
pass
После этого self.graph содержит:
seed.my_project.raw_users -> model.my_project.stg_users -> model.my_project.dim_users
Граф serialized в target/graph.gpickle.
Timing: менее 100 мс.
ref(): соединяем модели в граф (dbt I)Шаг 5: SelectionSpec -> selected_uids
# core/dbt/task/runnable.py
def select_resources(self) -> Set[str]:
spec = parse_difference(self.selection_arg, self.exclusion_arg)
selector = NodeSelector(self.graph, self.manifest, ...)
selected_uids = selector.get_selected(spec)
return selected_uids
В нашем сценарии нет --select и нет --exclude — selection spec пустой, селектор возвращает все модели проекта (но не sources, не seeds — потому что dbt run selects только runnable nodes по умолчанию).
selected_uids = {
'model.my_project.stg_users',
'model.my_project.dim_users'
}
raw_users — это seed, не model. Seeds запускаются отдельной командой (dbt seed). В dbt run они skip’ятся (но граф зависимости учитывает: stg_users depends on raw_users, поэтому raw_users должен быть «уже выполнен» — что верно для seeds, они материализуются при dbt seed).
Timing: менее 50 мс.
Шаг 6: GraphQueue + ThreadPool
# core/dbt/task/runnable.py
def execute_with_hooks(self, selected_uids):
self.run_queue = self.get_graph_queue() # GraphQueue
with self.adapter.connection_named("master"):
self.before_run(adapter) # on-run-start hooks
pool = ThreadPoolExecutor(max_workers=self.config.threads)
while not self.run_queue.empty():
node = self.run_queue.get(block=True)
future = pool.submit(self.call_runner, node)
# ... handling
pool.shutdown()
self.after_run(adapter, results)
GraphQueue имеет встроенный topological sort: возвращает ноды только после того как все их dependencies done.
В нашем сценарии (threads=4):
- t=0: queue.get() возвращает
stg_users(нет deps в selected set, raw_users — seed, не selected). - t=0: Worker 1 запускает
RunRunner(stg_users). - t=200мс: stg_users done, queue.mark_done().
- t=200мс: queue.get() возвращает
dim_users(stg_users done). - t=200мс: Worker 2 запускает
RunRunner(dim_users). - t=400мс: dim_users done.
С threads=4, но только 2 ноды и линейная зависимость — параллелизм не использован. На 100 моделей с разветвлённым DAG — все 4 workers загружены.
Timing setup: менее 10 мс. Execution — зависит от моделей.
Conditional materialization: table в prod, view в devШаг 7a: RunRunner.compile() для stg_users
# core/dbt/task/run.py
class RunRunner(CompileRunner):
def execute(self, model, manifest):
compiled_node = self.compile(manifest)
# ...
def compile(self, manifest):
# Создаём ParseProvider / RuntimeProvider context
context = generate_runtime_model_context(model, self.config, manifest)
# Render Jinja
compiled_code = MacroGenerator(model.raw_code, context)()
model.compiled_code = compiled_code
return model
Для stg_users.sql (raw_code):
SELECT
user_id,
email,
created_at
FROM {{ ref('raw_users') }}
WHERE created_at IS NOT NULL
После Jinja render с RuntimeProvider context:
SELECT
user_id,
email,
created_at
FROM "my_project"."main"."raw_users"
WHERE created_at IS NOT NULL
ref('raw_users') резолвится в Relation object, который через __str__() рендерится в quoted identifier.
Записывается в target/compiled/my_project/models/staging/stg_users.sql.
Timing: 10-50 мс на простой модели. На сложных моделях с include macros может быть до секунды.
Шаг 7b: Materialization для stg_users
# core/dbt/task/run.py (упрощённо)
def execute(self, model, manifest):
compiled_node = self.compile(manifest)
materialization_macro = self.get_materialization(model) # materialization_table_default
context = generate_runtime_model_context(model, self.config, manifest)
result = MacroGenerator(materialization_macro, context)(model)
return result
Lookup materialization: model.config.materialized == 'view' (default из dbt_project.yml). dbt ищет:
materialization_view_duckdb— adapter-specific?materialization_view_default— fallback.
Находит materialization_view_default (в include/global_project/macros/materializations/models/view/). Это Jinja-макрос ~100 строк:
{% materialization view, default %}
{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='view') -%}
{%- set intermediate_relation = make_intermediate_relation(target_relation) -%}
{{ run_hooks(pre_hooks) }}
-- main statement: CREATE OR REPLACE VIEW ...
{% call statement('main') -%}
{{ create_view_as(target_relation, sql) }}
{%- endcall %}
{{ adapter.rename_relation(target_relation, backup_relation) if existing_relation }}
{{ adapter.drop_relation(intermediate_relation) }}
{{ run_hooks(post_hooks) }}
{{ adapter.commit() }}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
statement('main') — это вызов adapter.execute с SQL. Возвращает AdapterResponse.
create_view_as(target_relation, sql) — другой Jinja-макрос, который генерирует CREATE OR REPLACE VIEW ... AS .... В нашем случае:
CREATE OR REPLACE VIEW "my_project"."main"."stg_users" AS (
SELECT
user_id,
email,
created_at
FROM "my_project"."main"."raw_users"
WHERE created_at IS NOT NULL
)
Timing: macro execution 30-100 мс.
Шаг 7c: adapter.execute -> DuckDB
# Inside statement('main') macro — calls adapter.execute(sql)
# core/dbt/adapters/sql/impl.py (simplified)
def execute(self, sql, auto_begin=True, fetch=False):
connection = self.connections.get_thread_connection()
return self.connections.execute(sql, auto_begin, fetch)
get_thread_connection() — thread-local cache. На первом вызове в треде — открывает connection (DuckDB соединяется с файлом my_project.duckdb).
connections.execute(sql) — для DuckDB через dbt-duckdb:
# dbt-duckdb/dbt/adapters/duckdb/connections.py (упрощённо)
def execute(self, sql, auto_begin=True, fetch=False):
with self._cursor() as cursor:
cursor.execute(sql)
if fetch:
return cursor.fetchall()
return AdapterResponse(_message=cursor.description, rows_affected=cursor.rowcount)
cursor.execute(sql) — это DuckDB-native operation. DuckDB парсит SQL, оптимизирует, выполняет в-памяти. Для view — это просто DDL, нет данных движения.
Returns AdapterResponse(code='SUCCESS', rows_affected=0).
Timing: 10-100 мс на CREATE VIEW (быстро). На больших INSERT/MERGE — секунды/минуты.
Шаг 8: RunResult -> node_results -> run_results.json
# core/dbt/task/runnable.py
def call_runner(self, runner) -> RunResult:
started = time.time()
try:
result = runner.run() # RunResult
except Exception as e:
result = RunResult(status=NodeStatus.Error, ...)
finished = time.time()
result.execution_time = finished - started
self.node_results.append(result)
return result
После того как stg_users и dim_users отработали:
self.node_results = [
RunResult(status='success', unique_id='model.my_project.stg_users', execution_time=0.18, ...),
RunResult(status='success', unique_id='model.my_project.dim_users', execution_time=0.22, ...),
]
В after_run:
# core/dbt/task/runnable.py
def after_run(self, adapter, result):
# ... on-run-end hooks
artifact = RunResultsArtifact.from_node_results(self.node_results, ...)
write_json(artifact, "target/run_results.json")
run_results.json — это публичный артефакт (модуль 05 курса):
{
"metadata": {...},
"results": [
{
"unique_id": "model.my_project.stg_users",
"status": "success",
"execution_time": 0.18,
"adapter_response": {"code": "SUCCESS", "rows_affected": 0},
"timing": [...],
"thread_id": "Thread-1"
},
...
],
"elapsed_time": 0.42,
"args": {...}
}
Timing: менее 50 мс.
Полный timing budget
Total для нашего сценария:
- Cold (full parse): ~5-7 сек
- Warm (partial parse): ~800 мс - 1.5 сек
На production проекте с 1000 моделей:
- Cold: 60-180 сек (parse доминирует), плюс execution time моделей
- Warm: 5-15 сек fixed overhead + execution time
Боттлнек на больших проектах — parse phase. Это причина появления dbt Fusion (модуль 12) — Rust parsing даёт 30x speedup.
Что записано на диск после run
target/
├── manifest.json ← полный Manifest (модуль 04)
├── partial_parse.msgpack ← двоичный snapshot для next run
├── graph.gpickle ← networkx граф
├── run_results.json ← результаты этого run (модуль 05)
├── catalog.json ← (только после dbt docs generate)
├── compiled/
│ └── my_project/models/staging/stg_users.sql ← compiled SQL
├── run/
│ └── my_project/models/staging/stg_users.sql ← materialization-wrapped SQL
└── logs/
└── dbt.log
Senior использует все эти файлы для observability, debugging, state comparison, BI integration. Модули 04-05 курса детально разбирают artifacts.
Попробуй сам
- Запустите
dbt run --debugна любом тестовом проекте. Debug-mode выводит каждое событие event stream. - Найдите в выводе:
Acquiring new connection,Began executing node,Finished running node,Connection released. Это event types изcore/dbt/events/types.py. - Откройте
target/run_results.json. Найдите ваши модели. Посмотритеexecution_time,adapter_response,timing(массив с разбивкой на phases). - Откройте
target/compiled/.../my_model.sqlиtarget/run/.../my_model.sql. Разница — первый это compile (чистый SQL после Jinja), второй — после materialization wrap (CREATE TABLE/VIEW + main SELECT). - Профилируйте dbt run через cProfile:
Топ функций по cumulative time покажет где tracking budget.python -c " from dbt.cli.main import dbtRunner import cProfile cProfile.run('dbtRunner().invoke([\"run\"])', sort='cumulative') " | head -40
После этой задачи у вас будет mental model «вот так dbt run выглядит изнутри во времени», не только в архитектуре.