Learning Platform
Глоссарий Troubleshooting
Урок 09.04 · 25 мин
Продвинутый
AdapterConnectionManagerConnectionsThreading

ConnectionManager: open, get_response, cancel, exception_handler

ConnectionManager — сердце adapter’а на Python-уровне. Он отвечает за жизненный цикл соединений с warehouse: открытие, выполнение, обработку ошибок, отмена, закрытие. Все остальные части (impl.py, Relation, Column) полагаются на ConnectionManager как на источник правды о warehouse state.

with-statement: __enter__ и __exit__

В этом уроке — полный разбор каждого метода, который нужно реализовать.


Минимальный ConnectionManager

# dbt-oceanbase/dbt/adapters/oceanbase/connections.py
from contextlib import contextmanager
from typing import Optional, Tuple

import pymysql

from dbt.adapters.base import BaseConnectionManager
from dbt.adapters.contracts.connection import (
    AdapterResponse,
    Connection,
    ConnectionState,
    Credentials,
)
from dbt.adapters.exceptions import FailedToConnectError


class OceanBaseConnectionManager(BaseConnectionManager):
    TYPE = 'oceanbase'

    @classmethod
    def open(cls, connection: Connection) -> Connection:
        """Open connection if not already open"""
        if connection.state == ConnectionState.OPEN:
            return connection

        credentials = connection.credentials
        try:
            handle = pymysql.connect(
                host=credentials.host,
                port=credentials.port,
                user=credentials.user,
                password=credentials.password,
                database=credentials.database,
                connect_timeout=10,
            )
        except pymysql.Error as e:
            raise FailedToConnectError(str(e))

        connection.handle = handle
        connection.state = ConnectionState.OPEN
        return connection

    @classmethod
    def get_response(cls, cursor) -> AdapterResponse:
        """Build AdapterResponse from cursor after execute"""
        return AdapterResponse(
            _message='OK',
            rows_affected=cursor.rowcount,
        )

    @contextmanager
    def exception_handler(self, sql: str):
        """Wrap SQL execution, convert warehouse exceptions to dbt's"""
        try:
            yield
        except pymysql.Error as e:
            self.release()
            raise RuntimeError(str(e))

    def cancel(self, connection: Connection):
        """Cancel running query and close connection"""
        connection.handle.close()

Четыре метода, каждый имеет специфическую роль. Разберём.


Метод 1: open — открытие соединения

@classmethod
def open(cls, connection: Connection) -> Connection:
    if connection.state == ConnectionState.OPEN:
        return connection

    credentials = connection.credentials
    try:
        handle = pymysql.connect(
            host=credentials.host,
            port=credentials.port,
            user=credentials.user,
            password=credentials.password,
            database=credentials.database,
            connect_timeout=10,
        )
    except pymysql.Error as e:
        raise FailedToConnectError(str(e))

    connection.handle = handle
    connection.state = ConnectionState.OPEN
    return connection

Задача: открыть соединение с warehouse, attached к Connection объекту.

Параметры:

  • connection: Connection — объект с credentials и state. Уже создан dbt-core, нужно положить handle.

Логика:

  1. Idempotent check: если уже OPEN — return без изменений. Connection caching полагается на это.

  2. Credentials access: connection.credentials — это ваш OceanBaseCredentials instance.

  3. Connect через client library: pymysql.connect(...) для OceanBase. Snowflake — snowflake.connector.connect(...). BigQuery — google.cloud.bigquery.Client(...). DuckDB — duckdb.connect(...).

  4. Error handling: catch warehouse-specific exception, raise FailedToConnectError (dbt-стандартная).

  5. Assign handle: connection.handle = handle. handle — это connection object из client library.

  6. Set state: connection.state = ConnectionState.OPEN. dbt-core полагается на это для caching.

Class method: важно. dbt-core вызывает OceanBaseConnectionManager.open(conn) без instance. Это позволяет реconnect без recreating manager.

