Зачем поверх psycopg ещё одна библиотека
В прошлом уроке psycopg3 справился со всем — параметризованные запросы, транзакции, COPY. Зачем тогда городить ещё SQLAlchemy?
Один аргумент:
psycopg сам пул не делает. Чтобы получить пул на чистом psycopg, нужно либо ставить отдельную psycopg-pool, либо использовать pgbouncer снаружи, либо писать свою обвязку.
SQLAlchemy даёт пул из коробки + удобный DSL для динамических запросов + единый API под все БД. Это — главные причины. Не «потому что круче».
Второй аргумент — переносимость. Если завтра ваш ETL перевозят с Postgres на ClickHouse / DuckDB / Snowflake — SQLAlchemy-код потребует минимум изменений (поменяется только URL и пара специфичных типов). Чистый psycopg такого не даст.
Core vs ORM — обязательно отличать
- Core — низкоуровневый. Engine, connection, raw SQL через
text()или builder (select(table).where(...)). Никаких Python-классов как «модели». Похоже на psycopg, только с пулом и кросс-БД. - ORM — поверх Core. Python-классы соответствуют таблицам, экземпляры — строкам. Запросы через
session.query(User).filter_by(name="Alice"). Удобно для бэкенда с rich domain (User, Order, Invoice — много логики).
Для DE-задач
- ETL — это пакетная обработка, не CRUD над отдельными объектами.
- Аналитические запросы — это сложные SELECT с агрегатами, JOIN-ами, оконными функциями. ORM на них только мешает.
- Запросы пишутся вручную (Senior DE пишет SQL свободно), не генерируются из объектной модели.
Поэтому в курсе мы изучаем только Core. ORM — это middle/web-direction, у нас Python 02 и SQL 01.
Установка и первый запрос
uv add "sqlalchemy>=2.0" "psycopg[binary]"
SQLAlchemy 2.x — целевая версия в 2026. У 2.0 был большой апдейт API (2023), и старые туториалы под 1.4 много чего напишут иначе. Внимательно читайте даты, когда гуглите.
from sqlalchemy import create_engine, text
engine = create_engine("postgresql+psycopg://etl_user:secret@localhost:5432/etl_db")
with engine.connect() as conn:
result = conn.execute(text("SELECT version()"))
print(result.scalar())
Что происходит:
- — создаёт engine. Это не соединение! Это объект, который умеет открывать соединения и держит пул.
create_engine(url) engine.connect()— берёт коннект из пула (если нет — создаёт). ВозвращаетConnection.text("SELECT ...")— оборачивает raw SQL в SQLAlchemy-объект, чтобы драйвер знал, что это SQL, а не безопасная строка.result.scalar()— берёт одно значение из единственной строки/колонки.- На выходе из
withконнект возвращается в пул, не закрывается.
URL у engine — это dialect+driver://user:pass@host:port/dbname. Для Postgres с psycopg3:
postgresql+psycopg://...
Для psycopg2 (legacy) — postgresql+psycopg2://.... Для SQLite — sqlite:///path/to/file.db. Для DuckDB — duckdb:///path.duckdb. Один и тот же код, разный URL.
text() и параметры
Raw SQL — самый часто используемый стиль в DE:
from sqlalchemy import text
with engine.connect() as conn:
result = conn.execute(
text("SELECT id, name FROM users WHERE age > :min_age"),
{"min_age": 18},
)
for row in result:
print(row.id, row.name)
Различия с psycopg:
- Плейсхолдер —
:name, не%s. SQLAlchemy сам подменит на правильный синтаксис драйвера. - Параметры всегда в словаре (даже один параметр).
- Доступ к колонкам по имени (
row.id) работает из коробки.
Для bulk-операций (много строк с тем же запросом) — список словарей:
conn.execute(
text("INSERT INTO users (name, age) VALUES (:name, :age)"),
[
{"name": "Alice", "age": 25},
{"name": "Bob", "age": 30},
{"name": "Charlie", "age": 35},
],
)
SQLAlchemy под капотом сделает
Engine vs Connection — разница важна
# ПЛОХО — на каждом execute открывается новый коннект из пула
for user_id in user_ids:
result = engine.execute(text("...")) # SQLAlchemy 1.x; в 2.x этого даже нет
# ХОРОШО — один коннект на весь блок
with engine.connect() as conn:
for user_id in user_ids:
result = conn.execute(text("SELECT * FROM users WHERE id = :id"), {"id": user_id})
В 2.x SQLAlchemy убрала возможность исполнять SQL прямо на engine — нужно явно брать connection. Это правильная модель: connection = единица работы, engine = ресурс.
Один engine на процесс, пул из N коннектов, каждый запрос берёт коннект и возвращает в пул
Транзакции: engine.begin() vs connect()
Два режима — обязательно знать разницу:
# Автокоммит каждого запроса
with engine.connect() as conn:
conn.execute(text("INSERT ..."))
# На выходе — НЕ коммитится! Нужно явно
conn.commit()
# Транзакция на весь блок
with engine.begin() as conn:
conn.execute(text("INSERT INTO users ..."))
conn.execute(text("INSERT INTO audit_log ..."))
# На выходе — commit; при исключении — rollback
engine.begin() — то, что вы хотите в 95% случаев. Все запросы в блоке — одна транзакция, на выходе commit/rollback автоматически.
engine.connect() оставьте для случаев, когда вам нужно несколько независимых транзакций в одном блоке или работа с autocommit.
Connection pool: параметры
По умолчанию SQLAlchemy создаёт QueuePool с такими дефолтами:
pool_size=5— постоянное количество коннектов в пуле.max_overflow=10— дополнительные коннекты при пиках. Закрываются после использования.pool_timeout=30— сколько ждать свободного коннекта при перегрузке.pool_recycle=-1— не перезапускать коннекты по времени (см. ниже).
Настройка:
engine = create_engine(
"postgresql+psycopg://...",
pool_size=10,
max_overflow=20,
pool_timeout=30,
pool_recycle=1800, # перезапускать коннект, если ему >30 мин
pool_pre_ping=True, # проверять коннект перед использованием
)
pool_pre_ping=True — обязательная настройка для DE. Postgres / firewall / VPN иногда тихо рвут idle-коннекты; следующий запрос упадёт OperationalError. С pre_ping SQLAlchemy перед каждым execute делает дешёвый SELECT 1 и при ошибке заменит коннект новым. Стоит микросекунды, спасает от рандомных падений.
pool_recycle — периодически пересоздавать коннект. Postgres-балансировщики (pgbouncer, AWS RDS proxy) иногда «забывают» коннекты после получаса idle. Поставьте 1800 (30 мин) и забудьте.
DE-кейс: ETL с pool + retry
Соединим всё, что есть, в маленький production-ready ETL:
from sqlalchemy import create_engine, text
from sqlalchemy.exc import OperationalError, DBAPIError
from tenacity import retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type
import httpx
from datetime import datetime, UTC
engine = create_engine(
"postgresql+psycopg://etl_user:secret@localhost:5432/etl_db",
pool_size=5,
pool_pre_ping=True,
pool_recycle=1800,
)
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential_jitter(initial=1, max=16, jitter=2),
retry=retry_if_exception_type((OperationalError, DBAPIError)),
reraise=True,
)
def upsert_issues(issues: list[dict]) -> int:
with engine.begin() as conn:
# SQLAlchemy выберет executemany для psycopg3
result = conn.execute(
text("""
INSERT INTO issues (repo_id, number, title, state, updated_at)
VALUES (:repo_id, :number, :title, :state, :updated_at)
ON CONFLICT (repo_id, number) DO UPDATE
SET title = EXCLUDED.title,
state = EXCLUDED.state,
updated_at = EXCLUDED.updated_at
WHERE issues.updated_at < EXCLUDED.updated_at
"""),
[
{
"repo_id": i["repository"]["id"],
"number": i["number"],
"title": i["title"],
"state": i["state"],
"updated_at": datetime.fromisoformat(i["updated_at"].replace("Z", "+00:00")),
}
for i in issues
],
)
return result.rowcount
# Использование
with httpx.Client(timeout=30) as http:
issues = http.get("https://api.github.com/repos/python/cpython/issues").json()
affected = upsert_issues(issues)
print(f"Upserted: {affected}")
Что мы получили:
- Один engine на процесс — пул из 5 коннектов.
- Каждый
upsert_issuesберёт коннект из пула, делает транзакцию, возвращает коннект. - На
OperationalError(потерянный коннект, deadlock) tenacity повторит с backoff. pool_pre_pingдополнительно страхует от тихо умерших коннектов.
SQL builder (не raw text)
Кроме text(), у Core есть builder для составления SQL программно:
from sqlalchemy import MetaData, Table, Column, Integer, String, select
metadata = MetaData()
users = Table(
"users", metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
Column("age", Integer),
)
with engine.connect() as conn:
query = select(users.c.name, users.c.age).where(users.c.age > 18)
result = conn.execute(query)
for row in result:
print(row.name, row.age)
Это удобно, когда запрос строится динамически — фильтры, опциональные JOIN-ы, разный набор колонок. Для статических ETL-запросов raw text() обычно читабельнее.
В DE-практике 80% — text() для устоявшихся запросов, 20% — builder, когда часть условий приходит как параметр.
COPY через SQLAlchemy
SQLAlchemy сама COPY не реализует — это специфика Postgres. Но можно получить сырой psycopg3 connection из SQLAlchemy и использовать его API:
with engine.begin() as conn:
raw_conn = conn.connection.dbapi_connection # psycopg.Connection
with raw_conn.cursor() as cur:
with cur.copy("COPY issues_staging (...) FROM STDIN") as copy:
for row in rows:
copy.write_row(row)
# Дальше обычные text() выполняем через SQLAlchemy
conn.execute(text("INSERT INTO issues SELECT * FROM issues_staging ..."))
То есть SQLAlchemy и psycopg сочетаются — мы используем пул и транзакции от первого, нативный COPY от второго. На прод-проектах это типовой паттерн.
Anti-patterns
Создание engine на каждый запрос. create_engine — дорогая операция. Делайте один engine на процесс, на старте приложения. В Airflow — на уровне DAG/модуля.
engine.dispose() без необходимости. Закрывает пул и все коннекты в нём. Нужно только при шатдауне, после fork(), или в редких сценариях. Не вызывайте после каждого запроса.
Не пользоваться with. Без with engine.connect() коннект может не вернуться в пул при исключении. Пул быстро исчерпается, дальше pool_timeout-блокировка.
ORM для аналитики. session.query(Order).filter_by(...).all() для отчёта по миллиону строк — это OOM на пустом месте. Аналитика — text() или builder + streaming.
Что мы получили
- SQLAlchemy Core — пул коннектов и кросс-БД, без объектной модели.
- ORM в DE-задачах обычно лишний. text() + builder покрывают всё.
create_engine(url)один раз на процесс.engine.begin()для транзакций.pool_pre_ping=True,pool_recycle=1800— must-have для production-ETL.text("... :name ...")с словарём параметров. Для bulk — список словарей.- COPY — через сырой psycopg connection из SQLAlchemy.
- Retry на
OperationalError/DBAPIErrorчерез tenacity — стандарт для прод-ETL.
В последнем уроке модуля — где взять credentials, как не закоммитить токен в git, и как настроить TLS для Postgres.