Learning Platform
Глоссарий Troubleshooting
Урок 02.01 · 28 мин
Продвинутый
dbt-corearchitecturepipeline

Pipeline dbt-core: от CLI до warehouse

dbt run — это команда. То, что между нажатием Enter и появлением таблицы в warehouse — это pipeline из семи крупных стадий. Каждая стадия — отдельная подсистема dbt-core, со своими структурами данных, своими исключениями, своими файлами в репозитории dbt-labs/dbt-core. Senior, понимающий pipeline, может ответить на вопрос «почему вот это не работает» не «надо погуглить», а «надо открыть core/dbt/parser/manifest.py».

В этом уроке мы пройдём по всем семи стадиям, обозначим главные классы и файлы, и подготовим базу для следующих уроков, где каждая стадия будет разобрана детально.


Семь стадий

Pipeline dbt-core: семь стадий от CLI до warehouse
1. CLIcore/dbt/cli/main.py — argparse-based entry point. Парсит CLI флаги, создаёт click-команду, делегирует в task.
2. RuntimeConfigcore/dbt/config/runtime.py — собирает dbt_project.yml + profiles.yml + CLI vars в единый Config object. Resolved paths, target connection, vars merge.
3. ManifestLoadercore/dbt/parser/manifest.py — главный класс парсинга. Читает все .sql/.yml/.py файлы проекта, обходит все packages, строит Manifest object. Использует partial_parse.msgpack для инкрементальности.
4. Linker / Graphcore/dbt/compilation.py + core/dbt/graph/ — берёт Manifest, строит networkx DAG. Resolves все ref() в edges. Output: Linker object + serialized graph.gpickle.
5. GraphRunnableTaskcore/dbt/task/runnable.py — главный hub выполнения. Принимает Manifest + Graph, селектирует ноды (SelectionSpec), управляет execution через GraphQueue, dispatch'ит каждую ноду в Runner.
6. Runnerscore/dbt/task/run.py + base.py — RunRunner для моделей, TestRunner для тестов, SnapshotRunner. Каждый Runner — выполнение одной ноды: compile, execute, materialize.
7. BaseAdaptercore/dbt/adapters/base/impl.py (legacy) + dbt-adapters package — диспетчер к SQL через ConnectionManager. Каждый adapter (dbt-duckdb, dbt-postgres) extends Base.

Каждая стадия принимает output предыдущей и передаёт input следующей. Manifest — главная структура данных, передаваемая через стадии 3-7. Graph — вторая, передаваемая через 4-7. Если в любом месте Manifest неконсистентен или Graph содержит cycle — pipeline падает в конкретной стадии.

DAG как mental model в dbt I Medallion-паттерн: staging / intermediate / marts (dbt II)

Стадия 1: CLI (Click + argparse)

Точка входа — core/dbt/cli/main.py. Когда вы набираете dbt run --select my_model, происходит примерно следующее:

# core/dbt/cli/main.py (упрощённо)
@click.group(...)
@click.pass_context
def cli(ctx, **kwargs):
    # ...

@cli.command("run")
@click.option("--select", ...)
@click.option("--exclude", ...)
@click.option("--full-refresh", is_flag=True)
def run(ctx, **kwargs):
    from dbt.task.run import RunTask
    task = RunTask(args=ctx.obj["flags"], manifest=ctx.obj.get("manifest"))
    return task.run()

Click создаёт Context объект, в нём flags — это argparse.Namespace-подобный объект со всеми переданными опциями. Эти flags передаются в task constructor.

Зачем Click + argparse? Click даёт удобный декларативный API для команд, но dbt также экспортирует programmatic API (модуль 06 курса). Поэтому внутри есть прослойка Flags объекта, который собирается и из CLI, и из dbtRunner.invoke(["run", "--select", "my_model"]).

Команды dbt CLI: debug, run, test, build и компания

Ключевые файлы:

  • core/dbt/cli/main.py — определения команд
  • core/dbt/cli/flags.pyFlags dataclass, mediator между CLI/programmatic
  • core/dbt/cli/option_types.py — кастомные Click типы (например, для --vars)

Стадия 2: RuntimeConfig

core/dbt/config/runtime.py содержит RuntimeConfig — класс, агрегирующий всю конфигурацию dbt-проекта.

@dataclass
class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
    """
    Project + Profile + parsed CLI args + computed paths.
    Immutable после создания.
    """
    args: Any
    cli_vars: Dict[str, Any]
    # ... десятки полей