Производительность:

open может быть медленным (1-3 секунды для Snowflake). Тут происходят:

  • TLS handshake
  • Authentication (password / OAuth / private key)
  • Session setup (set timezone, search_path, etc.)

dbt amortizes это через connection caching — open once, reuse across threads.


Расширенный open — с auth methods

Real-world adapter’ы поддерживают много auth methods. Snowflake-like:

@classmethod
def open(cls, connection: Connection) -> Connection:
    if connection.state == ConnectionState.OPEN:
        return connection

    credentials = connection.credentials
    
    # Build connection parameters
    conn_params = {
        'host': credentials.host,
        'port': credentials.port,
        'user': credentials.user,
        'database': credentials.database,
    }
    
    # Auth method
    if credentials.password:
        conn_params['password'] = credentials.password
    elif credentials.private_key_path:
        with open(credentials.private_key_path, 'rb') as key_file:
            conn_params['private_key'] = key_file.read()
    elif credentials.auth_token:
        conn_params['auth_token'] = credentials.auth_token
    else:
        raise FailedToConnectError('No authentication method provided')
    
    # Optional connection settings
    if credentials.connect_timeout:
        conn_params['connect_timeout'] = credentials.connect_timeout
    
    # SSL
    if credentials.sslmode:
        conn_params['ssl_mode'] = credentials.sslmode
    
    # Retry logic
    last_exception = None
    for attempt in range(credentials.connect_retries + 1):
        try:
            handle = pymysql.connect(**conn_params)
            connection.handle = handle
            connection.state = ConnectionState.OPEN
            return connection
        except pymysql.Error as e:
            last_exception = e
            if attempt < credentials.connect_retries:
                time.sleep(2 ** attempt)  # Exponential backoff
                continue
            else:
                raise FailedToConnectError(str(e))
    
    raise FailedToConnectError(str(last_exception))

Дополнительная functionality:

  • Multiple auth: password, private_key, OAuth token
  • Retry с exponential backoff: для flaky connections
  • Optional settings: timeout, SSL mode

Метод 2: get_response — извлечение результата

@classmethod
def get_response(cls, cursor) -> AdapterResponse:
    return AdapterResponse(
        _message='OK',
        rows_affected=cursor.rowcount,
    )

Задача: после cursor.execute(sql) извлечь metadata о результате (rows affected, code, message). Возвращается dbt-core и пишется в run_results.json.

Параметры:

  • cursor — cursor object из client library (post-execute state)

Структура AdapterResponse:

@dataclass
class AdapterResponse:
    _message: str                         # Human-readable status
    code: Optional[str] = None            # Warehouse-specific code (если есть)
    rows_affected: Optional[int] = None   # Кол-во rows insert/update/delete
    
    def __str__(self):
        return self._message

Тебе нужно заполнить хотя бы _message и rows_affected.

Warehouse-specific examples:

MySQL/OceanBase:

return AdapterResponse(
    _message='OK',
    rows_affected=cursor.rowcount,
)

Snowflake:

return AdapterResponse(
    _message=cursor.sfqid,                # Snowflake query ID
    code=cursor.errorcode,                 # SQL error code if any
    rows_affected=cursor.rowcount,
)

BigQuery:

result = cursor.result()
return AdapterResponse(
    _message=f'OK; job_id={result.job_id}',
    rows_affected=result.total_rows_inserted or result.row_count,
)

Warehouse-specific metadata (Snowflake query ID, BigQuery job ID) полезны для observability и debugging.


Метод 3: exception_handler — обработка ошибок

@contextmanager
def exception_handler(self, sql: str):
    try:
        yield
    except pymysql.Error as e:
        self.release()
        raise RuntimeError(str(e))

Задача: контекстный менеджер, оборачивающий SQL execution. Когда warehouse выбрасывает exception — convert в dbt-стандартное.

