Learning Platform
Глоссарий Troubleshooting
Урок 06.04 · 18 мин
Продвинутый
CatalogProviderSchemaProviderListingTableMemTableINFORMATION_SCHEMATableFactoryCREATE EXTERNAL TABLE

Каталожная система

В модуле 02 мы рассмотрели трёхуровневую иерархию CatalogProviderSchemaProviderTableProvider как архитектурную модель. Теперь реализуем собственную каталожную систему: динамическая регистрация схем и таблиц, мультитенантные каталоги, интеграция с INFORMATION_SCHEMA и TableFactory для поддержки CREATE EXTERNAL TABLE.

Иерархия каталога

DataFusion использует трёхуровневую модель именования, аналогичную PostgreSQL:

Catalog Hierarchy: полная картина
SessionContextКорневой контейнер, хранящий список каталогов и управляющий их жизненным циклом
CatalogProvider "analytics"Каталог для аналитических данных — изолирует схемы raw и curated
CatalogProvider "ml"Каталог для ML-данных — разделение фичей и результатов моделей

Запрос SELECT * FROM analytics.raw.events разрешается как: каталог analytics → схема raw → таблица events.

Реализация CatalogProvider

use datafusion::catalog::{CatalogProvider, SchemaProvider};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::any::Any;

struct DynamicCatalog {
    schemas: RwLock<HashMap<String, Arc<dyn SchemaProvider>>>,
}

impl DynamicCatalog {
    fn new() -> Self {
        Self { schemas: RwLock::new(HashMap::new()) }
    }
}

impl CatalogProvider for DynamicCatalog {
    fn as_any(&self) -> &dyn Any { self }

    fn schema_names(&self) -> Vec<String> {
        self.schemas.read().unwrap().keys().cloned().collect()
    }

    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
        self.schemas.read().unwrap().get(name).cloned()
    }

    fn register_schema(
        &self,
        name: &str,
        schema: Arc<dyn SchemaProvider>,
    ) -> datafusion::common::Result<Option<Arc<dyn SchemaProvider>>> {
        Ok(self.schemas.write().unwrap().insert(name.to_string(), schema))
    }
}

Реализация SchemaProvider

SchemaProvider хранит коллекцию таблиц. Метод table() асинхронный — можно подгружать метаданные из удалённого каталога:

use datafusion::catalog::SchemaProvider;
use datafusion::datasource::TableProvider;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::any::Any;
use async_trait::async_trait;

struct DynamicSchema {
    tables: RwLock<HashMap<String, Arc<dyn TableProvider>>>,
}

impl DynamicSchema {
    fn new() -> Self {
        Self { tables: RwLock::new(HashMap::new()) }
    }
}

#[async_trait]
impl SchemaProvider for DynamicSchema {
    fn as_any(&self) -> &dyn Any { self }

    fn table_names(&self) -> Vec<String> {
        self.tables.read().unwrap().keys().cloned().collect()
    }

    async fn table(
        &self,
        name: &str,
    ) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
        Ok(self.tables.read().unwrap().get(name).cloned())
    }

    fn register_table(
        &self,
        name: String,
        table: Arc<dyn TableProvider>,
    ) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
        Ok(self.tables.write().unwrap().insert(name, table))
    }

    fn deregister_table(
        &self,
        name: &str,
    ) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
        Ok(self.tables.write().unwrap().remove(name))
    }

    fn table_exist(&self, name: &str) -> bool {
        self.tables.read().unwrap().contains_key(name)
    }
}

Мультитенантный каталог

Паттерн «каталог на клиента» изолирует данные между тенантами:

use datafusion::prelude::SessionContext;
use datafusion::execution::SessionStateBuilder;

