Каталожная система
В модуле 02 мы рассмотрели трёхуровневую иерархию CatalogProvider → SchemaProvider → TableProvider как архитектурную модель. Теперь реализуем собственную каталожную систему: динамическая регистрация схем и таблиц, мультитенантные каталоги, интеграция с INFORMATION_SCHEMA и TableFactory для поддержки CREATE EXTERNAL TABLE.
Иерархия каталога
DataFusion использует трёхуровневую модель именования, аналогичную PostgreSQL:
Запрос SELECT * FROM analytics.raw.events разрешается как: каталог analytics → схема raw → таблица events.
Реализация CatalogProvider
use datafusion::catalog::{CatalogProvider, SchemaProvider};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::any::Any;
struct DynamicCatalog {
schemas: RwLock<HashMap<String, Arc<dyn SchemaProvider>>>,
}
impl DynamicCatalog {
fn new() -> Self {
Self { schemas: RwLock::new(HashMap::new()) }
}
}
impl CatalogProvider for DynamicCatalog {
fn as_any(&self) -> &dyn Any { self }
fn schema_names(&self) -> Vec<String> {
self.schemas.read().unwrap().keys().cloned().collect()
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.schemas.read().unwrap().get(name).cloned()
}
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> datafusion::common::Result<Option<Arc<dyn SchemaProvider>>> {
Ok(self.schemas.write().unwrap().insert(name.to_string(), schema))
}
}
Реализация SchemaProvider
SchemaProvider хранит коллекцию таблиц. Метод table() асинхронный — можно подгружать метаданные из удалённого каталога:
use datafusion::catalog::SchemaProvider;
use datafusion::datasource::TableProvider;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::any::Any;
use async_trait::async_trait;
struct DynamicSchema {
tables: RwLock<HashMap<String, Arc<dyn TableProvider>>>,
}
impl DynamicSchema {
fn new() -> Self {
Self { tables: RwLock::new(HashMap::new()) }
}
}
#[async_trait]
impl SchemaProvider for DynamicSchema {
fn as_any(&self) -> &dyn Any { self }
fn table_names(&self) -> Vec<String> {
self.tables.read().unwrap().keys().cloned().collect()
}
async fn table(
&self,
name: &str,
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
Ok(self.tables.read().unwrap().get(name).cloned())
}
fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
Ok(self.tables.write().unwrap().insert(name, table))
}
fn deregister_table(
&self,
name: &str,
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
Ok(self.tables.write().unwrap().remove(name))
}
fn table_exist(&self, name: &str) -> bool {
self.tables.read().unwrap().contains_key(name)
}
}
Мультитенантный каталог
Паттерн «каталог на клиента» изолирует данные между тенантами:
use datafusion::prelude::SessionContext;
use datafusion::execution::SessionStateBuilder;
async fn create_tenant_session(tenant_id: &str) -> datafusion::common::Result<SessionContext> {
let catalog = Arc::new(DynamicCatalog::new());
// Создаём схемы для тенанта
let raw_schema = Arc::new(DynamicSchema::new());
let processed_schema = Arc::new(DynamicSchema::new());
catalog.register_schema("raw", raw_schema)?;
catalog.register_schema("processed", processed_schema)?;
// Строим SessionState с кастомным каталогом
let state = SessionStateBuilder::new()
.with_default_features()
.build();
let ctx = SessionContext::new_with_state(state);
// Регистрируем каталог тенанта
ctx.register_catalog(tenant_id, catalog);
// Устанавливаем search path: по умолчанию ищем в схеме "raw"
ctx.sql(&format!(
"SET search_path = '{}.raw'",
tenant_id
)).await?;
Ok(ctx)
}
Каждый тенант получает изолированный namespace. Запрос SELECT * FROM events в контексте тенанта client_a разрешается как client_a.raw.events. Один инстанс DataFusion может обслуживать множество тенантов с полной изоляцией метаданных.
ListingTable: продвинутая конфигурация
ListingTable — встроенный TableProvider для файловых источников. Расширенная настройка:
use datafusion::datasource::listing::{
ListingTable, ListingTableConfig, ListingTableUrl, ListingOptions,
};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::prelude::SessionContext;
async fn register_partitioned_table(ctx: &SessionContext) -> datafusion::common::Result<()> {
let table_path = ListingTableUrl::parse("s3://data-lake/events/")?;
let file_format = ParquetFormat::default()
.with_enable_pruning(true); // Predicate pushdown в row groups
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(".parquet")
.with_target_partitions(16)
.with_table_partition_cols(vec![
// Hive-style: year=2024/month=03/day=15/
("year".to_string(), DataType::Int32),
("month".to_string(), DataType::Int32),
("day".to_string(), DataType::Int32),
])
.with_file_sort_order(vec![
// Файлы отсортированы по timestamp внутри каждой партиции
vec![datafusion::logical_expr::col("timestamp").sort(true, false)],
]);
let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
.infer_schema(&ctx.state()).await?;
let table = ListingTable::try_new(config)?;
ctx.register_table("events", Arc::new(table))?;
// Partition pruning: DataFusion автоматически фильтрует директории
// Этот запрос прочитает только файлы из year=2024/month=03/
ctx.sql("SELECT * FROM events WHERE year = 2024 AND month = 3").await?.show().await?;
Ok(())
}
with_file_sort_order сообщает оптимизатору, что данные уже отсортированы. Если запрос требует сортировку по timestamp, DataFusion может избежать повторной сортировки — merge-sort из предсортированных партиций значительно быстрее полного sort.
MemTable: программная таблица
MemTable полезен для unit-тестирования, кэширования промежуточных результатов и lookup-таблиц:
use datafusion::datasource::MemTable;
use datafusion::arrow::datatypes::{Schema, Field, DataType};
use datafusion::arrow::array::{Int64Array, StringArray, RecordBatch};
fn create_lookup_table() -> datafusion::common::Result<MemTable> {
let schema = Arc::new(Schema::new(vec![
Field::new("code", DataType::Utf8, false),
Field::new("label", DataType::Utf8, false),
]));
let batches = vec![
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["RU", "DE", "JP", "US"])),
Arc::new(StringArray::from(vec!["Россия", "Германия", "Япония", "США"])),
],
)?,
];
// Vec<Vec<RecordBatch>> — внешний вектор по партициям
MemTable::try_new(schema, vec![batches])
}
INFORMATION_SCHEMA
DataFusion автоматически предоставляет INFORMATION_SCHEMA при включённой опции:
let ctx = SessionContext::new();
ctx.register_table("users", Arc::new(users_table))?;
// Обзор всех таблиц
let df = ctx.sql("SELECT * FROM information_schema.tables").await?;
df.show().await?;
// Колонки конкретной таблицы
let df = ctx.sql(
"SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = 'users'"
).await?;
df.show().await?;
INFORMATION_SCHEMA включает виртуальные таблицы:
| Таблица | Содержимое |
|---|---|
tables | Все зарегистрированные таблицы и представления |
columns | Колонки с типами данных и nullable-флагами |
df_settings | Параметры конфигурации DataFusion |
schemata | Список схем |
TableFactory: CREATE EXTERNAL TABLE
TableFactory позволяет создавать таблицы через SQL CREATE EXTERNAL TABLE:
use datafusion::logical_expr::CreateExternalTable;
use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use async_trait::async_trait;
#[derive(Debug)]
struct MyTableFactory;
#[async_trait]
impl datafusion::catalog::TableProviderFactory for MyTableFactory {
async fn create(
&self,
_state: &SessionState,
cmd: &CreateExternalTable,
) -> datafusion::common::Result<Arc<dyn TableProvider>> {
// cmd содержит: name, schema, location, file_type, options
let location = &cmd.location;
let options = &cmd.options;
// Создаём TableProvider на основе параметров из SQL
let connection_string = options.get("connection")
.ok_or_else(|| datafusion::error::DataFusionError::Plan(
"Missing 'connection' option".to_string()
))?;
Ok(Arc::new(DatabaseTableProvider::new(
connection_string.clone(),
cmd.schema.as_ref().into(),
)))
}
}
Регистрация фабрики:
let state = SessionStateBuilder::new()
.with_default_features()
.build();
let ctx = SessionContext::new_with_state(state);
// Регистрируем фабрику для типа 'MYDB'
ctx.state().table_factories().insert(
"MYDB".to_string(),
Arc::new(MyTableFactory),
);
// Теперь доступен SQL-синтаксис:
ctx.sql("
CREATE EXTERNAL TABLE remote_users
STORED AS MYDB
LOCATION 'users'
OPTIONS ('connection' 'postgres://localhost/mydb')
").await?;
// Используем как обычную таблицу
ctx.sql("SELECT * FROM remote_users WHERE active = true").await?.show().await?;
TableProviderFactory отделяет SQL-синтаксис от реализации. Пользователь пишет стандартный CREATE EXTERNAL TABLE, а фабрика создаёт нужный TableProvider с конфигурацией из OPTIONS.
Lazy-загрузка в SchemaProvider
Для каталогов с тысячами таблиц загрузка всех метаданных при старте неэффективна. Реализуйте lazy loading:
#[async_trait]
impl SchemaProvider for RemoteSchema {
fn table_names(&self) -> Vec<String> {
// Кэшированный список — обновляется периодически
self.cached_names.read().unwrap().clone()
}
async fn table(
&self,
name: &str,
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
// Проверяем кэш
if let Some(cached) = self.cache.read().unwrap().get(name) {
return Ok(Some(cached.clone()));
}
// Lazy load: запрос метаданных из удалённого каталога
let metadata = self.remote_catalog.get_table_metadata(name).await?;
match metadata {
Some(meta) => {
let provider = Arc::new(self.create_provider(meta)?);
self.cache.write().unwrap().insert(name.to_string(), provider.clone());
Ok(Some(provider))
}
None => Ok(None),
}
}
}
Итоги
-
CatalogProvider→SchemaProvider→TableProvider— трёхуровневая иерархия с динамической регистрацией - Мультитенантные каталоги: каталог на клиента с изолированными namespace
-
ListingTableподдерживает Hive partitioning, predicate pushdown в Parquet и file sort order -
INFORMATION_SCHEMAавтоматически отражает зарегистрированные таблицы и колонки -
TableProviderFactoryвключаетCREATE EXTERNAL TABLEдля пользовательских типов хранилищ - Lazy loading в
SchemaProvider::table()— для каталогов с тысячами таблиц