Использование (внутри dbt-core):

with self.exception_handler(sql):
    cursor.execute(sql)

Если cursor.execute падает с warehouse-specific exception, exception_handler catches и raises dbt-friendly error.

Why this needed:

  1. Standardize error types: dbt-core знает несколько exception классов (DatabaseError, CompilationError, RuntimeError). Warehouse libraries имеют свои (pymysql.Error, snowflake.connector.errors.ProgrammingError, etc.). Convert один тип в другой.

  2. Release connection on error: если query упал, connection может быть в плохом состоянии (broken transaction, dropped connection). self.release() cleans up.

  3. Better error messages: warehouse exceptions могут быть terse (‘SQL syntax error’). exception_handler может add context (SQL text, model name).

Production-grade version:

@contextmanager
def exception_handler(self, sql: str):
    try:
        yield
    except pymysql.IntegrityError as e:
        # Constraint violation, key conflict
        self.release()
        raise dbt.exceptions.DatabaseException(
            f'IntegrityError in SQL: {sql[:200]}... ({e})'
        )
    except pymysql.ProgrammingError as e:
        # Syntax error, wrong table, etc.
        self.release()
        raise dbt.exceptions.DatabaseException(
            f'ProgrammingError: {e}\nSQL: {sql}'
        )
    except pymysql.OperationalError as e:
        # Connection lost, server died
        self.release()
        raise dbt.exceptions.DatabaseException(
            f'OperationalError (connection issue): {e}'
        )
    except pymysql.Error as e:
        # Catch-all
        self.release()
        raise dbt.exceptions.DatabaseException(str(e))
    except Exception as e:
        # Non-warehouse exception
        self.release()
        raise e

Разные warehouse exceptions classified differently. Это помогает dbt-core решать retry logic и error reporting.


Метод 4: cancel — отмена query

def cancel(self, connection: Connection):
    connection.handle.close()

Задача: отменить running query, обычно при dbt cancel или SIGINT (Ctrl+C).

Параметры:

  • connection: Connection — connection с running query

Простейший: close connection. Warehouse cancels query когда client disconnects.

Better — warehouse-specific cancel:

Snowflake:

def cancel(self, connection: Connection):
    cursor = connection.handle.cursor()
    cursor.execute(f'SELECT SYSTEM$CANCEL_QUERY(\\'{cursor.sfqid}\\')')

Snowflake поддерживает SYSTEM$CANCEL_QUERY(query_id) — отменяет query без disconnect. Faster.

Postgres:

def cancel(self, connection: Connection):
    pid = connection.backend_pid
    # Connect via second connection and pg_cancel_backend(pid)
    second_conn = psycopg2.connect(...)
    second_cur = second_conn.cursor()
    second_cur.execute(f'SELECT pg_cancel_backend({pid})')

Postgres имеет pg_cancel_backend(pid) — gentle cancel.

BigQuery:

def cancel(self, connection: Connection):
    job = connection.handle.job   # Last running job
    if job and not job.done():
        job.cancel()

Зачем custom cancel vs просто close:

  1. Cleanup: warehouse может оставить running query на server side даже если client disconnected. Explicit cancel освобождает resources.

  2. Faster recovery: explicit cancel освобождает connection быстрее, чем waiting for timeout.

  3. Observability: warehouse logs cancel reason. Лучше для audit.


standardize_grants_dict — для GRANT/REVOKE

Если warehouse поддерживает grants (Postgres, Snowflake, BigQuery), вам нужен метод для normalize grants format:

def standardize_grants_dict(self, grants_table: agate.Table) -> dict:
    """
    Convert warehouse's grants table to standard dict format.
    Used by dbt to compute grant diffs.
    """
    grants_dict: Dict[str, List[str]] = {}
    
    for row in grants_table:
        grantee = row['grantee']
        privilege = row['privilege_type']
        
        if privilege in grants_dict:
            grants_dict[privilege].append(grantee)
        else:
            grants_dict[privilege] = [grantee]
    
    return grants_dict

