Learning Platform
Урок 07.05 · 16 мин
Начальный
CompressiongzipzstdStreamingLog rotation
Колоночное хранение: почему Parquet + zstd сжимается лучше CSV + gzip Compression форматы: gzip, bzip2, lz4, zstd, snappy

Зачем компрессия дата-инженеру

Когда работаешь с малыми данными — компрессия незаметна. Когда работаешь с большими — она становится первой статьёй экономии.

Два главных рычага. Первый — storage: 1 ТБ raw CSV или JSONL после сжатия — это 50-100 ГБ. На AWS S3 это разница в 10x по стоимости хранения. Когда вы складываете в archive layer тысячи файлов в день, гигабайты быстро превращаются в реальные деньги.

Второй — network transfer: гонять CSV в 10 ГБ через интернет — минуты, через VPN из филиала — десятки минут. Сжатый — секунды. То же для inter-DC передач, для git LFS, для уплоада в облако.

И третий, неочевидный — скорость обработки. Парадокс: сжатый файл читается быстрее несжатого, даже с учётом времени на декомпрессию. Потому что disk I/O — самая медленная часть pipeline’а, и читать 100 МБ + распаковать в RAM сильно быстрее, чем читать 1 ГБ. Это особенно заметно на сетевых дисках и S3.

Стандартный набор stdlib

В stdlib три модуля компрессии: gzip, bz2, lzma. У всех трёх одинаковый интерфейс: открыть файл, читать, писать, закрыть. Это симметрия с обычным open.

import gzip

# чтение текстового файла
with gzip.open("events.jsonl.gz", "rt", encoding="utf-8") as f:
    for line in f:
        print(line.strip())

# запись текстового файла
with gzip.open("events.jsonl.gz", "wt", encoding="utf-8") as f:
    f.write('{"id": 1}\n')
    f.write('{"id": 2}\n')

# чтение бинарника
with gzip.open("dump.bin.gz", "rb") as f:
    data = f.read()

# запись бинарника
with gzip.open("dump.bin.gz", "wb") as f:
    f.write(b"\x00\x01\x02")

bz2.open и lzma.open работают точно так же, отличается только алгоритм. Можно даже написать функцию-универсал:

import gzip
import bz2
import lzma
from pathlib import Path

OPENERS = {".gz": gzip.open, ".bz2": bz2.open, ".xz": lzma.open}

def open_compressed(path: Path, mode: str = "rt", **kwargs):
    opener = OPENERS.get(path.suffix, open)
    return opener(path, mode, **kwargs)

# работает для .gz, .bz2, .xz и обычных файлов
with open_compressed(Path("data.csv.gz"), "rt", encoding="utf-8") as f:
    text = f.read()

Параметр compresslevel

У gzip и bz2 есть параметр compresslevel от 1 до 9. 1 — самый быстрый, 9 — самый компактный. Default — 9 для gzip (медленно, лучшее сжатие), 9 для bz2.

# быстрая запись, чуть больше места
with gzip.open("big.jsonl.gz", "wt", encoding="utf-8", compresslevel=1) as f:
    ...

# медленная запись, минимум места
with gzip.open("archive.jsonl.gz", "wt", encoding="utf-8", compresslevel=9) as f:
    ...

Правило для DE: 6 — золотая середина для большинства задач (это default для большинства tools, кроме самого gzip stdlib). 1 — когда пишете часто и важна скорость. 9 — для долгого архива, где места ценнее, чем процессор.

zstd: современный фаворит

В stdlib zstd нет, но это сейчас главный алгоритм в DE. Ставится через third-party библиотеку

zstandard
:

uv add zstandard
import zstandard as zstd
from pathlib import Path

# запись
cctx = zstd.ZstdCompressor(level=3)
with Path("big.jsonl.zst").open("wb") as f, cctx.stream_writer(f) as writer:
    writer.write(b'{"id": 1}\n')
    writer.write(b'{"id": 2}\n')

# чтение
dctx = zstd.ZstdDecompressor()
with Path("big.jsonl.zst").open("rb") as f, dctx.stream_reader(f) as reader:
    data = reader.read()

Интерфейс чуть сложнее, чем у gzip.open, потому что zstandard ниже-уровневый. Если хочется text-mode и привычной симметрии, есть удобная обёртка io.TextIOWrapper:

import io
import zstandard as zstd
from pathlib import Path

def open_zst_read(path: Path, encoding: str = "utf-8"):
    dctx = zstd.ZstdDecompressor()
    stream = dctx.stream_reader(path.open("rb"))
    return io.TextIOWrapper(stream, encoding=encoding)

with open_zst_read(Path("events.jsonl.zst")) as f:
    for line in f:
        ...

Зачем переходить на zstd. Три причины.

