Learning Platform
Урок 09.04 · 24 мин
Начальный
pandasI/OperformancechunkingParquetpolars
Parquet, Avro, ORC: когда и почему read_parquet быстрее read_csv Polars: lazy API, query optimizer и Arrow backend

Финал модуля: pandas в продакшене

Три предыдущих урока — про синтаксис и логику. Этот — про инженерную сторону. Что выбрать для чтения файла, как не положить машину гигабайтным CSV, какие операции в pandas быстрые, какие медленные, и в какой момент пора смотреть на

polars
.

Большая часть этого урока — практические заметки из реальных DE-будней. Если вы научитесь правильно выбирать формат и не убивать память — половина проблем junior’а с pandas исчезнет.

Форматы: что быстрее, почему

Форматread_csvread_parquetread_sql_query
Скорость чтения 1М строк~5-10 сек~0.3-1 секзависит от БД
Размер на дискебольшой (text)в 3-10 раз меньшехранится в БД
Типы данныхтеряются (всё text)сохраняютсясохраняются
Schemaнадо угадыватьв файлев БД
Колоночностьнетдакак ответ БД
Хорош дляexchange с людьмиdata lake, ETL-кэшпрямая выгрузка

Главный совет: между задачами кэшируйте в Parquet, между системами — Parquet, наружу пользователям иногда CSV. Текстовый CSV — это формат «из почтовой переписки», а не для прод-пайплайнов.

Подробнее про сам Parquet и pyarrow в уроке 04 модуля 6. Здесь — что про это знает pandas.

read_parquet — пишите и читайте Parquet

import pandas as pd

# чтение
df = pd.read_parquet("data.parquet")

# чтение только нужных колонок (column pruning)
df = pd.read_parquet("data.parquet", columns=["user_id", "amount", "event_time"])

# чтение с фильтром (predicate pushdown)
df = pd.read_parquet(
    "events/",
    filters=[("country", "==", "RU"), ("amount", ">=", 100)],
)

# запись
df.to_parquet("output.parquet", compression="snappy", index=False)

# запись с партиционированием — частая практика data lake
df.to_parquet(
    "data/",
    partition_cols=["country", "date"],   # создаёт ./data/country=RU/date=2025-01-15/
)

Column pruning
и
predicate pushdown
— две суперспособности Parquet. Они работают в pandas через pyarrow engine (он по умолчанию начиная с pandas 2.0).

partition_cols пишет данные в иерархию папок. Это

Hive-style partitioning
— стандарт data lake. Читать такой dataset обратно как один DataFrame:

df = pd.read_parquet("data/")   # autodetect партиций
# country и date станут колонками автоматически

read_csv для больших файлов — chunksize

CSV есть CSV. Если файл 10ГБ и не помещается в память — вы не сможете прочитать его целиком. Решение — стримить кусками:

# chunksize=10_000 — генератор, выдающий DataFrame по 10000 строк
total = 0
for chunk in pd.read_csv("huge.csv", chunksize=10_000):
    chunk = chunk[chunk["amount"] > 0]
    total += chunk["amount"].sum()

print(f"Total: {total}")

pd.read_csv(..., chunksize=N) возвращает

TextFileReader
— итератор. На каждой итерации в память загружается только N строк. После цикла память освобождается.

Типичный pattern: чанк за чанком, агрегируете промежуточный результат, в конце собираете финальный.

# собираем агрегат по чанкам
partial_results = []
for chunk in pd.read_csv("huge.csv", chunksize=100_000):
    agg = chunk.groupby("country", as_index=False)["amount"].sum()
    partial_results.append(agg)

# финальная агрегация (sum of sums)
final = pd.concat(partial_results, ignore_index=True)
final = final.groupby("country", as_index=False)["amount"].sum()

Это

fold
по чанкам — классический ETL-приём.

WARNING

Не делайте pd.concat(list_of_chunks) для сохранения всех чанков целиком — вы сразу же убьёте весь смысл chunked-чтения. Идея в том, чтобы в памяти находился только один чанк за раз, плюс маленький промежуточный агрегат.

read_sql_query — pandas + PostgreSQL

Junior DE часто читает из БД напрямую. Через SQLAlchemy:

import pandas as pd
from sqlalchemy import create_engine

engine = create_engine("postgresql+psycopg://user:pass@localhost/db")

df = pd.read_sql_query(
    "SELECT user_id, amount, created_at FROM orders WHERE created_at >= %(since)s",
    engine,
    params={"since": "2025-01-01"},
    parse_dates=["created_at"],
)

Что важно:

  1. Не пишите параметры в строку запроса — используйте placeholder и params. Иначе SQL injection.
  2. parse_dates для timestamp-колонок — pandas сам конвертит.
  3. Для больших ответов — chunksize:
for chunk in pd.read_sql_query("SELECT * FROM big_table", engine, chunksize=50_000):
    process(chunk)

Это работает на уровне курсора БД — драйвер выдаёт чанки, pandas конвертит в DataFrame. Память не разрастается. Подробнее про БД-доступ — в модуле 7.

Memory profiling — где ваша память

Когда DataFrame неожиданно много весит, посмотрите на потребление:

df.info(memory_usage="deep")
# полный размер с учётом строк (object-колонки прячут потребление)

df.memory_usage(deep=True)
# по колонкам, в байтах

Без deep=True object-колонка показывает «8 байт на ячейку» — на самом деле там указатели на Python-строки, а сами строки — отдельно. С deep=True цифры реальные.

Типичные виновники жирной памяти:

  • object-колонки строк — конвертите в string (pyarrow backend) или в category для повторяющихся значений.
  • float64 для счётчиков, которые могут быть int — приведите в int32/Int32.
  • много пустых строк — оставьте только те, что нужны для отчёта (df = df[mask]).
  • datetime с наносекундной точностью, когда хватит секунд — в pandas 2.x можно: datetime64[s].

Категориальные типы — экономия на повторах

Если колонка содержит много повторов одних и тех же значений (страна, валюта, статус, тип события), category — это золото:

df["country"] = df["country"].astype("category")

Внутри pandas хранит:

  • словарь уникальных значений (["RU", "US", "DE"]) — один раз;
  • массив int8/int16 индексов на эти значения.

Колонка из миллиона "USA" строк — это 8 МБ как object, 1 МБ как category. И большинство операций (groupby, ==, isin) на category работают быстрее.

Когда не делать category:

  • Если уникальных значений много (UUID, email, free-text) — нет выигрыша.
  • Если колонка постоянно мутирует — overhead на пересчёт категорий.

Vectorization — главное правило производительности

Самая частая ошибка начинающего: цикл for по строкам DataFrame.

# плохо: Python-цикл, 100х медленнее
for i in range(len(df)):
    df.loc[i, "total"] = df.loc[i, "qty"] * df.loc[i, "price"]

# хорошо: векторно
df["total"] = df["qty"] * df["price"]

Под капотом df["qty"] * df["price"] — это NumPy/Arrow поэлементное умножение в C, для миллиона строк — миллисекунды. Цикл for — это миллион вызовов .loc, каждый с проверками и созданием объектов.

Правило: если в pandas вы пишете for row in df.iterrows() — посмотрите на проблему ещё раз. В 95% случаев это можно сделать через выражения, groupby, merge, apply (медленный, но всё равно быстрее цикла),

np.where
, pd.cut/pd.qcut (биннинг).

import numpy as np

# условный выбор
df["tier"] = np.where(df["amount"] > 1000, "high", "low")

# несколько веток через select
df["tier"] = np.select(
    condlist=[df["amount"] >= 1000, df["amount"] >= 100],
    choicelist=["high", "mid"],
    default="low",
)

# биннинг в категории
df["amount_bin"] = pd.cut(df["amount"], bins=[0, 100, 500, 1000, np.inf], labels=["low", "mid", "high", "vip"])

apply — выпасть в Python только когда необходимо

Иногда логика не векторизуется. Тогда apply:

# строковая логика
df["normalized_name"] = df["name"].apply(lambda s: s.strip().lower())

# с axis=1 — по строкам (медленнее, использовать редко)
df["full_address"] = df.apply(
    lambda row: f"{row['street']}, {row['city']}",
    axis=1,
)

apply по столбцам быстрее, чем по строкам (axis=1). По строкам — это де-факто цикл. Часто заменимо на str.cat, векторное выражение или numba-ускоренные операции. Если нашли себя в df.apply(..., axis=1) — подумайте, нет ли векторного способа.

Для строк есть отдельный

str accessor
:

df["name"] = df["name"].str.strip().str.lower()
df["domain"] = df["email"].str.split("@").str[1]
df["matches"] = df["text"].str.contains(r"\b\d{4}\b", regex=True)

Когда pandas не справляется

pandas — это in-memory движок. Жёсткие границы:

  • Датасет больше RAM — pandas просто не сможет загрузить. Помогают chunking, read_parquet с фильтрами, partitioning.
  • Многопоточность — pandas однопоточный почти везде (кроме редких операций в pyarrow). 32 ядра не помогают.
  • Сложные пайплайны с lazy-оптимизацией — pandas eager, выполняет каждую строку кода сразу. Нет глобальной оптимизации.

Когда упирается:

Вариант 1: chunking + Parquet

Половина случаев решается дисциплиной: читать большое только частями, хранить в Parquet, фильтровать через predicate pushdown. Скоринг чанк-за-чанком с агрегацией. Это самый дешёвый upgrade — без новой библиотеки.

Вариант 2: Dask

Dask
— pandas-совместимая обёртка для параллельной обработки. API почти 1:1 c pandas, под капотом split-apply-combine на много процессов/машин. Хорош для миграции существующего pandas-кода на cluster.

Вариант 3: polars

polars
— новая звезда. Написан на Rust, нативно multi-threaded, поддерживает lazy execution. На типичных DE-задачах в 5-30 раз быстрее pandas. API похож, но не идентичен:

import polars as pl

# чтение
df = pl.read_parquet("data.parquet")

# фильтр + группировка (eager)
result = (
    df.filter(pl.col("amount") > 100)
      .group_by("country")
      .agg(pl.col("amount").sum())
)