Standard format:

{
    'select': ['analyst_role', 'finance_role'],
    'insert': ['etl_role'],
    'delete': ['admin_role'],
}

Why standardize:

dbt’s grants config выглядит так:

models:
  +grants:
    select: ['analyst_role']

dbt сравнивает это с current grants on warehouse to compute diff. standardize_grants_dict нормализует warehouse-specific format (которые могут быть rows, JSON, etc.) к стандартному dict.

Если ваш warehouse не поддерживает grants — opt-out:

# В impl.py
class MyAdapter(SQLAdapter):
    @classmethod
    def capabilities(cls) -> AdapterCapabilities:
        return AdapterCapabilities.from_dict({
            'grants': False,
        })

Тогда dbt не будет генерировать grant SQL для вашего adapter’а.


Threading и connection isolation

ConnectionManager тред-aware. dbt с threads: 8 запускает 8 parallel threads, каждый получает свой connection.

BaseConnectionManager имеет встроенную поддержку:

class BaseConnectionManager:
    def __init__(self, profile):
        self.profile = profile
        self.thread_connections = {}   # Dict[thread_id, Connection]
    
    def get_thread_connection(self) -> Connection:
        thread_id = threading.get_ident()
        if thread_id in self.thread_connections:
            return self.thread_connections[thread_id]
        
        # Create new connection for this thread
        conn = self.acquire_connection(...)
        self.thread_connections[thread_id] = conn
        return conn

Это automatic. Вам не надо делать threading вручную в open — base class handles.

Connection caching: ConnectionManager использует _connection_keys (см. прошлый урок) для решения, sharing ли connections между threads.


Полный production-grade ConnectionManager

import time
from contextlib import contextmanager
from typing import Optional, Tuple, Dict, List

import pymysql
import agate

from dbt.adapters.base import BaseConnectionManager
from dbt.adapters.contracts.connection import (
    AdapterResponse,
    Connection,
    ConnectionState,
)
from dbt.adapters.events.logging import AdapterLogger
from dbt.adapters.exceptions import FailedToConnectError
from dbt.exceptions import DatabaseException, RuntimeException


logger = AdapterLogger('OceanBase')


