Learning Platform
Глоссарий Troubleshooting
Урок 03.02 · 26 мин
Продвинутый
parsermanifestmanifest-loader

Phase parse: построение Manifest шаг за шагом

NOTE

Здесь показываем построение Manifest в контексте парсинга — полная анатомия manifest.json (top-level keys, schema versions, node properties, use cases) разбирается в модуле 04. Пока фокус на том, как ManifestLoader конструирует объект шаг за шагом, а не на финальной структуре.

Предыдущий урок показал, как tree-sitter ускоряет parsing одной модели. Этот урок — как ManifestLoader собирает все модели, sources, тесты, snapshot’ы, macros в единый Manifest object. По шагам, с примерами кода из dbt-core.

Версия — v1.11.5. Базовый файл — core/dbt/parser/manifest.py (~800 строк).


Высокоуровневый flow ManifestLoader

# core/dbt/parser/manifest.py (упрощённо)
class ManifestLoader:
    @classmethod
    def get_full_manifest(cls, root_config, reset=False, write_perf_info=False) -> Manifest:
        # 1. Build all_projects (main + deps)
        all_projects = root_config.load_dependencies(base_macros_only=False)
        
        # 2. Check partial parse
        loader = cls(root_config, all_projects)
        manifest = loader.load()
        
        return manifest
    
    def load(self) -> Manifest:
        # 3. Load macros first (нужны для render YAML)
        self._load_macros()
        
        # 4. Load nodes (models, snapshots, seeds, tests)
        self._load_nodes()
        
        # 5. Load YAML resources (sources, exposures, metrics, semantic_models)
        self._load_yaml()
        
        # 6. Patch nodes with YAML configs
        self._patch_nodes()
        
        # 7. Process refs (резолвинг)
        self._process_refs()
        
        # 8. Process sources
        self._process_sources()
        
        # 9. Build parent_map / child_map
        self._build_parent_child_maps()
        
        # 10. Build static analysis indexes
        self._compute_introspective_indexes()
        
        return self.manifest

Каждый шаг — отдельная подсистема. Разберём по очереди.


Шаг 1-2: Build all_projects и partial parse check

root_config.load_dependencies() — это:

  1. Парсит packages.yml (или package-lock.yml если есть).
  2. Для каждой зависимости — клонирует/проверяет наличие в dbt_packages/.
  3. Каждый package становится отдельным Project объектом.
  4. Итоговый all_projects: Dict[str, Project] — main project + dependencies.

Если target/partial_parse.msgpack существует и валиден — ManifestLoader идёт по partial path (следующий урок). Иначе full parse.


Шаг 3: Load macros

_load_macros() обходит все macros/ директории во всех projects:

def _load_macros(self):
    for project_name, project in self.all_projects.items():
        for macro_path in project.macro_paths:
            for path in walk_files(macro_path, ext='.sql'):
                self._parse_macro_file(project, path)
    
    # Также macros inside models/ — для backwards compat
    # Также analyses/ — это analyses, но похожи на models

Каждый .sql файл в macros/ парсится через MacroParser:

class MacroParser(BaseParser):
    def parse_file(self, file: AnyFile):
        # Render Jinja statically — найти {% macro foo() %} blocks
        macro_blocks = extract_macro_blocks(file.contents)
        for block in macro_blocks:
            macro = Macro(
                unique_id=f"macro.{project_name}.{block.name}",
                package_name=project_name,
                name=block.name,
                arguments=block.args,
                macro_sql=block.body,
                # ...
            )
            self.manifest.macros[macro.unique_id] = macro

Macro dataclass хранит raw Jinja source макроса, не выполненный. Macros не выполняются на parse phase — они выполняются на runtime, когда вызываются из моделей или materializations.

Почему macros первыми: YAML files могут использовать macros через '{{ my_macro() }}' strings. Чтобы их рендерить, нужны macros готовы.


Шаг 4: Load nodes

_load_nodes() парсит все .sql файлы в models/, snapshots/, seeds/, tests/, analyses/:

def _load_nodes(self):
    for project_name, project in self.all_projects.items():
        # Models
        for path in walk_files(project.model_paths, ext=('.sql', '.py')):
            self._parse_model_file(project, path)
        
        # Snapshots
        for path in walk_files(project.snapshot_paths, ext='.sql'):
            self._parse_snapshot_file(project, path)
        
        # Seeds
        for path in walk_files(project.seed_paths, ext='.csv'):
            self._parse_seed_file(project, path)
        
        # Singular tests
        for path in walk_files(project.test_paths, ext='.sql'):
            self._parse_singular_test_file(project, path)
        
        # Analyses
        for path in walk_files(project.analysis_paths, ext='.sql'):
            self._parse_analysis_file(project, path)

