Паттерны расширяемости
В предыдущих уроках мы разобрали отдельные точки расширения: UDF, TableProvider, каталоги, кастомные операторы. В реальных проектах эти механизмы комбинируются для построения специализированных систем. Разберём паттерны расширения на примерах production-проектов.
DataFusion: библиотека vs фреймворк
DataFusion занимает уникальную позицию — это не готовая СУБД, а библиотека для построения query engine. Два паттерна использования:
Различие определяет, где находится «точка сборки»:
- Библиотека: ваш
main()создаётSessionContextи вызывает DataFusion - Фреймворк: DataFusion — основной runtime, вы регистрируете расширения
Spice AI: федеративный аналитический движок
Spice AI использует DataFusion для федеративных запросов через множество источников данных: PostgreSQL, MySQL, DuckDB, S3, Databricks.
Паттерн: кастомные TableProviders на каждый источник
// Каждый коннектор — отдельный TableProvider
struct PostgresTableProvider { pool: PgPool, table: String, schema: SchemaRef }
struct DuckDbTableProvider { conn: DuckDbConnection, query: String }
struct S3ParquetProvider { bucket: String, prefix: String }
// Единый каталог объединяет все источники
struct FederatedCatalog {
providers: HashMap<String, Arc<dyn TableProvider>>,
}
Паттерн: кастомные OptimizerRules
Spice AI добавляет правила оптимизации для federation-aware планирования:
// Правило: pushdown агрегации в удалённый источник
// Вместо: Scan(postgres) → Filter → Aggregate
// Получаем: RemoteAggregate(postgres, "SELECT ... GROUP BY ...")
struct FederationPushdown;
impl OptimizerRule for FederationPushdown {
fn name(&self) -> &str { "federation_pushdown" }
fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
// Если Aggregate над TableScan удалённого источника —
// заменяем на RemoteAggregate
match &plan {
LogicalPlan::Aggregate(agg) => {
if let LogicalPlan::TableScan(scan) = agg.input.as_ref() {
if is_remote_source(&scan.source) {
return Ok(Transformed::yes(
create_remote_aggregate(agg, scan)?
));
}
}
Ok(Transformed::no(plan))
}
_ => Ok(Transformed::no(plan))
}
}
}
Используемые точки расширения
| Точка расширения | Назначение в Spice AI |
|---|---|
TableProvider | Коннекторы к PostgreSQL, MySQL, DuckDB, S3, Databricks |
CatalogProvider | Федеративный каталог, объединяющий все источники |
OptimizerRule | Federation pushdown (агрегации, фильтры, проекции) |
ScalarUDF | Функции для работы с метриками, ML-инференсом |
InfluxDB IOx: time-series движок
InfluxDB IOx (основа InfluxDB 3.0) использует DataFusion как библиотеку, заменяя ключевые компоненты.
Паттерн: собственное хранилище + DataFusion planning
// InfluxDB хранит данные в собственном columnar storage
// TableProvider оборачивает это хранилище
struct InfluxTableProvider {
table_name: String,
schema: SchemaRef,
chunk_store: Arc<ChunkStore>, // Собственный storage layer
}
// scan() возвращает кастомный ExecutionPlan, читающий из chunk storage
async fn scan(&self, ...) -> Result<Arc<dyn ExecutionPlan>> {
let chunks = self.chunk_store.get_chunks(&self.table_name)?;
Ok(Arc::new(InfluxScanExec::new(chunks, projection, filters)))
}
Паттерн: специализированные оптимизации
InfluxDB добавляет time-series-специфичные оптимизации:
- Time range pruning: пропуск чанков вне временного диапазона запроса
- Deduplication: удаление дупликатов по primary key + timestamp
- Compaction-aware scan: чтение из нескольких уровней compaction с правильным merge
// Кастомный физический оператор: дедупликация по PK
struct DeduplicateExec {
input: Arc<dyn ExecutionPlan>,
sort_keys: Vec<PhysicalSortExpr>,
}
// Кастомное правило: вставляет DeduplicateExec после scan
struct InsertDeduplication;
impl PhysicalOptimizerRule for InsertDeduplication {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
// Если scan из InfluxDB и требуется дедупликация —
// оборачиваем в DeduplicateExec
// ...
}
}
GreptimeDB: мультимодальная time-series СУБД
GreptimeDB объединяет метрики, логи и события в единой системе.
Паттерн: multi-engine execution
// GreptimeDB использует DataFusion для SQL,
// но добавляет альтернативные движки для PromQL
struct QueryEngine {
sql_engine: DataFusionEngine, // DataFusion SessionContext
promql_engine: PromQLEngine, // Собственный PromQL-парсер
}
// DataFusion SessionContext настроен с:
// - Кастомным CatalogProvider (собственный metadata store)
// - Кастомными TableProviders (собственный storage engine)
// - Кастомными OptimizerRules (time-series-специфичные)
// - Кастомными PhysicalOptimizerRules (distributed execution)
Используемые точки расширения
| Точка расширения | Назначение в GreptimeDB |
|---|---|
CatalogProvider | Собственный metadata store вместо in-memory каталога |
TableProvider | Собственный SSTable-based storage engine |
UserDefinedLogicalNode | Distributed scan, distributed aggregate |
ExtensionPlanner | Планирование распределённого выполнения |
PhysicalOptimizerRule | Оптимизация shuffle, pipeline-breaking |
GlareDB: федеративная SQL СУБД
GlareDB подключает 30+ источников данных (PostgreSQL, MongoDB, BigQuery, Snowflake, Excel) через единый SQL-интерфейс.
Паттерн: динамический каталог с auto-discovery
// CatalogProvider подключается к удалённому источнику
// и автоматически обнаруживает таблицы
struct RemoteCatalog {
connector: Box<dyn Connector>,
}
impl SchemaProvider for RemoteSchema {
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
// Lazy: загружаем метаданные таблицы при первом обращении
let columns = self.connector.describe_table(name).await?;
let provider = RemoteTableProvider::new(self.connector.clone(), name, columns);
Ok(Some(Arc::new(provider)))
}
fn table_names(&self) -> Vec<String> {
// Периодически обновляемый кэш имён
self.cached_table_names.read().unwrap().clone()
}
}
Паттерн: tunnel-based filter pushdown
// GlareDB транслирует SQL-фильтры в нативные запросы источника
impl TableProvider for MongoTableProvider {
fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<TableProviderFilterPushDown>> {
filters.iter().map(|f| {
if can_translate_to_mongo_query(f) {
Ok(TableProviderFilterPushDown::Exact)
} else {
Ok(TableProviderFilterPushDown::Unsupported)
}
}).collect()
}
async fn scan(&self, ..., filters: &[Expr], ...) -> Result<Arc<dyn ExecutionPlan>> {
// Транслируем DataFusion Expr в MongoDB query
let mongo_filter = translate_to_bson(filters)?;
// { "$and": [{ "status": { "$eq": "active" } }] }
Ok(Arc::new(MongoScanExec::new(self.collection.clone(), mongo_filter)))
}
}
Матрица выбора точки расширения
Версионирование и стабильность API
DataFusion развивается быстро — мажорные релизы каждые 1–2 месяца. Рекомендации:
Управление зависимостями
# Cargo.toml — фиксируйте patch-версию
[dependencies]
datafusion = "53.0"
arrow = "54.1"
# НЕ используйте:
# datafusion = "53" — может получить breaking changes в 53.x
DataFusion до версии 1.0 не гарантирует стабильность API между мажорными версиями. При обновлении с 52.x на 53.x проверяйте CHANGELOG и Migration Guide. Основные точки разрыва: сигнатуры trait-методов (invoke → invoke_with_args), конструкторы (SessionState::new → SessionStateBuilder), и PlanProperties API.
Паттерн: абстракция над DataFusion API
// Собственный trait абстрагирует от деталей DataFusion API
trait MyQueryEngine {
fn register_source(&self, name: &str, source: Box<dyn MyDataSource>) -> Result<()>;
fn query(&self, sql: &str) -> Result<Vec<RecordBatch>>;
}
// Реализация использует DataFusion — но интерфейс стабилен
struct DataFusionEngine {
ctx: SessionContext,
}
impl MyQueryEngine for DataFusionEngine {
fn register_source(&self, name: &str, source: Box<dyn MyDataSource>) -> Result<()> {
// Оборачиваем MyDataSource в TableProvider
let provider = MyTableProviderAdapter::new(source);
self.ctx.register_table(name, Arc::new(provider))
}
fn query(&self, sql: &str) -> Result<Vec<RecordBatch>> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
let df = self.ctx.sql(sql).await?;
df.collect().await
})
}
}
Абстракция над DataFusion API позволяет обновлять версию DataFusion, меняя только адаптер. Остальной код вашей системы работает через стабильный внутренний trait.
Итоги
- DataFusion используется как библиотека (InfluxDB, GreptimeDB) или фреймворк (GlareDB, Spice AI)
- Spice AI: федеративные коннекторы через TableProvider + federation pushdown через OptimizerRule
- InfluxDB IOx: собственное хранилище + DataFusion для SQL-планирования и оптимизации
- GreptimeDB: multi-engine (SQL + PromQL) с distributed execution через Extension nodes
- GlareDB: динамический каталог с auto-discovery и filter pushdown в нативные запросы
- Матрица выбора: начинайте с TableProvider/UDF, переходите к advanced points по необходимости