Продвинутые паттерны
DataFusion Python позволяет выходить за рамки стандартных CSV/Parquet-запросов. Кастомные провайдеры данных, облачные хранилища, сериализация планов и конфигурация runtime открывают путь к production-системам.
Custom TableProvider
TableProvider --- интерфейс для подключения произвольных источников данных. В Python можно реализовать провайдер, который DataFusion будет вызывать при обращении к таблице:
from datafusion import SessionContext
import pyarrow as pa
class ApiTableProvider:
"""Провайдер, читающий данные из API."""
def __init__(self, data: list[dict]):
self._data = data
def schema(self) -> pa.Schema:
"""Схема таблицы."""
return pa.schema([
("id", pa.int64()),
("name", pa.utf8()),
("value", pa.float64()),
])
def scan(self, projection=None, filters=None, limit=None):
"""Чтение данных с учётом проекции и фильтров."""
table = pa.table(
{
"id": [r["id"] for r in self._data],
"name": [r["name"] for r in self._data],
"value": [r["value"] for r in self._data],
},
schema=self.schema(),
)
if projection is not None:
table = table.select(projection)
if limit is not None:
table = table.slice(0, limit)
return table.to_batches()
Python TableProvider --- упрощённая версия Rust TableProvider. Для production-нагрузок с pushdown предикатов и партиционированием рекомендуется Rust-реализация.
Паттерны использования
Custom TableProvider позволяет:
- Читать данные из REST API, баз данных, message queues
- Реализовать федеративные запросы (SQL поверх нескольких источников)
- Абстрагировать формат хранения от SQL-слоя
Object Store: S3, GCS, Azure
DataFusion поддерживает облачные хранилища через object store интеграцию:
from datafusion import SessionContext
ctx = SessionContext()
# Чтение Parquet из S3
df = ctx.read_parquet("s3://my-bucket/data/orders.parquet")
# Регистрация таблицы из S3
ctx.register_parquet("orders", "s3://my-bucket/data/orders/")
Для аутентификации используются стандартные механизмы:
import os
# AWS credentials через переменные окружения
os.environ["AWS_ACCESS_KEY_ID"] = "..."
os.environ["AWS_SECRET_ACCESS_KEY"] = "..."
os.environ["AWS_DEFAULT_REGION"] = "eu-west-1"
# Или AWS profile
os.environ["AWS_PROFILE"] = "analytics"
DataFusion использует библиотеку object_store (Rust) для доступа к S3/GCS/Azure. Predicate pushdown и projection pruning работают и для удалённых файлов --- DataFusion читает только нужные row groups из Parquet.
Substrait: сериализация планов
Substrait --- открытый стандарт для сериализации планов запросов. DataFusion может конвертировать LogicalPlan в Substrait и обратно:
from datafusion import SessionContext
from datafusion.substrait import Serde
ctx = SessionContext()
ctx.register_parquet("orders", "data/orders.parquet")
# Создаём план
df = ctx.sql("SELECT region, SUM(amount) FROM orders GROUP BY region")
# Сериализация в Substrait (бинарный формат)
plan = df.logical_plan()
substrait_bytes = Serde.serialize_to_plan(plan, ctx)
# Десериализация и выполнение
restored_plan = Serde.deserialize_bytes(substrait_bytes, ctx)
restored_df = ctx.create_dataframe_from_logical_plan(restored_plan)
restored_df.show()
Сценарии использования Substrait:
- Передача планов между системами (Python формирует план, Rust/Java исполняет)
- Кэширование скомпилированных запросов
- Аудит и логирование планов выполнения
RuntimeEnvBuilder
Для конфигурации runtime-окружения (пул памяти, дисковый менеджер) используется RuntimeEnvBuilder:
from datafusion import SessionContext, RuntimeEnvBuilder, SessionConfig
# Конфигурация runtime
runtime = (
RuntimeEnvBuilder()
.with_disk_manager_os() # OS-level temp files
.with_fair_spill_pool(1024 * 1024 * 512) # 512 MB пул памяти
.build()
)
# Конфигурация сессии
config = (
SessionConfig()
.with_batch_size(4096)
.with_target_partitions(8)
)
# Создание контекста с конфигурацией
ctx = SessionContext(config=config, runtime=runtime)
Устаревший класс для конфигурации runtime удалён начиная с версии 44.0 --- используйте RuntimeEnvBuilder. Аналогично, устаревшее имя контекста заменено на SessionContext.
Production-паттерны
Ограничение памяти
runtime = (
RuntimeEnvBuilder()
.with_fair_spill_pool(1024 * 1024 * 1024) # 1 GB
.with_disk_manager_os() # Spill на диск при превышении
.build()
)
ctx = SessionContext(runtime=runtime)
При превышении лимита памяти DataFusion автоматически сбрасывает промежуточные данные на диск (spilling) вместо OOM-ошибки.
Обработка ошибок
from datafusion import SessionContext
ctx = SessionContext()
try:
df = ctx.sql("SELECT * FROM nonexistent_table")
df.show()
except Exception as e:
print(f"Query failed: {e}")
# Query failed: Error during planning:
# table 'datafusion.public.nonexistent_table' not found
Конфигурация партиционирования
config = SessionConfig().with_target_partitions(16)
ctx = SessionContext(config=config)
# DataFusion распределит вычисления по 16 потокам
df = ctx.read_parquet("data/large_dataset/")
result = df.aggregate(
[col("region")],
[f.sum(col("amount")).alias("total")],
)
DataFusion как фундамент
DataFusion используется как движок запросов в нескольких крупных проектах:
Эти проекты демонстрируют главную ценность DataFusion: это не конечный продукт, а строительный блок. Вы берёте SQL-парсинг, оптимизатор и Arrow-выполнение и встраиваете в свою архитектуру.
Полный пример: production-ready pipeline
from datafusion import (
SessionContext, SessionConfig, RuntimeEnvBuilder,
col, lit,
)
from datafusion import functions as f
# Production-конфигурация
runtime = (
RuntimeEnvBuilder()
.with_fair_spill_pool(1024 * 1024 * 512) # 512 MB
.with_disk_manager_os()
.build()
)
config = (
SessionConfig()
.with_batch_size(8192)
.with_target_partitions(8)
.with_information_schema(True)
)
ctx = SessionContext(config=config, runtime=runtime)
# Регистрация данных
ctx.register_parquet("orders", "s3://analytics/orders/")
ctx.register_parquet("products", "s3://analytics/products/")
# Аналитический запрос
try:
result = ctx.sql("""
SELECT p.category,
COUNT(*) as order_count,
ROUND(SUM(o.amount), 2) as revenue,
ROUND(AVG(o.amount), 2) as avg_order
FROM orders o
JOIN products p ON o.product_id = p.id
WHERE o.status = 'completed'
AND o.order_date >= '2024-01-01'
GROUP BY p.category
HAVING COUNT(*) > 10
ORDER BY revenue DESC
""")
# Экспорт в Parquet для BI
arrow_table = result.to_arrow_table()
import pyarrow.parquet as pq
pq.write_table(arrow_table, "output/category_report.parquet")
except Exception as e:
print(f"Pipeline failed: {e}")
Итоги
- Custom TableProvider --- подключение произвольных источников данных к SQL-движку
- Object Store --- чтение S3/GCS/Azure с predicate pushdown
- Substrait --- сериализация планов для межсистемного обмена
- RuntimeEnvBuilder --- конфигурация памяти, партиционирования и spill-to-disk
- Production-паттерны --- ограничение памяти, обработка ошибок, конфигурация параллелизма
- DataFusion --- строительный блок для Dask SQL, Ballista, Apache Comet, Delta-rs