async fn create_tenant_session(tenant_id: &str) -> datafusion::common::Result<SessionContext> {
    let catalog = Arc::new(DynamicCatalog::new());

    // Создаём схемы для тенанта
    let raw_schema = Arc::new(DynamicSchema::new());
    let processed_schema = Arc::new(DynamicSchema::new());

    catalog.register_schema("raw", raw_schema)?;
    catalog.register_schema("processed", processed_schema)?;

    // Строим SessionState с кастомным каталогом
    let state = SessionStateBuilder::new()
        .with_default_features()
        .build();

    let ctx = SessionContext::new_with_state(state);

    // Регистрируем каталог тенанта
    ctx.register_catalog(tenant_id, catalog);

    // Устанавливаем search path: по умолчанию ищем в схеме "raw"
    ctx.sql(&format!(
        "SET search_path = '{}.raw'",
        tenant_id
    )).await?;

    Ok(ctx)
}
NOTE

Каждый тенант получает изолированный namespace. Запрос SELECT * FROM events в контексте тенанта client_a разрешается как client_a.raw.events. Один инстанс DataFusion может обслуживать множество тенантов с полной изоляцией метаданных.

ListingTable: продвинутая конфигурация

ListingTable — встроенный TableProvider для файловых источников. Расширенная настройка:

use datafusion::datasource::listing::{
    ListingTable, ListingTableConfig, ListingTableUrl, ListingOptions,
};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::prelude::SessionContext;

async fn register_partitioned_table(ctx: &SessionContext) -> datafusion::common::Result<()> {
    let table_path = ListingTableUrl::parse("s3://data-lake/events/")?;

    let file_format = ParquetFormat::default()
        .with_enable_pruning(true);  // Predicate pushdown в row groups

    let listing_options = ListingOptions::new(Arc::new(file_format))
        .with_file_extension(".parquet")
        .with_target_partitions(16)
        .with_table_partition_cols(vec![
            // Hive-style: year=2024/month=03/day=15/
            ("year".to_string(), DataType::Int32),
            ("month".to_string(), DataType::Int32),
            ("day".to_string(), DataType::Int32),
        ])
        .with_file_sort_order(vec![
            // Файлы отсортированы по timestamp внутри каждой партиции
            vec![datafusion::logical_expr::col("timestamp").sort(true, false)],
        ]);

    let config = ListingTableConfig::new(table_path)
        .with_listing_options(listing_options)
        .infer_schema(&ctx.state()).await?;

    let table = ListingTable::try_new(config)?;
    ctx.register_table("events", Arc::new(table))?;

    // Partition pruning: DataFusion автоматически фильтрует директории
    // Этот запрос прочитает только файлы из year=2024/month=03/
    ctx.sql("SELECT * FROM events WHERE year = 2024 AND month = 3").await?.show().await?;

    Ok(())
}
TIP

with_file_sort_order сообщает оптимизатору, что данные уже отсортированы. Если запрос требует сортировку по timestamp, DataFusion может избежать повторной сортировки — merge-sort из предсортированных партиций значительно быстрее полного sort.

MemTable: программная таблица

MemTable полезен для unit-тестирования, кэширования промежуточных результатов и lookup-таблиц:

use datafusion::datasource::MemTable;
use datafusion::arrow::datatypes::{Schema, Field, DataType};
use datafusion::arrow::array::{Int64Array, StringArray, RecordBatch};

fn create_lookup_table() -> datafusion::common::Result<MemTable> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("code", DataType::Utf8, false),
        Field::new("label", DataType::Utf8, false),
    ]));

    let batches = vec![
        RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(StringArray::from(vec!["RU", "DE", "JP", "US"])),
                Arc::new(StringArray::from(vec!["Россия", "Германия", "Япония", "США"])),
            ],
        )?,
    ];

    // Vec<Vec<RecordBatch>> — внешний вектор по партициям
    MemTable::try_new(schema, vec![batches])
}

INFORMATION_SCHEMA

DataFusion автоматически предоставляет INFORMATION_SCHEMA при включённой опции:

let ctx = SessionContext::new();
ctx.register_table("users", Arc::new(users_table))?;

// Обзор всех таблиц
let df = ctx.sql("SELECT * FROM information_schema.tables").await?;
df.show().await?;

// Колонки конкретной таблицы
let df = ctx.sql(
    "SELECT column_name, data_type, is_nullable
     FROM information_schema.columns
     WHERE table_name = 'users'"
).await?;
df.show().await?;

