Column API: маппинг warehouse types в dbt types
Column — это представление одного столбца в Python. Когда dbt-core делает introspection таблицы (get_columns_in_relation), он получает список Column объектов. Кастомизация Column нужна для warehouse-specific type handling.
В этом уроке — анатомия Column class, type mapping, как кастомизировать под warehouse.
Структура Column
# dbt-adapters/dbt/adapters/base/column.py
@dataclass
class Column:
column: str # column name
dtype: str # data type ('VARCHAR(255)', 'INTEGER', etc.)
char_size: Optional[int] = None # для VARCHAR(255)
numeric_precision: Optional[int] = None # для DECIMAL(15, 2) — 15
numeric_scale: Optional[int] = None # для DECIMAL(15, 2) — 2
Базовая instance:
col = Column(
column='customer_id',
dtype='INTEGER',
)
col_varchar = Column(
column='name',
dtype='VARCHAR',
char_size=255,
)
col_decimal = Column(
column='price',
dtype='DECIMAL',
numeric_precision=15,
numeric_scale=2,
)
type_label_for vs translate_type
Column class имеет два main methods для type rendering:
data_type property — full type string:
col = Column(column='price', dtype='DECIMAL', numeric_precision=15, numeric_scale=2)
print(col.data_type)
# DECIMAL(15, 2)
col = Column(column='name', dtype='VARCHAR', char_size=255)
print(col.data_type)
# VARCHAR(255)
Includes precision/scale/char_size.
translate_type(dtype) classmethod — convert dtype string между warehouses:
class Column:
@classmethod
def translate_type(cls, dtype: str) -> str:
return TRANSLATE_TYPES.get(dtype, dtype)
Default — identity (no translation). Override в adapter-specific Column.
Adapter-specific Column
# dbt-postgres/dbt/adapters/postgres/column.py
class PostgresColumn(Column):
@classmethod
def translate_type(cls, dtype: str) -> str:
# Convert ANSI -> Postgres-specific
TYPE_MAP = {
'TEXT': 'text',
'INTEGER': 'integer',
'BIGINT': 'bigint',
'BOOLEAN': 'boolean',
'DOUBLE': 'double precision',
'TIMESTAMP': 'timestamp without time zone',
'TIMESTAMPTZ': 'timestamp with time zone',
}
return TYPE_MAP.get(dtype.upper(), dtype)
# dbt-snowflake/dbt/adapters/snowflake/column.py
class SnowflakeColumn(Column):
@classmethod
def translate_type(cls, dtype: str) -> str:
TYPE_MAP = {
'TEXT': 'VARCHAR(16777216)', # Snowflake max VARCHAR
'STRING': 'VARCHAR(16777216)',
'INTEGER': 'NUMBER(38, 0)', # Snowflake INTEGER is alias to NUMBER
'BIGINT': 'NUMBER(38, 0)',
'DOUBLE': 'FLOAT',
'TIMESTAMP': 'TIMESTAMP_NTZ',
'TIMESTAMPTZ': 'TIMESTAMP_LTZ',
}
return TYPE_MAP.get(dtype.upper(), dtype)
# dbt-bigquery/dbt/adapters/bigquery/column.py
class BigQueryColumn(Column):
@classmethod
def translate_type(cls, dtype: str) -> str:
TYPE_MAP = {
'TEXT': 'STRING',
'INTEGER': 'INT64',
'BIGINT': 'INT64',
'DOUBLE': 'FLOAT64',
'BOOLEAN': 'BOOL',
'TIMESTAMP': 'TIMESTAMP',
}
return TYPE_MAP.get(dtype.upper(), dtype)
Each warehouse использует свою type system.
Type rendering в materializations
Когда dbt builds CREATE TABLE statement:
{# Default materialization table #}
CREATE TABLE {{ relation }} (
{%- for col in columns -%}
{{ col.column }} {{ col.dtype }}
{%- if not loop.last %}, {% endif %}
{%- endfor -%}
)
col.dtype returns warehouse-specific type via translate_type. Если ваш Column class имеет proper translations — generates valid SQL.
Без override:
-- На Snowflake:
CREATE TABLE x (id INTEGER, name TEXT);
-- [X] INTEGER не Snowflake native (it's NUMBER), TEXT не stand
С override:
-- Через SnowflakeColumn.translate_type:
CREATE TABLE x (id NUMBER(38, 0), name VARCHAR(16777216));
-- [OK] Snowflake-native types
Adapter type conversions (impl.py)
Дополнительно к Column class, Adapter имеет convert_*_type classmethods. Used для CSV -> SQL когда dbt seed runs:
class MyAdapter(SQLAdapter):
@classmethod
def convert_text_type(cls, agate_table, col_idx):
return 'TEXT'
@classmethod
def convert_number_type(cls, agate_table, col_idx):
# Check если column имеет decimals
decimals = agate_table.aggregate(agate.MaxPrecision(col_idx))
if decimals:
return 'DOUBLE'
return 'BIGINT'
@classmethod
def convert_boolean_type(cls, agate_table, col_idx):
return 'BOOLEAN'
@classmethod
def convert_datetime_type(cls, agate_table, col_idx):
return 'TIMESTAMP'
@classmethod
def convert_date_type(cls, agate_table, col_idx):
return 'DATE'
@classmethod
def convert_time_type(cls, agate_table, col_idx):
return 'TIME'
agate — Python data analysis library used by dbt for CSV processing. agate_table.aggregate(...) computes column stats.
Example flow:
1. dbt seed reads CSV: countries.csv
2. agate.Table created from CSV
3. For each column, dbt picks Python type (str, int, float, bool, datetime)
4. Maps к SQL type через convert_*_type
5. CREATE TABLE с SQL types
6. INSERT FROM csv values
Adapter-specific examples:
dbt-postgres:
@classmethod
def convert_text_type(cls, agate_table, col_idx):
return 'text'
@classmethod
def convert_number_type(cls, agate_table, col_idx):
decimals = agate_table.aggregate(agate.MaxPrecision(col_idx))
return 'numeric' if decimals else 'integer'
dbt-snowflake:
@classmethod
def convert_text_type(cls, agate_table, col_idx):
return 'TEXT'
dbt-bigquery:
@classmethod
def convert_text_type(cls, agate_table, col_idx):
return 'STRING'
@classmethod
def convert_number_type(cls, agate_table, col_idx):
decimals = agate_table.aggregate(agate.MaxPrecision(col_idx))
return 'FLOAT64' if decimals else 'INT64'
Column-level information в introspection
Когда get_columns_in_relation returns columns, каждая Column instance имеет full info:
columns = adapter.get_columns_in_relation(relation)
for col in columns:
print(f'{col.column}: {col.dtype}')
print(f' char_size: {col.char_size}')
print(f' numeric_precision: {col.numeric_precision}')
print(f' numeric_scale: {col.numeric_scale}')
Used в:
- Contracts — verify column types match
data_typeв schema.yml - Catalog generation —
dbt docs generateincludes types - Schema sync — incremental on_schema_change handles new columns
Quote column names
Some warehouses требуют quoting для special-character column names:
class Column:
def quoted(self) -> str:
return f'\"{self.column}\"'
# Or override per adapter:
class SnowflakeColumn(Column):
def quoted(self) -> str:
# Snowflake quoting trap — only quote if mixed case or special chars
if self.column.lower() != self.column or '"' in self.column:
return f'\"{self.column}\"'
return self.column
В materialization:
{{ col.quoted }} {# yields quoted column name #}
Capabilities and special types
Some warehouses имеют non-standard types:
Postgres:
JSON,JSONB,UUID,INET,CIDR, arrays (INTEGER[])geometry,geography(PostGIS)
Snowflake:
VARIANT— JSON-likeOBJECT,ARRAY— semi-structuredGEOGRAPHY,GEOMETRY
BigQuery:
STRUCT<...>,ARRAY<...>GEOGRAPHY
DuckDB:
LIST<...>,STRUCT<...>MAP<key, value>INTERVAL
If your adapter needs these — extend Column class:
class MyAdapterColumn(Column):
@property
def is_json(self) -> bool:
return self.dtype in ('JSON', 'JSONB', 'VARIANT')
@property
def is_array(self) -> bool:
return self.dtype.endswith('[]') or self.dtype.startswith('ARRAY<')
@classmethod
def translate_type(cls, dtype):
# Standard ANSI -> MyAdapter
# Plus special types
return TYPE_MAP.get(dtype.upper(), dtype)
Custom logic visible в macros через {% if col.is_json %}.
Production-grade Column class
Full example для гипотетического warehouse OceanBase:
# dbt-oceanbase/dbt/adapters/oceanbase/column.py
from dataclasses import dataclass
from typing import Optional
from dbt.adapters.base import Column
@dataclass
class OceanBaseColumn(Column):
@classmethod
def translate_type(cls, dtype: str) -> str:
TYPE_MAP = {
'TEXT': 'TEXT',
'STRING': 'TEXT',
'INTEGER': 'BIGINT',
'BIGINT': 'BIGINT',
'DOUBLE': 'DOUBLE',
'BOOLEAN': 'BOOLEAN',
'TIMESTAMP': 'TIMESTAMP',
'TIMESTAMPTZ': 'TIMESTAMP', # OceanBase doesn't distinguish
'DATE': 'DATE',
'TIME': 'TIME',
'JSON': 'JSON',
}
return TYPE_MAP.get(dtype.upper(), dtype)
@property
def is_string(self) -> bool:
return self.dtype.upper() in ('TEXT', 'VARCHAR', 'CHAR', 'STRING')
@property
def is_numeric(self) -> bool:
return self.dtype.upper() in (
'INT', 'BIGINT', 'SMALLINT', 'INTEGER',
'DOUBLE', 'FLOAT', 'NUMERIC', 'DECIMAL'
)
@property
def is_integer(self) -> bool:
return self.dtype.upper() in ('INT', 'BIGINT', 'SMALLINT', 'INTEGER')
@property
def can_expand_to(self, other_column):
# Can my type expand to accommodate other_column's type?
# Used для incremental schema sync
if self.dtype == other_column.dtype:
return True
# OceanBase-specific compatibility rules
...
is_string, is_numeric, etc. — convenience properties used в macros.
Попробуй сам
- В Python REPL:
from dbt.adapters.base import Column
col = Column(column='price', dtype='DECIMAL', numeric_precision=15, numeric_scale=2)
print(col.data_type)
# DECIMAL(15, 2)
col2 = Column(column='name', dtype='VARCHAR', char_size=255)
print(col2.data_type)
# VARCHAR(255)
- Try translate_type:
print(Column.translate_type('TEXT'))
# TEXT (default — identity)
# Subclass for specific warehouse
class SnowflakeColumn(Column):
@classmethod
def translate_type(cls, dtype):
TYPE_MAP = {'TEXT': 'VARCHAR(16777216)', 'INTEGER': 'NUMBER(38, 0)'}
return TYPE_MAP.get(dtype.upper(), dtype)
print(SnowflakeColumn.translate_type('TEXT'))
# VARCHAR(16777216)
print(SnowflakeColumn.translate_type('INTEGER'))
# NUMBER(38, 0)
- В dbt project test seed:
mkdir -p seeds/
cat > seeds/test_seed.csv << EOF
id,name,price,is_active
1,Alice,99.99,true
2,Bob,49.99,false
EOF
dbt seed --select test_seed
# Check created table types — should be appropriate per adapter
- Inspect через
dbt docs generate+ catalog.json:
dbt docs generate
# Then view target/catalog.json — see column types per relation
@dataclass: память и удобство Наследование и MRO: PostgresColumn extends Column
Ключевые выводы
-
Column = (column, dtype, char_size, numeric_precision, numeric_scale) — represents single column.
-
data_typeproperty — returns rendered type with precision/scale (DECIMAL(15, 2)). -
translate_typeclassmethod — convert ANSI/dbt types в warehouse-specific. Override per adapter. -
convert_*_typemethods на Adapter — дляdbt seedCSV -> SQL type mapping. -
Special types (JSON, ARRAY, STRUCT, geography) — extend Column class с convenience properties.
-
Used в:
- Materializations (CREATE TABLE column types)
- Contracts (verify data_type matches)
- Catalog generation (
dbt docs) - Schema sync (incremental on_schema_change)
-
Production-grade: implement all standard types + warehouse-specific properties.