Learning Platform
Урок 07.04 · 22 мин
Начальный
ParquetpyarrowColumnar storageSchemaPartitioning
Parquet, Avro, ORC: колоночные форматы в data lake Parquet: row groups, column stats и эволюция схемы

Почему Parquet — это очень важно

В прошлых уроках мы возились с CSV и JSONL — текстовыми форматами, которые умеют все. Они дёшевы для обмена, но дороги для аналитики: типов нет, файлы большие, читать нужно всё целиком, даже если нужны две колонки из двадцати.

Современный data lake устроен иначе. Когда данные оседают в аналитический слой, их конвертируют в Parquet. Это бинарный формат, в котором:

  • Внутри файла лежит схема (точные типы колонок).
  • Данные хранятся по колонкам, а не по строкам.
  • Каждая колонка сжата (обычно snappy или zstd), часто 10x меньше CSV.
  • Можно читать только нужные колонки, не парся остальные.

На Parquet работают Spark, Trino, DuckDB, Polars, BigQuery (внутри), AWS Athena (SELECT FROM s3://bucket/data/). Если ваша компания делает аналитику на больших данных — где-то под капотом лежат Parquet-файлы.

Для junior DE задача такая: уметь конвертировать данные (из CSV/JSONL/API) в Parquet, добавляя типизацию, и понимать, как partitionить вывод. Этого достаточно, чтобы покрыть 80% реальных задач. Углубление в row-groups, page-сжатие, predicate pushdown — это уже middle.

Row-based vs columnar: главная идея

Сначала концепция. Возьмём четыре строки данных:

id   country   amount
1    RU        100
2    US         50
3    RU        200
4    DE         75

В CSV или JSONL это хранится построчно — байты на диске идут в порядке 1,RU,100,2,US,50,3,RU,200,4,DE,75. Чтобы посчитать SUM(amount) WHERE country = 'RU', нужно прочитать всё, потому что country и amount каждой строки лежат рядом друг с другом в потоке.

Parquet хранит данные по колонкам:

id      : 1, 2, 3, 4
country : RU, US, RU, DE
amount  : 100, 50, 200, 75

И сразу же выигрыш: чтобы посчитать SUM(amount) WHERE country = 'RU', читаем только два блока (country и amount), а блок id вообще не трогаем. На таблицах с 50 колонками экономия чтения — 25-кратная.

Row-based vs Columnar storage

Те же данные, но разная физическая раскладка. Row-based хорош для записи строк целиком, columnar — для запросов по подмножеству колонок.

Row-based (CSV/JSONL)строки рядом
row 11, RU, 100
row 22, US, 50
row 33, RU, 200
row 44, DE, 75
плюсыOLTP, insert по строкеОдин row — один disk write
минусы аналитикичитать всё ради одной колонкиSUM(amount) — придётся пробежать всё
Columnar (Parquet)колонки рядом
col id1, 2, 3, 4
col countryRU, US, RU, DE
col amount100, 50, 200, 75
плюсыOLAP, column pruningЧитаем только нужные колонки
компрессиялучше: одинаковые типы рядомRU, RU, RU — это легко сжать. 1, 2, 3, 4 — тоже.

Есть и второй выигрыш: компрессия лучше работает на однородных данных. В колонке country много повторов «RU», «US». Любой алгоритм сжатия (snappy, gzip, zstd) на таком ряду работает на порядок эффективнее, чем когда RU соседствует с числом 100.

pyarrow: Python-библиотека для Parquet

Главная Python-библиотека для работы с Parquet —

pyarrow
. Это Python-биндинг Apache Arrow — кросс-языкового стандарта columnar данных.

uv add pyarrow

Минимальный пример записи и чтения:

import pyarrow as pa
import pyarrow.parquet as pq

# 1. Собираем таблицу из python-словарей
data = {
    "id": [1, 2, 3, 4],
    "country": ["RU", "US", "RU", "DE"],
    "amount": [100, 50, 200, 75],
}
table = pa.table(data)

print(table.schema)
# id: int64
# country: string
# amount: int64

# 2. Пишем в Parquet-файл
pq.write_table(table, "orders.parquet")

# 3. Читаем обратно
table_read = pq.read_table("orders.parquet")
print(table_read.to_pylist())
# [{'id': 1, 'country': 'RU', 'amount': 100}, ...]

Три вещи на что обратить внимание. pa.table(...) строит таблицу из dict[column_name -> list]. Тип каждой колонки arrow выводит сам, но можно задать явно (см. ниже). pq.write_table — сериализация в файл; pq.read_table — чтение. Никаких контекст-менеджеров не требуется — функции сами открывают и закрывают файлы.

Схема: явные типы

Arrow и Parquet — строго типизированные. У каждой колонки есть тип, и он сохраняется внутри файла. Это главное отличие от CSV.

Список типов, которые junior должен знать:

  • pa.int8(), int16(), int32(), int64() — целые разной разрядности.
  • pa.float32(), float64() — числа с плавающей точкой.
  • pa.string() — строки UTF-8.
  • pa.bool_() — булевы.
  • pa.timestamp("us", tz="UTC") — datetime с микросекундной точностью и зоной.
  • pa.date32() — дата без времени.
  • pa.decimal128(precision, scale) — фиксированная точность (для денег).
  • pa.list_(pa.string()) — массив строк.

Схема описывается явно через pa.schema:

import pyarrow as pa

schema = pa.schema([
    pa.field("id", pa.int64(), nullable=False),
    pa.field("country", pa.string(), nullable=False),
    pa.field("amount", pa.int64(), nullable=True),
    pa.field("created_at", pa.timestamp("us", tz="UTC"), nullable=False),
])

table = pa.table(
    {
        "id": [1, 2, 3],
        "country": ["RU", "US", "RU"],
        "amount": [100, None, 200],
        "created_at": [
            datetime(2026, 5, 1, tzinfo=UTC),
            datetime(2026, 5, 2, tzinfo=UTC),
            datetime(2026, 5, 3, tzinfo=UTC),
        ],
    },
    schema=schema,
)

Зачем явная схема. Во-первых, контроль типов: если в id случайно прилетит строка, arrow упадёт сразу. Во-вторых, nullable — указание, может ли в колонке быть NULL, ровно как в SQL NOT NULL. В-третьих, при чтении файла downstream-системы (Spark, DuckDB) увидят эти типы и не будут гадать.

nullable=False — гарантия. Если в данных окажется None в этой колонке, запись упадёт. Это правильно: лучше падать на стадии записи, чем молча сохранить и сломать downstream.

Запись из generators / API response

В реальной жизни данные приходят не из dict-литералов, а из CSV, API, JSON. Идиома такая: сначала парсим в Python-структуру (list of dicts или генератор), потом — в Arrow-таблицу, потом в Parquet.

import json
from pathlib import Path
import pyarrow as pa
import pyarrow.parquet as pq


def jsonl_to_parquet(jsonl_path: Path, parquet_path: Path, schema: pa.Schema) -> None:
    """Конвертирует JSONL в Parquet с проверкой schema."""
    rows: list[dict] = []
    with jsonl_path.open(encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if line:
                rows.append(json.loads(line))

    # pa.Table.from_pylist — удобный конструктор из списка dict'ов
    table = pa.Table.from_pylist(rows, schema=schema)
    pq.write_table(table, parquet_path, compression="snappy")

Здесь две полезные функции pyarrow:

  • pa.Table.from_pylist(rows, schema=...) — из list[dict] в Table. Каждый dict — одна строка, ключи должны совпадать с именами в схеме.
  • pa.Table.from_pydict({...}) — из dict[col_name -> list]. Колонками вверх.

Для больших JSONL (миллионы строк) делать rows: list[dict] = [] нельзя — это OOM. Тогда используется ParquetWriter в режиме append (chunks по N штук), но это уже более продвинутый паттерн, который трогаем в Module 07 при работе с pandas.

Partitioning: запись по папкам

Когда данных много (десятки миллионов строк в день), один большой Parquet — это плохо. Запросы вида «дай мне данные за май 2026» вынуждены сканировать весь файл. Решение — partitioning.

Партиционирование — это раскладка файлов по подкаталогам, чьи имена несут значения «партиционных» колонок:

data/
└── orders/
    ├── year=2026/month=04/day=30/part-0.parquet
    ├── year=2026/month=05/day=01/part-0.parquet
    └── year=2026/month=05/day=02/part-0.parquet

Когда analytical engine получает запрос WHERE year = 2026 AND month = 5, он смотрит на имена папок и читает только нужные. Это называется

partition pruning
.

Имя year=2026 — не случайное. Это Hive convention (от старого Apache Hive), и сейчас все системы (Spark, DuckDB, Athena, BigQuery External Tables) её понимают. Никогда не пишите 2026/05/01 без year=, month=, day= — это будут просто папки, а с конвенцией — это партиции.

pyarrow умеет писать partitioned dataset одной строкой:

import pyarrow.parquet as pq

pq.write_to_dataset(
    table,
    root_path="data/orders",
    partition_cols=["year", "month", "day"],
)

pyarrow возьмёт значения колонок year, month, day из таблицы, разложит данные по подпапкам, и в каждой создаст part-0.parquet с остальными колонками (партиционные значения вынесены в имена папок и не дублируются в файле).

Чтение partitioned dataset — симметрично:

import pyarrow.dataset as ds

dataset = ds.dataset("data/orders", format="parquet", partitioning="hive")
table = dataset.to_table(filter=ds.field("year") == 2026)

partitioning="hive" — указание, что папки именованы в Hive-конвенции. filter=... — partition pruning: arrow прочитает только подпапки, удовлетворяющие условию.

Компрессия: snappy, gzip, zstd

Каждая колонка в Parquet сжимается отдельно. У pyarrow на запись три популярных варианта:

АлгоритмСкоростьCompression ratioКогда выбирать
snappyочень быстраясредний (2-3x)default; быстрая запись и чтение
gzipсредняяхороший (3-5x)хочется меньше места, скорость не критична
zstdбыстраяочень хороший (4-7x)современный выбор, лучшее соотношение
noneмгновенно1xредко, для отладки

В 2026 году рекомендуемый выбор для нового кода —

zstd
. По умолчанию pyarrow ставит snappy — это legacy от Hadoop-эпохи, всё ещё ОК, но zstd объективно лучше.

pq.write_table(table, "out.parquet", compression="zstd")
pq.write_table(table, "out.parquet", compression="zstd", compression_level=3)

Параметр compression_level есть для gzip и zstd — выше число, лучше сжатие, но дольше запись. Для большинства задач default подходит.

TIP

Один Parquet-файл может содержать разные алгоритмы компрессии для разных колонок, через column_encoding. Но junior’у это не нужно: один уровень на весь файл — норма. К этому возвращаются, когда уже разбираются в storage-tuning.

DE-кейс: API → JSON → Parquet

Соберём всё на боевой задаче. API возвращает JSON со счётчиками событий. Раз в день — копим в archive (JSONL), потом конвертируем в Parquet для аналитики.

import gzip
import json
from datetime import date, datetime, UTC
from pathlib import Path
import pyarrow as pa
import pyarrow.parquet as pq


SCHEMA = pa.schema([
    pa.field("event_id", pa.int64(), nullable=False),
    pa.field("user_id", pa.int64(), nullable=False),
    pa.field("event_type", pa.string(), nullable=False),
    pa.field("amount", pa.decimal128(12, 2), nullable=True),
    pa.field("created_at", pa.timestamp("us", tz="UTC"), nullable=False),
    pa.field("year", pa.int32(), nullable=False),
    pa.field("month", pa.int32(), nullable=False),
    pa.field("day", pa.int32(), nullable=False),
])


def parse_jsonl_gz(path: Path) -> list[dict]:
    rows: list[dict] = []
    with gzip.open(path, "rt", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            raw = json.loads(line)
            ts = datetime.fromisoformat(raw["created_at"])
            rows.append({
                "event_id": raw["id"],
                "user_id": raw["user"],
                "event_type": raw["type"],
                "amount": raw.get("amount"),
                "created_at": ts,
                "year": ts.year,
                "month": ts.month,
                "day": ts.day,
            })
    return rows


def jsonl_to_partitioned_parquet(jsonl_path: Path, out_root: Path) -> None:
    rows = parse_jsonl_gz(jsonl_path)
    table = pa.Table.from_pylist(rows, schema=SCHEMA)
    pq.write_to_dataset(
        table,
        root_path=str(out_root),
        partition_cols=["year", "month", "day"],
        compression="zstd",
    )


# использование
jsonl_to_partitioned_parquet(
    Path("archive/2026-05-13.jsonl.gz"),
    Path("data/events"),
)

Что мы здесь делаем. Парсим JSONL в list[dict], одновременно превращая поля в нужные типы: created_at — в datetime (Arrow тут же его распакует в timestamp с UTC), amount — в Decimal через схему. Добавляем колонки year/month/day для партиционирования. Записываем partitioned dataset с zstd-компрессией. На выходе — папка data/events/year=2026/month=05/day=13/, в которой Parquet-файл с типизированными данными, готовый к чтению из Spark или DuckDB.

Это типичный «landing zone → analytical layer» pipeline, который вы будете писать на работе раз в неделю.

Чтение partitioned данных в downstream

Раз уж записали — посмотрим, как это потом читается:

import pyarrow.dataset as ds

dataset = ds.dataset("data/events", format="parquet", partitioning="hive")

# схема всего dataset'а
print(dataset.schema)

# фильтр по партиции + по колонке
table = dataset.to_table(
    columns=["event_id", "user_id", "amount"],   # только нужные колонки
    filter=(ds.field("year") == 2026) & (ds.field("month") == 5),
)

Два важных эффекта здесь.

Column pruningcolumns=[...] говорит arrow читать только три колонки из всего файла. Остальные физически не читаются с диска.

Partition pruningfilter=(year==2026) & (month==5) отбрасывает все папки, кроме year=2026/month=05/.... Если в dataset 365 партиций по дням, читать будем только 31 из них.

Это и есть та «магия аналитики», ради которой делают Parquet и partitioning. На больших объёмах разница между «прочитать 1 TB CSV» и «прочитать 5 GB Parquet с column+partition pruning» — это разница между «8 часов» и «10 секунд».

Что junior не делает

Pyarrow и Parquet — глубокая тема. Junior DE сейчас не разбирается в:

  • Row groups, page sizes, dictionary encoding — внутренняя физика Parquet, для middle.
  • Predicate pushdown в Arrow Compute — оптимизация чтения, осваивается на практике через Spark/DuckDB.
  • Schema evolution (что делать, если в новых файлах появилась колонка) — большой отдельный разговор, попадает в Python 02.
  • ParquetWriter для гигабайтных файлов чанками — будет в Module 07 при работе с pandas/Polars.

Запомнить три вещи: pq.write_table / pq.read_table, pq.write_to_dataset с partition_cols, схема через pa.schema. Этого хватит на первые полгода работы.

Что мы получили

  • Parquet — бинарный columnar формат для аналитики; стандарт для всех современных движков (Spark, Trino, DuckDB, Athena).
  • Columnar storage даёт column pruning: читаем только нужные колонки, экономим 5-25x на сканировании.
  • pyarrow — Python-библиотека для Parquet. Ключевые функции: pa.table, pq.write_table, pq.read_table.
  • Схема обязательная: типы колонок, nullable; pyarrow сам выведет, но лучше задать явно через pa.schema.
  • Partitioning через Hive-convention (year=2026/month=05/) — даёт partition pruning в downstream-движках. pq.write_to_dataset(..., partition_cols=[...]).
  • Компрессия: zstd для нового кода, snappy — default-legacy, gzip — если нужен ratio и читатель его поддерживает.
  • DE-pipeline: API → JSONL.gz (archive) → Parquet partitioned (analytical) — стандартная двухслойная схема data lake.

В следующем уроке — компрессия как самостоятельная тема: gzip, zstd, bz2 — что выбирать, как читать и писать на лету.

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5