Learning Platform
Урок 08.06 · 22 мин
Начальный
SQLAlchemyConnection poolEngineETLSQL
CREATE TABLE: типы, NOT NULL, DEFAULT, IDENTITY ETL: Extract, Transform, Load — классическая архитектура

Зачем поверх psycopg ещё одна библиотека

В прошлом уроке psycopg3 справился со всем — параметризованные запросы, транзакции, COPY. Зачем тогда городить ещё SQLAlchemy?

Один аргумент:

connection pool
. Открыть соединение к Postgres — операция дорогая (TCP + TLS + auth, обычно 50-200 мс). Если у вас Airflow-таск выполняет 1000 запросов — это +50 секунд только на коннекты. С пулом — открыли 5 коннектов один раз, тысячу запросов прогнали через них.

psycopg сам пул не делает. Чтобы получить пул на чистом psycopg, нужно либо ставить отдельную psycopg-pool, либо использовать pgbouncer снаружи, либо писать свою обвязку.

SQLAlchemy даёт пул из коробки + удобный DSL для динамических запросов + единый API под все БД. Это — главные причины. Не «потому что круче».

Второй аргумент — переносимость. Если завтра ваш ETL перевозят с Postgres на ClickHouse / DuckDB / Snowflake — SQLAlchemy-код потребует минимум изменений (поменяется только URL и пара специфичных типов). Чистый psycopg такого не даст.

Core vs ORM — обязательно отличать

SQLAlchemy
— это два разных слоя:

  • Core — низкоуровневый. Engine, connection, raw SQL через text() или builder (select(table).where(...)). Никаких Python-классов как «модели». Похоже на psycopg, только с пулом и кросс-БД.
  • ORM — поверх Core. Python-классы соответствуют таблицам, экземпляры — строкам. Запросы через session.query(User).filter_by(name="Alice"). Удобно для бэкенда с rich domain (User, Order, Invoice — много логики).

Для DE-задач

ORM
почти всегда перебор:

  • ETL — это пакетная обработка, не CRUD над отдельными объектами.
  • Аналитические запросы — это сложные SELECT с агрегатами, JOIN-ами, оконными функциями. ORM на них только мешает.
  • Запросы пишутся вручную (Senior DE пишет SQL свободно), не генерируются из объектной модели.

Поэтому в курсе мы изучаем только Core. ORM — это middle/web-direction, у нас Python 02 и SQL 01.

Установка и первый запрос

uv add "sqlalchemy>=2.0" "psycopg[binary]"

SQLAlchemy 2.x — целевая версия в 2026. У 2.0 был большой апдейт API (2023), и старые туториалы под 1.4 много чего напишут иначе. Внимательно читайте даты, когда гуглите.

from sqlalchemy import create_engine, text

engine = create_engine("postgresql+psycopg://etl_user:secret@localhost:5432/etl_db")

with engine.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    print(result.scalar())

Что происходит:

  1. create_engine(url)
    — создаёт engine. Это не соединение! Это объект, который умеет открывать соединения и держит пул.
  2. engine.connect() — берёт коннект из пула (если нет — создаёт). Возвращает Connection.
  3. text("SELECT ...") — оборачивает raw SQL в SQLAlchemy-объект, чтобы драйвер знал, что это SQL, а не безопасная строка.
  4. result.scalar() — берёт одно значение из единственной строки/колонки.
  5. На выходе из with коннект возвращается в пул, не закрывается.

URL у engine — это dialect+driver://user:pass@host:port/dbname. Для Postgres с psycopg3:

postgresql+psycopg://...

Для psycopg2 (legacy) — postgresql+psycopg2://.... Для SQLite — sqlite:///path/to/file.db. Для DuckDB — duckdb:///path.duckdb. Один и тот же код, разный URL.

text() и параметры

Raw SQL — самый часто используемый стиль в DE:

from sqlalchemy import text

with engine.connect() as conn:
    result = conn.execute(
        text("SELECT id, name FROM users WHERE age > :min_age"),
        {"min_age": 18},
    )
    for row in result:
        print(row.id, row.name)

Различия с psycopg:

  • Плейсхолдер — :name, не %s. SQLAlchemy сам подменит на правильный синтаксис драйвера.
  • Параметры всегда в словаре (даже один параметр).
  • Доступ к колонкам по имени (row.id) работает из коробки.

