Learning Platform
Глоссарий Troubleshooting
Урок 03.04 · 14 мин
Средний
CatalogProviderSchemaProviderTableProviderListingTableMemTableData Sources

Каталог и источники данных

Когда DataFusion видит SELECT * FROM orders, он должен знать, что такое orders: какая у неё схема, где лежат данные, как их читать. За это отвечает система каталогов — трёхуровневая иерархия провайдеров.

Трёхуровневая иерархия

DataFusion использует модель именования catalog.schema.table, аналогичную PostgreSQL:

Catalog Hierarchy
CatalogProviderВерхний уровень иерархии: содержит именованные схемы. По умолчанию — 'datafusion'
SchemaProviderСредний уровень: содержит коллекцию таблиц. По умолчанию — 'public'
TableProviderПровайдер данных: определяет схему таблицы и метод scan() для создания ExecutionPlan
TableProviderКаждый TableProvider — самостоятельный источник данных (Parquet, CSV, API, Memory)
TableProviderМетод supports_filters_pushdown() позволяет источнику фильтровать данные самостоятельно

При запросе SELECT * FROM orders DataFusion ищет таблицу как datafusion.public.orders. Полное имя можно указать явно: SELECT * FROM my_catalog.my_schema.orders.

CatalogProvider

Верхний уровень — каталог. Содержит коллекцию схем:

pub trait CatalogProvider: Send + Sync {
    /// Список доступных схем
    fn schema_names(&self) -> Vec<String>;

    /// Получить провайдер схемы по имени
    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;

    /// Зарегистрировать новую схему
    fn register_schema(
        &self,
        name: &str,
        schema: Arc<dyn SchemaProvider>,
    ) -> Result<Option<Arc<dyn SchemaProvider>>>;
}

По умолчанию DataFusion создаёт каталог datafusion с одной схемой public. Для мультитенантных систем можно создать каталог на клиента.

SchemaProvider

Средний уровень — схема. Содержит коллекцию таблиц:

pub trait SchemaProvider: Send + Sync {
    /// Список таблиц в схеме
    fn table_names(&self) -> Vec<String>;

    /// Получить провайдер таблицы по имени
    async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>>;

    /// Зарегистрировать таблицу
    fn register_table(
        &self,
        name: String,
        table: Arc<dyn TableProvider>,
    ) -> Result<Option<Arc<dyn TableProvider>>>;

    /// Проверить существование таблицы
    fn table_exist(&self, name: &str) -> bool;
}
NOTE

Метод table() — асинхронный. Это позволяет реализовать lazy loading: схема может загружать метаданные таблицы из remote catalog (Hive Metastore, Iceberg REST, Unity Catalog) только при первом обращении.

TableProvider: ключевой trait

TableProvider — самый важный trait для расширения DataFusion. Он определяет, как читать данные из источника:

pub trait TableProvider: Send + Sync {
    /// Схема таблицы (колонки и типы)
    fn schema(&self) -> SchemaRef;

    /// Тип таблицы (Base, View, Temporary)
    fn table_type(&self) -> TableType;

    /// Создать план сканирования
    async fn scan(
        &self,
        state: &dyn Session,
        projection: Option<&Vec<usize>>,  // Какие колонки читать
        filters: &[Expr],                 // Предикаты для pushdown
        limit: Option<usize>,             // LIMIT для pushdown
    ) -> Result<Arc<dyn ExecutionPlan>>;

    /// Какие фильтры источник может обработать сам
    fn supports_filters_pushdown(
        &self,
        filters: &[&Expr],
    ) -> Result<Vec<TableProviderFilterPushDown>>;
}

Метод scan() возвращает ExecutionPlan — физический оператор, который будет встроен в дерево выполнения. Параметры projection, filters, limit позволяют источнику оптимизировать чтение: читать только нужные колонки, пропускать ненужные строки, ограничивать количество.

Встроенные реализации

ListingTable: файловые источники

ListingTable — универсальный провайдер для файловых источников (Parquet, CSV, JSON, Avro):

use datafusion::datasource::listing::{ListingTable, ListingTableConfig, ListingOptions};
use datafusion::datasource::file_format::parquet::ParquetFormat;

let config = ListingTableConfig::new(
    ListingTableUrl::parse("s3://bucket/data/")?
)
.with_listing_options(
    ListingOptions::new(Arc::new(ParquetFormat::default()))
        .with_file_extension(".parquet")
        .with_target_partitions(8)
)
.infer_schema(&state).await?;