RuntimeConfig собирается через RuntimeConfig.from_args(args):

  1. Читает dbt_project.ymlProject mixin
  2. Читает profiles.ymlProfile mixin (по --target или target из project)
  3. Merge’ит vars — project vars + profile vars + CLI vars
  4. Резолвит пути — model-paths, macro-paths, seed-paths
  5. Создаёт Credentials — type-specific dataclass из адаптера (DuckDBCredentials, SnowflakeCredentials)

Если в любой точке config invalid (например, target отсутствует в profiles.yml) — DbtProjectError или DbtProfileError.

NOTE

RuntimeConfig — immutable. Если в макросе вам нужны config values, они приходят через target object в Jinja (модуль 03), не через прямой доступ к RuntimeConfig.

Ключевые файлы:

  • core/dbt/config/runtime.pyRuntimeConfig
  • core/dbt/config/project.pyProject (читает dbt_project.yml)
  • core/dbt/config/profile.pyProfile (читает profiles.yml)
  • core/dbt/config/selectors.pySelectorConfig (selectors.yml для модуля 10)
env_var(): чтение environment variables и секреты

Стадия 3: ManifestLoader

Это самая большая стадия по объёму кода. core/dbt/parser/manifest.pyManifestLoader класс, который:

  1. Обходит все packages — главный проект + все packages.yml зависимости (через dbt deps)
  2. Для каждого package:
    • Парсит все .sql модели через ModelParser
    • Парсит все .sql macros через MacroParser
    • Парсит _models.yml, _sources.yml, _macros.yml через SchemaParser
    • Парсит snapshots, seeds, tests, exposures, metrics, semantic_models, groups, selectors
  3. Собирает Manifest — главный artefact
# core/dbt/parser/manifest.py (упрощённо)
class ManifestLoader:
    def load(self) -> Manifest:
        self._load_macros()
        self._load_nodes()      # models, snapshots, seeds, tests
        self._load_sources()
        self._load_exposures()
        self._load_metrics()
        self._load_semantic_models()
        self._load_groups()
        self._patch_nodes()      # YAML overrides из _models.yml
        self._process_refs()     # резолвит ref() -> unique_id
        self._process_sources()  # source() -> unique_id
        return self.manifest

Partial parsing: если на диске есть target/partial_parse.msgpack и hash проектных файлов не изменился, ManifestLoader загружает Manifest из msgpack вместо полного re-parse. Это сокращает время с 60-120 сек до 1-3 сек на больших проектах. Детали — модуль 02 курса.

Output: Manifest — главный data structure, передаваемый дальше.

Ключевые файлы:

  • core/dbt/parser/manifest.pyManifestLoader
  • core/dbt/parser/models.pyModelParser
  • core/dbt/parser/macros.pyMacroParser
  • core/dbt/parser/schemas.pySchemaParser (YAML)
  • core/dbt/parser/snapshots.py, seeds.py, tests.py, sources.py — специализированные парсеры
  • core/dbt/contracts/graph/manifest.pyManifest dataclass

Стадия 4: Linker и DAG

После того как Manifest собран, нужно построить граф зависимостей. Это делает Linker в core/dbt/compilation.py.

# core/dbt/compilation.py (упрощённо)
class Linker:
    def __init__(self):
        self.graph = nx.DiGraph()  # networkx directed graph

    def link_graph(self, manifest: Manifest):
        for unique_id, node in manifest.nodes.items():
            self.graph.add_node(unique_id, **node.to_dict())
            for dep in node.depends_on.nodes:
                self.graph.add_edge(dep, unique_id)
        # cycle check
        try:
            nx.find_cycle(self.graph)
            raise DbtRuntimeError("Cycle detected in DAG")
        except nx.NetworkXNoCycle:
            pass
        return self.graph

Manifest.nodes[unique_id].depends_on.nodes — это уже резолвенные ref()-зависимости (резолвинг прошёл в стадии 3 через _process_refs()). Linker просто переводит их в edges networkx-графа.

Cycle detection — здесь же. Если ваши модели образуют цикл (A ref’ит B, B ref’ит A напрямую или через macro), DbtRuntimeError бросается с указанием конкретного цикла.

Граф serialized’ится в target/graph.gpickle для последующего использования (например, dbt list --select my_model+).