Для bulk-операций (много строк с тем же запросом) — список словарей:

conn.execute(
    text("INSERT INTO users (name, age) VALUES (:name, :age)"),
    [
        {"name": "Alice", "age": 25},
        {"name": "Bob", "age": 30},
        {"name": "Charlie", "age": 35},
    ],
)

SQLAlchemy под капотом сделает

executemany
. На небольших объёмах (до тысячи строк) — нормально; для гигабайт всё равно нужен COPY (об этом ниже).

Engine vs Connection — разница важна

# ПЛОХО — на каждом execute открывается новый коннект из пула
for user_id in user_ids:
    result = engine.execute(text("..."))    # SQLAlchemy 1.x; в 2.x этого даже нет
# ХОРОШО — один коннект на весь блок
with engine.connect() as conn:
    for user_id in user_ids:
        result = conn.execute(text("SELECT * FROM users WHERE id = :id"), {"id": user_id})

В 2.x SQLAlchemy убрала возможность исполнять SQL прямо на engine — нужно явно брать connection. Это правильная модель: connection = единица работы, engine = ресурс.

Engine, Pool, Connection

Один engine на процесс, пул из N коннектов, каждый запрос берёт коннект и возвращает в пул

ApplicationETL job
Engineодин на процессcreate_engine() один раз при старте
Engine
Pool5 коннектовQueuePool по умолчанию
Postgresbackend processes
with engine.connect()
Connectionвзят из пулаНа выходе из with — возвращается в пул, не закрывается
execute()запросы

Транзакции: engine.begin() vs connect()

Два режима — обязательно знать разницу:

# Автокоммит каждого запроса
with engine.connect() as conn:
    conn.execute(text("INSERT ..."))
    # На выходе — НЕ коммитится! Нужно явно
    conn.commit()
# Транзакция на весь блок
with engine.begin() as conn:
    conn.execute(text("INSERT INTO users ..."))
    conn.execute(text("INSERT INTO audit_log ..."))
    # На выходе — commit; при исключении — rollback

engine.begin() — то, что вы хотите в 95% случаев. Все запросы в блоке — одна транзакция, на выходе commit/rollback автоматически.

engine.connect() оставьте для случаев, когда вам нужно несколько независимых транзакций в одном блоке или работа с autocommit.

Connection pool: параметры

По умолчанию SQLAlchemy создаёт QueuePool с такими дефолтами:

  • pool_size=5 — постоянное количество коннектов в пуле.
  • max_overflow=10 — дополнительные коннекты при пиках. Закрываются после использования.
  • pool_timeout=30 — сколько ждать свободного коннекта при перегрузке.
  • pool_recycle=-1 — не перезапускать коннекты по времени (см. ниже).

Настройка:

engine = create_engine(
    "postgresql+psycopg://...",
    pool_size=10,
    max_overflow=20,
    pool_timeout=30,
    pool_recycle=1800,    # перезапускать коннект, если ему >30 мин
    pool_pre_ping=True,    # проверять коннект перед использованием
)

pool_pre_ping=Trueобязательная настройка для DE. Postgres / firewall / VPN иногда тихо рвут idle-коннекты; следующий запрос упадёт OperationalError. С pre_ping SQLAlchemy перед каждым execute делает дешёвый SELECT 1 и при ошибке заменит коннект новым. Стоит микросекунды, спасает от рандомных падений.

pool_recycle — периодически пересоздавать коннект. Postgres-балансировщики (pgbouncer, AWS RDS proxy) иногда «забывают» коннекты после получаса idle. Поставьте 1800 (30 мин) и забудьте.

DE-кейс: ETL с pool + retry

Соединим всё, что есть, в маленький production-ready ETL:

from sqlalchemy import create_engine, text
from sqlalchemy.exc import OperationalError, DBAPIError
from tenacity import retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type
import httpx
from datetime import datetime, UTC


engine = create_engine(
    "postgresql+psycopg://etl_user:secret@localhost:5432/etl_db",
    pool_size=5,
    pool_pre_ping=True,
    pool_recycle=1800,
)