class OceanBaseConnectionManager(BaseConnectionManager):
    TYPE = 'oceanbase'

    @classmethod
    def open(cls, connection: Connection) -> Connection:
        if connection.state == ConnectionState.OPEN:
            logger.debug('Connection already open, reusing')
            return connection

        credentials = connection.credentials

        conn_params = {
            'host': credentials.host,
            'port': credentials.port,
            'user': credentials.user,
            'password': credentials.password,
            'database': credentials.database,
            'connect_timeout': 10,
            'autocommit': False,
        }

        if credentials.tenant:
            user_str = f'{credentials.user}@{credentials.tenant}'
            if credentials.cluster:
                user_str += f'#{credentials.cluster}'
            conn_params['user'] = user_str

        last_exception = None
        for attempt in range(credentials.connect_retries + 1):
            try:
                logger.debug(f'Connecting to OceanBase: {credentials.host}:{credentials.port}')
                handle = pymysql.connect(**conn_params)
                connection.handle = handle
                connection.state = ConnectionState.OPEN
                return connection
            except pymysql.Error as e:
                last_exception = e
                if attempt < credentials.connect_retries:
                    backoff = 2 ** attempt
                    logger.warning(
                        f'Connection attempt {attempt + 1} failed: {e}. '
                        f'Retrying in {backoff}s...'
                    )
                    time.sleep(backoff)
                    continue
                else:
                    logger.error(f'All connection attempts failed: {e}')
                    raise FailedToConnectError(str(e))

        raise FailedToConnectError(str(last_exception))

    @classmethod
    def get_response(cls, cursor) -> AdapterResponse:
        return AdapterResponse(
            _message='OK',
            rows_affected=cursor.rowcount,
            code=str(cursor.connection.thread_id()) if hasattr(cursor.connection, 'thread_id') else None,
        )

    @contextmanager
    def exception_handler(self, sql: str):
        try:
            yield
        except pymysql.ProgrammingError as e:
            self.release()
            error_code = e.args[0] if e.args else 'unknown'
            error_msg = e.args[1] if len(e.args) > 1 else str(e)
            raise DatabaseException(
                f'OceanBase programming error ({error_code}): {error_msg}\n'
                f'SQL: {sql[:500]}'
            )
        except pymysql.IntegrityError as e:
            self.release()
            raise DatabaseException(f'OceanBase integrity error: {e}')
        except pymysql.OperationalError as e:
            self.release()
            raise DatabaseException(f'OceanBase operational error (connection?): {e}')
        except pymysql.Error as e:
            self.release()
            raise DatabaseException(str(e))
        except Exception as e:
            logger.error(f'Non-database exception in exception_handler: {e}')
            self.release()
            raise e

    def cancel(self, connection: Connection):
        try:
            cursor = connection.handle.cursor()
            thread_id = connection.handle.thread_id()
            cursor.execute(f'KILL QUERY {thread_id}')
        except pymysql.Error as e:
            logger.warning(f'Failed to kill query on OceanBase: {e}')
            connection.handle.close()

    def standardize_grants_dict(self, grants_table: agate.Table) -> dict:
        grants_dict: Dict[str, List[str]] = {}
        for row in grants_table:
            privilege = row['privilege_type']
            grantee = row['grantee']
            if privilege in grants_dict:
                grants_dict[privilege].append(grantee)
            else:
                grants_dict[privilege] = [grantee]
        return grants_dict

Это production-quality ConnectionManager. Включает:

  • Retry с exponential backoff
  • Tenant / cluster support для OceanBase
  • Granular exception classification
  • Specific cancel through KILL QUERY
  • Logging для observability
  • standardize_grants_dict для grants

Попробуй сам

  1. Создайте minimal ConnectionManager:

    class MyConnectionManager(BaseConnectionManager):
        TYPE = 'myadapter'
        
        @classmethod
        def open(cls, connection):
            # TODO: ваш warehouse client
            pass
        
        @classmethod
        def get_response(cls, cursor):
            return AdapterResponse(_message='OK', rows_affected=cursor.rowcount)
        
        @contextmanager
        def exception_handler(self, sql):
            try:
                yield
            except Exception as e:
                raise
        
        def cancel(self, connection):
            connection.handle.close()
  2. Для тестового warehouse используйте SQLite (через sqlite3 library):

    import sqlite3
    
    @classmethod
    def open(cls, connection):
        if connection.state == ConnectionState.OPEN:
            return connection
        
        handle = sqlite3.connect(connection.credentials.path)
        connection.handle = handle
        connection.state = ConnectionState.OPEN
        return connection
  3. Connect через dbt:

    dbt debug --profiles-dir . --profile myadapter-test

    Должно показать ‘OK’ для каждого check.

  4. Bonus: добавьте retry logic в open(). Test с wrong credentials — должен делать 3 attempts с backoff.


Ключевые выводы

  1. ConnectionManager — 4 главных метода: open, get_response, exception_handler, cancel. Плюс standardize_grants_dict если warehouse имеет grants.

  2. open — idempotent, fast path для existing connection. Builds connection через client library. Raise FailedToConnectError on failure.

  3. get_response — extract metadata после execute. Returns AdapterResponse с _message, rows_affected, code.

  4. exception_handler — context manager. Catches warehouse exceptions, converts to dbt’s DatabaseException. Granular classification (ProgrammingError, IntegrityError, etc.).

  5. cancel — extends close connection. Warehouse-specific (Snowflake SYSTEM$CANCEL_QUERY, Postgres pg_cancel_backend, BigQuery job.cancel()).

  6. Threading: BaseConnectionManager handles thread isolation. _connection_keys (в Credentials) определяет sharing.

  7. standardize_grants_dict — normalize warehouse grants format to standard dict. Used для grant diff computation.

