Learning Platform
Глоссарий Troubleshooting
Урок 05.06 · 15 мин
Средний
TableProviderobject storeSubstraitRuntimeEnvBuilderproductionDask SQLBallista

Продвинутые паттерны

DataFusion Python позволяет выходить за рамки стандартных CSV/Parquet-запросов. Кастомные провайдеры данных, облачные хранилища, сериализация планов и конфигурация runtime открывают путь к production-системам.

Custom TableProvider

TableProvider --- интерфейс для подключения произвольных источников данных. В Python можно реализовать провайдер, который DataFusion будет вызывать при обращении к таблице:

from datafusion import SessionContext
import pyarrow as pa

class ApiTableProvider:
    """Провайдер, читающий данные из API."""

    def __init__(self, data: list[dict]):
        self._data = data

    def schema(self) -> pa.Schema:
        """Схема таблицы."""
        return pa.schema([
            ("id", pa.int64()),
            ("name", pa.utf8()),
            ("value", pa.float64()),
        ])

    def scan(self, projection=None, filters=None, limit=None):
        """Чтение данных с учётом проекции и фильтров."""
        table = pa.table(
            {
                "id": [r["id"] for r in self._data],
                "name": [r["name"] for r in self._data],
                "value": [r["value"] for r in self._data],
            },
            schema=self.schema(),
        )
        if projection is not None:
            table = table.select(projection)
        if limit is not None:
            table = table.slice(0, limit)
        return table.to_batches()
NOTE

Python TableProvider --- упрощённая версия Rust TableProvider. Для production-нагрузок с pushdown предикатов и партиционированием рекомендуется Rust-реализация.

Паттерны использования

Custom TableProvider позволяет:

  • Читать данные из REST API, баз данных, message queues
  • Реализовать федеративные запросы (SQL поверх нескольких источников)
  • Абстрагировать формат хранения от SQL-слоя

Object Store: S3, GCS, Azure

DataFusion поддерживает облачные хранилища через object store интеграцию:

from datafusion import SessionContext

ctx = SessionContext()

# Чтение Parquet из S3
df = ctx.read_parquet("s3://my-bucket/data/orders.parquet")

# Регистрация таблицы из S3
ctx.register_parquet("orders", "s3://my-bucket/data/orders/")

Для аутентификации используются стандартные механизмы:

import os

# AWS credentials через переменные окружения
os.environ["AWS_ACCESS_KEY_ID"] = "..."
os.environ["AWS_SECRET_ACCESS_KEY"] = "..."
os.environ["AWS_DEFAULT_REGION"] = "eu-west-1"

# Или AWS profile
os.environ["AWS_PROFILE"] = "analytics"
TIP

DataFusion использует библиотеку object_store (Rust) для доступа к S3/GCS/Azure. Predicate pushdown и projection pruning работают и для удалённых файлов --- DataFusion читает только нужные row groups из Parquet.

Substrait: сериализация планов

Substrait --- открытый стандарт для сериализации планов запросов. DataFusion может конвертировать LogicalPlan в Substrait и обратно:

from datafusion import SessionContext
from datafusion.substrait import Serde

ctx = SessionContext()
ctx.register_parquet("orders", "data/orders.parquet")

# Создаём план
df = ctx.sql("SELECT region, SUM(amount) FROM orders GROUP BY region")

# Сериализация в Substrait (бинарный формат)
plan = df.logical_plan()
substrait_bytes = Serde.serialize_to_plan(plan, ctx)

# Десериализация и выполнение
restored_plan = Serde.deserialize_bytes(substrait_bytes, ctx)
restored_df = ctx.create_dataframe_from_logical_plan(restored_plan)
restored_df.show()

Сценарии использования Substrait:

  • Передача планов между системами (Python формирует план, Rust/Java исполняет)
  • Кэширование скомпилированных запросов
  • Аудит и логирование планов выполнения

RuntimeEnvBuilder

Для конфигурации runtime-окружения (пул памяти, дисковый менеджер) используется RuntimeEnvBuilder:

from datafusion import SessionContext, RuntimeEnvBuilder, SessionConfig

# Конфигурация runtime
runtime = (
    RuntimeEnvBuilder()
    .with_disk_manager_os()                   # OS-level temp files
    .with_fair_spill_pool(1024 * 1024 * 512)  # 512 MB пул памяти
    .build()
)

