Второй внешний мир — БД
HTTP мы прошли. Теперь — другой канал общения с внешним миром, без которого DE не бывает: реляционная база. В подавляющем большинстве случаев — PostgreSQL.
В этом курсе мы не учим SQL — для этого есть SQL 01. Здесь — как говорить с Postgres из Python: какие драйверы, как открывать соединения, как не сделать SQL injection, и как загрузить туда миллион строк за пять секунд.
psycopg2 vs psycopg3 — какой брать
psycopg2 — драйвер с 2011 года, написан как сишное расширение, везде стоит. Это де-факто стандарт legacy-проектов.
psycopg3 (часто пишут просто psycopg) — переписанная с нуля версия от того же автора Daniele Varrazzo, релиз в 2021. Это то, что вы ставите в новый проект в 2026 году.
Чем psycopg3 лучше:
- Поддерживает асинхронный API (
AsyncConnection). - Чище работает с типами Postgres (range, array, JSONB).
- Поддерживает автоматически.prepared statements
- Современный COPY (синхронный и async) удобнее, чем в psycopg2.
- Активно развивается; psycopg2 в режиме maintenance.
Названия путают: pip-пакет называется psycopg, не psycopg3. Импортируется тоже как import psycopg. В этом курсе только psycopg3.
uv add "psycopg[binary]"
[binary] означает: ставится с предкомпилированным libpq (системная C-библиотека Postgres). Без него psycopg использует pure-Python драйвер, который медленнее и требует libpq отдельно. На macOS/Linux/Windows [binary] ставится мгновенно.
Первое соединение
import psycopg
conn = psycopg.connect("postgresql://user:password@localhost:5432/etl_db")
cur = conn.cursor()
cur.execute("SELECT version()")
print(cur.fetchone())
# ('PostgreSQL 16.4 on x86_64-pc-linux-gnu, compiled by gcc...',)
cur.close()
conn.close()
Что произошло:
psycopg.connect(dsn)— открыли TCP-соединение к Postgres, авторизовались. Это «дорогая» операция (TCP handshake + auth + initial setup).conn.cursor()— получили. Это и есть «исполнитель» запросов.cursorcur.execute(...)— отправили SQL серверу, получили результат.cur.fetchone()— забрали первую (и единственную) строку результата.close()— отдали ресурсы.
Главное различие: connection — это TCP-соединение и сессия с сервером (одна на весь скрипт обычно), cursor — это контекст конкретного запроса/результата (можно создавать много).
# Альтернативная форма connect — через kwargs
conn = psycopg.connect(
host="localhost",
port=5432,
user="etl_user",
password="secret",
dbname="etl_db",
)
Любой вариант валиден. dsn (
Connection — TCP-соединение и сессия. Cursor — контекст выполнения запроса и итератор по результату.
Параметризованные запросы — священное правило
Самая страшная ошибка джуниора:
# КАТЕГОРИЧЕСКИ НЕЛЬЗЯ — SQL injection ждёт вас
user_input = "1; DROP TABLE users; --"
cur.execute(f"SELECT * FROM users WHERE id = {user_input}")
Если user_input пришёл от пользователя (даже из CSV-файла, который кто-то прислал по почте), вы только что дропнули таблицу. Это не теоретическая угроза — это xkcd 327, это O'Connell' OR 1=1 --, это реальные взломы каждый день.
Правило, без исключений: для подстановки значений в SQL используйте placeholder’ы. psycopg сам корректно экранирует данные:
# ПРАВИЛЬНО — psycopg экранирует значение
cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
# Несколько параметров — кортеж в том же порядке
cur.execute(
"INSERT INTO users (name, email, created_at) VALUES (%s, %s, %s)",
("Alice", "[email protected]", datetime.now(UTC)),
)
# Именованные параметры — словарём
cur.execute(
"SELECT * FROM users WHERE name = %(name)s AND age > %(min_age)s",
{"name": "Alice", "min_age": 18},
)
Плейсхолдер у psycopg — только %s, не ?, не :name. Запомните: %s это не Python %-форматирование. Это маркер для драйвера, который сам передаёт значение отдельно и сервер вообще не видит конкатенацию.
Никогда не строите SQL через f-string или конкатенацию с данными от пользователя. Даже если «это же только наш админ его задаст» — через год это «наш админ» станет «загрузка из CSV», и привет, дроп таблицы. SQL injection — это первая строчка любого OWASP Top 10 за последние 20 лет.
Исключение, ради которого f-string иногда оправдан — имена таблиц / колонок (динамические). Их в плейсхолдер не поставить (это синтаксические элементы, не данные). Но даже тогда лучше использовать psycopg.sql.Identifier:
from psycopg import sql
cur.execute(
sql.SQL("SELECT * FROM {table} WHERE id = %s").format(
table=sql.Identifier(table_name),
),
(user_id,),
)
sql.Identifier корректно экранирует имя таблицы по правилам Postgres. f-string на данных — никогда; sql.SQL + Identifier на структурных элементах — можно.
Чтение результатов
После cur.execute("SELECT ...") результат хранится в cursor. Способы достать:
cur.execute("SELECT id, name FROM users WHERE age > %s", (18,))
cur.fetchone() # (1, 'Alice') или None если строк нет
cur.fetchmany(50) # список до 50 строк
cur.fetchall() # все оставшиеся строки списком
for row in cur: # итерация — самый Pythonic вариант
print(row)
Итерация по cursor — streaming на стороне Python. Внутри psycopg по умолчанию делает client-side cursor: тянет все строки в память сразу. На больших результатах это OOM.
Для миллионов строк есть
# Чтение миллиона строк без OOM
with conn.cursor(name="streamed") as cur:
cur.execute("SELECT id, raw_data FROM events WHERE day = %s", ("2026-05-13",))
for row in cur: # тянет порциями с сервера
process(row)
Имя у cursor — обязательно для server-side. Без имени — client-side, всё в память.
row по умолчанию — кортеж. Если хочется доступ по имени колонки:
from psycopg.rows import dict_row
with conn.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT id, name FROM users")
for row in cur:
print(row["name"]) # доступ как к словарю
Это удобнее, особенно когда SELECT возвращает 10 колонок и считать индексы — больно.
Транзакции
psycopg3 по умолчанию запускает каждый блок в транзакции. Изменения не видны другим сессиям, пока вы не сделаете commit():
conn = psycopg.connect(dsn)
cur = conn.cursor()
cur.execute("INSERT INTO users (name) VALUES (%s)", ("Alice",))
# Сейчас другая сессия эту вставку НЕ увидит
conn.commit() # теперь видит
# Альтернатива — отмена:
cur.execute("INSERT INTO users (name) VALUES (%s)", ("Bob",))
conn.rollback() # Bob не вставится
autocommit=True — каждый запрос в своей транзакции. Это нужно для команд, которые не могут идти в транзакции (CREATE DATABASE, VACUUM, CREATE INDEX CONCURRENTLY). По умолчанию выключен — обычно вы хотите транзакции.
conn.autocommit = True # для административных команд
with-блоки — гарантированный cleanup
Connection и cursor — это ресурсы. Если упало исключение в середине, надо корректно закрыть.
import psycopg
with psycopg.connect(dsn) as conn:
with conn.cursor() as cur:
cur.execute("INSERT INTO users (name) VALUES (%s)", ("Alice",))
# cur закрыт
# При нормальном выходе — conn.commit(); при исключении — conn.rollback(); потом close().
Семантика блока with conn: важна:
- Нормальный выход → автоматический
commit(). - Исключение → автоматический
rollback(). - В любом случае →
close()на выходе из самого внешнегоwith.
Это production-pattern. Junior, который пишет conn = psycopg.connect(); ...; conn.close() без with — оставляет открытые соединения при ошибках. Через неделю Postgres устаёт от 1000 зависших коннектов и встаёт.
Bulk insert: COPY вместо VALUES
INSERT работает, но медленно. Загрузить 100000 строк через INSERT ... VALUES ... — это 100000 RTT, минимум 30 секунд. Через executemany — лучше, но всё равно медленно.
В psycopg3:
import psycopg
rows = [
("alice", "[email protected]", 25),
("bob", "[email protected]", 30),
# ... 100000 строк
]
with psycopg.connect(dsn) as conn:
with conn.cursor() as cur:
with cur.copy("COPY users (name, email, age) FROM STDIN") as copy:
for row in rows:
copy.write_row(row)
copy.write_row(tuple) отправляет строку в стрим. На выходе из with cur.copy(...) стрим закрывается, Postgres получает EOF и применяет вставку как одну операцию.
Замеры на простом примере (100k строк, локальный Postgres):
| Способ | Время |
|---|---|
| INSERT … VALUES (один за раз) | ~30 сек |
| executemany | ~5 сек |
| COPY … FROM STDIN | ~0.4 сек |
Для DE-задач (ETL, импорт CSV) — COPY всегда. Никаких for row in rows: cur.execute("INSERT ...").
DE-кейс: upsert через COPY + ON CONFLICT
Реальная задача: тянем из API issues, упсёртим в Postgres (вставляем новые, обновляем существующие по (repo, number)).
Прямого COPY ... ON CONFLICT нет. Стандартный паттерн — двухшаговый: COPY в staging-таблицу, потом INSERT из неё в основную с ON CONFLICT.
import psycopg
import httpx
from datetime import datetime, UTC
def upsert_issues(conn: psycopg.Connection, issues: list[dict]) -> int:
with conn.cursor() as cur:
# Шаг 1: временная staging-таблица в текущей сессии
cur.execute("""
CREATE TEMP TABLE issues_staging (
repo_id BIGINT,
number INT,
title TEXT,
state TEXT,
updated_at TIMESTAMPTZ
) ON COMMIT DROP
""")
# Шаг 2: bulk-загрузка через COPY
with cur.copy("COPY issues_staging (repo_id, number, title, state, updated_at) FROM STDIN") as copy:
for issue in issues:
copy.write_row((
issue["repository"]["id"],
issue["number"],
issue["title"],
issue["state"],
datetime.fromisoformat(issue["updated_at"].replace("Z", "+00:00")),
))
# Шаг 3: upsert из staging в основную таблицу
cur.execute("""
INSERT INTO issues (repo_id, number, title, state, updated_at)
SELECT repo_id, number, title, state, updated_at FROM issues_staging
ON CONFLICT (repo_id, number) DO UPDATE
SET title = EXCLUDED.title,
state = EXCLUDED.state,
updated_at = EXCLUDED.updated_at
WHERE issues.updated_at < EXCLUDED.updated_at
""")
return cur.rowcount
with psycopg.connect("postgresql://localhost/etl") as conn:
with httpx.Client(timeout=30) as http:
page = http.get("https://api.github.com/repos/python/cpython/issues").json()
affected = upsert_issues(conn, page)
print(f"Upserted: {affected}")
# conn.commit() произойдёт автоматически на выходе из with
Что важно:
ON COMMIT DROP— temp-таблица удаляется автоматом при commit/rollback. Не оставляет мусора.EXCLUDED.x— в ON CONFLICT UPDATE так обращаются к новым значениям из INSERT.WHERE issues.updated_at < EXCLUDED.updated_at— апдейтим только если новая версия свежее. Защита от out-of-order данных.- Транзакция накрывает всё. Если на ON CONFLICT упало — staging тоже откатывается, остаёмся в чистом состоянии.
Это типовой ETL-паттерн «получили partition из API → залили в Postgres идемпотентно». Запомните — это будет в каждом DE-проекте.
Подводные камни
Забытый conn.commit(). Сделали INSERT, скрипт закончился без commit — данных нет. Используйте with — он закоммитит сам.
Висящие транзакции. Открыли cursor, сделали SELECT, не закрыли cursor (или connection) — Postgres держит snapshot, autovacuum не может почистить мёртвые строки. Через неделю база раздувается. with cursor и with connection решают.
*SELECT . В Postgres SELECT * тащит все колонки, включая большие JSONB/BYTEA. На широких таблицах это +гигабайты трафика. Перечисляйте колонки явно.
Параметры в LIKE. Чтобы найти по подстроке:
cur.execute("SELECT * FROM logs WHERE message LIKE %s", (f"%{search}%",))
% тут — это
%s — это psycopg-placeholder. Они разные %. f-string собирает шаблон, потом psycopg экранирует его как значение. Никакого SQL injection — % внутри значения остаётся %, не плейсхолдером.
Типы. psycopg маппит Postgres-типы в Python: TIMESTAMPTZ → datetime с tzinfo, JSONB → dict, ARRAY → list, NUMERIC → Decimal. Это работает в обе стороны: положили datetime.now(UTC) — поедет в TIMESTAMPTZ корректно.
Что мы получили
- psycopg3 (
uv add psycopg[binary]), не psycopg2. - Connection — TCP/сессия; cursor — исполнитель и итератор результата.
- Параметризованные запросы через
%s. Никогда f-string на данных. - Структурные элементы (имена таблиц) — через
psycopg.sql.Identifier. - Транзакции включены по умолчанию,
conn.commit()нужен.with conn:делает это сам. - Bulk insert — только COPY. INSERT/executemany — для редких случаев.
- Upsert-паттерн: COPY в temp-таблицу + INSERT … ON CONFLICT DO UPDATE.
- Server-side cursor (
conn.cursor(name="...")) для миллионов строк.
В следующем уроке — SQLAlchemy Core: тот же Postgres, но с connection-pool из коробки и удобным DSL для динамического SQL.