Learning Platform
Глоссарий Troubleshooting
Урок 06.06 · 16 мин
Продвинутый
Spice AIInfluxDBGreptimeDBGlareDBextensibility patternslibrary vs frameworkdecision matrix

Паттерны расширяемости

В предыдущих уроках мы разобрали отдельные точки расширения: UDF, TableProvider, каталоги, кастомные операторы. В реальных проектах эти механизмы комбинируются для построения специализированных систем. Разберём паттерны расширения на примерах production-проектов.

DataFusion: библиотека vs фреймворк

DataFusion занимает уникальную позицию — это не готовая СУБД, а библиотека для построения query engine. Два паттерна использования:

Библиотека vs Фреймворк
Библиотека (Embedding)DataFusion как библиотека: ваш main() создаёт SessionContext — полный контроль над хранением и сетью
Фреймворк (Extension)DataFusion как фреймворк: он управляет выполнением, вы подключаете источники и расширения

Различие определяет, где находится «точка сборки»:

  • Библиотека: ваш main() создаёт SessionContext и вызывает DataFusion
  • Фреймворк: DataFusion — основной runtime, вы регистрируете расширения

Spice AI: федеративный аналитический движок

Spice AI использует DataFusion для федеративных запросов через множество источников данных: PostgreSQL, MySQL, DuckDB, S3, Databricks.

Паттерн: кастомные TableProviders на каждый источник

// Каждый коннектор — отдельный TableProvider
struct PostgresTableProvider { pool: PgPool, table: String, schema: SchemaRef }
struct DuckDbTableProvider { conn: DuckDbConnection, query: String }
struct S3ParquetProvider { bucket: String, prefix: String }

// Единый каталог объединяет все источники
struct FederatedCatalog {
    providers: HashMap<String, Arc<dyn TableProvider>>,
}

Паттерн: кастомные OptimizerRules

Spice AI добавляет правила оптимизации для federation-aware планирования:

// Правило: pushdown агрегации в удалённый источник
// Вместо: Scan(postgres) → Filter → Aggregate
// Получаем: RemoteAggregate(postgres, "SELECT ... GROUP BY ...")
struct FederationPushdown;

impl OptimizerRule for FederationPushdown {
    fn name(&self) -> &str { "federation_pushdown" }

    fn rewrite(
        &self,
        plan: LogicalPlan,
        _config: &dyn OptimizerConfig,
    ) -> Result<Transformed<LogicalPlan>> {
        // Если Aggregate над TableScan удалённого источника —
        // заменяем на RemoteAggregate
        match &plan {
            LogicalPlan::Aggregate(agg) => {
                if let LogicalPlan::TableScan(scan) = agg.input.as_ref() {
                    if is_remote_source(&scan.source) {
                        return Ok(Transformed::yes(
                            create_remote_aggregate(agg, scan)?
                        ));
                    }
                }
                Ok(Transformed::no(plan))
            }
            _ => Ok(Transformed::no(plan))
        }
    }
}

Используемые точки расширения

Точка расширенияНазначение в Spice AI
TableProviderКоннекторы к PostgreSQL, MySQL, DuckDB, S3, Databricks
CatalogProviderФедеративный каталог, объединяющий все источники
OptimizerRuleFederation pushdown (агрегации, фильтры, проекции)
ScalarUDFФункции для работы с метриками, ML-инференсом

InfluxDB IOx: time-series движок

InfluxDB IOx (основа InfluxDB 3.0) использует DataFusion как библиотеку, заменяя ключевые компоненты.

Паттерн: собственное хранилище + DataFusion planning

// InfluxDB хранит данные в собственном columnar storage
// TableProvider оборачивает это хранилище
struct InfluxTableProvider {
    table_name: String,
    schema: SchemaRef,
    chunk_store: Arc<ChunkStore>,  // Собственный storage layer
}

// scan() возвращает кастомный ExecutionPlan, читающий из chunk storage
async fn scan(&self, ...) -> Result<Arc<dyn ExecutionPlan>> {
    let chunks = self.chunk_store.get_chunks(&self.table_name)?;
    Ok(Arc::new(InfluxScanExec::new(chunks, projection, filters)))
}

Паттерн: специализированные оптимизации

InfluxDB добавляет time-series-специфичные оптимизации:

  • Time range pruning: пропуск чанков вне временного диапазона запроса
  • Deduplication: удаление дупликатов по primary key + timestamp
  • Compaction-aware scan: чтение из нескольких уровней compaction с правильным merge
// Кастомный физический оператор: дедупликация по PK
struct DeduplicateExec {
    input: Arc<dyn ExecutionPlan>,
    sort_keys: Vec<PhysicalSortExpr>,
}

// Кастомное правило: вставляет DeduplicateExec после scan
struct InsertDeduplication;
impl PhysicalOptimizerRule for InsertDeduplication {
    fn optimize(
        &self,
        plan: Arc<dyn ExecutionPlan>,
        _config: &ConfigOptions,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // Если scan из InfluxDB и требуется дедупликация —
        // оборачиваем в DeduplicateExec
        // ...
    }
}

GreptimeDB: мультимодальная time-series СУБД

GreptimeDB объединяет метрики, логи и события в единой системе.

Паттерн: multi-engine execution

// GreptimeDB использует DataFusion для SQL,
// но добавляет альтернативные движки для PromQL
struct QueryEngine {
    sql_engine: DataFusionEngine,      // DataFusion SessionContext
    promql_engine: PromQLEngine,       // Собственный PromQL-парсер
}

