ConnectionManager: open, get_response, cancel, exception_handler
ConnectionManager — сердце adapter’а на Python-уровне. Он отвечает за жизненный цикл соединений с warehouse: открытие, выполнение, обработку ошибок, отмена, закрытие. Все остальные части (impl.py, Relation, Column) полагаются на ConnectionManager как на источник правды о warehouse state.
with-statement: __enter__ и __exit__В этом уроке — полный разбор каждого метода, который нужно реализовать.
Минимальный ConnectionManager
# dbt-oceanbase/dbt/adapters/oceanbase/connections.py
from contextlib import contextmanager
from typing import Optional, Tuple
import pymysql
from dbt.adapters.base import BaseConnectionManager
from dbt.adapters.contracts.connection import (
AdapterResponse,
Connection,
ConnectionState,
Credentials,
)
from dbt.adapters.exceptions import FailedToConnectError
class OceanBaseConnectionManager(BaseConnectionManager):
TYPE = 'oceanbase'
@classmethod
def open(cls, connection: Connection) -> Connection:
"""Open connection if not already open"""
if connection.state == ConnectionState.OPEN:
return connection
credentials = connection.credentials
try:
handle = pymysql.connect(
host=credentials.host,
port=credentials.port,
user=credentials.user,
password=credentials.password,
database=credentials.database,
connect_timeout=10,
)
except pymysql.Error as e:
raise FailedToConnectError(str(e))
connection.handle = handle
connection.state = ConnectionState.OPEN
return connection
@classmethod
def get_response(cls, cursor) -> AdapterResponse:
"""Build AdapterResponse from cursor after execute"""
return AdapterResponse(
_message='OK',
rows_affected=cursor.rowcount,
)
@contextmanager
def exception_handler(self, sql: str):
"""Wrap SQL execution, convert warehouse exceptions to dbt's"""
try:
yield
except pymysql.Error as e:
self.release()
raise RuntimeError(str(e))
def cancel(self, connection: Connection):
"""Cancel running query and close connection"""
connection.handle.close()
Четыре метода, каждый имеет специфическую роль. Разберём.
Метод 1: open — открытие соединения
@classmethod
def open(cls, connection: Connection) -> Connection:
if connection.state == ConnectionState.OPEN:
return connection
credentials = connection.credentials
try:
handle = pymysql.connect(
host=credentials.host,
port=credentials.port,
user=credentials.user,
password=credentials.password,
database=credentials.database,
connect_timeout=10,
)
except pymysql.Error as e:
raise FailedToConnectError(str(e))
connection.handle = handle
connection.state = ConnectionState.OPEN
return connection
Задача: открыть соединение с warehouse, attached к Connection объекту.
Параметры:
connection: Connection— объект с credentials и state. Уже создан dbt-core, нужно положить handle.
Логика:
-
Idempotent check: если уже OPEN — return без изменений. Connection caching полагается на это.
-
Credentials access:
connection.credentials— это вашOceanBaseCredentialsinstance. -
Connect через client library:
pymysql.connect(...)для OceanBase. Snowflake —snowflake.connector.connect(...). BigQuery —google.cloud.bigquery.Client(...). DuckDB —duckdb.connect(...). -
Error handling: catch warehouse-specific exception, raise
FailedToConnectError(dbt-стандартная). -
Assign handle:
connection.handle = handle.handle— это connection object из client library. -
Set state:
connection.state = ConnectionState.OPEN. dbt-core полагается на это для caching.
Class method: важно. dbt-core вызывает OceanBaseConnectionManager.open(conn) без instance. Это позволяет реconnect без recreating manager.
Производительность:
open может быть медленным (1-3 секунды для Snowflake). Тут происходят:
- TLS handshake
- Authentication (password / OAuth / private key)
- Session setup (set timezone, search_path, etc.)
dbt amortizes это через connection caching — open once, reuse across threads.
Расширенный open — с auth methods
Real-world adapter’ы поддерживают много auth methods. Snowflake-like:
@classmethod
def open(cls, connection: Connection) -> Connection:
if connection.state == ConnectionState.OPEN:
return connection
credentials = connection.credentials
# Build connection parameters
conn_params = {
'host': credentials.host,
'port': credentials.port,
'user': credentials.user,
'database': credentials.database,
}
# Auth method
if credentials.password:
conn_params['password'] = credentials.password
elif credentials.private_key_path:
with open(credentials.private_key_path, 'rb') as key_file:
conn_params['private_key'] = key_file.read()
elif credentials.auth_token:
conn_params['auth_token'] = credentials.auth_token
else:
raise FailedToConnectError('No authentication method provided')
# Optional connection settings
if credentials.connect_timeout:
conn_params['connect_timeout'] = credentials.connect_timeout
# SSL
if credentials.sslmode:
conn_params['ssl_mode'] = credentials.sslmode
# Retry logic
last_exception = None
for attempt in range(credentials.connect_retries + 1):
try:
handle = pymysql.connect(**conn_params)
connection.handle = handle
connection.state = ConnectionState.OPEN
return connection
except pymysql.Error as e:
last_exception = e
if attempt < credentials.connect_retries:
time.sleep(2 ** attempt) # Exponential backoff
continue
else:
raise FailedToConnectError(str(e))
raise FailedToConnectError(str(last_exception))
Дополнительная functionality:
- Multiple auth: password, private_key, OAuth token
- Retry с exponential backoff: для flaky connections
- Optional settings: timeout, SSL mode
Метод 2: get_response — извлечение результата
@classmethod
def get_response(cls, cursor) -> AdapterResponse:
return AdapterResponse(
_message='OK',
rows_affected=cursor.rowcount,
)
Задача: после cursor.execute(sql) извлечь metadata о результате (rows affected, code, message). Возвращается dbt-core и пишется в run_results.json.
Параметры:
cursor— cursor object из client library (post-execute state)
Структура AdapterResponse:
@dataclass
class AdapterResponse:
_message: str # Human-readable status
code: Optional[str] = None # Warehouse-specific code (если есть)
rows_affected: Optional[int] = None # Кол-во rows insert/update/delete
def __str__(self):
return self._message
Тебе нужно заполнить хотя бы _message и rows_affected.
Warehouse-specific examples:
MySQL/OceanBase:
return AdapterResponse(
_message='OK',
rows_affected=cursor.rowcount,
)
Snowflake:
return AdapterResponse(
_message=cursor.sfqid, # Snowflake query ID
code=cursor.errorcode, # SQL error code if any
rows_affected=cursor.rowcount,
)
BigQuery:
result = cursor.result()
return AdapterResponse(
_message=f'OK; job_id={result.job_id}',
rows_affected=result.total_rows_inserted or result.row_count,
)
Warehouse-specific metadata (Snowflake query ID, BigQuery job ID) полезны для observability и debugging.
Метод 3: exception_handler — обработка ошибок
@contextmanager
def exception_handler(self, sql: str):
try:
yield
except pymysql.Error as e:
self.release()
raise RuntimeError(str(e))
Задача: контекстный менеджер, оборачивающий SQL execution. Когда warehouse выбрасывает exception — convert в dbt-стандартное.
Использование (внутри dbt-core):
with self.exception_handler(sql):
cursor.execute(sql)
Если cursor.execute падает с warehouse-specific exception, exception_handler catches и raises dbt-friendly error.
Why this needed:
-
Standardize error types: dbt-core знает несколько exception классов (
DatabaseError,CompilationError,RuntimeError). Warehouse libraries имеют свои (pymysql.Error,snowflake.connector.errors.ProgrammingError, etc.). Convert один тип в другой. -
Release connection on error: если query упал, connection может быть в плохом состоянии (broken transaction, dropped connection).
self.release()cleans up. -
Better error messages: warehouse exceptions могут быть terse (‘SQL syntax error’). exception_handler может add context (SQL text, model name).
Production-grade version:
@contextmanager
def exception_handler(self, sql: str):
try:
yield
except pymysql.IntegrityError as e:
# Constraint violation, key conflict
self.release()
raise dbt.exceptions.DatabaseException(
f'IntegrityError in SQL: {sql[:200]}... ({e})'
)
except pymysql.ProgrammingError as e:
# Syntax error, wrong table, etc.
self.release()
raise dbt.exceptions.DatabaseException(
f'ProgrammingError: {e}\nSQL: {sql}'
)
except pymysql.OperationalError as e:
# Connection lost, server died
self.release()
raise dbt.exceptions.DatabaseException(
f'OperationalError (connection issue): {e}'
)
except pymysql.Error as e:
# Catch-all
self.release()
raise dbt.exceptions.DatabaseException(str(e))
except Exception as e:
# Non-warehouse exception
self.release()
raise e
Разные warehouse exceptions classified differently. Это помогает dbt-core решать retry logic и error reporting.
Метод 4: cancel — отмена query
def cancel(self, connection: Connection):
connection.handle.close()
Задача: отменить running query, обычно при dbt cancel или SIGINT (Ctrl+C).
Параметры:
connection: Connection— connection с running query
Простейший: close connection. Warehouse cancels query когда client disconnects.
Better — warehouse-specific cancel:
Snowflake:
def cancel(self, connection: Connection):
cursor = connection.handle.cursor()
cursor.execute(f'SELECT SYSTEM$CANCEL_QUERY(\\'{cursor.sfqid}\\')')
Snowflake поддерживает SYSTEM$CANCEL_QUERY(query_id) — отменяет query без disconnect. Faster.
Postgres:
def cancel(self, connection: Connection):
pid = connection.backend_pid
# Connect via second connection and pg_cancel_backend(pid)
second_conn = psycopg2.connect(...)
second_cur = second_conn.cursor()
second_cur.execute(f'SELECT pg_cancel_backend({pid})')
Postgres имеет pg_cancel_backend(pid) — gentle cancel.
BigQuery:
def cancel(self, connection: Connection):
job = connection.handle.job # Last running job
if job and not job.done():
job.cancel()
Зачем custom cancel vs просто close:
-
Cleanup: warehouse может оставить running query на server side даже если client disconnected. Explicit cancel освобождает resources.
-
Faster recovery: explicit cancel освобождает connection быстрее, чем waiting for timeout.
-
Observability: warehouse logs cancel reason. Лучше для audit.
standardize_grants_dict — для GRANT/REVOKE
Если warehouse поддерживает grants (Postgres, Snowflake, BigQuery), вам нужен метод для normalize grants format:
def standardize_grants_dict(self, grants_table: agate.Table) -> dict:
"""
Convert warehouse's grants table to standard dict format.
Used by dbt to compute grant diffs.
"""
grants_dict: Dict[str, List[str]] = {}
for row in grants_table:
grantee = row['grantee']
privilege = row['privilege_type']
if privilege in grants_dict:
grants_dict[privilege].append(grantee)
else:
grants_dict[privilege] = [grantee]
return grants_dict
Standard format:
{
'select': ['analyst_role', 'finance_role'],
'insert': ['etl_role'],
'delete': ['admin_role'],
}
Why standardize:
dbt’s grants config выглядит так:
models:
+grants:
select: ['analyst_role']
dbt сравнивает это с current grants on warehouse to compute diff. standardize_grants_dict нормализует warehouse-specific format (которые могут быть rows, JSON, etc.) к стандартному dict.
Если ваш warehouse не поддерживает grants — opt-out:
# В impl.py
class MyAdapter(SQLAdapter):
@classmethod
def capabilities(cls) -> AdapterCapabilities:
return AdapterCapabilities.from_dict({
'grants': False,
})
Тогда dbt не будет генерировать grant SQL для вашего adapter’а.
Threading и connection isolation
ConnectionManager тред-aware. dbt с threads: 8 запускает 8 parallel threads, каждый получает свой connection.
BaseConnectionManager имеет встроенную поддержку:
class BaseConnectionManager:
def __init__(self, profile):
self.profile = profile
self.thread_connections = {} # Dict[thread_id, Connection]
def get_thread_connection(self) -> Connection:
thread_id = threading.get_ident()
if thread_id in self.thread_connections:
return self.thread_connections[thread_id]
# Create new connection for this thread
conn = self.acquire_connection(...)
self.thread_connections[thread_id] = conn
return conn
Это automatic. Вам не надо делать threading вручную в open — base class handles.
Connection caching: ConnectionManager использует _connection_keys (см. прошлый урок) для решения, sharing ли connections между threads.
Полный production-grade ConnectionManager
import time
from contextlib import contextmanager
from typing import Optional, Tuple, Dict, List
import pymysql
import agate
from dbt.adapters.base import BaseConnectionManager
from dbt.adapters.contracts.connection import (
AdapterResponse,
Connection,
ConnectionState,
)
from dbt.adapters.events.logging import AdapterLogger
from dbt.adapters.exceptions import FailedToConnectError
from dbt.exceptions import DatabaseException, RuntimeException
logger = AdapterLogger('OceanBase')
class OceanBaseConnectionManager(BaseConnectionManager):
TYPE = 'oceanbase'
@classmethod
def open(cls, connection: Connection) -> Connection:
if connection.state == ConnectionState.OPEN:
logger.debug('Connection already open, reusing')
return connection
credentials = connection.credentials
conn_params = {
'host': credentials.host,
'port': credentials.port,
'user': credentials.user,
'password': credentials.password,
'database': credentials.database,
'connect_timeout': 10,
'autocommit': False,
}
if credentials.tenant:
user_str = f'{credentials.user}@{credentials.tenant}'
if credentials.cluster:
user_str += f'#{credentials.cluster}'
conn_params['user'] = user_str
last_exception = None
for attempt in range(credentials.connect_retries + 1):
try:
logger.debug(f'Connecting to OceanBase: {credentials.host}:{credentials.port}')
handle = pymysql.connect(**conn_params)
connection.handle = handle
connection.state = ConnectionState.OPEN
return connection
except pymysql.Error as e:
last_exception = e
if attempt < credentials.connect_retries:
backoff = 2 ** attempt
logger.warning(
f'Connection attempt {attempt + 1} failed: {e}. '
f'Retrying in {backoff}s...'
)
time.sleep(backoff)
continue
else:
logger.error(f'All connection attempts failed: {e}')
raise FailedToConnectError(str(e))
raise FailedToConnectError(str(last_exception))
@classmethod
def get_response(cls, cursor) -> AdapterResponse:
return AdapterResponse(
_message='OK',
rows_affected=cursor.rowcount,
code=str(cursor.connection.thread_id()) if hasattr(cursor.connection, 'thread_id') else None,
)
@contextmanager
def exception_handler(self, sql: str):
try:
yield
except pymysql.ProgrammingError as e:
self.release()
error_code = e.args[0] if e.args else 'unknown'
error_msg = e.args[1] if len(e.args) > 1 else str(e)
raise DatabaseException(
f'OceanBase programming error ({error_code}): {error_msg}\n'
f'SQL: {sql[:500]}'
)
except pymysql.IntegrityError as e:
self.release()
raise DatabaseException(f'OceanBase integrity error: {e}')
except pymysql.OperationalError as e:
self.release()
raise DatabaseException(f'OceanBase operational error (connection?): {e}')
except pymysql.Error as e:
self.release()
raise DatabaseException(str(e))
except Exception as e:
logger.error(f'Non-database exception in exception_handler: {e}')
self.release()
raise e
def cancel(self, connection: Connection):
try:
cursor = connection.handle.cursor()
thread_id = connection.handle.thread_id()
cursor.execute(f'KILL QUERY {thread_id}')
except pymysql.Error as e:
logger.warning(f'Failed to kill query on OceanBase: {e}')
connection.handle.close()
def standardize_grants_dict(self, grants_table: agate.Table) -> dict:
grants_dict: Dict[str, List[str]] = {}
for row in grants_table:
privilege = row['privilege_type']
grantee = row['grantee']
if privilege in grants_dict:
grants_dict[privilege].append(grantee)
else:
grants_dict[privilege] = [grantee]
return grants_dict
Это production-quality ConnectionManager. Включает:
- Retry с exponential backoff
- Tenant / cluster support для OceanBase
- Granular exception classification
- Specific cancel through KILL QUERY
- Logging для observability
- standardize_grants_dict для grants
Попробуй сам
-
Создайте minimal ConnectionManager:
class MyConnectionManager(BaseConnectionManager): TYPE = 'myadapter' @classmethod def open(cls, connection): # TODO: ваш warehouse client pass @classmethod def get_response(cls, cursor): return AdapterResponse(_message='OK', rows_affected=cursor.rowcount) @contextmanager def exception_handler(self, sql): try: yield except Exception as e: raise def cancel(self, connection): connection.handle.close() -
Для тестового warehouse используйте SQLite (через
sqlite3library):import sqlite3 @classmethod def open(cls, connection): if connection.state == ConnectionState.OPEN: return connection handle = sqlite3.connect(connection.credentials.path) connection.handle = handle connection.state = ConnectionState.OPEN return connection -
Connect через dbt:
dbt debug --profiles-dir . --profile myadapter-testДолжно показать ‘OK’ для каждого check.
-
Bonus: добавьте retry logic в open(). Test с wrong credentials — должен делать 3 attempts с backoff.
Ключевые выводы
-
ConnectionManager — 4 главных метода:
open,get_response,exception_handler,cancel. Плюсstandardize_grants_dictесли warehouse имеет grants. -
open— idempotent, fast path для existing connection. Builds connection через client library. RaiseFailedToConnectErroron failure. -
get_response— extract metadata после execute. ReturnsAdapterResponseс_message,rows_affected,code. -
exception_handler— context manager. Catches warehouse exceptions, converts to dbt’sDatabaseException. Granular classification (ProgrammingError, IntegrityError, etc.). -
cancel— extends close connection. Warehouse-specific (SnowflakeSYSTEM$CANCEL_QUERY, Postgrespg_cancel_backend, BigQueryjob.cancel()). -
Threading:
BaseConnectionManagerhandles thread isolation._connection_keys(в Credentials) определяет sharing. -
standardize_grants_dict— normalize warehouse grants format to standard dict. Used для grant diff computation.