Путь B: Свой dbt adapter from scratch
Если вы выбрали этот путь — это самый technically demanding capstone. Цель — написать functional dbt adapter from scratch, который passes dbt-tests-adapter suite. Realistic timeline: 30-40 часов.
Что вы получите:
- Глубокое понимание dbt internals на уровне «писал сам».
- Реальный артефакт — installable Python package.
- Potentially useful для community (если warehouse не имеет first-class support).
- Portfolio piece, который однозначно демонстрирует senior level.
В этом уроке мы пройдём процесс создания dbt-sqlite (как референс) — простейший viable adapter для local dev и learning. Вы можете применить тот же подход к dbt-pglite, dbt-clickhouse-light, или собственному target warehouse.
pyproject.toml — PEP 517/518/621; build backends; tomllib
Step 1: Choose target warehouse
В rest этого урока примеры на dbt-sqlite. Same techniques применимы к любой warehouse.
Step 2: Setup adapter project
dbt Labs provides cookiecutter template:
pip install cookiecutter
cookiecutter https://github.com/dbt-labs/dbt-adapter-cookiecutter
Interactive prompts:
project_name: dbt-sqlite
author_name: Your Name
author_email: [email protected]
github_username: yourname
adapter_name: sqlite
support_python_version: 3.10
Generates structure:
dbt-sqlite/
├── pyproject.toml
├── README.md
├── LICENSE
├── .github/workflows/ # CI templates
├── dbt/adapters/sqlite/
│ ├── __init__.py
│ ├── connections.py # ConnectionManager
│ ├── impl.py # SQLiteAdapter (Adapter API)
│ ├── credentials.py # SQLiteCredentials
│ └── relation.py # SQLiteRelation
├── dbt/include/sqlite/
│ ├── macros/ # adapter-specific macros
│ ├── profile_template.yml
│ └── dbt_project.yml
└── tests/
├── functional/ # dbt-tests-adapter integration tests
└── unit/ # internal tests
cd dbt-sqlite
pip install -e ".[dev]"
Step 3: Credentials class
dbt/adapters/sqlite/credentials.py:
from dataclasses import dataclass
from typing import Optional
from dbt.adapters.contracts.connection import Credentials
@dataclass
class SQLiteCredentials(Credentials):
"""SQLite adapter credentials."""
# Required fields (defined by Credentials base):
# database: str
# schema: str
# SQLite-specific:
database_path: str = ":memory:"
# Override defaults
threads: int = 1
ALIASES = {
"db_path": "database_path", # alternative name
"path": "database_path", # short form
}
@property
def type(self) -> str:
return "sqlite"
@property
def unique_field(self) -> str:
return self.database_path
def _connection_keys(self) -> tuple:
"""Fields shown в `dbt debug` output."""
return ("database", "schema", "database_path", "threads")
Что важно:
@dataclass— Python data class.- Inheritance from Credentials — base class в
dbt.adapters.contracts.connection. - ALIASES — alternative field names users can use в
profiles.yml. typeproperty — adapter name.unique_field— какое поле identifies unique connection (для caching)._connection_keys— fields shown вdbt debugoutput.
Step 4: ConnectionManager
dbt/adapters/sqlite/connections.py:
import sqlite3
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Optional
from dbt.adapters.contracts.connection import (
AdapterResponse,
Connection,
ConnectionState,
)
from dbt.adapters.sql import SQLConnectionManager
from dbt.adapters.sqlite.credentials import SQLiteCredentials
@dataclass
class SQLiteAdapterResponse(AdapterResponse):
pass
class SQLiteConnectionManager(SQLConnectionManager):
TYPE = "sqlite"
@classmethod
def open(cls, connection: Connection) -> Connection:
if connection.state == ConnectionState.OPEN:
return connection
credentials: SQLiteCredentials = connection.credentials
try:
handle = sqlite3.connect(
credentials.database_path,
isolation_level=None, # autocommit mode
)
handle.execute("PRAGMA foreign_keys = ON")
connection.handle = handle
connection.state = ConnectionState.OPEN
return connection
except Exception as e:
connection.state = ConnectionState.FAIL
raise dbt.exceptions.FailedToConnectError(str(e))
@classmethod
def get_response(cls, cursor) -> SQLiteAdapterResponse:
rows_affected = cursor.rowcount if cursor.rowcount > 0 else 0
return SQLiteAdapterResponse(
_message=f"SUCCESS {rows_affected}",
rows_affected=rows_affected,
)
def cancel(self, connection: Connection):
"""SQLite doesn't support cancellation."""
pass
@contextmanager
def exception_handler(self, sql: str):
try:
yield
except sqlite3.DatabaseError as e:
raise dbt.exceptions.DatabaseError(str(e))
except Exception as e:
raise dbt.exceptions.RuntimeError(str(e))
Key methods:
open— establishes connection. Returns connection с state OPEN.get_response— после query execution, returns AdapterResponse with metadata (rows affected).cancel— cancel running query (SQLite doesn’t support, no-op).exception_handler— context manager wrapping query execution, translates exceptions.
Step 5: Adapter implementation (impl.py)
dbt/adapters/sqlite/impl.py:
from typing import List, Optional
from dbt.adapters.base.column import Column
from dbt.adapters.base.impl import BaseAdapter
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.sqlite.connections import SQLiteConnectionManager
from dbt.adapters.sqlite.credentials import SQLiteCredentials
from dbt.adapters.sqlite.relation import SQLiteRelation
class SQLiteAdapter(SQLAdapter):
ConnectionManager = SQLiteConnectionManager
Relation = SQLiteRelation
@classmethod
def date_function(cls) -> str:
return "CURRENT_TIMESTAMP"
@classmethod
def is_cancelable(cls) -> bool:
return False # SQLite не cancel
def list_schemas(self, database: str) -> List[str]:
# SQLite не имеет multiple schemas — return single
return ["main"]
def check_schema_exists(self, database: str, schema: str) -> bool:
return schema == "main"
def drop_schema(self, relation: BaseRelation) -> None:
# SQLite — no schemas, no-op
pass
def create_schema(self, relation: BaseRelation) -> None:
# SQLite — no schemas, no-op
pass
SQLAdapter extends BaseAdapter для SQL-based warehouses. Provides default implementations для quoting, identifier resolution, etc.
Step 6: Relation class
dbt/adapters/sqlite/relation.py:
from dataclasses import dataclass
from dbt.adapters.base.relation import BaseRelation, Policy
@dataclass
class SQLiteQuotePolicy(Policy):
database: bool = False
schema: bool = False
identifier: bool = True
@dataclass
class SQLiteIncludePolicy(Policy):
database: bool = False
schema: bool = False
identifier: bool = True
@dataclass(frozen=True, eq=False, repr=False)
class SQLiteRelation(BaseRelation):
quote_policy: Policy = SQLiteQuotePolicy()
include_policy: Policy = SQLiteIncludePolicy()
Relation represents database object (table, view, etc.) с naming and quoting rules. SQLite — no databases или schemas, only identifiers (table names).
Step 7: Required macros
dbt expects 20+ macros в dbt/include/sqlite/macros/.
dbt/include/sqlite/macros/adapters.sql:
{% macro sqlite__get_columns_in_relation(relation) -%}
{% call statement('get_columns_in_relation', fetch_result=True) %}
SELECT
name AS column_name,
type AS data_type,
0 AS character_maximum_length,
0 AS numeric_precision,
0 AS numeric_scale
FROM pragma_table_info('{{ relation.identifier }}')
ORDER BY cid
{% endcall %}
{% set table = load_result('get_columns_in_relation').table %}
{{ return(sql_convert_columns_in_relation(table)) }}
{%- endmacro %}
{% macro sqlite__list_relations_without_caching(schema_relation) -%}
{% call statement('list_relations', fetch_result=True) %}
SELECT
'main' AS database,
name AS identifier,
'main' AS schema,
CASE
WHEN type = 'table' THEN 'table'
WHEN type = 'view' THEN 'view'
ELSE 'table'
END AS table_type
FROM sqlite_master
WHERE type IN ('table', 'view')
{% endcall %}
{{ return(load_result('list_relations').table) }}
{%- endmacro %}
{% macro sqlite__create_view_as(relation, sql) -%}
CREATE VIEW {{ relation.identifier }} AS {{ sql }}
{%- endmacro %}
{% macro sqlite__create_table_as(temporary, relation, sql) -%}
CREATE TABLE {{ relation.identifier }} AS {{ sql }}
{%- endmacro %}
{% macro sqlite__drop_relation(relation) -%}
DROP {{ relation.type }} IF EXISTS {{ relation.identifier }}
{%- endmacro %}
{% macro sqlite__rename_relation(from_relation, to_relation) -%}
ALTER TABLE {{ from_relation.identifier }} RENAME TO {{ to_relation.identifier }}
{%- endmacro %}
{% macro sqlite__truncate_relation(relation) -%}
DELETE FROM {{ relation.identifier }}
{%- endmacro %}
{% macro sqlite__current_timestamp() -%}
CURRENT_TIMESTAMP
{%- endmacro %}
Это минимум для basic functionality. Полный list adapter macros — около 25-30. Полный список в dbt-core/include/global_project/macros/adapters/.
Step 8: Profile template
dbt/include/sqlite/profile_template.yml:
fixed:
type: sqlite
prompts:
database_path:
hint: "path to your SQLite database file"
threads:
hint: "number of threads (typically 1 for SQLite due to single-writer)"
default: 1
type: int
This guides dbt init setup process when user creates new project.
Step 9: Tests — dbt-tests-adapter
dbt Labs provides standardized adapter test suite — dbt-tests-adapter:
pip install dbt-tests-adapter
tests/functional/conftest.py:
import pytest
@pytest.fixture(scope="class")
def dbt_profile_target():
return {
"type": "sqlite",
"threads": 1,
"database": "test_db",
"schema": "main",
"database_path": ":memory:",
}
tests/functional/adapter/test_basic.py:
import pytest
from dbt.tests.adapter.basic.test_base import BaseSimpleMaterializations
from dbt.tests.adapter.basic.test_singular_tests import BaseSingularTests
from dbt.tests.adapter.basic.test_singular_tests_ephemeral import BaseSingularTestsEphemeral
from dbt.tests.adapter.basic.test_empty import BaseEmpty
from dbt.tests.adapter.basic.test_ephemeral import BaseEphemeral
from dbt.tests.adapter.basic.test_incremental import BaseIncremental
from dbt.tests.adapter.basic.test_generic_tests import BaseGenericTests
class TestSimpleMaterializationsSQLite(BaseSimpleMaterializations):
pass
class TestSingularTestsSQLite(BaseSingularTests):
pass
class TestEmptySQLite(BaseEmpty):
pass
class TestEphemeralSQLite(BaseEphemeral):
pass
class TestIncrementalSQLite(BaseIncremental):
pass
class TestGenericTestsSQLite(BaseGenericTests):
pass
Run tests:
pytest tests/functional/ -v
Initial run — почти все fail. Это normal. You’ll fix one at a time.
Step 10: Iterate, fix failures
pytest tests/functional/adapter/test_basic.py::TestSimpleMaterializationsSQLite -v
Failure example:
FAILED tests/functional/adapter/test_basic.py::TestSimpleMaterializationsSQLite::test_base
AttributeError: SQLiteAdapter has no attribute 'create_table'
Add the method to impl.py:
class SQLiteAdapter(SQLAdapter):
# ... existing code ...
def create_table(self, relation: BaseRelation, sql: str):
sql = f"CREATE TABLE {relation.identifier} AS {sql}"
self.execute(sql, auto_begin=False)
Re-run, fix next failure. Iterate.
This is most time-consuming step. Realistic ~15-20 часов.
Step 11: Custom macros for SQLite quirks
SQLite has limitations vs full SQL warehouses. Handle these в adapter macros:
Quirk 1: SQLite не supports INFORMATION_SCHEMA. Use pragma_table_info instead (already в get_columns_in_relation).
Quirk 2: SQLite не supports schemas. Override all schema-related macros to be no-ops.
Quirk 3: SQLite uses || для concatenation, не CONCAT(). Override concat():
{% macro sqlite__concat(fields) -%}
{{ fields|join(' || ') }}
{%- endmacro %}
Quirk 4: SQLite date functions different. Override:
{% macro sqlite__dateadd(datepart, interval, from_date_or_timestamp) %}
DATETIME({{ from_date_or_timestamp }}, '+{{ interval }} {{ datepart }}')
{% endmacro %}
Полный list quirks специфичен warehouse. Common — найти в other community adapters.
Step 12: Package and publish
pyproject.toml:
[project]
name = "dbt-sqlite"
version = "0.1.0"
description = "dbt adapter for SQLite"
readme = "README.md"
requires-python = ">=3.10"
authors = [{name = "Your Name", email = "[email protected]"}]
dependencies = [
"dbt-adapters>=1.0.0",
"dbt-common>=1.0.0",
]
[project.optional-dependencies]
dev = [
"pytest",
"dbt-tests-adapter",
]
[project.urls]
Repository = "https://github.com/yourname/dbt-sqlite"
Build и (optionally) publish:
# Build
pip install build
python -m build
# Test install locally
pip install dist/dbt_sqlite-0.1.0-py3-none-any.whl
# Use в a dbt project
dbt init test_project --adapter sqlite
# (Optional) Publish to PyPI
pip install twine
twine upload dist/*
Step 13: Trusted Adapter Program (опционально)
Если хотите ваш adapter formally recognized:
- Read Trusted Adapter Program на dbt docs.
- Requirements:
- Public GitHub repo.
- CI с dbt-tests-adapter passing.
- Documentation README с setup, usage examples.
- Active maintainer (you).
- License compatible (Apache 2.0 typically).
- Submit application via dbt-labs/trusted-adapters repo.
- Wait for review (weeks to months).
- If approved — your adapter listed на dbt docs site.
This dramatic visibility boost. Even community-trusted (без formal program) adapters get usage.
Common challenges
Minimum viable adapter checklist
To complete capstone, adapter должен:
- Pass
BaseSimpleMaterializationstest class. - Pass
BaseEmptytest class. - Pass
BaseSingularTeststest class. - Have working
dbt initflow с profile template. - Support
dbt debug(validate connection). - Support
dbt runдля simple model. - GitHub repo с README, CI/CD.
- Tests в repo (passing).
- Installable via pip from your repo.
Stretch goals:
- Pass full
dbt-tests-adaptersuite. - Support incremental materializations.
- Support snapshots.
- PyPI publication.
- Trusted Adapter Program application.
Итого
- Path B capstone = functional adapter, passes core dbt-tests-adapter tests.
- Choose simple warehouse для first adapter: SQLite recommended.
- Setup: cookiecutter template + hatch для dev environment.
- Implement core classes: Credentials, ConnectionManager, Adapter, Relation.
- Write required macros (25+) для adapter-specific behavior.
- dbt-tests-adapter suite — iterate, fix test failures one at a time.
- Custom macros для warehouse quirks (DDL limitations, case sensitivity, etc.).
- Package и publish через pyproject.toml + PyPI (optional).
- Trusted Adapter Program (optional) для formal recognition.
- Common challenges: macro namespace conventions, dbt-tests-adapter version compatibility, DBAPI 2.0 differences, transaction handling, cache invalidation.
- Ship at 80-90% с honest documentation. Don’t perfectionism kill project.
- Realistic timeline: 30-40 часов для minimum viable adapter.
Next: урок 04 для path C (optimization), или урок 05 если path B complete.