Каталог и источники данных
Когда DataFusion видит SELECT * FROM orders, он должен знать, что такое orders: какая у неё схема, где лежат данные, как их читать. За это отвечает система каталогов — трёхуровневая иерархия провайдеров.
Трёхуровневая иерархия
DataFusion использует модель именования catalog.schema.table, аналогичную PostgreSQL:
При запросе SELECT * FROM orders DataFusion ищет таблицу как datafusion.public.orders. Полное имя можно указать явно: SELECT * FROM my_catalog.my_schema.orders.
CatalogProvider
Верхний уровень — каталог. Содержит коллекцию схем:
pub trait CatalogProvider: Send + Sync {
/// Список доступных схем
fn schema_names(&self) -> Vec<String>;
/// Получить провайдер схемы по имени
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;
/// Зарегистрировать новую схему
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>>;
}
По умолчанию DataFusion создаёт каталог datafusion с одной схемой public. Для мультитенантных систем можно создать каталог на клиента.
SchemaProvider
Средний уровень — схема. Содержит коллекцию таблиц:
pub trait SchemaProvider: Send + Sync {
/// Список таблиц в схеме
fn table_names(&self) -> Vec<String>;
/// Получить провайдер таблицы по имени
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>>;
/// Зарегистрировать таблицу
fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> Result<Option<Arc<dyn TableProvider>>>;
/// Проверить существование таблицы
fn table_exist(&self, name: &str) -> bool;
}
Метод table() — асинхронный. Это позволяет реализовать lazy loading: схема может загружать метаданные таблицы из remote catalog (Hive Metastore, Iceberg REST, Unity Catalog) только при первом обращении.
TableProvider: ключевой trait
TableProvider — самый важный trait для расширения DataFusion. Он определяет, как читать данные из источника:
pub trait TableProvider: Send + Sync {
/// Схема таблицы (колонки и типы)
fn schema(&self) -> SchemaRef;
/// Тип таблицы (Base, View, Temporary)
fn table_type(&self) -> TableType;
/// Создать план сканирования
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>, // Какие колонки читать
filters: &[Expr], // Предикаты для pushdown
limit: Option<usize>, // LIMIT для pushdown
) -> Result<Arc<dyn ExecutionPlan>>;
/// Какие фильтры источник может обработать сам
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>>;
}
Метод scan() возвращает ExecutionPlan — физический оператор, который будет встроен в дерево выполнения. Параметры projection, filters, limit позволяют источнику оптимизировать чтение: читать только нужные колонки, пропускать ненужные строки, ограничивать количество.
Встроенные реализации
ListingTable: файловые источники
ListingTable — универсальный провайдер для файловых источников (Parquet, CSV, JSON, Avro):
use datafusion::datasource::listing::{ListingTable, ListingTableConfig, ListingOptions};
use datafusion::datasource::file_format::parquet::ParquetFormat;
let config = ListingTableConfig::new(
ListingTableUrl::parse("s3://bucket/data/")?
)
.with_listing_options(
ListingOptions::new(Arc::new(ParquetFormat::default()))
.with_file_extension(".parquet")
.with_target_partitions(8)
)
.infer_schema(&state).await?;
let table = ListingTable::try_new(config)?;
ListingTable умеет:
- Обнаруживать файлы по паттерну в директории
- Инферить схему из первого файла (или принимать явную)
- Partition pruning по Hive-style partitioning (
year=2024/month=03/) - Predicate pushdown в Parquet (row group statistics)
MemTable: данные в памяти
MemTable хранит данные как Vec<RecordBatch>:
use datafusion::datasource::MemTable;
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
]));
let batch = RecordBatch::try_new(schema.clone(), vec![
Arc::new(Int64Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["A", "B", "C"])),
])?;
let table = MemTable::try_new(schema, vec![vec![batch]])?;
ctx.register_table("test", Arc::new(table))?;
MemTable полезен для тестирования, временных таблиц (CREATE TABLE ... AS SELECT) и небольших lookup-таблиц.
StreamTable: потоковые источники
StreamTable представляет бесконечный поток данных (Kafka, WebSocket):
// Создание StreamTable из пользовательского потока
let provider = StreamTable::new(schema, vec![streaming_source]);
Streaming-источники работают в модели «producer push, consumer pull» — DataFusion интегрирует их через SendableRecordBatchStream.
Регистрация источников через SessionContext
На практике регистрация источников происходит через удобные методы SessionContext:
let ctx = SessionContext::new();
// Parquet-файл → ListingTable автоматически
ctx.register_parquet("orders", "data/orders.parquet", Default::default()).await?;
// CSV с опциями
ctx.register_csv("users", "data/users.csv", CsvReadOptions::new().has_header(true)).await?;
// RecordBatch в память
ctx.register_batch("lookup", batch)?;
// JSON
ctx.register_json("events", "data/events/*.json", Default::default()).await?;
Каждый register_* метод создаёт соответствующий TableProvider и регистрирует его в каталоге по умолчанию (datafusion.public).
Пользовательский TableProvider
Для нестандартных источников данных вы реализуете TableProvider:
struct ApiTable {
schema: SchemaRef,
endpoint: String,
}
#[async_trait]
impl TableProvider for ApiTable {
fn schema(&self) -> SchemaRef { self.schema.clone() }
fn table_type(&self) -> TableType { TableType::Base }
async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Создаём custom ExecutionPlan, который вызывает API
Ok(Arc::new(ApiExec::new(
self.endpoint.clone(),
self.schema.clone(),
projection.cloned(),
limit,
)))
}
}
Это мощный паттерн: вы можете обернуть любой источник — REST API, gRPC, базу данных, очередь — в TableProvider и работать с ним через SQL.
Модуль 05 (Расширяемость) подробно покрывает реализацию пользовательских TableProvider, включая filter pushdown, partition pruning и кастомные ExecutionPlan. Здесь мы рассматриваем архитектурную модель.
Как каталог участвует в планировании
Когда SqlToRel встречает имя таблицы в SQL, он:
- Разрешает имя через
catalog.schema.table(с учётом search path) - Вызывает
SchemaProvider::table()для полученияTableProvider - Читает
TableProvider::schema()для построенияDFSchema - Создаёт узел
LogicalPlan::TableScanс ссылкой наTableProvider
Позже, при физическом планировании, TableProvider::scan() вызывается для создания реального ExecutionPlan.
Итоги
- Три уровня каталога:
CatalogProvider→SchemaProvider→TableProvider -
TableProvider::scan()— ключевой метод: возвращаетExecutionPlanдля чтения данных -
ListingTable— универсальный провайдер для файлов (Parquet, CSV, JSON) с pushdown -
MemTable— in-memory хранение для тестов и lookup-таблиц - Пользовательский
TableProviderпозволяет обернуть любой источник в SQL-интерфейс -
SessionContext::register_*— удобные методы для регистрации стандартных форматов