@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential_jitter(initial=1, max=16, jitter=2),
    retry=retry_if_exception_type((OperationalError, DBAPIError)),
    reraise=True,
)
def upsert_issues(issues: list[dict]) -> int:
    with engine.begin() as conn:
        # SQLAlchemy выберет executemany для psycopg3
        result = conn.execute(
            text("""
                INSERT INTO issues (repo_id, number, title, state, updated_at)
                VALUES (:repo_id, :number, :title, :state, :updated_at)
                ON CONFLICT (repo_id, number) DO UPDATE
                  SET title = EXCLUDED.title,
                      state = EXCLUDED.state,
                      updated_at = EXCLUDED.updated_at
                  WHERE issues.updated_at < EXCLUDED.updated_at
            """),
            [
                {
                    "repo_id": i["repository"]["id"],
                    "number": i["number"],
                    "title": i["title"],
                    "state": i["state"],
                    "updated_at": datetime.fromisoformat(i["updated_at"].replace("Z", "+00:00")),
                }
                for i in issues
            ],
        )
        return result.rowcount


# Использование
with httpx.Client(timeout=30) as http:
    issues = http.get("https://api.github.com/repos/python/cpython/issues").json()
    affected = upsert_issues(issues)
    print(f"Upserted: {affected}")

Что мы получили:

  • Один engine на процесс — пул из 5 коннектов.
  • Каждый upsert_issues берёт коннект из пула, делает транзакцию, возвращает коннект.
  • На OperationalError (потерянный коннект, deadlock) tenacity повторит с backoff.
  • pool_pre_ping дополнительно страхует от тихо умерших коннектов.

SQL builder (не raw text)

Кроме text(), у Core есть builder для составления SQL программно:

from sqlalchemy import MetaData, Table, Column, Integer, String, select

metadata = MetaData()
users = Table(
    "users", metadata,
    Column("id", Integer, primary_key=True),
    Column("name", String),
    Column("age", Integer),
)

with engine.connect() as conn:
    query = select(users.c.name, users.c.age).where(users.c.age > 18)
    result = conn.execute(query)
    for row in result:
        print(row.name, row.age)

Это удобно, когда запрос строится динамически — фильтры, опциональные JOIN-ы, разный набор колонок. Для статических ETL-запросов raw text() обычно читабельнее.

В DE-практике 80% — text() для устоявшихся запросов, 20% — builder, когда часть условий приходит как параметр.

COPY через SQLAlchemy

SQLAlchemy сама COPY не реализует — это специфика Postgres. Но можно получить сырой psycopg3 connection из SQLAlchemy и использовать его API:

with engine.begin() as conn:
    raw_conn = conn.connection.dbapi_connection    # psycopg.Connection
    with raw_conn.cursor() as cur:
        with cur.copy("COPY issues_staging (...) FROM STDIN") as copy:
            for row in rows:
                copy.write_row(row)
    # Дальше обычные text() выполняем через SQLAlchemy
    conn.execute(text("INSERT INTO issues SELECT * FROM issues_staging ..."))

То есть SQLAlchemy и psycopg сочетаются — мы используем пул и транзакции от первого, нативный COPY от второго. На прод-проектах это типовой паттерн.

Anti-patterns

Создание engine на каждый запрос. create_engine — дорогая операция. Делайте один engine на процесс, на старте приложения. В Airflow — на уровне DAG/модуля.

engine.dispose() без необходимости. Закрывает пул и все коннекты в нём. Нужно только при шатдауне, после fork(), или в редких сценариях. Не вызывайте после каждого запроса.

Не пользоваться with. Без with engine.connect() коннект может не вернуться в пул при исключении. Пул быстро исчерпается, дальше pool_timeout-блокировка.

ORM для аналитики. session.query(Order).filter_by(...).all() для отчёта по миллиону строк — это OOM на пустом месте. Аналитика — text() или builder + streaming.

Что мы получили

  • SQLAlchemy Core — пул коннектов и кросс-БД, без объектной модели.
  • ORM в DE-задачах обычно лишний. text() + builder покрывают всё.
  • create_engine(url) один раз на процесс. engine.begin() для транзакций.
  • pool_pre_ping=True, pool_recycle=1800 — must-have для production-ETL.
  • text("... :name ...") с словарём параметров. Для bulk — список словарей.
  • COPY — через сырой psycopg connection из SQLAlchemy.
  • Retry на OperationalError/DBAPIError через tenacity — стандарт для прод-ETL.

В последнем уроке модуля — где взять credentials, как не закоммитить токен в git, и как настроить TLS для Postgres.

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7