Финал модуля: pandas в продакшене
Три предыдущих урока — про синтаксис и логику. Этот — про инженерную сторону. Что выбрать для чтения файла, как не положить машину гигабайтным CSV, какие операции в pandas быстрые, какие медленные, и в какой момент пора смотреть на
Большая часть этого урока — практические заметки из реальных DE-будней. Если вы научитесь правильно выбирать формат и не убивать память — половина проблем junior’а с pandas исчезнет.
Форматы: что быстрее, почему
| Формат | read_csv | read_parquet | read_sql_query |
|---|---|---|---|
| Скорость чтения 1М строк | ~5-10 сек | ~0.3-1 сек | зависит от БД |
| Размер на диске | большой (text) | в 3-10 раз меньше | хранится в БД |
| Типы данных | теряются (всё text) | сохраняются | сохраняются |
| Schema | надо угадывать | в файле | в БД |
| Колоночность | нет | да | как ответ БД |
| Хорош для | exchange с людьми | data lake, ETL-кэш | прямая выгрузка |
Главный совет: между задачами кэшируйте в Parquet, между системами — Parquet, наружу пользователям иногда CSV. Текстовый CSV — это формат «из почтовой переписки», а не для прод-пайплайнов.
Подробнее про сам Parquet и pyarrow в уроке 04 модуля 6. Здесь — что про это знает pandas.
read_parquet — пишите и читайте Parquet
import pandas as pd
# чтение
df = pd.read_parquet("data.parquet")
# чтение только нужных колонок (column pruning)
df = pd.read_parquet("data.parquet", columns=["user_id", "amount", "event_time"])
# чтение с фильтром (predicate pushdown)
df = pd.read_parquet(
"events/",
filters=[("country", "==", "RU"), ("amount", ">=", 100)],
)
# запись
df.to_parquet("output.parquet", compression="snappy", index=False)
# запись с партиционированием — частая практика data lake
df.to_parquet(
"data/",
partition_cols=["country", "date"], # создаёт ./data/country=RU/date=2025-01-15/
)
partition_cols пишет данные в иерархию папок. Это
df = pd.read_parquet("data/") # autodetect партиций
# country и date станут колонками автоматически
read_csv для больших файлов — chunksize
CSV есть CSV. Если файл 10ГБ и не помещается в память — вы не сможете прочитать его целиком. Решение — стримить кусками:
# chunksize=10_000 — генератор, выдающий DataFrame по 10000 строк
total = 0
for chunk in pd.read_csv("huge.csv", chunksize=10_000):
chunk = chunk[chunk["amount"] > 0]
total += chunk["amount"].sum()
print(f"Total: {total}")
pd.read_csv(..., chunksize=N) возвращает
Типичный pattern: чанк за чанком, агрегируете промежуточный результат, в конце собираете финальный.
# собираем агрегат по чанкам
partial_results = []
for chunk in pd.read_csv("huge.csv", chunksize=100_000):
agg = chunk.groupby("country", as_index=False)["amount"].sum()
partial_results.append(agg)
# финальная агрегация (sum of sums)
final = pd.concat(partial_results, ignore_index=True)
final = final.groupby("country", as_index=False)["amount"].sum()
Это
Не делайте pd.concat(list_of_chunks) для сохранения всех чанков целиком — вы сразу же убьёте весь смысл chunked-чтения. Идея в том, чтобы в памяти находился только один чанк за раз, плюс маленький промежуточный агрегат.
read_sql_query — pandas + PostgreSQL
Junior DE часто читает из БД напрямую. Через SQLAlchemy:
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine("postgresql+psycopg://user:pass@localhost/db")
df = pd.read_sql_query(
"SELECT user_id, amount, created_at FROM orders WHERE created_at >= %(since)s",
engine,
params={"since": "2025-01-01"},
parse_dates=["created_at"],
)
Что важно:
- Не пишите параметры в строку запроса — используйте placeholder и
params. Иначе SQL injection. parse_datesдля timestamp-колонок — pandas сам конвертит.- Для больших ответов —
chunksize:
for chunk in pd.read_sql_query("SELECT * FROM big_table", engine, chunksize=50_000):
process(chunk)
Это работает на уровне курсора БД — драйвер выдаёт чанки, pandas конвертит в DataFrame. Память не разрастается. Подробнее про БД-доступ — в модуле 7.
Memory profiling — где ваша память
Когда DataFrame неожиданно много весит, посмотрите на потребление:
df.info(memory_usage="deep")
# полный размер с учётом строк (object-колонки прячут потребление)
df.memory_usage(deep=True)
# по колонкам, в байтах
Без deep=True object-колонка показывает «8 байт на ячейку» — на самом деле там указатели на Python-строки, а сами строки — отдельно. С deep=True цифры реальные.
Типичные виновники жирной памяти:
- object-колонки строк — конвертите в
string(pyarrow backend) или вcategoryдля повторяющихся значений. - float64 для счётчиков, которые могут быть int — приведите в
int32/Int32. - много пустых строк — оставьте только те, что нужны для отчёта (
df = df[mask]). - datetime с наносекундной точностью, когда хватит секунд — в pandas 2.x можно:
datetime64[s].
Категориальные типы — экономия на повторах
Если колонка содержит много повторов одних и тех же значений (страна, валюта, статус, тип события), category — это золото:
df["country"] = df["country"].astype("category")
Внутри pandas хранит:
- словарь уникальных значений (
["RU", "US", "DE"]) — один раз; - массив
int8/int16индексов на эти значения.
Колонка из миллиона "USA" строк — это 8 МБ как object, 1 МБ как category. И большинство операций (groupby, ==, isin) на category работают быстрее.
Когда не делать category:
- Если уникальных значений много (UUID, email, free-text) — нет выигрыша.
- Если колонка постоянно мутирует — overhead на пересчёт категорий.
Vectorization — главное правило производительности
Самая частая ошибка начинающего: цикл for по строкам DataFrame.
# плохо: Python-цикл, 100х медленнее
for i in range(len(df)):
df.loc[i, "total"] = df.loc[i, "qty"] * df.loc[i, "price"]
# хорошо: векторно
df["total"] = df["qty"] * df["price"]
Под капотом df["qty"] * df["price"] — это NumPy/Arrow поэлементное умножение в C, для миллиона строк — миллисекунды. Цикл for — это миллион вызовов .loc, каждый с проверками и созданием объектов.
Правило: если в pandas вы пишете for row in df.iterrows() — посмотрите на проблему ещё раз. В 95% случаев это можно сделать через выражения, groupby, merge, apply (медленный, но всё равно быстрее цикла),
np.wherepd.cut/pd.qcut (биннинг).
import numpy as np
# условный выбор
df["tier"] = np.where(df["amount"] > 1000, "high", "low")
# несколько веток через select
df["tier"] = np.select(
condlist=[df["amount"] >= 1000, df["amount"] >= 100],
choicelist=["high", "mid"],
default="low",
)
# биннинг в категории
df["amount_bin"] = pd.cut(df["amount"], bins=[0, 100, 500, 1000, np.inf], labels=["low", "mid", "high", "vip"])
apply — выпасть в Python только когда необходимо
Иногда логика не векторизуется. Тогда apply:
# строковая логика
df["normalized_name"] = df["name"].apply(lambda s: s.strip().lower())
# с axis=1 — по строкам (медленнее, использовать редко)
df["full_address"] = df.apply(
lambda row: f"{row['street']}, {row['city']}",
axis=1,
)
apply по столбцам быстрее, чем по строкам (axis=1). По строкам — это де-факто цикл. Часто заменимо на str.cat, векторное выражение или numba-ускоренные операции. Если нашли себя в df.apply(..., axis=1) — подумайте, нет ли векторного способа.
Для строк есть отдельный
str accessordf["name"] = df["name"].str.strip().str.lower()
df["domain"] = df["email"].str.split("@").str[1]
df["matches"] = df["text"].str.contains(r"\b\d{4}\b", regex=True)
Когда pandas не справляется
pandas — это in-memory движок. Жёсткие границы:
- Датасет больше RAM — pandas просто не сможет загрузить. Помогают chunking, read_parquet с фильтрами, partitioning.
- Многопоточность — pandas однопоточный почти везде (кроме редких операций в pyarrow). 32 ядра не помогают.
- Сложные пайплайны с lazy-оптимизацией — pandas eager, выполняет каждую строку кода сразу. Нет глобальной оптимизации.
Когда упирается:
Вариант 1: chunking + Parquet
Половина случаев решается дисциплиной: читать большое только частями, хранить в Parquet, фильтровать через predicate pushdown. Скоринг чанк-за-чанком с агрегацией. Это самый дешёвый upgrade — без новой библиотеки.
Вариант 2: Dask
Вариант 3: polars
import polars as pl
# чтение
df = pl.read_parquet("data.parquet")
# фильтр + группировка (eager)
result = (
df.filter(pl.col("amount") > 100)
.group_by("country")
.agg(pl.col("amount").sum())
)
# то же, но lazy
result = (
pl.scan_parquet("data.parquet")
.filter(pl.col("amount") > 100)
.group_by("country")
.agg(pl.col("amount").sum())
.collect() # тут происходит выполнение
)
collect() запускает выполнение. На больших данных разница на порядок.
Когда мигрировать на polars
- Не сейчас, если вы junior. Учите pandas сначала — он установлен на каждом проекте.
- Постепенно, когда:
- У вас регулярно DataFrame > 5М строк и pandas тормозит.
- В команде уже есть кто-то с polars.
- Пишете новый проект с нуля, и нет легаси pandas-кода.
API близко, но не одинаково. Polars не имеет index в pandas-стиле, не имеет MultiIndex. Группа выражений через pl.col(...). groupby → group_by. И так далее. Перенос — это работа, не автоматика.
В курсе пишем на pandas, потому что это стандарт для junior. Но знать о существовании polars нужно — на собеседовании могут спросить, и в реальной работе через год-два встретите.
DE-кейс: streaming-ETL через pandas
Соберём всё в один скрипт. Дано: 10ГБ CSV с заказами на сервере, нужно — Parquet, партиционированный по стране-дате, агрегированный.
import pandas as pd
from pathlib import Path
INPUT = Path("/data/orders_raw.csv")
OUTPUT_DIR = Path("/data/orders_parquet/")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
CHUNK = 200_000 # ~200K строк за чанк
for i, chunk in enumerate(pd.read_csv(
INPUT,
chunksize=CHUNK,
dtype={"country": "category", "user_id": "int64"},
parse_dates=["created_at"],
na_values=["", "-", "N/A"],
)):
# очистка
chunk = chunk.dropna(subset=["amount", "user_id"])
chunk["date"] = chunk["created_at"].dt.date.astype("string")
# пишем чанк партиционированно (pyarrow склеит файлы)
chunk.to_parquet(
OUTPUT_DIR,
partition_cols=["country", "date"],
compression="snappy",
engine="pyarrow",
index=False,
)
print(f"chunk {i}: {len(chunk):,} rows written")
print("Done.")
200К строк за чанк помещаются в память легко (~50-200 МБ). Целый файл — 10ГБ — никогда не загружается целиком. Запись через partition_cols создаёт иерархию /country=RU/date=2025-01-15/part-0.parquet. Дальше эти данные можно читать обратно с predicate pushdown:
df = pd.read_parquet(
"/data/orders_parquet/",
filters=[("country", "==", "RU"), ("date", ">=", "2025-01-01")],
)
Это рабочий streaming-ETL на pandas. На polars аналогично, синтаксис немного другой.
Шпаргалка производительности
| Проблема | Решение |
|---|---|
| Файл не помещается в RAM | chunksize в read_csv, partitioned Parquet |
| Много памяти на строки | dtype_backend="pyarrow" или `astype(“string" |
| Цикл по строкам | Векторизация, np.where, np.select, groupby |
| Долгий I/O CSV | Перейти на Parquet |
| Долгий groupby | Sort по ключу группы заранее, использовать observed=True для category |
| Много пайплайнов | Хранить промежуточное в Parquet между шагами |
| Apply axis=1 | Заменить на векторное выражение или str-accessor |
| Нужен параллелизм | Dask или polars |
| Размер > RAM на регулярной основе | polars lazy / DuckDB |
Итоги модуля
Четыре урока:
- dtype hygiene — читайте файлы правильно с первого раза, используйте PyArrow backend.
- Фильтрация, группировка, джоины — три кита DE-работы, аналог SQL.
- Datetime и rolling — отчёты по периодам, sliding-агрегаты.
- I/O и производительность — Parquet вместо CSV, chunking больших файлов, vectorization.
После этого модуля вы спокойно делаете ETL-pipeline на pandas, читаете чужой pandas-код и знаете, где границы инструмента и куда смотреть дальше.
Упражнение
Возьмите CSV orders.csv (можно сгенерировать произвольно на 100К строк с колонками order_id, user_id, country, amount, created_at).
Напишите ETL-скрипт etl.py, который:
- Читает CSV чанками по 10_000 строк.
- На каждом чанке группирует по
countryиdate(день изcreated_at), агрегируетamount.sum()иorder_id.count(). - Накапливает агрегаты по чанкам.
- Финально склеивает чанковые агрегаты и финально досуммирует.
- Записывает результат в Parquet
daily_country_revenue.parquet.
Критерии приёмки:
- В коде ровно один цикл
for chunk in pd.read_csv(...). - Промежуточные агрегаты — DataFrame’ы, накапливаемые в list.
- Финальная агрегация через
pd.concat+groupby. - Выходной Parquet читается обратно через
pd.read_parquetи имеет колонкиcountry, date, total_amount, n_orders.
Подсказка: структура agg-дикта {"amount": "sum", "order_id": "count"} плюс rename колонок после.
На этом модуль pandas закончен. В следующем модуле — тесты на pytest и качество кода.