Phase parse: построение Manifest шаг за шагом
Здесь показываем построение Manifest в контексте парсинга — полная анатомия manifest.json (top-level keys, schema versions, node properties, use cases) разбирается в модуле 04. Пока фокус на том, как ManifestLoader конструирует объект шаг за шагом, а не на финальной структуре.
Предыдущий урок показал, как tree-sitter ускоряет parsing одной модели. Этот урок — как ManifestLoader собирает все модели, sources, тесты, snapshot’ы, macros в единый Manifest object. По шагам, с примерами кода из dbt-core.
Версия — v1.11.5. Базовый файл — core/dbt/parser/manifest.py (~800 строк).
Высокоуровневый flow ManifestLoader
# core/dbt/parser/manifest.py (упрощённо)
class ManifestLoader:
@classmethod
def get_full_manifest(cls, root_config, reset=False, write_perf_info=False) -> Manifest:
# 1. Build all_projects (main + deps)
all_projects = root_config.load_dependencies(base_macros_only=False)
# 2. Check partial parse
loader = cls(root_config, all_projects)
manifest = loader.load()
return manifest
def load(self) -> Manifest:
# 3. Load macros first (нужны для render YAML)
self._load_macros()
# 4. Load nodes (models, snapshots, seeds, tests)
self._load_nodes()
# 5. Load YAML resources (sources, exposures, metrics, semantic_models)
self._load_yaml()
# 6. Patch nodes with YAML configs
self._patch_nodes()
# 7. Process refs (резолвинг)
self._process_refs()
# 8. Process sources
self._process_sources()
# 9. Build parent_map / child_map
self._build_parent_child_maps()
# 10. Build static analysis indexes
self._compute_introspective_indexes()
return self.manifest
Каждый шаг — отдельная подсистема. Разберём по очереди.
Шаг 1-2: Build all_projects и partial parse check
root_config.load_dependencies() — это:
- Парсит
packages.yml(илиpackage-lock.ymlесли есть). - Для каждой зависимости — клонирует/проверяет наличие в
dbt_packages/. - Каждый package становится отдельным
Projectобъектом. - Итоговый
all_projects: Dict[str, Project]— main project + dependencies.
Если target/partial_parse.msgpack существует и валиден — ManifestLoader идёт по partial path (следующий урок). Иначе full parse.
Шаг 3: Load macros
_load_macros() обходит все macros/ директории во всех projects:
def _load_macros(self):
for project_name, project in self.all_projects.items():
for macro_path in project.macro_paths:
for path in walk_files(macro_path, ext='.sql'):
self._parse_macro_file(project, path)
# Также macros inside models/ — для backwards compat
# Также analyses/ — это analyses, но похожи на models
Каждый .sql файл в macros/ парсится через MacroParser:
class MacroParser(BaseParser):
def parse_file(self, file: AnyFile):
# Render Jinja statically — найти {% macro foo() %} blocks
macro_blocks = extract_macro_blocks(file.contents)
for block in macro_blocks:
macro = Macro(
unique_id=f"macro.{project_name}.{block.name}",
package_name=project_name,
name=block.name,
arguments=block.args,
macro_sql=block.body,
# ...
)
self.manifest.macros[macro.unique_id] = macro
Macro dataclass хранит raw Jinja source макроса, не выполненный. Macros не выполняются на parse phase — они выполняются на runtime, когда вызываются из моделей или materializations.
Почему macros первыми: YAML files могут использовать macros через '{{ my_macro() }}' strings. Чтобы их рендерить, нужны macros готовы.
Шаг 4: Load nodes
_load_nodes() парсит все .sql файлы в models/, snapshots/, seeds/, tests/, analyses/:
def _load_nodes(self):
for project_name, project in self.all_projects.items():
# Models
for path in walk_files(project.model_paths, ext=('.sql', '.py')):
self._parse_model_file(project, path)
# Snapshots
for path in walk_files(project.snapshot_paths, ext='.sql'):
self._parse_snapshot_file(project, path)
# Seeds
for path in walk_files(project.seed_paths, ext='.csv'):
self._parse_seed_file(project, path)
# Singular tests
for path in walk_files(project.test_paths, ext='.sql'):
self._parse_singular_test_file(project, path)
# Analyses
for path in walk_files(project.analysis_paths, ext='.sql'):
self._parse_analysis_file(project, path)
ModelParser
core/dbt/parser/models.py:
class ModelParser(BaseParser):
def parse_file(self, file: AnyFile):
raw_code = file.contents
# Try tree-sitter static parsing
try:
metadata = static_parser.parse(raw_code)
# Extract: refs, sources, configs, calls to other macros
except StaticParsingError:
# Fallback: full Jinja render with execute=False
metadata = self._full_render(file)
# Build ModelNode
node = ModelNode(
unique_id=f"model.{project_name}.{model_name}",
package_name=project_name,
name=model_name,
resource_type=NodeType.Model,
raw_code=raw_code,
language=detect_language(file), # 'sql' or 'python'
depends_on=DependsOn(
nodes=[], # пока пустой — резолвится на шаге _process_refs
macros=metadata.macros, # ['macro.dbt_utils.union_relations', ...]
),
refs=metadata.refs, # [RefArgs(name='stg_users'), ...]
sources=metadata.sources, # [['raw', 'users'], ...]
config=ModelConfig.from_dict(metadata.configs),
database=resolve_database(project, model_name),
schema=resolve_schema(project, model_name),
alias=resolve_alias(project, model_name),
# ... остальные поля из config
)
self.manifest.nodes[node.unique_id] = node
Важно: на этой стадии depends_on.nodes ещё пустой. У нас есть refs: [RefArgs(name='stg_users')], но мы не знаем unique_id stg_users — может быть model.my_project.stg_users или model.dbt_utils.stg_users (если из package).
Резолвинг — на шаге 7.
SnapshotParser, SeedParser, SingularTestParser
Аналогичны ModelParser, но создают SnapshotNode, SeedNode, SingularTestNode соответственно. Каждый — отдельный resource_type:
snapshot.{project}.{name}—SnapshotNodeseed.{project}.{name}—SeedNodetest.{project}.{name}—TestNode(singular tests)analysis.{project}.{name}—AnalysisNode
Шаг 5: Load YAML resources
_load_yaml() парсит все .yml файлы:
def _load_yaml(self):
for project in self.all_projects.values():
for yaml_path in find_all_yaml_files(project):
self._parse_yaml_file(project, yaml_path)
SchemaParser обрабатывает все _models.yml, _sources.yml, _macros.yml, _exposures.yml, _metrics.yml, _semantic_models.yml:
class SchemaParser:
def parse_yaml(self, project, file):
yaml_dict = parse_yaml_with_jinja(file.contents)
# Each top-level key is a different resource type
if 'sources' in yaml_dict:
self._parse_sources(yaml_dict['sources'])
if 'models' in yaml_dict:
self._parse_model_patches(yaml_dict['models'])
if 'exposures' in yaml_dict:
self._parse_exposures(yaml_dict['exposures'])
if 'metrics' in yaml_dict:
self._parse_metrics(yaml_dict['metrics'])
if 'semantic_models' in yaml_dict:
self._parse_semantic_models(yaml_dict['semantic_models'])
if 'groups' in yaml_dict:
self._parse_groups(yaml_dict['groups'])
# ... unit_tests, macros (descriptions), data_tests
Patches vs new resources
YAML files делают две вещи:
- Create new resources — sources, exposures, metrics, groups, unit_tests. У них нет соответствующих
.sqlfiles. - Patch existing nodes — для models, snapshots, macros в YAML добавляются metadata: description, columns, tests, contract.
Patches накапливаются отдельно и применяются в шаге 6.
Generic tests
Если в _models.yml:
models:
- name: customers
columns:
- name: customer_id
tests:
- unique
- not_null
SchemaParser парсит эти tests и создаёт TestNode для каждого:
TestNode(
unique_id='test.my_project.unique_customers_customer_id',
test_metadata=TestMetadata(name='unique', namespace='dbt', kwargs={'column_name': 'customer_id'}),
depends_on=DependsOn(nodes=['model.my_project.customers']),
# ...
)
Generic tests — это test-macros (test_unique, test_not_null) которые dbt применяет к указанной колонке. core/dbt/parser/generic_test_builders.py — где это построение.
Шаг 6: Patch nodes
После того как YAML собрано, нужно apply patches к existing nodes:
def _patch_nodes(self):
for patch in self._yaml_model_patches:
node = self.manifest.nodes.get(patch.unique_id)
if not node:
continue # YAML описывает несуществующую модель — warning
# Merge metadata from YAML into node
node.description = patch.description or node.description
node.columns = patch.columns or node.columns
node.config = merge_configs(node.config, patch.config)
node.constraints = patch.constraints
node.contract = patch.contract
node.access = patch.access
node.group = patch.group
node.version = patch.version
# ...
После patch’а Manifest ноды содержат полный set метаданных. До patch’а — только то что извлечено из .sql исходника (config(), refs, raw_code).
Шаг 7: Process refs
_process_refs() — резолвинг ref names в unique_ids:
def _process_refs(self):
for node in self.manifest.nodes.values():
for ref in node.refs:
# ref: RefArgs(name='stg_users', package=None, version=None)
target_node = self._resolve_ref(node, ref)
if target_node is None:
raise CompilationError(f"Model {node.name} refs unknown model {ref.name}")
node.depends_on.nodes.append(target_node.unique_id)
_resolve_ref — это самая хитрая часть. Алгоритм:
def _resolve_ref(self, node, ref) -> Optional[Node]:
target_name = ref.name
target_package = ref.package or node.package_name
target_version = ref.version or 'latest'
# Try exact match in target_package
if target_package == node.package_name:
candidate = self.manifest.nodes.get(f"model.{target_package}.{target_name}")
if candidate:
return candidate
# Try across all packages (для ref('foo') без package qualifier)
for project_name in self.all_projects:
candidate_id = f"model.{project_name}.{target_name}"
if candidate_id in self.manifest.nodes:
return self.manifest.nodes[candidate_id]
# Try seeds
for project_name in self.all_projects:
candidate_id = f"seed.{project_name}.{target_name}"
if candidate_id in self.manifest.nodes:
return self.manifest.nodes[candidate_id]
# Try snapshots
for project_name in self.all_projects:
candidate_id = f"snapshot.{project_name}.{target_name}"
if candidate_id in self.manifest.nodes:
return self.manifest.nodes[candidate_id]
return None
Edge cases:
-
ref('foo')ambiguity. Еслиfooсуществует и в main project, и в dependency package — может быть ambiguity. dbt-core предпочитает main project, но для clean code лучше явныйref('foo', package='my_project'). -
Versioned refs.
ref('foo', version=2)— резолвится вmodel.my_project.foo.v2. Если version нет — берётlatest_version. -
Disabled nodes. Если модель имеет
enabled: falseв config, она вmanifest.disabled, не вmanifest.nodes. Ref на disabled модель — ошибка “ref to disabled”. -
Cross-project refs (dbt Mesh) — модуль 11.
ref('foo', project='other_project')— резолвится через project_dependencies.
Шаг 8: Process sources
Аналогично для sources:
def _process_sources(self):
for node in self.manifest.nodes.values():
for src in node.sources:
# src: ['raw', 'users']
source_node = self._resolve_source(src[0], src[1])
if source_node is None:
raise CompilationError(f"Unknown source {src}")
node.depends_on.nodes.append(source_node.unique_id)
Sources создаются в шаге 5 из YAML. Их unique_id формата source.{project}.{schema}.{name}.
Шаг 9: parent_map / child_map
def _build_parent_child_maps(self):
parent_map = defaultdict(list)
child_map = defaultdict(list)
for unique_id, node in self.manifest.nodes.items():
for parent_id in node.depends_on.nodes:
parent_map[unique_id].append(parent_id)
child_map[parent_id].append(unique_id)
self.manifest.parent_map = dict(parent_map)
self.manifest.child_map = dict(child_map)
После этого:
manifest.parent_map['model.my_project.dim_users']=['model.my_project.stg_users']manifest.child_map['model.my_project.stg_users']=['model.my_project.dim_users']
Эти maps активно используются для DAG traversal, selection (+my_model+), state comparison.
Шаг 10: introspective indexes
В конце ManifestLoader строит вспомогательные индексы для быстрых lookups:
manifest.ref_lookup—{name -> {package -> unique_id}}для быстрого_resolve_refmanifest.source_lookup— аналогично для sourcesmanifest.disabled_lookup— для disabled nodes
# Псевдокод
self.manifest.ref_lookup = ManifestLookup(
{n.name: {n.package_name: n.unique_id} for n in nodes if n.resource_type == 'model'}
)
При следующем ref() lookup’е — O(1) вместо O(n) сканирования всех nodes.
Серилизация Manifest
После того как ManifestLoader.load() завершился, Manifest — это полноценный Python object. Он:
- Записывается в
target/manifest.json— для внешних tools. Schema versioned (модуль 04). - Записывается в
target/partial_parse.msgpack— для следующего partial parse. - Передаётся дальше в pipeline — в
GraphRunnableTask.compile_manifest().
partial_parse.msgpack — это msgpack binary serialization Manifest + checksums файлов. Следующий run будет проверять, не изменились ли файлы, и если нет — load из msgpack за 100-500мс вместо full parse за 5-15 сек.
Performance profile parse phase
На production проекте 1500 моделей (без partial parse cache):
| Шаг | Время | % |
|---|---|---|
| 1-2. Project + packages load | 100-500мс | 1% |
| 3. Macros parse | 1-3 сек | 5% |
| 4. Nodes parse (tree-sitter) | 5-15 сек | 50% |
| 5. YAML parse | 2-5 сек | 15% |
| 6. Patches | 0.5-1 сек | 3% |
| 7-8. Refs/sources process | 1-2 сек | 5% |
| 9-10. Maps + indexes | 0.5-1 сек | 3% |
| Manifest serialization (msgpack + json) | 2-5 сек | 18% |
| Total | 15-30 сек |
С partial parse cache hit — total 1-3 сек (90%+ ускорение).
С dbt Fusion (Rust parsing) — total 0.5-1 сек cold.
Попробуй сам
- Запустите
dbt parseи измерьте:time dbt parse - Удалите partial_parse и replay cold:
rm target/partial_parse.msgpack time dbt parse # cold time dbt parse # warm — должно быть значительно быстрее - Откройте
target/manifest.json— посмотрите на структуру. Найдите свою модель, посмотрите еёdepends_on.nodes,refs,sources. - Профилируйте parse через cProfile:
Топ функций — обычноpython -c " from dbt.cli.main import dbtRunner import cProfile cProfile.run('dbtRunner().invoke([\"parse\"])', sort='cumulative') " | head -50_parse_*варианты иMacroGenerator.__call__. - Откройте
core/dbt/parser/manifest.py— найдитеclass ManifestLoader, читайтеload()method. Это ~50 строк, описывает все 10 шагов.