let table = ListingTable::try_new(config)?;

ListingTable умеет:

  • Обнаруживать файлы по паттерну в директории
  • Инферить схему из первого файла (или принимать явную)
  • Partition pruning по Hive-style partitioning (year=2024/month=03/)
  • Predicate pushdown в Parquet (row group statistics)

MemTable: данные в памяти

MemTable хранит данные как Vec<RecordBatch>:

use datafusion::datasource::MemTable;

let schema = Arc::new(Schema::new(vec![
    Field::new("id", DataType::Int64, false),
    Field::new("name", DataType::Utf8, true),
]));

let batch = RecordBatch::try_new(schema.clone(), vec![
    Arc::new(Int64Array::from(vec![1, 2, 3])),
    Arc::new(StringArray::from(vec!["A", "B", "C"])),
])?;

let table = MemTable::try_new(schema, vec![vec![batch]])?;
ctx.register_table("test", Arc::new(table))?;

MemTable полезен для тестирования, временных таблиц (CREATE TABLE ... AS SELECT) и небольших lookup-таблиц.

StreamTable: потоковые источники

StreamTable представляет бесконечный поток данных (Kafka, WebSocket):

// Создание StreamTable из пользовательского потока
let provider = StreamTable::new(schema, vec![streaming_source]);

Streaming-источники работают в модели «producer push, consumer pull» — DataFusion интегрирует их через SendableRecordBatchStream.

Регистрация источников через SessionContext

На практике регистрация источников происходит через удобные методы SessionContext:

let ctx = SessionContext::new();

// Parquet-файл → ListingTable автоматически
ctx.register_parquet("orders", "data/orders.parquet", Default::default()).await?;

// CSV с опциями
ctx.register_csv("users", "data/users.csv", CsvReadOptions::new().has_header(true)).await?;

// RecordBatch в память
ctx.register_batch("lookup", batch)?;

// JSON
ctx.register_json("events", "data/events/*.json", Default::default()).await?;

Каждый register_* метод создаёт соответствующий TableProvider и регистрирует его в каталоге по умолчанию (datafusion.public).

Пользовательский TableProvider

Для нестандартных источников данных вы реализуете TableProvider:

struct ApiTable {
    schema: SchemaRef,
    endpoint: String,
}

#[async_trait]
impl TableProvider for ApiTable {
    fn schema(&self) -> SchemaRef { self.schema.clone() }
    fn table_type(&self) -> TableType { TableType::Base }

    async fn scan(
        &self,
        _state: &dyn Session,
        projection: Option<&Vec<usize>>,
        _filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // Создаём custom ExecutionPlan, который вызывает API
        Ok(Arc::new(ApiExec::new(
            self.endpoint.clone(),
            self.schema.clone(),
            projection.cloned(),
            limit,
        )))
    }
}

Это мощный паттерн: вы можете обернуть любой источник — REST API, gRPC, базу данных, очередь — в TableProvider и работать с ним через SQL.

TIP

Модуль 05 (Расширяемость) подробно покрывает реализацию пользовательских TableProvider, включая filter pushdown, partition pruning и кастомные ExecutionPlan. Здесь мы рассматриваем архитектурную модель.

Как каталог участвует в планировании

Когда SqlToRel встречает имя таблицы в SQL, он:

  1. Разрешает имя через catalog.schema.table (с учётом search path)
  2. Вызывает SchemaProvider::table() для получения TableProvider
  3. Читает TableProvider::schema() для построения DFSchema
  4. Создаёт узел LogicalPlan::TableScan с ссылкой на TableProvider

Позже, при физическом планировании, TableProvider::scan() вызывается для создания реального ExecutionPlan.

Итоги

  • Три уровня каталога: CatalogProviderSchemaProviderTableProvider
  • TableProvider::scan() — ключевой метод: возвращает ExecutionPlan для чтения данных
  • ListingTable — универсальный провайдер для файлов (Parquet, CSV, JSON) с pushdown
  • MemTable — in-memory хранение для тестов и lookup-таблиц
  • Пользовательский TableProvider позволяет обернуть любой источник в SQL-интерфейс
  • SessionContext::register_* — удобные методы для регистрации стандартных форматов

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Система каталогов DataFusion имеет три уровня. Какова правильная иерархия?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7