# Конфигурация сессии
config = (
    SessionConfig()
    .with_batch_size(4096)
    .with_target_partitions(8)
)

# Создание контекста с конфигурацией
ctx = SessionContext(config=config, runtime=runtime)
WARNING

Устаревший класс для конфигурации runtime удалён начиная с версии 44.0 --- используйте RuntimeEnvBuilder. Аналогично, устаревшее имя контекста заменено на SessionContext.

Production-паттерны

Ограничение памяти

runtime = (
    RuntimeEnvBuilder()
    .with_fair_spill_pool(1024 * 1024 * 1024)  # 1 GB
    .with_disk_manager_os()                      # Spill на диск при превышении
    .build()
)

ctx = SessionContext(runtime=runtime)

При превышении лимита памяти DataFusion автоматически сбрасывает промежуточные данные на диск (spilling) вместо OOM-ошибки.

Обработка ошибок

from datafusion import SessionContext

ctx = SessionContext()

try:
    df = ctx.sql("SELECT * FROM nonexistent_table")
    df.show()
except Exception as e:
    print(f"Query failed: {e}")
    # Query failed: Error during planning:
    # table 'datafusion.public.nonexistent_table' not found

Конфигурация партиционирования

config = SessionConfig().with_target_partitions(16)
ctx = SessionContext(config=config)

# DataFusion распределит вычисления по 16 потокам
df = ctx.read_parquet("data/large_dataset/")
result = df.aggregate(
    [col("region")],
    [f.sum(col("amount")).alias("total")],
)

DataFusion как фундамент

DataFusion используется как движок запросов в нескольких крупных проектах:

Проекты на основе DataFusion
Dask SQLSQL-интерфейс для Dask — DataFusion парсит и планирует, Dask выполняет распределённо
BallistaРаспределённый DataFusion — кластер из Scheduler и Executor узлов
Apache CometНативный ускоритель Spark — DataFusion заменяет JVM-выполнение Rust-операторами
Delta-rsRust-реализация Delta Lake — DataFusion как движок запросов к Delta-таблицам

Эти проекты демонстрируют главную ценность DataFusion: это не конечный продукт, а строительный блок. Вы берёте SQL-парсинг, оптимизатор и Arrow-выполнение и встраиваете в свою архитектуру.

Полный пример: production-ready pipeline

from datafusion import (
    SessionContext, SessionConfig, RuntimeEnvBuilder,
    col, lit,
)
from datafusion import functions as f

# Production-конфигурация
runtime = (
    RuntimeEnvBuilder()
    .with_fair_spill_pool(1024 * 1024 * 512)  # 512 MB
    .with_disk_manager_os()
    .build()
)

config = (
    SessionConfig()
    .with_batch_size(8192)
    .with_target_partitions(8)
    .with_information_schema(True)
)

ctx = SessionContext(config=config, runtime=runtime)

# Регистрация данных
ctx.register_parquet("orders", "s3://analytics/orders/")
ctx.register_parquet("products", "s3://analytics/products/")

# Аналитический запрос
try:
    result = ctx.sql("""
        SELECT p.category,
               COUNT(*) as order_count,
               ROUND(SUM(o.amount), 2) as revenue,
               ROUND(AVG(o.amount), 2) as avg_order
        FROM orders o
        JOIN products p ON o.product_id = p.id
        WHERE o.status = 'completed'
          AND o.order_date >= '2024-01-01'
        GROUP BY p.category
        HAVING COUNT(*) > 10
        ORDER BY revenue DESC
    """)

    # Экспорт в Parquet для BI
    arrow_table = result.to_arrow_table()
    import pyarrow.parquet as pq
    pq.write_table(arrow_table, "output/category_report.parquet")

except Exception as e:
    print(f"Pipeline failed: {e}")

Итоги

  • Custom TableProvider --- подключение произвольных источников данных к SQL-движку
  • Object Store --- чтение S3/GCS/Azure с predicate pushdown
  • Substrait --- сериализация планов для межсистемного обмена
  • RuntimeEnvBuilder --- конфигурация памяти, партиционирования и spill-to-disk
  • Production-паттерны --- ограничение памяти, обработка ошибок, конфигурация параллелизма
  • DataFusion --- строительный блок для Dask SQL, Ballista, Apache Comet, Delta-rs

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Для чего используется Custom TableProvider в DataFusion Python?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6