Ключевые файлы:

  • core/dbt/compilation.pyLinker, Compiler
  • core/dbt/graph/queue.pyGraphQueue (используется в стадии 5)
  • core/dbt/graph/selector.pyNodeSelector (--select, --exclude)

Стадия 5: GraphRunnableTask

Это второй critical hub после Manifest. core/dbt/task/runnable.py содержит GraphRunnableTask — базовый класс для RunTask, TestTask, BuildTask, SnapshotTask.

# core/dbt/task/runnable.py (упрощённо)
class GraphRunnableTask(ConfiguredTask):
    def run(self) -> RunResult:
        self.before_run(adapter, selected_uids)
        self.execute_with_hooks(selected_uids)
        self.after_run(adapter, result)
        return result

    def execute_with_hooks(self, selected_uids):
        # Создаём GraphQueue из selected nodes
        queue = self.get_graph_queue()
        with self.adapter.connection_named("master"):
            while not queue.empty():
                node = queue.get(block=True)
                runner = self.get_runner(node)  # RunRunner для моделей
                result = self.call_runner(runner)
                queue.mark_done(node.unique_id)

GraphQueue — это thread-safe очередь, которая выдаёт ноды для выполнения с учётом зависимостей. Если node B зависит от A, B не выдаётся пока A не done.

Threading: threads config (по умолчанию 4, конфигурируется в profiles.yml). GraphQueue поддерживает параллелизм — каждый thread берёт node из очереди.

Selection: --select, --exclude, --selector парсятся через SelectionSpec (core/dbt/graph/selector_spec.py) и применяются перед стартом — selected_uids это Set[str] unique_id’ов, которые будут запущены.

Ключевые файлы:

  • core/dbt/task/runnable.pyGraphRunnableTask
  • core/dbt/task/run.pyRunTask, RunRunner
  • core/dbt/task/test.pyTestTask, TestRunner
  • core/dbt/task/snapshot.pySnapshotTask, SnapshotRunner
  • core/dbt/task/build.pyBuildTask (combo)
Селекторы: что значит +model, model+ и @model Slim CI: state:modified+ deep dive (dbt II)

Стадия 6: Runners

Каждый node-type имеет свой Runner. Для моделей — RunRunner (core/dbt/task/run.py).

# core/dbt/task/run.py (упрощённо)
class RunRunner(CompileRunner):
    def execute(self, model, manifest):
        # 1. Compile — резолвим Jinja в чистый SQL
        compiled_node = self.compile(manifest)
        # 2. Materialize — вызываем materialization macro
        materialization_macro = self.get_materialization(model)
        # 3. Execute materialization — рендерит и шлёт SQL
        result = MacroGenerator(materialization_macro, context)(model)
        return result

Compile phase: Jinja-источник модели рендерится с execute=False context — ref(), source(), var() доступны, но statement(), run_query(), adapter.dispatch() — undefined. Result — это compiled_node.compiled_code (чистый SQL без Jinja).

Materialize phase: materialization macro (например, materialization('table', adapter='duckdb')) вызывается с execute=True context. Внутри macro используется statement('main') для отправки реального SQL в warehouse.

Output: RunResult — dataclass с status, execution_time, adapter_response, message, добавляется в общий RunResultsArtifact.

Ключевые файлы:

  • core/dbt/task/run.pyRunRunner
  • core/dbt/task/base.pyBaseRunner, ExecutionContext
  • core/dbt/context/providers.pyProviderContext (контекст рендеринга)

Стадия 7: BaseAdapter

Финальная стадия — реальное выполнение SQL в warehouse. Через adapter API.

# core/dbt/adapters/base/impl.py (упрощённо, в современном dbt — отдельный package dbt-adapters)
class BaseAdapter:
    def execute(self, sql, auto_begin=False, fetch=False):
        connection = self.connections.get_thread_connection()
        return self.connections.execute(sql, auto_begin, fetch)

    def add_query(self, sql, ...):
        # Один SQL через connection
        ...

    def get_relation(self, database, schema, identifier):
        # Метаданные таблицы из information_schema
        ...

BaseAdapter — abstract. SQLAdapter — common implementation для SQL warehouses. Конкретные adapters (DuckDBAdapter, PostgresAdapter, SnowflakeAdapter) extends SQLAdapter.

ConnectionManager управляет connections (open/close/cancel/exception_handler). Thread-local — каждый thread получает свой connection.

Output: AdapterResponse — dataclass с rows_affected, code, _message — это попадает в run_results.json.