// DataFusion SessionContext настроен с:
// - Кастомным CatalogProvider (собственный metadata store)
// - Кастомными TableProviders (собственный storage engine)
// - Кастомными OptimizerRules (time-series-специфичные)
// - Кастомными PhysicalOptimizerRules (distributed execution)

Используемые точки расширения

Точка расширенияНазначение в GreptimeDB
CatalogProviderСобственный metadata store вместо in-memory каталога
TableProviderСобственный SSTable-based storage engine
UserDefinedLogicalNodeDistributed scan, distributed aggregate
ExtensionPlannerПланирование распределённого выполнения
PhysicalOptimizerRuleОптимизация shuffle, pipeline-breaking

GlareDB: федеративная SQL СУБД

GlareDB подключает 30+ источников данных (PostgreSQL, MongoDB, BigQuery, Snowflake, Excel) через единый SQL-интерфейс.

Паттерн: динамический каталог с auto-discovery

// CatalogProvider подключается к удалённому источнику
// и автоматически обнаруживает таблицы
struct RemoteCatalog {
    connector: Box<dyn Connector>,
}

impl SchemaProvider for RemoteSchema {
    async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
        // Lazy: загружаем метаданные таблицы при первом обращении
        let columns = self.connector.describe_table(name).await?;
        let provider = RemoteTableProvider::new(self.connector.clone(), name, columns);
        Ok(Some(Arc::new(provider)))
    }

    fn table_names(&self) -> Vec<String> {
        // Периодически обновляемый кэш имён
        self.cached_table_names.read().unwrap().clone()
    }
}

Паттерн: tunnel-based filter pushdown

// GlareDB транслирует SQL-фильтры в нативные запросы источника
impl TableProvider for MongoTableProvider {
    fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<TableProviderFilterPushDown>> {
        filters.iter().map(|f| {
            if can_translate_to_mongo_query(f) {
                Ok(TableProviderFilterPushDown::Exact)
            } else {
                Ok(TableProviderFilterPushDown::Unsupported)
            }
        }).collect()
    }

    async fn scan(&self, ..., filters: &[Expr], ...) -> Result<Arc<dyn ExecutionPlan>> {
        // Транслируем DataFusion Expr в MongoDB query
        let mongo_filter = translate_to_bson(filters)?;
        // { "$and": [{ "status": { "$eq": "active" } }] }
        Ok(Arc::new(MongoScanExec::new(self.collection.clone(), mongo_filter)))
    }
}

Матрица выбора точки расширения

Когда использовать какую точку расширения
Задача → Точка расширенияМатрица выбора: начинайте с TableProvider/UDF, переходите к advanced points по необходимости

Версионирование и стабильность API

DataFusion развивается быстро — мажорные релизы каждые 1–2 месяца. Рекомендации:

Управление зависимостями

# Cargo.toml — фиксируйте patch-версию
[dependencies]
datafusion = "53.0"
arrow = "54.1"

# НЕ используйте:
# datafusion = "53"  — может получить breaking changes в 53.x
WARNING

DataFusion до версии 1.0 не гарантирует стабильность API между мажорными версиями. При обновлении с 52.x на 53.x проверяйте CHANGELOG и Migration Guide. Основные точки разрыва: сигнатуры trait-методов (invokeinvoke_with_args), конструкторы (SessionState::newSessionStateBuilder), и PlanProperties API.

Паттерн: абстракция над DataFusion API

// Собственный trait абстрагирует от деталей DataFusion API
trait MyQueryEngine {
    fn register_source(&self, name: &str, source: Box<dyn MyDataSource>) -> Result<()>;
    fn query(&self, sql: &str) -> Result<Vec<RecordBatch>>;
}

// Реализация использует DataFusion — но интерфейс стабилен
struct DataFusionEngine {
    ctx: SessionContext,
}

impl MyQueryEngine for DataFusionEngine {
    fn register_source(&self, name: &str, source: Box<dyn MyDataSource>) -> Result<()> {
        // Оборачиваем MyDataSource в TableProvider
        let provider = MyTableProviderAdapter::new(source);
        self.ctx.register_table(name, Arc::new(provider))
    }

    fn query(&self, sql: &str) -> Result<Vec<RecordBatch>> {
        let rt = tokio::runtime::Runtime::new()?;
        rt.block_on(async {
            let df = self.ctx.sql(sql).await?;
            df.collect().await
        })
    }
}
TIP

Абстракция над DataFusion API позволяет обновлять версию DataFusion, меняя только адаптер. Остальной код вашей системы работает через стабильный внутренний trait.

Итоги

  • DataFusion используется как библиотека (InfluxDB, GreptimeDB) или фреймворк (GlareDB, Spice AI)
  • Spice AI: федеративные коннекторы через TableProvider + federation pushdown через OptimizerRule
  • InfluxDB IOx: собственное хранилище + DataFusion для SQL-планирования и оптимизации
  • GreptimeDB: multi-engine (SQL + PromQL) с distributed execution через Extension nodes
  • GlareDB: динамический каталог с auto-discovery и filter pushdown в нативные запросы
  • Матрица выбора: начинайте с TableProvider/UDF, переходите к advanced points по необходимости

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. В чём ключевая разница между использованием DataFusion как библиотеки (embedding) и как фреймворка (extension)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8