Проверка знанийKnowledge check
Senior пишет ConnectionManager. После dbt run завершения некоторые connections остаются open и not released (memory leak). Где debugging?
ОтветAnswer
Несколько потенциальных причин и debug strategies.\n\n**Cause 1 — exception_handler не вызывает release()**:\n\n```python\n@contextmanager\ndef exception_handler(self, sql):\n try:\n yield\n except Exception as e:\n # Missing: self.release()\n raise\n```\n\nЕсли exception throws inside yielded block, connection остаётся в `thread_connections` dict. Не cleaned up.\n\n**Fix**:\n\n```python\nexcept Exception as e:\n self.release() # ← Free connection back to pool\n raise\n```\n\n**Cause 2 — connection не закрывается после use**:\n\nbase `BaseConnectionManager` имеет `release()` метод который returns connection to pool. Но если ваш custom code calls `get_thread_connection()` без соответствующего `release()` — connection leaks.\n\nVerify:\n\n```python\n# В вашем impl.py — где используется connection\ndef execute_macro_query(self, ...):\n conn = self.connections.get_thread_connection()\n cursor = conn.handle.cursor()\n cursor.execute(sql)\n # Missing: self.connections.release() ← leaks!\n```\n\nFix: use context manager pattern:\n\n```python\nwith self.connection_named(name):\n # Connection automatically released\n cursor.execute(sql)\n```\n\n**Cause 3 — cancel() не closes handle**:\n\n```python\ndef cancel(self, connection):\n try:\n cursor = connection.handle.cursor()\n cursor.execute('KILL QUERY ...')\n except Exception:\n pass\n # Missing: connection.handle.close()\n```\n\nЕсли KILL succeeded, query cancelled, но connection handle ещё open. Should close after.\n\n**Fix**:\n\n```python\ndef cancel(self, connection):\n try:\n cursor = connection.handle.cursor()\n cursor.execute('KILL QUERY ...')\n except Exception as e:\n logger.warning(f'Cancel failed: {e}')\n finally:\n connection.handle.close()\n connection.state = ConnectionState.CLOSED\n```\n\n**Cause 4 — connection state not updated**:\n\nЕсли вы close connection через `connection.handle.close()` но забыли `connection.state = ConnectionState.CLOSED` — dbt-core думает connection ещё OPEN, не освобождает slot в pool. Eventually pool fills up, all connections 'busy' но actually broken.\n\n**Fix**: всегда update state when changing actual state:\n\n```python\nconnection.handle.close()\nconnection.state = ConnectionState.CLOSED # ← critical\n```\n\n**Cause 5 — Long-running queries не interrupted**:\n\nDBT с `Ctrl+C` calls cancel() на all running connections. If your cancel() blocks (например, KILL QUERY hangs because server unresponsive), thread doesn't return — connection не released.\n\n**Fix**: timeout на cancel:\n\n```python\nimport signal\nfrom contextlib import contextmanager\n\n@contextmanager\ndef timeout_after(seconds):\n def handler(signum, frame):\n raise TimeoutError(f'Timed out after {seconds}s')\n old = signal.signal(signal.SIGALRM, handler)\n signal.alarm(seconds)\n try:\n yield\n finally:\n signal.alarm(0)\n signal.signal(signal.SIGALRM, old)\n\ndef cancel(self, connection):\n try:\n with timeout_after(5):\n cursor = connection.handle.cursor()\n cursor.execute('KILL QUERY ...')\n except TimeoutError:\n logger.warning('Cancel timed out, force closing')\n finally:\n connection.handle.close()\n connection.state = ConnectionState.CLOSED\n```\n\n**Debug strategies**:\n\n**Strategy 1 — Log connection lifecycle**:\n\nAdd verbose logging:\n\n```python\n@classmethod\ndef open(cls, connection):\n logger.info(f'OPENING connection thread={threading.get_ident()}')\n # ... open logic ...\n logger.info(f'OPENED connection {id(connection.handle)}')\n return connection\n\ndef cancel(self, connection):\n logger.info(f'CANCELING connection {id(connection.handle)}')\n # ... cancel logic ...\n logger.info(f'CANCELED connection')\n```\n\nЗапустите `dbt --debug run` и смотрите logs. Should see OPEN followed by CANCEL/CLOSE. Если есть OPEN без CLOSE — leak.\n\n**Strategy 2 — Count connections**:\n\nMonitor warehouse connection count:\n\n```sql\n-- Snowflake\nSELECT COUNT(*) FROM table(information_schema.query_history())\nWHERE end_time IS NULL AND user_name = 'dbt_user'\n\n-- Postgres\nSELECT COUNT(*) FROM pg_stat_activity WHERE usename = 'dbt_user'\n```\n\nRun `dbt run` несколько times. Count should be stable (idle connections released).\n\n**Strategy 3 — pytest detect leaks**:\n\n```python\ndef test_no_connection_leaks():\n manager = MyConnectionManager(profile)\n \n for _ in range(100):\n conn = manager.acquire_connection('test')\n # ... use ...\n manager.release()\n \n # All released\n assert len(manager.thread_connections) == 0\n```\n\nFails если есть leaks.\n\n**Strategy 4 — Use connection pool monitoring**:\n\nWarehouse-side metrics обычно показывают connection count over time. Spike during dbt run, decay after — normal. Plateau высокий — leak.\n\n**Production discipline**:\n\n1. **Always release in exception_handler**.\n2. **Always close in cancel**.\n3. **Always update connection.state** when changing actual state.\n4. **Log connection lifecycle** for debug.\n5. **Test для leaks** в pytest.\n6. **Monitor warehouse-side** в production.\n\nЭто **invisible plumbing** — пользователь не видит, но bugs critically affect production reliability.
Проверка знанийKnowledge check
Production scenario: Snowflake dbt run периодически зависает на 30 секунд. Debug strategy?
ОтветAnswer
Многоэтапный debug. Connection management и Snowflake-specific issues.\n\n**Possible causes**:\n\n**Cause 1 — Network latency / connection setup**:\n\nSnowflake connections take 1-3 seconds via TLS. Если adapter не cache connections — каждый thread может wait независимо.\n\nDebug:\n\n```bash\ndbt --debug run 2>&1 | grep -E '(OPENING|OPENED|CANCEL|CLOSE)' | head -50\n```\n\nИщите много OPENINGs без OPENs — connections hanging.\n\nFix: убедиться что `_connection_keys` правильно настроен, connection caching работает (см. прошлый урок quiz).\n\n**Cause 2 — Warehouse paused / resuming**:\n\nSnowflake AUTO_SUSPEND. Если warehouse paused, первый query waits while warehouse resumes (10-30 seconds).\n\nDebug:\n\n```sql\nSHOW WAREHOUSES;\n-- Check state, auto_suspend setting\n```\n\nIf warehouse 'SUSPENDED', first query blocks waiting for resume.\n\nFix:\n\n- `auto_suspend` higher (e.g., 600 seconds)\n- `initially_suspended: false`\n- Pre-warm warehouse before dbt run:\n\n```bash\nsf_query "USE WAREHOUSE my_wh; SELECT 1;" # Resumes warehouse\ndbt run # No wait, warehouse already running\n```\n\n**Cause 3 — Query queuing**:\n\nSnowflake warehouse имеет concurrent query limit. Default L size — 8 concurrent. If dbt с threads: 8 + ad-hoc queries — newer queries queue.\n\nDebug:\n\n```sql\nSELECT * FROM table(information_schema.query_history())\nWHERE execution_status = 'QUEUED'\nORDER BY start_time DESC LIMIT 20;\n```\n\nFix:\n\n- Reduce `threads` в profile (e.g., 4)\n- Multi-cluster warehouse (auto-scale)\n- Schedule dbt off-peak hours\n\n**Cause 4 — Long network calls в open()**:\n\nЕсли `open()` does много setup (set role, set warehouse, set timezone) — each step = round-trip.\n\n```python\n@classmethod\ndef open(cls, connection):\n handle = snowflake.connector.connect(...)\n \n # Each cursor.execute = round-trip\n cursor = handle.cursor()\n cursor.execute('USE ROLE analyst') # ← network\n cursor.execute('USE WAREHOUSE my_wh') # ← network \n cursor.execute('USE DATABASE analytics') # ← network\n cursor.execute('USE SCHEMA marts') # ← network\n cursor.execute("ALTER SESSION SET TIMEZONE='UTC'") # ← network\n \n # Total: 5 round-trips × 200ms = 1 second per connection\n```\n\nFix: combine в один statement:\n\n```python\ncursor.execute('''\n USE ROLE analyst;\n USE WAREHOUSE my_wh;\n USE DATABASE analytics;\n USE SCHEMA marts;\n ALTER SESSION SET TIMEZONE='UTC';\n''')\n```\n\nOr use connection parameters при connect:\n\n```python\nhandle = snowflake.connector.connect(\n role='analyst',\n warehouse='my_wh',\n database='analytics',\n schema='marts',\n session_parameters={'TIMEZONE': 'UTC'},\n)\n# All set атомарно в connect — single round-trip\n```\n\n**Cause 5 — Slow queries hold connections**:\n\nЕсли одна модель занимает 5 minutes, и threads=4 — все 4 threads могут wait на slow queries.\n\nDebug: `dbt run --debug 2>&1 | grep -E 'Running model|Finished model'`. Compare start/end times.\n\nFix:\n\n- Optimize SQL (indexes, partitioning, materialization choice)\n- Increase warehouse size (Snowflake)\n- Slim CI (`--select state:modified+`) — run only changed\n\n**Cause 6 — Connection cancellation hangs**:\n\nIf `cancel()` hangs (network issue), thread blocks indefinitely.\n\nDebug: add timeout to cancel (см. предыдущий quiz).\n\n**Cause 7 — Slowly-resolving DNS**:\n\nIf Snowflake hostname uses cloud DNS with stale entries — first connection takes long, subsequent fast.\n\nDebug:\n\n```bash\ntime dig my-account.snowflakecomputing.com\n```\n\nFix: cache DNS, use IP if static.\n\n**Cause 8 — Snowflake-side metadata operations**:\n\n`SHOW TABLES` / `information_schema queries` могут be slow на больших databases.\n\nDebug:\n\n```sql\nSELECT * FROM table(information_schema.query_history())\nWHERE query_text LIKE '%information_schema%'\nORDER BY total_elapsed_time DESC LIMIT 10;\n```\n\nIf information_schema queries take 10+ seconds — это slow.\n\nFix:\n\n- Limit schemas в `schemas:` (don't scan everything)\n- Use partial parsing (dbt-core 1.4+)\n- Pre-compute manifest, share across runs\n\n**Systematic debug protocol**:\n\n1. `dbt --debug run` — get verbose logs\n2. Time individual queries: dbt's run_results.json has `execution_time` per model\n3. Check Snowflake query_history during dbt run\n4. Profile connection setup: log time для `open()`\n5. Monitor warehouse load\n\nProduction observability: integrate с dbt-cloud / Elementary / custom logging.\n\nЭто **systematic engineering**, не just 'restart dbt'.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Какие 4 главных метода в ConnectionManager и зачем каждый?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5