Скорость. На уровне сжатия 3 (default) zstd сжимает примерно как gzip-6, но в 3-5 раз быстрее. На decompression — в 4-5 раз быстрее gzip всегда.

Compression ratio. На высоких уровнях (zstd-19, zstd-22) сжимает заметно лучше gzip-9, не сильно проигрывая xz/lzma.

Стандарт в новых tools. Parquet, Kafka, Clickhouse, modern tarball — везде zstd. Если вы пишете данные «в zstd», вы пишете их в формат, который понимает весь современный стек.

Сравнение: gzip vs bz2 vs zstd vs xz

Цифры — порядок (на реалистичном JSON, 1 ГБ raw):

АлгоритмCompressionRatioDecompressionКогда выбирать
gzip-6~50 МБ/с~3.5x~250 МБ/суниверсал, понимают все
gzip-9~25 МБ/с~3.7x~250 МБ/сархив, чуть лучше ratio
bz2-9~10 МБ/с~4.5x~50 МБ/средко, плохая скорость
xz (lzma)~5 МБ/с~5x~80 МБ/сдолгий архив, max ratio
zstd-3~400 МБ/с~3.3x~1000 МБ/сновый default, баланс
zstd-19~10 МБ/с~5x~1000 МБ/сmax ratio, decompression быстрая

Из этой таблицы — пара выводов.

bz2 — устаревший. Медленный и в сжатии, и в распаковке. Используется почти только потому, что какой-то старый source ещё пишет .bz2 (например, Wikipedia dumps). Своими руками новый файл в .bz2 не пишите.

xz (lzma) — для долгоживущего архива, где сжатие происходит редко, а места хочется минимум. Декомпрессия не быстрая, но и не катастрофическая.

gzip — золотой стандарт для совместимости. Понимают абсолютно все, от curl до Apache, от Spark до пожилого скрипта на bash. Если файл может прийти к любому потребителю — .gz безопасный выбор.

zstd — лучший для нового DE-кода. Если вы контролируете и producer, и consumer — выбирайте zstd, и забудьте про gzip.

Streaming: компрессия не материализует файл

Главное достоинство всех компрессоров в Python — они работают потоком. Вы не должны сначала собрать весь файл в RAM, потом сжать его. Можно писать построчно — байты будут уходить в сжатый поток на диск по мере поступления.

import gzip
import json
from pathlib import Path
from collections.abc import Iterable

def write_jsonl_gz(path: Path, items: Iterable[dict]) -> None:
    """Пишет итератор словарей в gzipped JSONL построчно."""
    with gzip.open(path, "wt", encoding="utf-8") as f:
        for item in items:
            f.write(json.dumps(item, ensure_ascii=False) + "\n")

# использование
def fetch_pages():
    cursor = None
    while True:
        page = api.get_page(cursor)
        yield from page.items
        if not page.next_cursor:
            return
        cursor = page.next_cursor

write_jsonl_gz(Path("events.jsonl.gz"), fetch_pages())

Здесь fetch_pages — генератор, не материализует всю выгрузку. write_jsonl_gz потребляет его по одному элементу, пишет в gzip.open файл. В каждый момент времени в RAM лежит одна страница API.

Это та же ленивая модель из 03/01 про генераторы — и она прекрасно сочетается с компрессией. Файл может быть гигабайтным, RAM-потребление константное.

DE-кейс: ingestion zst-compressed JSONL

Очень типичный приходящий формат — .jsonl.zst. Облако или партнёр кладёт файлы в S3, и они уже сжаты zstd. Ваша задача — стримить, парсить, обрабатывать.

import io
import json
from collections.abc import Iterator
from pathlib import Path

import zstandard as zstd


def stream_jsonl_zst(path: Path) -> Iterator[dict]:
    """Читает .jsonl.zst построчно, выдавая распарсенные dict'ы."""
    dctx = zstd.ZstdDecompressor()
    with path.open("rb") as raw, dctx.stream_reader(raw) as stream:
        text = io.TextIOWrapper(stream, encoding="utf-8")
        for line in text:
            line = line.strip()
            if line:
                yield json.loads(line)


# использование — считаем события по типу
counts: dict[str, int] = {}
for event in stream_jsonl_zst(Path("events-2026-05-13.jsonl.zst")):
    counts[event["type"]] = counts.get(event["type"], 0) + 1

print(counts)

Этот код корректно работает на файле любого размера — RAM-потребление O(1). Расжатие происходит на лету, не нужно сначала декомпрессировать на диск.

DE-кейс: log rotation

Старая, но всё ещё актуальная задача — ротация логов. Когда сервис пишет в app.log, и файл растёт, периодически его «архивируют»: переименовывают в app.log.YYYY-MM-DD.gz, а пишущий процесс начинает новый чистый файл.

Простейший вариант — после rotation просто сжать старый файл:

import gzip
import shutil
from datetime import date
from pathlib import Path