# то же, но lazy
result = (
    pl.scan_parquet("data.parquet")
      .filter(pl.col("amount") > 100)
      .group_by("country")
      .agg(pl.col("amount").sum())
      .collect()   # тут происходит выполнение
)

Lazy frame
— главная фича polars. Вы строите план, polars оптимизирует его (объединяет фильтры с чтением, отбрасывает колонки), и только collect() запускает выполнение. На больших данных разница на порядок.

Когда мигрировать на polars

  • Не сейчас, если вы junior. Учите pandas сначала — он установлен на каждом проекте.
  • Постепенно, когда:
    • У вас регулярно DataFrame > 5М строк и pandas тормозит.
    • В команде уже есть кто-то с polars.
    • Пишете новый проект с нуля, и нет легаси pandas-кода.

API близко, но не одинаково. Polars не имеет index в pandas-стиле, не имеет MultiIndex. Группа выражений через pl.col(...). groupbygroup_by. И так далее. Перенос — это работа, не автоматика.

В курсе пишем на pandas, потому что это стандарт для junior. Но знать о существовании polars нужно — на собеседовании могут спросить, и в реальной работе через год-два встретите.

DE-кейс: streaming-ETL через pandas

Соберём всё в один скрипт. Дано: 10ГБ CSV с заказами на сервере, нужно — Parquet, партиционированный по стране-дате, агрегированный.

import pandas as pd
from pathlib import Path

INPUT = Path("/data/orders_raw.csv")
OUTPUT_DIR = Path("/data/orders_parquet/")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

CHUNK = 200_000   # ~200K строк за чанк

for i, chunk in enumerate(pd.read_csv(
    INPUT,
    chunksize=CHUNK,
    dtype={"country": "category", "user_id": "int64"},
    parse_dates=["created_at"],
    na_values=["", "-", "N/A"],
)):
    # очистка
    chunk = chunk.dropna(subset=["amount", "user_id"])
    chunk["date"] = chunk["created_at"].dt.date.astype("string")

    # пишем чанк партиционированно (pyarrow склеит файлы)
    chunk.to_parquet(
        OUTPUT_DIR,
        partition_cols=["country", "date"],
        compression="snappy",
        engine="pyarrow",
        index=False,
    )
    print(f"chunk {i}: {len(chunk):,} rows written")

print("Done.")

200К строк за чанк помещаются в память легко (~50-200 МБ). Целый файл — 10ГБ — никогда не загружается целиком. Запись через partition_cols создаёт иерархию /country=RU/date=2025-01-15/part-0.parquet. Дальше эти данные можно читать обратно с predicate pushdown:

df = pd.read_parquet(
    "/data/orders_parquet/",
    filters=[("country", "==", "RU"), ("date", ">=", "2025-01-01")],
)

Это рабочий streaming-ETL на pandas. На polars аналогично, синтаксис немного другой.

Шпаргалка производительности

ПроблемаРешение
Файл не помещается в RAMchunksize в read_csv, partitioned Parquet
Много памяти на строкиdtype_backend="pyarrow" или `astype(“string"
Цикл по строкамВекторизация, np.where, np.select, groupby
Долгий I/O CSVПерейти на Parquet
Долгий groupbySort по ключу группы заранее, использовать observed=True для category
Много пайплайновХранить промежуточное в Parquet между шагами
Apply axis=1Заменить на векторное выражение или str-accessor
Нужен параллелизмDask или polars
Размер > RAM на регулярной основеpolars lazy / DuckDB

Итоги модуля

Четыре урока:

  1. dtype hygiene — читайте файлы правильно с первого раза, используйте PyArrow backend.
  2. Фильтрация, группировка, джоины — три кита DE-работы, аналог SQL.
  3. Datetime и rolling — отчёты по периодам, sliding-агрегаты.
  4. I/O и производительность — Parquet вместо CSV, chunking больших файлов, vectorization.

После этого модуля вы спокойно делаете ETL-pipeline на pandas, читаете чужой pandas-код и знаете, где границы инструмента и куда смотреть дальше.

Упражнение

Возьмите CSV orders.csv (можно сгенерировать произвольно на 100К строк с колонками order_id, user_id, country, amount, created_at).

Напишите ETL-скрипт etl.py, который:

  1. Читает CSV чанками по 10_000 строк.
  2. На каждом чанке группирует по country и date (день из created_at), агрегирует amount.sum() и order_id.count().
  3. Накапливает агрегаты по чанкам.
  4. Финально склеивает чанковые агрегаты и финально досуммирует.
  5. Записывает результат в Parquet daily_country_revenue.parquet.

Критерии приёмки:

  • В коде ровно один цикл for chunk in pd.read_csv(...).
  • Промежуточные агрегаты — DataFrame’ы, накапливаемые в list.
  • Финальная агрегация через pd.concat + groupby.
  • Выходной Parquet читается обратно через pd.read_parquet и имеет колонки country, date, total_amount, n_orders.

Подсказка: структура agg-дикта {"amount": "sum", "order_id": "count"} плюс rename колонок после.

На этом модуль pandas закончен. В следующем модуле — тесты на pytest и качество кода.

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4