ModelParser

core/dbt/parser/models.py:

class ModelParser(BaseParser):
    def parse_file(self, file: AnyFile):
        raw_code = file.contents
        
        # Try tree-sitter static parsing
        try:
            metadata = static_parser.parse(raw_code)
            # Extract: refs, sources, configs, calls to other macros
        except StaticParsingError:
            # Fallback: full Jinja render with execute=False
            metadata = self._full_render(file)
        
        # Build ModelNode
        node = ModelNode(
            unique_id=f"model.{project_name}.{model_name}",
            package_name=project_name,
            name=model_name,
            resource_type=NodeType.Model,
            raw_code=raw_code,
            language=detect_language(file),  # 'sql' or 'python'
            depends_on=DependsOn(
                nodes=[],  # пока пустой — резолвится на шаге _process_refs
                macros=metadata.macros,  # ['macro.dbt_utils.union_relations', ...]
            ),
            refs=metadata.refs,  # [RefArgs(name='stg_users'), ...]
            sources=metadata.sources,  # [['raw', 'users'], ...]
            config=ModelConfig.from_dict(metadata.configs),
            database=resolve_database(project, model_name),
            schema=resolve_schema(project, model_name),
            alias=resolve_alias(project, model_name),
            # ... остальные поля из config
        )
        
        self.manifest.nodes[node.unique_id] = node

Важно: на этой стадии depends_on.nodes ещё пустой. У нас есть refs: [RefArgs(name='stg_users')], но мы не знаем unique_id stg_users — может быть model.my_project.stg_users или model.dbt_utils.stg_users (если из package).

Резолвинг — на шаге 7.

SnapshotParser, SeedParser, SingularTestParser

Аналогичны ModelParser, но создают SnapshotNode, SeedNode, SingularTestNode соответственно. Каждый — отдельный resource_type:

  • snapshot.{project}.{name}SnapshotNode
  • seed.{project}.{name}SeedNode
  • test.{project}.{name}TestNode (singular tests)
  • analysis.{project}.{name}AnalysisNode

Шаг 5: Load YAML resources

_load_yaml() парсит все .yml файлы:

def _load_yaml(self):
    for project in self.all_projects.values():
        for yaml_path in find_all_yaml_files(project):
            self._parse_yaml_file(project, yaml_path)

SchemaParser обрабатывает все _models.yml, _sources.yml, _macros.yml, _exposures.yml, _metrics.yml, _semantic_models.yml:

class SchemaParser:
    def parse_yaml(self, project, file):
        yaml_dict = parse_yaml_with_jinja(file.contents)
        
        # Each top-level key is a different resource type
        if 'sources' in yaml_dict:
            self._parse_sources(yaml_dict['sources'])
        if 'models' in yaml_dict:
            self._parse_model_patches(yaml_dict['models'])
        if 'exposures' in yaml_dict:
            self._parse_exposures(yaml_dict['exposures'])
        if 'metrics' in yaml_dict:
            self._parse_metrics(yaml_dict['metrics'])
        if 'semantic_models' in yaml_dict:
            self._parse_semantic_models(yaml_dict['semantic_models'])
        if 'groups' in yaml_dict:
            self._parse_groups(yaml_dict['groups'])
        # ... unit_tests, macros (descriptions), data_tests

Patches vs new resources

YAML files делают две вещи:

  1. Create new resources — sources, exposures, metrics, groups, unit_tests. У них нет соответствующих .sql files.
  2. Patch existing nodes — для models, snapshots, macros в YAML добавляются metadata: description, columns, tests, contract.

Patches накапливаются отдельно и применяются в шаге 6.

Generic tests

Если в _models.yml:

models:
  - name: customers
    columns:
      - name: customer_id
        tests:
          - unique
          - not_null

SchemaParser парсит эти tests и создаёт TestNode для каждого:

TestNode(
    unique_id='test.my_project.unique_customers_customer_id',
    test_metadata=TestMetadata(name='unique', namespace='dbt', kwargs={'column_name': 'customer_id'}),
    depends_on=DependsOn(nodes=['model.my_project.customers']),
    # ...
)

Generic tests — это test-macros (test_unique, test_not_null) которые dbt применяет к указанной колонке. core/dbt/parser/generic_test_builders.py — где это построение.


Шаг 6: Patch nodes

После того как YAML собрано, нужно apply patches к existing nodes:

def _patch_nodes(self):
    for patch in self._yaml_model_patches:
        node = self.manifest.nodes.get(patch.unique_id)
        if not node:
            continue  # YAML описывает несуществующую модель — warning
        
        # Merge metadata from YAML into node
        node.description = patch.description or node.description
        node.columns = patch.columns or node.columns
        node.config = merge_configs(node.config, patch.config)
        node.constraints = patch.constraints
        node.contract = patch.contract
        node.access = patch.access
        node.group = patch.group
        node.version = patch.version
        # ...

После patch’а Manifest ноды содержат полный set метаданных. До patch’а — только то что извлечено из .sql исходника (config(), refs, raw_code).


Шаг 7: Process refs

_process_refs() — резолвинг ref names в unique_ids:

def _process_refs(self):
    for node in self.manifest.nodes.values():
        for ref in node.refs:
            # ref: RefArgs(name='stg_users', package=None, version=None)
            target_node = self._resolve_ref(node, ref)
            if target_node is None:
                raise CompilationError(f"Model {node.name} refs unknown model {ref.name}")
            node.depends_on.nodes.append(target_node.unique_id)

_resolve_ref — это самая хитрая часть. Алгоритм:

def _resolve_ref(self, node, ref) -> Optional[Node]:
    target_name = ref.name
    target_package = ref.package or node.package_name
    target_version = ref.version or 'latest'
    
    # Try exact match in target_package
    if target_package == node.package_name:
        candidate = self.manifest.nodes.get(f"model.{target_package}.{target_name}")
        if candidate:
            return candidate
    
    # Try across all packages (для ref('foo') без package qualifier)
    for project_name in self.all_projects:
        candidate_id = f"model.{project_name}.{target_name}"
        if candidate_id in self.manifest.nodes:
            return self.manifest.nodes[candidate_id]
    
    # Try seeds
    for project_name in self.all_projects:
        candidate_id = f"seed.{project_name}.{target_name}"
        if candidate_id in self.manifest.nodes:
            return self.manifest.nodes[candidate_id]
    
    # Try snapshots
    for project_name in self.all_projects:
        candidate_id = f"snapshot.{project_name}.{target_name}"
        if candidate_id in self.manifest.nodes:
            return self.manifest.nodes[candidate_id]
    
    return None

Edge cases:

  1. ref('foo') ambiguity. Если foo существует и в main project, и в dependency package — может быть ambiguity. dbt-core предпочитает main project, но для clean code лучше явный ref('foo', package='my_project').

  2. Versioned refs. ref('foo', version=2) — резолвится в model.my_project.foo.v2. Если version нет — берёт latest_version.

  3. Disabled nodes. Если модель имеет enabled: false в config, она в manifest.disabled, не в manifest.nodes. Ref на disabled модель — ошибка “ref to disabled”.

  4. Cross-project refs (dbt Mesh) — модуль 11. ref('foo', project='other_project') — резолвится через project_dependencies.

Model versions: v1/v2, latest_version, deprecation_date, ref({version}) Декларация sources: что это и зачем (dbt I)

Шаг 8: Process sources

Аналогично для sources:

def _process_sources(self):
    for node in self.manifest.nodes.values():
        for src in node.sources:
            # src: ['raw', 'users']
            source_node = self._resolve_source(src[0], src[1])
            if source_node is None:
                raise CompilationError(f"Unknown source {src}")
            node.depends_on.nodes.append(source_node.unique_id)

Sources создаются в шаге 5 из YAML. Их unique_id формата source.{project}.{schema}.{name}.


Шаг 9: parent_map / child_map

def _build_parent_child_maps(self):
    parent_map = defaultdict(list)
    child_map = defaultdict(list)
    
    for unique_id, node in self.manifest.nodes.items():
        for parent_id in node.depends_on.nodes:
            parent_map[unique_id].append(parent_id)
            child_map[parent_id].append(unique_id)
    
    self.manifest.parent_map = dict(parent_map)
    self.manifest.child_map = dict(child_map)

После этого:

  • manifest.parent_map['model.my_project.dim_users'] = ['model.my_project.stg_users']
  • manifest.child_map['model.my_project.stg_users'] = ['model.my_project.dim_users']

Эти maps активно используются для DAG traversal, selection (+my_model+), state comparison.


Шаг 10: introspective indexes

В конце ManifestLoader строит вспомогательные индексы для быстрых lookups:

  • manifest.ref_lookup{name -> {package -> unique_id}} для быстрого _resolve_ref
  • manifest.source_lookup — аналогично для sources
  • manifest.disabled_lookup — для disabled nodes
