Zero-copy: передача данных без копирования
Проблема: стоимость копирования данных
В распределённых системах данные постоянно перемещаются между процессами: Python-процесс передаёт данные JVM, один сервис отправляет результат другому, драйвер собирает данные с executor’ов.
Традиционная передача данных включает 3 шага:
~5.5 сек для 1 GB, 2× потребление памяти
При таком подходе 1 GB данных требует 2 GB памяти (оригинал + копия) и ~5.5 секунд на сериализацию/десериализацию. Для 10 GB DataFrame это 55 секунд и 20 GB RAM.
Zero-copy: передача указателя вместо данных
Zero-copy — это техника передачи данных, при которой оба процесса обращаются к одной и той же физической памяти. Данные не копируются — передаётся только указатель (ссылка) на область памяти.
~0 сек, 1× потребление памяти
Результат: вместо 5.5 секунд — мгновенно. Вместо 2 GB — 1 GB.
Механизмы zero-copy
1. Shared Memory (POSIX shm)
Операционная система позволяет нескольким процессам маппить одну область физической памяти в свои виртуальные адресные пространства:
import pyarrow as pa
# Процесс A: создаёт Arrow buffer в shared memory
buffer = pa.allocate_buffer(1024 * 1024 * 1024) # 1 GB
# Заполняем данными...
# Передача: отправляем ТОЛЬКО дескриптор shared memory
# (несколько байт вместо 1 GB)
# Процесс B: подключается к той же памяти
# Видит те же данные без копирования
2. Memory-Mapped Files (mmap)
Файл на диске маппится напрямую в виртуальную память. Операционная система загружает страницы по мере обращения (lazy loading):
import pyarrow as pa
import pyarrow.ipc as ipc
# Запись Arrow IPC файла
table = pa.table({
'id': pa.array(range(10_000_000)),
'value': pa.array([float(i) * 0.1 for i in range(10_000_000)])
})
with pa.OSFile('/tmp/large_data.arrow', 'wb') as f:
writer = ipc.new_file(f, table.schema)
writer.write_table(table)
writer.close()
# Чтение через memory-mapped file (zero-copy!)
source = pa.memory_map('/tmp/large_data.arrow', 'r')
reader = ipc.open_file(source)
table_mmap = reader.read_all()
# Данные НЕ загружены в RAM -- загрузятся при обращении к конкретным колонкам
Memory-mapped Arrow файл vs обычное чтение
При обычном чтении (pa.OSFile) весь файл копируется в RAM. При memory-mapped чтении (pa.memory_map) файл маппится в виртуальное адресное пространство, но физически загружается только при обращении к конкретным данным. Для 10 GB файла, из которого нужны 2 колонки — это разница между 10 GB и ~2 GB потребления RAM.
Arrow Buffer Protocol
pyarrow.Buffer — это основной примитив для zero-copy в Arrow. Buffer — это непрерывный (contiguous) блок байтов в памяти, который может быть:
- Owned — Arrow выделил память и управляет ей
- View — ссылка на чужую память (zero-copy!)
import pyarrow as pa
import numpy as np
# NumPy array (owns memory)
np_array = np.array([1, 2, 3, 4, 5], dtype=np.int64)
# Arrow buffer как VIEW на NumPy memory (zero-copy!)
arrow_buf = pa.py_buffer(np_array)
# Arrow array из buffer (zero-copy!)
arrow_array = pa.Array.from_buffers(
pa.int64(), 5,
[None, arrow_buf]
)
# Данные НЕ копировались -- arrow_array ссылается на ту же память, что np_array
# Изменение np_array[0] = 999 будет видно через arrow_array
Zero-copy между pandas и Arrow
Конвертация между pandas DataFrame и Arrow Table — одна из самых частых операций в Spark. Arrow предоставляет zero-copy путь:
Обычная конвертация (с копированием)
import pyarrow as pa
import pandas as pd
table = pa.table({
'id': range(10_000_000),
'value': [float(i) for i in range(10_000_000)]
})
# Обычная конвертация -- КОПИРУЕТ данные
df = table.to_pandas()
# Создаёт NumPy arrays, копирует из Arrow buffers
# Для 1 GB данных: ~2 GB RAM, ~1 сек
Zero-copy конвертация
# Zero-copy: self_destruct=True
df = table.to_pandas(self_destruct=True)
# Arrow buffers передаются pandas напрямую
# Arrow Table уничтожается (self_destruct)
# Для 1 GB данных: ~1 GB RAM, ~0 сек
Anti-pattern: to_pandas() на больших данных без zero-copy
# ПЛОХО: копирует все данные
df = spark_df.toPandas() # 10 GB DataFrame → 20 GB RAM (Arrow + pandas copy)
# ЛУЧШЕ: включить Arrow optimization
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df = spark_df.toPandas() # Arrow-based, значительно быстрее
# ЛУЧШЕ ВСЕГО: self_destruct (если Arrow Table больше не нужен)
table = pa.Table.from_pandas(spark_df.toPandas())
df = table.to_pandas(self_destruct=True)Без spark.sql.execution.arrow.pyspark.enabled=true Spark использует pickle для сериализации каждой строки. С Arrow — передаёт целые RecordBatch’и, в 10-100x быстрее.
RecordBatch: streaming-friendly единица данных
Arrow RecordBatch — это “порция” данных фиксированного размера. Вместо передачи всей таблицы целиком, данные разбиваются на RecordBatch’и и передаются по одному:
import pyarrow as pa
# Создаём RecordBatch (порция данных)
batch = pa.RecordBatch.from_pydict({
'id': [1, 2, 3, 4, 5],
'name': ['Анна', 'Борис', 'Вера', 'Григорий', 'Дарья'],
'salary': [95000, 78000, 112000, 89000, 103000]
})
print(f"Rows: {batch.num_rows}") # 5
print(f"Bytes: {batch.nbytes}") # ~200 bytes
# RecordBatch позволяет streaming-обработку:
# вместо загрузки 10 GB в RAM сразу,
# обрабатываем по 64 MB за раз
Почему RecordBatch, а не целая Table?
| Аспект | Целая Table | RecordBatch (64 MB) |
|---|---|---|
| Память | Нужна вся RAM сразу | Фиксированный буфер |
| Latency | Первый результат после загрузки всех данных | Первый результат через ~10 мс |
| Streaming | Невозможно | Естественно |
| Backpressure | Нет | Контроль потока |
RecordBatch — это единица обмена в Arrow Flight (следующий урок). Spark Structured Streaming также использует batches для micro-batch обработки.
Производительность: copy vs zero-copy
Реальные замеры для 1 GB DataFrame (10 млн строк, 8 колонок):
| Операция | Copy (pickle) | Copy (Arrow) | Zero-copy (Arrow) |
|---|---|---|---|
| Serialize | 2.5 сек | 0.3 сек | 0 сек |
| Transfer | 0.5 сек | 0.5 сек | ~0 сек |
| Deserialize | 2.5 сек | 0.3 сек | 0 сек |
| Итого | 5.5 сек | 1.1 сек | < 0.01 сек |
| RAM usage | 2x (2 GB) | 2x (2 GB) | 1x (1 GB) |
Zero-copy выигрывает в 500x по времени и в 2x по памяти.
Что дальше?
В следующем уроке мы разберём Arrow Flight — высокоскоростной протокол передачи данных поверх gRPC, который использует RecordBatch для streaming-передачи Arrow-данных между сервисами. Flight заменяет JDBC/ODBC для аналитических нагрузок.