Learning Platform
Глоссарий Troubleshooting
Урок 02.02 · 26 мин
Продвинутый
dbt-coremanifesttask

Critical hubs: Manifest и GraphRunnableTask

dbt-core содержит десятки классов, но два из них — Manifest и GraphRunnableTask — это critical hubs: всё остальное либо производит их, либо потребляет. Если из всего курса вы запомните только их, вы уже на 60% поймёте dbt internals.

В этом уроке мы разберём оба класса до полей и методов, посмотрим как они взаимодействуют, и поймём почему дизайн именно такой.


Manifest — единый contract проекта

core/dbt/contracts/graph/manifest.py содержит Manifest — главный data class dbt-core. Это единая структура данных, описывающая весь проект: модели, источники, тесты, snapshot’ы, макросы, exposures, метрики, semantic models, документы, селекторы, parent_map/child_map.

# core/dbt/contracts/graph/manifest.py (упрощённо)
@dataclass
class Manifest:
    nodes: Dict[str, ManifestNode]               # все ноды (models, snapshots, tests, seeds, ops)
    sources: Dict[str, SourceDefinition]         # source() definitions
    macros: Dict[str, Macro]                     # все макросы из всех packages
    docs: Dict[str, Documentation]               # .md docs
    exposures: Dict[str, Exposure]               # exposures.yml
    metrics: Dict[str, Metric]                   # metrics (deprecated в 1.6+, теперь semantic_models)
    groups: Dict[str, Group]                     # groups для governance
    selectors: Dict[str, Any]                    # selectors.yml
    disabled: Dict[str, List[GraphMemberNode]]   # отключённые ноды
    files: MutableMapping[str, AnyFile]          # все исходные файлы (для partial parsing)
    semantic_models: Dict[str, SemanticModel]    # semantic_models (1.6+)
    saved_queries: Dict[str, SavedQuery]         # saved_queries для semantic layer
    unit_tests: Dict[str, UnitTestDefinition]    # unit_tests (1.8+)
    parent_map: Dict[str, List[str]]             # node -> parents (resolved refs)
    child_map: Dict[str, List[str]]              # node -> children
    metadata: ManifestMetadata                   # dbt_version, project_name, generated_at

Это двадцать ключей верхнего уровня. Каждый — отдельный мир со своей структурой. В сериализованном виде Manifest становится target/manifest.json (артефакт, который потом используют BI tools, dbt-docs, dbt-meshify, observability).

__slots__ и @dataclass: память и удобство

Ключевые свойства Manifest

  1. Immutable после build. Manifest строится один раз в начале dbt run и не меняется во время execution. Все runtime-данные (RunResult и т.п.) — отдельные структуры.

  2. Unique IDs. Каждая нода имеет unique_id формата {resource_type}.{project_name}.{node_name}:

    • model.jaffle_shop.customers — модель customers
    • source.jaffle_shop.raw.users — source users в schema raw
    • test.jaffle_shop.unique_customers_customer_id — тест unique для customers.customer_id
    • macro.dbt_utils.union_relations — макрос из package dbt_utils
    • snapshot.jaffle_shop.orders_snapshot — snapshot
  3. Resolved references. node.depends_on.nodes — это уже резолвенные unique_id’ы (не «my_model», а model.my_project.my_model). Резолвинг делает _process_refs() в ManifestLoader.

  4. Parent_map / child_map. Производные структуры — derived в самом конце parsing. Использует Linker для построения графа. Доступны как поля Manifest.


Структура одной ноды

Посмотрим на ManifestNode для модели — что хранится для каждой .sql модели:

# core/dbt/contracts/graph/nodes.py (упрощённо)
@dataclass
class ModelNode(ManifestNode):
    unique_id: str                              # 'model.jaffle_shop.customers'
    package_name: str                           # 'jaffle_shop'
    name: str                                   # 'customers'
    resource_type: NodeType                     # NodeType.Model
    
    database: str                               # 'analytics'
    schema: str                                 # 'marts' (после generate_schema_name)
    alias: str                                  # 'customers' (или override)
    
    fqn: List[str]                              # ['jaffle_shop', 'marts', 'customers']
    
    path: str                                   # 'marts/customers.sql' (relative to model-paths)
    original_file_path: str                     # 'models/marts/customers.sql' (absolute)
    
    raw_code: str                               # Jinja-исходник как есть
    compiled_code: Optional[str]                # резолвенный SQL после compile
    language: str                               # 'sql' или 'python'
    
    description: str                            # docs
    columns: Dict[str, ColumnInfo]              # columns: из _models.yml
    
    config: ModelConfig                         # materialized, schema_, tags, ...
    
    depends_on: DependsOn                       # depends_on.nodes (refs) + depends_on.macros
    refs: List[RefArgs]                         # raw ref()'ы из SQL — для introspection
    sources: List[List[str]]                    # raw source()'ы
    
    contract: ModelContract                     # contract.enforced, columns
    constraints: List[ModelLevelConstraint]
    
    version: Optional[NodeVersion]              # для versioned моделей
    latest_version: Optional[NodeVersion]
    deprecation_date: Optional[datetime]
    
    access: AccessType                          # public / private / protected
    group: Optional[str]                        # для governance
    
    meta: Dict[str, Any]                        # arbitrary meta
    tags: List[str]
    
    created_at: float                           # timestamp