Ключевые файлы:

  • dbt-adapters/dbt/adapters/base/impl.pyBaseAdapter
  • dbt-adapters/dbt/adapters/sql/impl.pySQLAdapter
  • dbt-duckdb/dbt/adapters/duckdb/impl.pyDuckDBAdapter (читаемый пример adapter)
  • dbt-adapters/dbt/adapters/base/connections.pyBaseConnectionManager

Полный путь одного dbt run --select my_model

Соберём всё вместе. Команда — dbt run --select my_model, проект — небольшой с одним моделей.

Один dbt run от CLI до warehouse
CLIClick парсит 'run --select my_model', создаёт Flags. ctx.invoke(RunTask).
RuntimeConfigЧитает dbt_project.yml + profiles.yml. Merge vars. Резолвит target=dev -> DuckDBCredentials.
ManifestLoaderЕсли partial_parse.msgpack валиден — load. Иначе full parse через ModelParser/MacroParser/SchemaParser.
LinkerСтроит networkx DiGraph из Manifest.nodes. Cycle check. Сериализует в graph.gpickle.
RunTaskSelectionSpec парсит '--select my_model' -> selected_uids = {'model.my_project.my_model'}. GraphQueue.
RunRunnerCompile Jinja -> SQL. Lookup materialization('table') macro. MacroGenerator(...).
DuckDBAdapteradapter.execute('CREATE TABLE ...'). ConnectionManager -> DuckDB connection -> SQL executed.
warehouseDuckDB создаёт таблицу. Возвращает rows_affected. AdapterResponse -> RunResult -> run_results.json.

Это ~50000 строк Python-кода в dbt-core, плюс ~5000 в dbt-adapters, плюс ~3000 в каждом конкретном adapter — для того, чтобы выполнить одну простую модель. И это правильно для system такого scope: dbt поддерживает 30+ адаптеров, partial parsing, dbt Mesh, contracts, microbatch, semantic layer. Сложность распределена.

Наследование, MRO, C3 linearization в Python

Попробуй сам

  1. Клонируйте dbt-core локально.
    git clone https://github.com/dbt-labs/dbt-core
    cd dbt-core
    git checkout v1.11.5
  2. Откройте core/dbt/cli/main.py. Найдите команду run. Посмотрите как она зарегистрирована.
  3. Откройте core/dbt/task/run.py. Найдите класс RunTask. Посмотрите __init__ и run() методы.
  4. Откройте core/dbt/parser/manifest.py. Найдите класс ManifestLoader. Прочитайте load() метод — это ~50 строк, читается за 2 минуты.
  5. Поставьте breakpoint() в RunTask.run(). Запустите dbt run на любом тестовом проекте. Когда pdb сработает — введите dir(self) и посмотрите атрибуты RunTask. Найдите manifest, selected_uids, node_results.

После этой задачи pipeline перестанет быть абстракцией — будет конкретные файлы и классы в вашей IDE.


Проверка знанийKnowledge check
Вы запустили dbt run --select my_model. Команда отрабатывает, но в warehouse таблица не создаётся. Какая стадия pipeline могла дать сбой, и как локализовать?
ОтветAnswer
Несколько возможных мест сбоя. (1) Стадия 5 — SelectionSpec мог не найти my_model: возможно, вы в неправильном проекте, или модель не зарегистрирована в Manifest (например, отсутствует в model-paths). Проверка: dbt list --select my_model — если возвращает пусто, проблема здесь. (2) Стадия 6 — RunRunner мог не вызваться: возможно, у модели materialization='ephemeral' (ephemeral модели не создают таблиц, только CTE при ref на них). Проверка: компилируйте модель — dbt compile --select my_model — посмотрите target/compiled/.../my_model.sql, есть ли CREATE TABLE. (3) Стадия 7 — adapter.execute мог сработать, но в неправильной схеме: возможно target.schema указывает в dev_alice, а вы смотрите в prod. Проверка: dbt show --select my_model — посмотрите fully-qualified имя таблицы из логов. (4) Логи dbt-core — самое прямое: dbt run --debug выдаёт detailed event stream. В частности, finished_node event содержит rows_affected — если 0 или null, что-то странное. Production debug этой ситуации — последовательно проверять каждую стадию через инструменты dbt (list/compile/show/debug logs).

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какие 7 стадий проходит dbt run от bash до warehouse, в правильном порядке?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4