Critical hubs: Manifest и GraphRunnableTask
dbt-core содержит десятки классов, но два из них — Manifest и GraphRunnableTask — это critical hubs: всё остальное либо производит их, либо потребляет. Если из всего курса вы запомните только их, вы уже на 60% поймёте dbt internals.
В этом уроке мы разберём оба класса до полей и методов, посмотрим как они взаимодействуют, и поймём почему дизайн именно такой.
Manifest — единый contract проекта
core/dbt/contracts/graph/manifest.py содержит Manifest — главный data class dbt-core. Это единая структура данных, описывающая весь проект: модели, источники, тесты, snapshot’ы, макросы, exposures, метрики, semantic models, документы, селекторы, parent_map/child_map.
# core/dbt/contracts/graph/manifest.py (упрощённо)
@dataclass
class Manifest:
nodes: Dict[str, ManifestNode] # все ноды (models, snapshots, tests, seeds, ops)
sources: Dict[str, SourceDefinition] # source() definitions
macros: Dict[str, Macro] # все макросы из всех packages
docs: Dict[str, Documentation] # .md docs
exposures: Dict[str, Exposure] # exposures.yml
metrics: Dict[str, Metric] # metrics (deprecated в 1.6+, теперь semantic_models)
groups: Dict[str, Group] # groups для governance
selectors: Dict[str, Any] # selectors.yml
disabled: Dict[str, List[GraphMemberNode]] # отключённые ноды
files: MutableMapping[str, AnyFile] # все исходные файлы (для partial parsing)
semantic_models: Dict[str, SemanticModel] # semantic_models (1.6+)
saved_queries: Dict[str, SavedQuery] # saved_queries для semantic layer
unit_tests: Dict[str, UnitTestDefinition] # unit_tests (1.8+)
parent_map: Dict[str, List[str]] # node -> parents (resolved refs)
child_map: Dict[str, List[str]] # node -> children
metadata: ManifestMetadata # dbt_version, project_name, generated_at
Это двадцать ключей верхнего уровня. Каждый — отдельный мир со своей структурой. В сериализованном виде Manifest становится target/manifest.json (артефакт, который потом используют BI tools, dbt-docs, dbt-meshify, observability).
Ключевые свойства Manifest
-
Immutable после build. Manifest строится один раз в начале
dbt runи не меняется во время execution. Все runtime-данные (RunResult и т.п.) — отдельные структуры. -
Unique IDs. Каждая нода имеет
unique_idформата{resource_type}.{project_name}.{node_name}:model.jaffle_shop.customers— модель customerssource.jaffle_shop.raw.users— source users в schema rawtest.jaffle_shop.unique_customers_customer_id— тест unique для customers.customer_idmacro.dbt_utils.union_relations— макрос из package dbt_utilssnapshot.jaffle_shop.orders_snapshot— snapshot
-
Resolved references.
node.depends_on.nodes— это уже резолвенные unique_id’ы (не «my_model», аmodel.my_project.my_model). Резолвинг делает_process_refs()вManifestLoader. -
Parent_map / child_map. Производные структуры — derived в самом конце parsing. Использует Linker для построения графа. Доступны как поля Manifest.
Структура одной ноды
Посмотрим на ManifestNode для модели — что хранится для каждой .sql модели:
# core/dbt/contracts/graph/nodes.py (упрощённо)
@dataclass
class ModelNode(ManifestNode):
unique_id: str # 'model.jaffle_shop.customers'
package_name: str # 'jaffle_shop'
name: str # 'customers'
resource_type: NodeType # NodeType.Model
database: str # 'analytics'
schema: str # 'marts' (после generate_schema_name)
alias: str # 'customers' (или override)
fqn: List[str] # ['jaffle_shop', 'marts', 'customers']
path: str # 'marts/customers.sql' (relative to model-paths)
original_file_path: str # 'models/marts/customers.sql' (absolute)
raw_code: str # Jinja-исходник как есть
compiled_code: Optional[str] # резолвенный SQL после compile
language: str # 'sql' или 'python'
description: str # docs
columns: Dict[str, ColumnInfo] # columns: из _models.yml
config: ModelConfig # materialized, schema_, tags, ...
depends_on: DependsOn # depends_on.nodes (refs) + depends_on.macros
refs: List[RefArgs] # raw ref()'ы из SQL — для introspection
sources: List[List[str]] # raw source()'ы
contract: ModelContract # contract.enforced, columns
constraints: List[ModelLevelConstraint]
version: Optional[NodeVersion] # для versioned моделей
latest_version: Optional[NodeVersion]
deprecation_date: Optional[datetime]
access: AccessType # public / private / protected
group: Optional[str] # для governance
meta: Dict[str, Any] # arbitrary meta
tags: List[str]
created_at: float # timestamp
Это 35-40 полей для одной модели. Каждое поле либо приходит из Jinja-исходника (raw_code), либо из _models.yml (description, columns, contract, access, group), либо вычисляется (depends_on, fqn, schema через generate_schema_name).
Senior, который понимает Manifest, может за час написать observability-инструмент, который:
- Находит все модели без description (для doc coverage метрики)
- Строит граф dependencies для отдельной модели (через
child_map) - Находит контракт-нарушения после диффа двух Manifest’ов (state comparison)
Это потому что Manifest — это единый contract: всё, что нужно знать о проекте, есть в одной JSON-структуре.
Model contracts: концепт enforced контракта (dbt II) Descriptions: модели, колонки, sources (dbt I)GraphRunnableTask — главный orchestrator
Второй critical hub — GraphRunnableTask в core/dbt/task/runnable.py. Это базовый класс для всех task-классов, которые обрабатывают граф нод: RunTask, TestTask, BuildTask, SnapshotTask, SeedTask, CompileTask, RetryTask.
# core/dbt/task/runnable.py (упрощённо, ~600 строк в реальности)
class GraphRunnableTask(ConfiguredTask):
def __init__(self, args, config, manifest=None):
super().__init__(args, config, manifest)
self.run_count: int = 0
self.num_nodes: int = 0
self.node_results: List[BaseResult] = []
self.started_at: float = 0.0
self._raise_next_tick: Optional[DbtRuntimeError] = None
self.previous_state: Optional[PreviousState] = None
self.run_queue: Optional[GraphQueue] = None
self.graph: Optional[Graph] = None
self.selection_arg: Optional[Any] = None
self.exclusion_arg: Optional[Any] = None
def run(self) -> RunResult:
# 1. Compile manifest if not provided
if self.manifest is None:
self.manifest = ManifestLoader.get_full_manifest(self.config)
# 2. Build graph
self.compile_manifest() # builds self.graph
# 3. Selection
selected_uids = self.select_resources()
# 4. Execute
result = self.execute_with_hooks(selected_uids)
return result
Что делает GraphRunnableTask
-
Manifest acquisition. Если task получает Manifest снаружи (programmatic API,
dbtRunnerс pre-loaded manifest), он его использует. Иначе — вызываетManifestLoader.get_full_manifest(config). -
Graph compilation.
compile_manifest()создаётLinker, который строит networkx DAG. -
Selection.
select_resources()парсит--select/--exclude/--selectorчерезSelectionSpecи возвращаетSet[str]unique_id’ов. -
Execution with hooks.
execute_with_hooks(selected_uids):- Запускает
on-run-starthooks - Создаёт
GraphQueueиз selected nodes - В цикле берёт ноды из queue, делегирует в Runner
- Aggregates results
- Запускает
on-run-endhooks
- Запускает
-
Threading. Внутри
execute_with_hooksсоздаётсяThreadPoolExecutorсthreadsпотоками. Каждый thread берёт ноду изGraphQueue.get(), выполняет, помечает done.
- Result aggregation.
node_resultsнакапливается, в конце сериализуется вRunResultsArtifact->target/run_results.json.
Почему именно GraphRunnableTask — hub
GraphRunnableTask сидит на пересечении:
- Manifest (входной артефакт)
- Graph (производный артефакт)
- SelectionSpec (как фильтровать)
- GraphQueue (как ordered’ить выполнение)
- Runners (как выполнять каждую ноду)
- adapter (как взаимодействовать с warehouse)
- events (event stream для observability)
Любая операция dbt с графом нод проходит через него. Если вы пишете программную интеграцию (модуль 06), вы взаимодействуете с GraphRunnableTask напрямую или через dbtRunner, который его обёртывает.
Как взаимодействуют Manifest и GraphRunnableTask
Manifest — статичный contract, GraphRunnableTask — динамический orchestrator. Manifest описывает «что есть», GraphRunnableTask — «как это выполнить в правильном порядке с правильной параллелизацией».
DagRun lifecycle и state machine в AirflowПрограммный доступ к Manifest
Senior часто пишет инструменты, использующие Manifest. Минимальный пример:
from dbt.cli.main import dbtRunner
runner = dbtRunner()
# Parse only — без execute
result = runner.invoke(["parse"])
manifest = result.result # type: Manifest
# Все модели проекта
for unique_id, node in manifest.nodes.items():
if node.resource_type.value == "model":
print(f"{node.name}: materialized={node.config.materialized}")
# Зависимости конкретной модели
for parent_id in manifest.parent_map["model.my_project.customers"]:
print(f"depends on: {parent_id}")
# Все модели без описания (doc coverage)
no_desc = [
n.name for n in manifest.nodes.values()
if n.resource_type.value == "model" and not n.description
]
print(f"{len(no_desc)} models without description")
Это прямой Python-доступ к internals. Manifest полностью сериализуется в JSON (target/manifest.json), но для programmatic анализа удобнее работать с объектом — type hints, dataclasses, valid attribute access.
Для analytics и observability проектов с большой кодовой базой dbt я рекомендую держать локальный virtualenv с dbt-core и работать с Manifest как Python object’ом, не как JSON. Манифесты гигантские (~50MB для 1000 моделей), JSON parsing медленный, объектный доступ через dbtRunner.invoke(["parse"]).result быстрее и type-safe.
Где Manifest сериализуется
После ManifestLoader.load():
target/manifest.json— полный сериализованный Manifest для внешних инструментов (BI, dbt-meshify, dbt Cloud, observability)target/partial_parse.msgpack— двоичный snapshot для partial parsing (модуль 02)target/graph.gpickle— networkx граф (pickle), используется некоторыми утилитами- In-memory — главное использование, передаётся в
GraphRunnableTaskбез сериализации
manifest.json — это публичный API артефакт. Schema versioned (v12 в 1.10, v13 в 1.11+). Инструменты которые читают manifest.json должны указывать поддерживаемые versions.
Попробуй сам
- Запустите
dbt parseна любом тестовом проекте. Это создастtarget/manifest.jsonбез execution. - Откройте
target/manifest.jsonв редакторе (илиjq '.nodes | keys[]' target/manifest.json). Посмотрите top-level keys, потом одну модель —jq '.nodes | to_entries[0].value' target/manifest.json. - Откройте
core/dbt/contracts/graph/nodes.pyв клонированном dbt-core. НайдитеModelNode. Сверьте поля dataclass с полями в JSON. - Откройте
core/dbt/task/runnable.py. НайдитеGraphRunnableTask. Прочитайте методexecute_with_hooks— ~80 строк, читается за 5 минут. Найдите где создаётсяThreadPoolExecutor, гдеqueue.get(), гдеqueue.mark_done(). - Поставьте
breakpoint()вGraphRunnableTask.execute_with_hooks, запуститеdbt runна тестовом проекте, инспектируйтеself.manifest,self.graph,selected_uids. Это даст ощущение «вот они, объекты, в живой Python-сессии».