Это 35-40 полей для одной модели. Каждое поле либо приходит из Jinja-исходника (raw_code), либо из _models.yml (description, columns, contract, access, group), либо вычисляется (depends_on, fqn, schema через generate_schema_name).

Senior, который понимает Manifest, может за час написать observability-инструмент, который:

  • Находит все модели без description (для doc coverage метрики)
  • Строит граф dependencies для отдельной модели (через child_map)
  • Находит контракт-нарушения после диффа двух Manifest’ов (state comparison)

Это потому что Manifest — это единый contract: всё, что нужно знать о проекте, есть в одной JSON-структуре.

Model contracts: концепт enforced контракта (dbt II) Descriptions: модели, колонки, sources (dbt I)

GraphRunnableTask — главный orchestrator

Второй critical hub — GraphRunnableTask в core/dbt/task/runnable.py. Это базовый класс для всех task-классов, которые обрабатывают граф нод: RunTask, TestTask, BuildTask, SnapshotTask, SeedTask, CompileTask, RetryTask.

# core/dbt/task/runnable.py (упрощённо, ~600 строк в реальности)
class GraphRunnableTask(ConfiguredTask):
    def __init__(self, args, config, manifest=None):
        super().__init__(args, config, manifest)
        self.run_count: int = 0
        self.num_nodes: int = 0
        self.node_results: List[BaseResult] = []
        self.started_at: float = 0.0
        self._raise_next_tick: Optional[DbtRuntimeError] = None
        self.previous_state: Optional[PreviousState] = None
        self.run_queue: Optional[GraphQueue] = None
        self.graph: Optional[Graph] = None
        self.selection_arg: Optional[Any] = None
        self.exclusion_arg: Optional[Any] = None

    def run(self) -> RunResult:
        # 1. Compile manifest if not provided
        if self.manifest is None:
            self.manifest = ManifestLoader.get_full_manifest(self.config)
        # 2. Build graph
        self.compile_manifest()    # builds self.graph
        # 3. Selection
        selected_uids = self.select_resources()
        # 4. Execute
        result = self.execute_with_hooks(selected_uids)
        return result

Что делает GraphRunnableTask

  1. Manifest acquisition. Если task получает Manifest снаружи (programmatic API, dbtRunner с pre-loaded manifest), он его использует. Иначе — вызывает ManifestLoader.get_full_manifest(config).

  2. Graph compilation. compile_manifest() создаёт Linker, который строит networkx DAG.

  3. Selection. select_resources() парсит --select/--exclude/--selector через SelectionSpec и возвращает Set[str] unique_id’ов.

  4. Execution with hooks. execute_with_hooks(selected_uids):

    • Запускает on-run-start hooks
    • Создаёт GraphQueue из selected nodes
    • В цикле берёт ноды из queue, делегирует в Runner
    • Aggregates results
    • Запускает on-run-end hooks
  5. Threading. Внутри execute_with_hooks создаётся ThreadPoolExecutor с threads потоками. Каждый thread берёт ноду из GraphQueue.get(), выполняет, помечает done.

GIL + threading vs multiprocessing в Python
  1. Result aggregation. node_results накапливается, в конце сериализуется в RunResultsArtifact -> target/run_results.json.

Почему именно GraphRunnableTask — hub

GraphRunnableTask сидит на пересечении:

  • Manifest (входной артефакт)
  • Graph (производный артефакт)
  • SelectionSpec (как фильтровать)
  • GraphQueue (как ordered’ить выполнение)
  • Runners (как выполнять каждую ноду)
  • adapter (как взаимодействовать с warehouse)
  • events (event stream для observability)

Любая операция dbt с графом нод проходит через него. Если вы пишете программную интеграцию (модуль 06), вы взаимодействуете с GraphRunnableTask напрямую или через dbtRunner, который его обёртывает.


Как взаимодействуют Manifest и GraphRunnableTask

Manifest -> GraphRunnableTask data flow
ManifestLoaderParses all .sql/.yml/.py. Resolves refs. Builds Manifest object.
Manifest
GraphRunnableTask.compile_manifestWraps Manifest in Graph. Linker creates networkx DiGraph. parent_map/child_map уже built в Manifest.
SelectionSpecParses --select, --exclude. selected_uids = Set[unique_id] from Manifest.nodes.
GraphQueueTopologically-sorted queue. get() returns next available node (deps satisfied). mark_done() signals completion.
ThreadPoolThreadPoolExecutor with self.config.threads workers. Each worker: queue.get() -> runner.run() -> queue.mark_done().
RunRunnerPer-node execution: compile (Jinja -> SQL), get materialization macro, execute SQL via adapter.
adapter.executeadapter.execute(sql) -> ConnectionManager -> warehouse. Returns AdapterResponse.
RunResultdataclass with status, execution_time, adapter_response, message. Appended to GraphRunnableTask.node_results.
RunResultsArtifactSerialized to target/run_results.json. Includes elapsed_time, args, results array.