# Псевдокод
self.manifest.ref_lookup = ManifestLookup(
    {n.name: {n.package_name: n.unique_id} for n in nodes if n.resource_type == 'model'}
)

При следующем ref() lookup’е — O(1) вместо O(n) сканирования всех nodes.


Серилизация Manifest

После того как ManifestLoader.load() завершился, Manifest — это полноценный Python object. Он:

  1. Записывается в target/manifest.json — для внешних tools. Schema versioned (модуль 04).
  2. Записывается в target/partial_parse.msgpack — для следующего partial parse.
  3. Передаётся дальше в pipeline — в GraphRunnableTask.compile_manifest().

partial_parse.msgpack — это msgpack binary serialization Manifest + checksums файлов. Следующий run будет проверять, не изменились ли файлы, и если нет — load из msgpack за 100-500мс вместо full parse за 5-15 сек.


Performance profile parse phase

На production проекте 1500 моделей (без partial parse cache):

ШагВремя%
1-2. Project + packages load100-500мс1%
3. Macros parse1-3 сек5%
4. Nodes parse (tree-sitter)5-15 сек50%
5. YAML parse2-5 сек15%
6. Patches0.5-1 сек3%
7-8. Refs/sources process1-2 сек5%
9-10. Maps + indexes0.5-1 сек3%
Manifest serialization (msgpack + json)2-5 сек18%
Total15-30 сек

С partial parse cache hit — total 1-3 сек (90%+ ускорение).

С dbt Fusion (Rust parsing) — total 0.5-1 сек cold.


Попробуй сам

  1. Запустите dbt parse и измерьте:
    time dbt parse
  2. Удалите partial_parse и replay cold:
    rm target/partial_parse.msgpack
    time dbt parse  # cold
    time dbt parse  # warm — должно быть значительно быстрее
  3. Откройте target/manifest.json — посмотрите на структуру. Найдите свою модель, посмотрите её depends_on.nodes, refs, sources.
  4. Профилируйте parse через cProfile:
    python -c "
    from dbt.cli.main import dbtRunner
    import cProfile
    cProfile.run('dbtRunner().invoke([\"parse\"])', sort='cumulative')
    " | head -50
    Топ функций — обычно _parse_* варианты и MacroGenerator.__call__.
  5. Откройте core/dbt/parser/manifest.py — найдите class ManifestLoader, читайте load() method. Это ~50 строк, описывает все 10 шагов.

Проверка знанийKnowledge check
У вас в проекте есть YAML файл models/marts/_models.yml который описывает model 'customer_segmentation' с tests и contract. Но dbt run --select customer_segmentation возвращает 'Model not found'. Сама модель models/marts/customer_segmentation.sql существует. Что могло пойти не так и как продебажить?
ОтветAnswer
Несколько возможных причин: (1) Модель в disabled state — config('enabled', false) в .sql файле или в YAML. Проверка: dbt list --select customer_segmentation --resource-type model — если возвращает empty, посмотрите grep -r 'enabled.*false' models/marts/customer_segmentation.* и в YAML. (2) Schema parse failure — если в _models.yml есть syntax error, dbt-core может не подцепить весь файл с warning. Проверка: dbt parse --no-partial-parse 2>&1 | grep -i 'customer_segmentation' и поищите warnings/errors. (3) Patch не applied — YAML описывает модель, но имя в YAML отличается от имени файла (например, 'customer_segmentations' в YAML vs 'customer_segmentation.sql'). Patch создаст orphan entry, модель тоже найдётся, но они не связаны. Проверка: jq '.nodes | to_entries | map(select(.key | contains("customer_segmentation")))' target/manifest.json — посмотрите все nodes с этим именем. (4) Wrong file extension — может быть .sql.bak или .sqlx, dbt по умолчанию только .sql и .py. Проверка: ls models/marts/customer*. (5) Path mismatch — модель в models/marts/, но dbt_project.yml имеет model-paths: ['models'] — обычно работает, но если кастомное model-paths указывает в другую директорию, не подцепится. Проверка: cat dbt_project.yml | grep model-paths. (6) Manifest staleness — partial parse не обновился. Проверка: rm target/partial_parse.msgpack && dbt parse. После debug откройте target/manifest.json, посмотрите ключ '.nodes' — должен ли там быть 'model.YOUR_PROJECT.customer_segmentation'. Это сразу скажет, добралась ли модель до Manifest или нет.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. ManifestLoader.load() имеет ~10 шагов в определённом порядке. Почему _load_macros() идёт перед _load_nodes()?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4