def rotate_log(active: Path, archive_dir: Path) -> Path:
    """Архивирует текущий лог с компрессией, освобождает имя для нового."""
    archive_dir.mkdir(parents=True, exist_ok=True)
    out = archive_dir / f"{active.stem}.{date.today().isoformat()}.gz"
    with active.open("rb") as src, gzip.open(out, "wb", compresslevel=6) as dst:
        shutil.copyfileobj(src, dst)
    active.unlink()                  # удаляем оригинал — он уже в архиве
    active.touch()                   # создаём пустой новый файл
    return out


# использование
rotate_log(Path("/var/log/app/app.log"), Path("/var/log/app/archive"))

shutil.copyfileobj — стандартный способ копировать байты между двумя «файловыми» объектами кусками, не пытаясь прочитать в RAM весь файл. Работает с любым src, у которого есть .read() и любым dst с .write() — в нашем случае это gzip.open(..., "wb").

Этот скрипт можно поставить в cron @daily — каждый день будет получать app.log.2026-05-13.gz, app.log.2026-05-14.gz, и так далее.

В production обычно используется готовый logrotate (Linux daemon), но понимать механику полезно: рано или поздно вы напишете похожий скрипт для собственного pipeline’а.

Прозрачно с pandas и pyarrow

Хорошая новость: большинство DE-библиотек умеют читать сжатые файлы напрямую, не нужно отдельной стадии decompression.

import pandas as pd

# pandas сам определяет компрессию по расширению
df = pd.read_csv("orders.csv.gz")
df = pd.read_json("events.jsonl.zst", lines=True)   # с свежими версиями pandas

# можно указать явно
df = pd.read_csv("orders.csv.gz", compression="gzip")
df = pd.read_csv("orders.csv.zst", compression="zstd")

Это сделано умно: pandas под капотом открывает файл через тот же gzip.open / zstandard, передаёт стриминговый объект в свой parser. RAM-нагрузка такая же, как при чтении несжатого, только I/O меньше.

То же с pyarrow:

import pyarrow.parquet as pq

# Parquet хранит компрессию внутри файла, ничего указывать не нужно
table = pq.read_table("orders.parquet")

Внутри файла Parquet каждая колонка сжата своим алгоритмом, который записан в метаданных. pyarrow сам выбирает нужный декомпрессор.

К pandas плотно вернёмся в Module 07 — там разбираем, как читать CSV/JSON/Parquet с компрессией в DataFrame’ы для аналитики.

Подводные камни

Не сжимайте уже сжатое. Если данные уже в Parquet (внутри snappy/zstd) — обёртка .gz поверх практически не уменьшит размер, только потратит CPU. То же для JPEG, PNG, MP4 — они сжаты от природы.

gzip.open в режиме append. Опасный режим: при добавлении к существующему .gz файлу вы создаёте «multi-stream gzip», который не все декомпрессоры читают корректно. Лучше перезаписывать или собирать в отдельные файлы (a-la log rotation).

На сетевых дисках компрессия особенно полезна. На NFS/S3 read latency высокий, и читать 100 МБ + decompress быстрее, чем читать 500 МБ. Особенно на cloud storage, где плата идёт за GB.

Уровень сжатия не возвращает потерянные данные. Декомпрессированный gzip-1 идентичен декомпрессированному gzip-9 — алгоритм lossless. Никаких артефактов, как у JPEG.

Что мы получили

  • Компрессия в DE — это экономия места (3-7x) и ускорение I/O (особенно на сети).
  • stdlib имеет gzip, bz2, lzma с одинаковым интерфейсом: open(path, "rt"|"rb"|"wt"|"wb").
  • zstd — современный фаворит: быстрее gzip и сжимает лучше. Подключается через zstandard.
  • Все компрессоры работают потоком — можно стримить гигабайты с O(1) памятью.
  • Сравнение: gzip — универсал, zstd — новый default для DE, xz — для долгого архива, bz2 — устаревший.
  • DE-паттерны: .jsonl.gz / .jsonl.zst для archive layer; log rotation с gzip; прозрачное чтение в pandas/pyarrow по расширению файла.
  • Не сжимать уже сжатое, не злоупотреблять gzip-append, не пытаться менять данные через уровень.

С этим уроком модуль закрыт. Junior теперь умеет работать со всеми распространёнными форматами данных — CSV, JSON/JSONL, YAML/TOML, Parquet — и понимает, когда какой выбирать. В упражнениях модуля собраны конкретные задачи, которые встречаются на первой неделе работы DE: починить кривой CSV, посчитать события в гигабайтном JSONL, конвертировать API-выгрузку в Parquet с partitioning.

Дальше — Module 06, внешний мир: HTTP-клиенты и БД. Туда мы пойдём с готовым инструментом для чтения любых форматов и для разумного логирования.

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5