Manifest — статичный contract, GraphRunnableTask — динамический orchestrator. Manifest описывает «что есть», GraphRunnableTask — «как это выполнить в правильном порядке с правильной параллелизацией».

DagRun lifecycle и state machine в Airflow

Программный доступ к Manifest

Senior часто пишет инструменты, использующие Manifest. Минимальный пример:

from dbt.cli.main import dbtRunner

runner = dbtRunner()
# Parse only — без execute
result = runner.invoke(["parse"])
manifest = result.result  # type: Manifest

# Все модели проекта
for unique_id, node in manifest.nodes.items():
    if node.resource_type.value == "model":
        print(f"{node.name}: materialized={node.config.materialized}")

# Зависимости конкретной модели
for parent_id in manifest.parent_map["model.my_project.customers"]:
    print(f"depends on: {parent_id}")

# Все модели без описания (doc coverage)
no_desc = [
    n.name for n in manifest.nodes.values()
    if n.resource_type.value == "model" and not n.description
]
print(f"{len(no_desc)} models without description")

Это прямой Python-доступ к internals. Manifest полностью сериализуется в JSON (target/manifest.json), но для programmatic анализа удобнее работать с объектом — type hints, dataclasses, valid attribute access.

TIP

Для analytics и observability проектов с большой кодовой базой dbt я рекомендую держать локальный virtualenv с dbt-core и работать с Manifest как Python object’ом, не как JSON. Манифесты гигантские (~50MB для 1000 моделей), JSON parsing медленный, объектный доступ через dbtRunner.invoke(["parse"]).result быстрее и type-safe.


Где Manifest сериализуется

После ManifestLoader.load():

  • target/manifest.json — полный сериализованный Manifest для внешних инструментов (BI, dbt-meshify, dbt Cloud, observability)
  • target/partial_parse.msgpack — двоичный snapshot для partial parsing (модуль 02)
  • target/graph.gpickle — networkx граф (pickle), используется некоторыми утилитами
  • In-memory — главное использование, передаётся в GraphRunnableTask без сериализации

manifest.json — это публичный API артефакт. Schema versioned (v12 в 1.10, v13 в 1.11+). Инструменты которые читают manifest.json должны указывать поддерживаемые versions.


Попробуй сам

  1. Запустите dbt parse на любом тестовом проекте. Это создаст target/manifest.json без execution.
  2. Откройте target/manifest.json в редакторе (или jq '.nodes | keys[]' target/manifest.json). Посмотрите top-level keys, потом одну модель — jq '.nodes | to_entries[0].value' target/manifest.json.
  3. Откройте core/dbt/contracts/graph/nodes.py в клонированном dbt-core. Найдите ModelNode. Сверьте поля dataclass с полями в JSON.
  4. Откройте core/dbt/task/runnable.py. Найдите GraphRunnableTask. Прочитайте метод execute_with_hooks — ~80 строк, читается за 5 минут. Найдите где создаётся ThreadPoolExecutor, где queue.get(), где queue.mark_done().
  5. Поставьте breakpoint() в GraphRunnableTask.execute_with_hooks, запустите dbt run на тестовом проекте, инспектируйте self.manifest, self.graph, selected_uids. Это даст ощущение «вот они, объекты, в живой Python-сессии».

Проверка знанийKnowledge check
Почему Manifest и GraphRunnableTask — это два разных hub'а, а не один объединённый? Какая логика разделения?
ОтветAnswer
Логика — separation of concerns по фазам жизненного цикла. Manifest — это immutable contract проекта: что в нём есть, какие зависимости, какие свойства каждой ноды. Он строится один раз и не меняется во время run. GraphRunnableTask — это dynamic orchestrator: как выполнить ноды в правильном порядке, с правильной параллелизацией, с правильным error handling. Он принимает Manifest как input и создаёт ephemeral state (queue, results, node_results) для конкретного run. Объединение в один класс нарушило бы: (1) immutability Manifest — class state менялся бы во время execution; (2) reusability — один Manifest может быть использован для нескольких runs (dbt run + dbt test, dbt compile + dbt run --select на разные нодах); (3) testability — Manifest можно строить в тестах без запуска execution; (4) programmatic API — dbtRunner может pre-loaded Manifest передать в несколько последовательных invocations. Это классический pattern: data structure (Manifest) отдельно от algorithm operating on it (GraphRunnableTask). Senior, который видит это разделение, лучше проектирует свои custom integrations.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Manifest имеет ~20 top-level keys (nodes, sources, macros, exposures, metrics, groups, semantic_models, parent_map, child_map, ...). Какая логика разделения keys?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4