Интеграция с Python-экосистемой
DataFusion Python построен на PyArrow и обменивается данными с другими библиотеками через Apache Arrow без копирования. Это позволяет комбинировать DataFusion с Pandas, Polars, NumPy и любыми Arrow-совместимыми инструментами.
PyArrow: основа взаимодействия
DataFusion Python использует PyArrow как основной формат данных. Все операции collect() возвращают List[pa.RecordBatch], все UDF принимают и возвращают pa.Array:
from datafusion import SessionContext
import pyarrow as pa
ctx = SessionContext()
df = ctx.read_parquet("data/orders.parquet")
# collect() возвращает список RecordBatch
batches = df.collect()
for batch in batches:
print(type(batch)) # <class 'pyarrow.lib.RecordBatch'>
# to_arrow_table() объединяет в одну Table
table = df.to_arrow_table()
print(type(table)) # <class 'pyarrow.lib.Table'>
print(table.schema)
Zero-copy через C data interface
Передача данных между DataFusion (Rust) и Python происходит через Apache Arrow C data interface. Это не сериализация --- Rust и Python смотрят на одну и ту же область памяти:
Zero-copy означает, что конвертация между DataFusion и PyArrow практически бесплатна --- O(1) по памяти и времени. Данные не дублируются.
Pandas-интеграция
DataFusion в Pandas
from datafusion import SessionContext
ctx = SessionContext()
df = ctx.read_parquet("data/orders.parquet")
# Фильтрация в DataFusion, анализ в Pandas
pandas_df = (
df.filter(col("status").eq(lit("completed")))
.to_pandas()
)
# Pandas-операции
print(pandas_df.describe())
print(pandas_df.groupby("region")["amount"].mean())
Pandas в DataFusion
import pandas as pd
import pyarrow as pa
# Pandas DataFrame
pdf = pd.DataFrame({
"city": ["Moscow", "Berlin", "Tokyo"],
"population": [12_600_000, 3_700_000, 14_000_000],
})
# Через PyArrow Table
table = pa.Table.from_pandas(pdf)
ctx.register_record_batches("cities", [table.to_batches()])
df = ctx.sql("SELECT city FROM cities WHERE population > 10000000")
df.show()
Конвертация to_pandas() создаёт копию данных, так как Pandas использует собственный формат хранения (NumPy-backed). Для больших данных предпочтительнее оставаться в Arrow-формате.
Polars-интеграция
Polars нативно поддерживает Arrow, что делает обмен данными эффективным:
DataFusion в Polars
import polars as pl
ctx = SessionContext()
df = ctx.read_parquet("data/orders.parquet")
# Через Arrow Table --- минимальное копирование
arrow_table = df.to_arrow_table()
polars_df = pl.from_arrow(arrow_table)
print(polars_df.describe())
Polars в DataFusion
import polars as pl
import pyarrow as pa
polars_df = pl.DataFrame({
"name": ["Alice", "Bob"],
"score": [95, 82],
})
# Polars → Arrow → DataFusion
arrow_table = polars_df.to_arrow()
ctx.register_record_batches("students", [arrow_table.to_batches()])
df = ctx.sql("SELECT * FROM students")
df.show()
NumPy-интеграция
Для вычислений на массивах NumPy --- через PyArrow:
import numpy as np
import pyarrow as pa
# NumPy → Arrow → DataFusion
np_array = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
arrow_array = pa.array(np_array)
table = pa.table({"values": arrow_array})
ctx.register_record_batches("numbers", [table.to_batches()])
df = ctx.sql("SELECT AVG(values) as mean, STDDEV(values) as std FROM numbers")
df.show()
# DataFusion → NumPy
result = df.to_arrow_table()
mean_array = result.column("mean").to_numpy()
Arrow PyCapsule Interface — zero-copy DataFrame interchange
Начиная с PyArrow 14+ и DataFusion-Python 40+ для обмена данными между библиотеками используется Arrow PyCapsule Interface (спецификация) — стандартный Python-протокол на основе __arrow_c_array__, __arrow_c_stream__, __arrow_c_schema__. Любой объект, реализующий эти dunder-методы, обменивается данными zero-copy без знания о конкретном producer/consumer.
Что такое PyCapsule
PyCapsule (CPython C API since 2.7) — typed pointer wrapper, безопасно передающий C-указатели между Python-модулями. Arrow PyCapsule Interface оборачивает структуры C Data Interface (ArrowArray, ArrowSchema, ArrowArrayStream) в PyCapsule с известными именами — consumer извлекает указатели без сериализации.
# Producer (DataFusion / pandas / Polars / pyarrow) реализует:
class MyDataFrame:
def __arrow_c_stream__(self, requested_schema=None):
# Возвращает PyCapsule с указателем на ArrowArrayStream
...
def __arrow_c_schema__(self):
# Возвращает PyCapsule с указателем на ArrowSchema
...
Универсальный обмен DataFusion ↔ Polars 1.x ↔ pandas 2.x ↔ pyarrow
import polars as pl
import pandas as pd
import pyarrow as pa
from datafusion import SessionContext
ctx = SessionContext()
df = ctx.read_parquet("data/events.parquet")
# DataFusion → Polars 1.x (zero-copy через PyCapsule)
polars_df = pl.DataFrame(df) # Polars 1.x понимает __arrow_c_stream__ напрямую
# DataFusion → pandas 2.x (zero-copy для Arrow-backed columns; pyarrow.array dtype)
pandas_df = pd.api.interchange.from_dataframe(df)
# Polars 1.x → DataFusion (zero-copy)
polars_input = pl.DataFrame({"x": [1, 2, 3], "y": [4.0, 5.0, 6.0]})
ctx.register_dataframe("polars_data", polars_input) # читает PyCapsule интерфейс
df_from_polars = ctx.sql("SELECT * FROM polars_data WHERE x > 1")
До PyCapsule Interface каждая пара библиотек требовала explicit knowledge о конвертации (pl.from_arrow(df.to_arrow_table()), pd.DataFrame.from_records(df.to_pylist())). Теперь любой Arrow-совместимый объект обменивается через протокол — producer не знает consumer’а, consumer не знает producer’а. Это эквивалент Python iterator protocol для табличных данных.
Различие __arrow_c_array__ vs __arrow_c_stream__
| Dunder | Семантика | Когда |
|---|---|---|
__arrow_c_array__ | Один RecordBatch | Маленькие данные, eager |
__arrow_c_stream__ | Поток RecordBatch (lazy) | Большие данные, streaming |
__arrow_c_schema__ | Только схема | Inspection без загрузки данных |
DataFusion DataFrame реализует __arrow_c_stream__ — потому что DataFrame ленивый, материализация только при collect/iter. Polars 1.x DataFrame — оба (__arrow_c_array__ для in-memory + __arrow_c_stream__ для streaming).
PyCapsule vs to_pandas() / to_arrow_table() — explicit conversions всё ещё работают и возвращают копию (для pandas) или zero-copy (для pyarrow). PyCapsule Interface — современный путь для библиотек, желающих избежать explicit dependency на конкретный producer. Cite Arrow PyCapsule Interface spec.
Когда использовать DataFusion в Python
DataFusion занимает определённую нишу среди Python-инструментов для данных:
Когда DataFusion --- правильный выбор
- Встраивание движка запросов в свою систему (кастомные каталоги, провайдеры данных)
- UDF/UDAF на Python с Rust-производительностью для остальных операций
- SQL + DataFrame API в одном инструменте
- Arrow-native pipeline без сериализации между компонентами
Когда лучше другие инструменты
- Pandas --- когда нужна экосистема ML (sklearn, statsmodels, matplotlib)
- Polars --- когда нужна максимальная скорость DataFrame без SQL
- DuckDB --- когда нужен персистентный SQL с ACID и прямым чтением S3/HTTP
DataFusion не конкурирует с Pandas или Polars --- он дополняет их. Типичный pipeline: загрузка и фильтрация в DataFusion (SQL), аналитика в Pandas, визуализация в matplotlib.
Итоги
- PyArrow --- единый формат обмена данными между DataFusion и Python-экосистемой
- Zero-copy через C data interface: DataFusion (Rust) и PyArrow (Python) разделяют память
to_pandas()создаёт копию;to_arrow_table()--- практически бесплатно- Polars интегрируется через Arrow с минимальным копированием
- DataFusion оптимален для встраиваемых движков и Arrow-native pipeline