Pipeline dbt-core: от CLI до warehouse
dbt run — это команда. То, что между нажатием Enter и появлением таблицы в warehouse — это pipeline из семи крупных стадий. Каждая стадия — отдельная подсистема dbt-core, со своими структурами данных, своими исключениями, своими файлами в репозитории dbt-labs/dbt-core. Senior, понимающий pipeline, может ответить на вопрос «почему вот это не работает» не «надо погуглить», а «надо открыть core/dbt/parser/manifest.py».
В этом уроке мы пройдём по всем семи стадиям, обозначим главные классы и файлы, и подготовим базу для следующих уроков, где каждая стадия будет разобрана детально.
Семь стадий
Каждая стадия принимает output предыдущей и передаёт input следующей. Manifest — главная структура данных, передаваемая через стадии 3-7. Graph — вторая, передаваемая через 4-7. Если в любом месте Manifest неконсистентен или Graph содержит cycle — pipeline падает в конкретной стадии.
DAG как mental model в dbt I Medallion-паттерн: staging / intermediate / marts (dbt II)Стадия 1: CLI (Click + argparse)
Точка входа — core/dbt/cli/main.py. Когда вы набираете dbt run --select my_model, происходит примерно следующее:
# core/dbt/cli/main.py (упрощённо)
@click.group(...)
@click.pass_context
def cli(ctx, **kwargs):
# ...
@cli.command("run")
@click.option("--select", ...)
@click.option("--exclude", ...)
@click.option("--full-refresh", is_flag=True)
def run(ctx, **kwargs):
from dbt.task.run import RunTask
task = RunTask(args=ctx.obj["flags"], manifest=ctx.obj.get("manifest"))
return task.run()
Click создаёт Context объект, в нём flags — это argparse.Namespace-подобный объект со всеми переданными опциями. Эти flags передаются в task constructor.
Зачем Click + argparse? Click даёт удобный декларативный API для команд, но dbt также экспортирует programmatic API (модуль 06 курса). Поэтому внутри есть прослойка Flags объекта, который собирается и из CLI, и из dbtRunner.invoke(["run", "--select", "my_model"]).
Ключевые файлы:
core/dbt/cli/main.py— определения командcore/dbt/cli/flags.py—Flagsdataclass, mediator между CLI/programmaticcore/dbt/cli/option_types.py— кастомные Click типы (например, для--vars)
Стадия 2: RuntimeConfig
core/dbt/config/runtime.py содержит RuntimeConfig — класс, агрегирующий всю конфигурацию dbt-проекта.
@dataclass
class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
"""
Project + Profile + parsed CLI args + computed paths.
Immutable после создания.
"""
args: Any
cli_vars: Dict[str, Any]
# ... десятки полей
RuntimeConfig собирается через RuntimeConfig.from_args(args):
- Читает
dbt_project.yml—Projectmixin - Читает
profiles.yml—Profilemixin (по--targetилиtargetиз project) - Merge’ит vars — project vars + profile vars + CLI vars
- Резолвит пути — model-paths, macro-paths, seed-paths
- Создаёт
Credentials— type-specific dataclass из адаптера (DuckDBCredentials,SnowflakeCredentials)
Если в любой точке config invalid (например, target отсутствует в profiles.yml) — DbtProjectError или DbtProfileError.
RuntimeConfig — immutable. Если в макросе вам нужны config values, они приходят через target object в Jinja (модуль 03), не через прямой доступ к RuntimeConfig.
Ключевые файлы:
core/dbt/config/runtime.py—RuntimeConfigcore/dbt/config/project.py—Project(читает dbt_project.yml)core/dbt/config/profile.py—Profile(читает profiles.yml)core/dbt/config/selectors.py—SelectorConfig(selectors.yml для модуля 10)
Стадия 3: ManifestLoader
Это самая большая стадия по объёму кода. core/dbt/parser/manifest.py — ManifestLoader класс, который:
- Обходит все packages — главный проект + все
packages.ymlзависимости (черезdbt deps) - Для каждого package:
- Парсит все
.sqlмодели черезModelParser - Парсит все
.sqlmacros черезMacroParser - Парсит
_models.yml,_sources.yml,_macros.ymlчерезSchemaParser - Парсит snapshots, seeds, tests, exposures, metrics, semantic_models, groups, selectors
- Парсит все
- Собирает
Manifest— главный artefact
# core/dbt/parser/manifest.py (упрощённо)
class ManifestLoader:
def load(self) -> Manifest:
self._load_macros()
self._load_nodes() # models, snapshots, seeds, tests
self._load_sources()
self._load_exposures()
self._load_metrics()
self._load_semantic_models()
self._load_groups()
self._patch_nodes() # YAML overrides из _models.yml
self._process_refs() # резолвит ref() -> unique_id
self._process_sources() # source() -> unique_id
return self.manifest
Partial parsing: если на диске есть target/partial_parse.msgpack и hash проектных файлов не изменился, ManifestLoader загружает Manifest из msgpack вместо полного re-parse. Это сокращает время с 60-120 сек до 1-3 сек на больших проектах. Детали — модуль 02 курса.
Output: Manifest — главный data structure, передаваемый дальше.
Ключевые файлы:
core/dbt/parser/manifest.py—ManifestLoadercore/dbt/parser/models.py—ModelParsercore/dbt/parser/macros.py—MacroParsercore/dbt/parser/schemas.py—SchemaParser(YAML)core/dbt/parser/snapshots.py,seeds.py,tests.py,sources.py— специализированные парсерыcore/dbt/contracts/graph/manifest.py—Manifestdataclass
Стадия 4: Linker и DAG
После того как Manifest собран, нужно построить граф зависимостей. Это делает Linker в core/dbt/compilation.py.
# core/dbt/compilation.py (упрощённо)
class Linker:
def __init__(self):
self.graph = nx.DiGraph() # networkx directed graph
def link_graph(self, manifest: Manifest):
for unique_id, node in manifest.nodes.items():
self.graph.add_node(unique_id, **node.to_dict())
for dep in node.depends_on.nodes:
self.graph.add_edge(dep, unique_id)
# cycle check
try:
nx.find_cycle(self.graph)
raise DbtRuntimeError("Cycle detected in DAG")
except nx.NetworkXNoCycle:
pass
return self.graph
Manifest.nodes[unique_id].depends_on.nodes — это уже резолвенные ref()-зависимости (резолвинг прошёл в стадии 3 через _process_refs()). Linker просто переводит их в edges networkx-графа.
Cycle detection — здесь же. Если ваши модели образуют цикл (A ref’ит B, B ref’ит A напрямую или через macro), DbtRuntimeError бросается с указанием конкретного цикла.
Граф serialized’ится в target/graph.gpickle для последующего использования (например, dbt list --select my_model+).
Ключевые файлы:
core/dbt/compilation.py—Linker,Compilercore/dbt/graph/queue.py—GraphQueue(используется в стадии 5)core/dbt/graph/selector.py—NodeSelector(--select,--exclude)
Стадия 5: GraphRunnableTask
Это второй critical hub после Manifest. core/dbt/task/runnable.py содержит GraphRunnableTask — базовый класс для RunTask, TestTask, BuildTask, SnapshotTask.
# core/dbt/task/runnable.py (упрощённо)
class GraphRunnableTask(ConfiguredTask):
def run(self) -> RunResult:
self.before_run(adapter, selected_uids)
self.execute_with_hooks(selected_uids)
self.after_run(adapter, result)
return result
def execute_with_hooks(self, selected_uids):
# Создаём GraphQueue из selected nodes
queue = self.get_graph_queue()
with self.adapter.connection_named("master"):
while not queue.empty():
node = queue.get(block=True)
runner = self.get_runner(node) # RunRunner для моделей
result = self.call_runner(runner)
queue.mark_done(node.unique_id)
GraphQueue — это thread-safe очередь, которая выдаёт ноды для выполнения с учётом зависимостей. Если node B зависит от A, B не выдаётся пока A не done.
Threading: threads config (по умолчанию 4, конфигурируется в profiles.yml). GraphQueue поддерживает параллелизм — каждый thread берёт node из очереди.
Selection: --select, --exclude, --selector парсятся через SelectionSpec (core/dbt/graph/selector_spec.py) и применяются перед стартом — selected_uids это Set[str] unique_id’ов, которые будут запущены.
Ключевые файлы:
core/dbt/task/runnable.py—GraphRunnableTaskcore/dbt/task/run.py—RunTask,RunRunnercore/dbt/task/test.py—TestTask,TestRunnercore/dbt/task/snapshot.py—SnapshotTask,SnapshotRunnercore/dbt/task/build.py—BuildTask(combo)
Стадия 6: Runners
Каждый node-type имеет свой Runner. Для моделей — RunRunner (core/dbt/task/run.py).
# core/dbt/task/run.py (упрощённо)
class RunRunner(CompileRunner):
def execute(self, model, manifest):
# 1. Compile — резолвим Jinja в чистый SQL
compiled_node = self.compile(manifest)
# 2. Materialize — вызываем materialization macro
materialization_macro = self.get_materialization(model)
# 3. Execute materialization — рендерит и шлёт SQL
result = MacroGenerator(materialization_macro, context)(model)
return result
Compile phase: Jinja-источник модели рендерится с execute=False context — ref(), source(), var() доступны, но statement(), run_query(), adapter.dispatch() — undefined. Result — это compiled_node.compiled_code (чистый SQL без Jinja).
Materialize phase: materialization macro (например, materialization('table', adapter='duckdb')) вызывается с execute=True context. Внутри macro используется statement('main') для отправки реального SQL в warehouse.
Output: RunResult — dataclass с status, execution_time, adapter_response, message, добавляется в общий RunResultsArtifact.
Ключевые файлы:
core/dbt/task/run.py—RunRunnercore/dbt/task/base.py—BaseRunner,ExecutionContextcore/dbt/context/providers.py—ProviderContext(контекст рендеринга)
Стадия 7: BaseAdapter
Финальная стадия — реальное выполнение SQL в warehouse. Через adapter API.
# core/dbt/adapters/base/impl.py (упрощённо, в современном dbt — отдельный package dbt-adapters)
class BaseAdapter:
def execute(self, sql, auto_begin=False, fetch=False):
connection = self.connections.get_thread_connection()
return self.connections.execute(sql, auto_begin, fetch)
def add_query(self, sql, ...):
# Один SQL через connection
...
def get_relation(self, database, schema, identifier):
# Метаданные таблицы из information_schema
...
BaseAdapter — abstract. SQLAdapter — common implementation для SQL warehouses. Конкретные adapters (DuckDBAdapter, PostgresAdapter, SnowflakeAdapter) extends SQLAdapter.
ConnectionManager управляет connections (open/close/cancel/exception_handler). Thread-local — каждый thread получает свой connection.
Output: AdapterResponse — dataclass с rows_affected, code, _message — это попадает в run_results.json.
Ключевые файлы:
dbt-adapters/dbt/adapters/base/impl.py—BaseAdapterdbt-adapters/dbt/adapters/sql/impl.py—SQLAdapterdbt-duckdb/dbt/adapters/duckdb/impl.py—DuckDBAdapter(читаемый пример adapter)dbt-adapters/dbt/adapters/base/connections.py—BaseConnectionManager
Полный путь одного dbt run --select my_model
Соберём всё вместе. Команда — dbt run --select my_model, проект — небольшой с одним моделей.
Это ~50000 строк Python-кода в dbt-core, плюс ~5000 в dbt-adapters, плюс ~3000 в каждом конкретном adapter — для того, чтобы выполнить одну простую модель. И это правильно для system такого scope: dbt поддерживает 30+ адаптеров, partial parsing, dbt Mesh, contracts, microbatch, semantic layer. Сложность распределена.
Наследование, MRO, C3 linearization в PythonПопробуй сам
- Клонируйте dbt-core локально.
git clone https://github.com/dbt-labs/dbt-core cd dbt-core git checkout v1.11.5 - Откройте
core/dbt/cli/main.py. Найдите командуrun. Посмотрите как она зарегистрирована. - Откройте
core/dbt/task/run.py. Найдите классRunTask. Посмотрите__init__иrun()методы. - Откройте
core/dbt/parser/manifest.py. Найдите классManifestLoader. Прочитайтеload()метод — это ~50 строк, читается за 2 минуты. - Поставьте
breakpoint()вRunTask.run(). Запуститеdbt runна любом тестовом проекте. Когдаpdbсработает — введитеdir(self)и посмотрите атрибуты RunTask. Найдитеmanifest,selected_uids,node_results.
После этой задачи pipeline перестанет быть абстракцией — будет конкретные файлы и классы в вашей IDE.