INFORMATION_SCHEMA включает виртуальные таблицы:

ТаблицаСодержимое
tablesВсе зарегистрированные таблицы и представления
columnsКолонки с типами данных и nullable-флагами
df_settingsПараметры конфигурации DataFusion
schemataСписок схем

TableFactory: CREATE EXTERNAL TABLE

TableFactory позволяет создавать таблицы через SQL CREATE EXTERNAL TABLE:

use datafusion::logical_expr::CreateExternalTable;
use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use async_trait::async_trait;

#[derive(Debug)]
struct MyTableFactory;

#[async_trait]
impl datafusion::catalog::TableProviderFactory for MyTableFactory {
    async fn create(
        &self,
        _state: &SessionState,
        cmd: &CreateExternalTable,
    ) -> datafusion::common::Result<Arc<dyn TableProvider>> {
        // cmd содержит: name, schema, location, file_type, options
        let location = &cmd.location;
        let options = &cmd.options;

        // Создаём TableProvider на основе параметров из SQL
        let connection_string = options.get("connection")
            .ok_or_else(|| datafusion::error::DataFusionError::Plan(
                "Missing 'connection' option".to_string()
            ))?;

        Ok(Arc::new(DatabaseTableProvider::new(
            connection_string.clone(),
            cmd.schema.as_ref().into(),
        )))
    }
}

Регистрация фабрики:

let state = SessionStateBuilder::new()
    .with_default_features()
    .build();
let ctx = SessionContext::new_with_state(state);

// Регистрируем фабрику для типа 'MYDB'
ctx.state().table_factories().insert(
    "MYDB".to_string(),
    Arc::new(MyTableFactory),
);

// Теперь доступен SQL-синтаксис:
ctx.sql("
    CREATE EXTERNAL TABLE remote_users
    STORED AS MYDB
    LOCATION 'users'
    OPTIONS ('connection' 'postgres://localhost/mydb')
").await?;

// Используем как обычную таблицу
ctx.sql("SELECT * FROM remote_users WHERE active = true").await?.show().await?;
NOTE

TableProviderFactory отделяет SQL-синтаксис от реализации. Пользователь пишет стандартный CREATE EXTERNAL TABLE, а фабрика создаёт нужный TableProvider с конфигурацией из OPTIONS.

Lazy-загрузка в SchemaProvider

Для каталогов с тысячами таблиц загрузка всех метаданных при старте неэффективна. Реализуйте lazy loading:

#[async_trait]
impl SchemaProvider for RemoteSchema {
    fn table_names(&self) -> Vec<String> {
        // Кэшированный список — обновляется периодически
        self.cached_names.read().unwrap().clone()
    }

    async fn table(
        &self,
        name: &str,
    ) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
        // Проверяем кэш
        if let Some(cached) = self.cache.read().unwrap().get(name) {
            return Ok(Some(cached.clone()));
        }

        // Lazy load: запрос метаданных из удалённого каталога
        let metadata = self.remote_catalog.get_table_metadata(name).await?;

        match metadata {
            Some(meta) => {
                let provider = Arc::new(self.create_provider(meta)?);
                self.cache.write().unwrap().insert(name.to_string(), provider.clone());
                Ok(Some(provider))
            }
            None => Ok(None),
        }
    }
}

Итоги

  • CatalogProviderSchemaProviderTableProvider — трёхуровневая иерархия с динамической регистрацией
  • Мультитенантные каталоги: каталог на клиента с изолированными namespace
  • ListingTable поддерживает Hive partitioning, predicate pushdown в Parquet и file sort order
  • INFORMATION_SCHEMA автоматически отражает зарегистрированные таблицы и колонки
  • TableProviderFactory включает CREATE EXTERNAL TABLE для пользовательских типов хранилищ
  • Lazy loading в SchemaProvider::table() — для каталогов с тысячами таблиц

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какой порядок разрешения имён использует DataFusion для запроса SELECT * FROM analytics.raw.events?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8