Продвинутые точки расширения
UDF, TableProvider и CatalogProvider покрывают большинство задач расширения. Но DataFusion предоставляет ещё шесть точек расширения для сценариев, где стандартных механизмов недостаточно: собственные логические операторы, кастомное планирование, расширение синтаксиса SQL, табличные функции и пользовательские форматы файлов.
UserDefinedLogicalNode: собственные операторы
Когда ни один встроенный LogicalPlan-узел не подходит для вашей логики, создайте собственный через UserDefinedLogicalNodeCore:
use datafusion::logical_expr::{UserDefinedLogicalNodeCore, LogicalPlan};
use datafusion::logical_expr::Expr;
use datafusion::common::DFSchemaRef;
use std::fmt;
use std::hash::{Hash, Hasher};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct TopKNode {
k: usize,
order_by: Expr,
input: LogicalPlan,
}
impl UserDefinedLogicalNodeCore for TopKNode {
fn name(&self) -> &str { "TopK" }
fn inputs(&self) -> Vec<&LogicalPlan> {
vec![&self.input]
}
fn schema(&self) -> &DFSchemaRef {
self.input.schema()
}
fn expressions(&self) -> Vec<Expr> {
vec![self.order_by.clone()]
}
fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "TopK: k={}, order_by={}", self.k, self.order_by)
}
fn with_exprs_and_inputs(
&self,
exprs: Vec<Expr>,
inputs: Vec<LogicalPlan>,
) -> datafusion::common::Result<Self> {
Ok(TopKNode {
k: self.k,
order_by: exprs[0].clone(),
input: inputs[0].clone(),
})
}
}
UserDefinedLogicalNodeCore — рекомендуемый trait вместо более старого UserDefinedLogicalNode. Он автоматически реализует UserDefinedLogicalNode и требует Clone + PartialEq + Eq + Hash, что позволяет оптимизатору сравнивать и кэшировать узлы.
Использование в плане:
use datafusion::logical_expr::LogicalPlan;
use datafusion::logical_expr::Extension;
fn create_topk_plan(input: LogicalPlan, k: usize, order_by: Expr) -> LogicalPlan {
LogicalPlan::Extension(Extension {
node: Arc::new(TopKNode { k, order_by, input }),
})
}
ExtensionPlanner: физическое планирование
UserDefinedLogicalNode описывает логику. Для выполнения нужен физический план. ExtensionPlanner преобразует логические Extension-узлы в ExecutionPlan:
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion::execution::context::SessionState;
use async_trait::async_trait;
struct TopKPlanner;
#[async_trait]
impl ExtensionPlanner for TopKPlanner {
async fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
_logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
_session_state: &SessionState,
) -> datafusion::common::Result<Option<Arc<dyn ExecutionPlan>>> {
// Проверяем, что это наш узел
if let Some(topk) = node.as_any().downcast_ref::<TopKNode>() {
let input = physical_inputs[0].clone();
let exec = TopKExec::new(input, topk.k, topk.order_by.clone());
Ok(Some(Arc::new(exec)))
} else {
// Не наш узел — пропускаем
Ok(None)
}
}
}
Регистрация планировщика:
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::SessionContext;
let state = SessionStateBuilder::new()
.with_default_features()
.with_physical_optimizer_rules(vec![]) // Можно добавить свои правила
.build();
// ExtensionPlanner добавляется к SessionState
// через SessionStateBuilder или кастомный PhysicalPlanner
let ctx = SessionContext::new_with_state(state);
ExprPlanner: расширение SQL-выражений
ExprPlanner позволяет перехватывать и трансформировать SQL-выражения, которые стандартный планировщик не понимает:
use datafusion::logical_expr::planner::ExprPlanner;
use datafusion::logical_expr::Expr;
use datafusion::common::Result;
use datafusion::logical_expr::sqlparser::ast as sql_ast;
#[derive(Debug)]
struct JsonExprPlanner;
impl ExprPlanner for JsonExprPlanner {
fn plan_binary_op(
&self,
expr: datafusion::logical_expr::planner::RawBinaryExpr,
) -> Result<datafusion::logical_expr::planner::PlannerResult<datafusion::logical_expr::planner::RawBinaryExpr>> {
// Перехватываем оператор -> для JSON-доступа
// data -> 'key' преобразуется в json_get(data, 'key')
match &expr.op {
datafusion::logical_expr::Operator::ArrowAt => {
let func = datafusion::logical_expr::expr::ScalarFunction::new_udf(
json_get_udf(), // Наш UDF для JSON
vec![expr.left.clone(), expr.right.clone()],
);
Ok(datafusion::logical_expr::planner::PlannerResult::Planned(
Expr::ScalarFunction(func)
))
}
_ => Ok(datafusion::logical_expr::planner::PlannerResult::Original(expr)),
}
}
}
ExprPlanner идеален для добавления синтаксического сахара: JSON-операторы (->), регулярные выражения (~), геопространственные операции. Стандартный SQL-парсер уже знает эти токены — ExprPlanner определяет их семантику.
FunctionFactory: CREATE FUNCTION
FunctionFactory обрабатывает SQL-команду CREATE FUNCTION, создавая UDF динамически:
use datafusion::logical_expr::CreateFunction;
use datafusion::execution::context::FunctionFactory;
use datafusion::logical_expr::ScalarUDF;
use async_trait::async_trait;
#[derive(Debug)]
struct WasmFunctionFactory;
#[async_trait]
impl FunctionFactory for WasmFunctionFactory {
type Context = SessionState;
async fn create(
&self,
_state: &SessionState,
statement: CreateFunction,
) -> datafusion::common::Result<RegisterFunction> {
// statement.name — имя функции
// statement.args — аргументы
// statement.params.as_ — тело функции (LANGUAGE, AS, ...)
let func_name = &statement.name;
let body = statement.params.as_.as_ref()
.ok_or_else(|| datafusion::error::DataFusionError::Plan(
"CREATE FUNCTION requires AS clause".to_string()
))?;
// Компилируем WASM-модуль в UDF
let udf = compile_wasm_to_udf(func_name, body)?;
Ok(RegisterFunction::Scalar(Arc::new(udf)))
}
}
После регистрации фабрики:
-- Создаёт UDF из WASM-модуля
CREATE FUNCTION double_value(x DOUBLE)
RETURNS DOUBLE
LANGUAGE WASM
AS 'module.wasm:double';
-- Используем как обычную функцию
SELECT double_value(price) FROM products;
TableFunctionImpl: табличные функции (UDTF)
Табличные функции возвращают таблицу вместо скалярного значения. Используются в FROM clause:
use datafusion::logical_expr::TableFunctionImpl;
use datafusion::datasource::TableProvider;
use datafusion::logical_expr::Expr;
#[derive(Debug)]
struct GenerateSeriesFunction;
impl TableFunctionImpl for GenerateSeriesFunction {
fn call(
&self,
args: &[Expr],
) -> datafusion::common::Result<Arc<dyn TableProvider>> {
// Извлекаем аргументы: start, stop, step
let start = extract_literal_i64(&args[0])?;
let stop = extract_literal_i64(&args[1])?;
let step = if args.len() > 2 {
extract_literal_i64(&args[2])?
} else {
1
};
// Генерируем данные
let values: Vec<i64> = (start..=stop).step_by(step as usize).collect();
let schema = Arc::new(Schema::new(vec![
Field::new("value", DataType::Int64, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int64Array::from(values))],
)?;
Ok(Arc::new(MemTable::try_new(schema, vec![vec![batch]])?))
}
}
// Регистрация
let ctx = SessionContext::new();
ctx.register_udtf("generate_series", Arc::new(GenerateSeriesFunction));
// Использование в SQL
let df = ctx.sql("SELECT * FROM generate_series(1, 100, 5)").await?;
UDTF отличается от UDF тем, что возвращает набор строк, а не одно значение. Это аналог generate_series() в PostgreSQL или UNNEST(). В DataFusion UDTF создаёт TableProvider, встраиваемый в план как обычная таблица.
FileFormatFactory: пользовательские форматы
FileFormatFactory регистрирует поддержку новых файловых форматов:
use datafusion::datasource::file_format::FileFormatFactory;
use datafusion::datasource::file_format::FileFormat;
#[derive(Debug)]
struct MsgPackFormatFactory;
impl FileFormatFactory for MsgPackFormatFactory {
fn create(
&self,
_state: &SessionState,
_options: &HashMap<String, String>,
) -> datafusion::common::Result<Arc<dyn FileFormat>> {
Ok(Arc::new(MsgPackFormat::default()))
}
fn default_file_extension(&self) -> &str {
".msgpack"
}
}
После регистрации ListingTable и CREATE EXTERNAL TABLE поддерживают новый формат:
CREATE EXTERNAL TABLE telemetry
STORED AS MSGPACK
LOCATION 's3://bucket/telemetry/';
Карта точек расширения
Комбинирование точек расширения
В реальных проектах точки расширения комбинируются. Пример: JSON-аналитический движок:
// 1. ExprPlanner: поддержка JSON-операторов в SQL
// data -> 'key' → json_get(data, key)
let expr_planner = JsonExprPlanner;
// 2. ScalarUDF: реализация json_get, json_array_length, json_path
let json_get = ScalarUDF::new_from_impl(JsonGetUdf::new());
let json_len = ScalarUDF::new_from_impl(JsonArrayLengthUdf::new());
// 3. TableProvider: чтение NDJSON из S3
let json_source = Arc::new(NdJsonS3Provider::new("s3://logs/"));
// 4. UserDefinedLogicalNode: оптимизированный JSON-путь scan
// вместо scan + json_get → специальный JsonScan с path pushdown
let json_scan_optimizer = JsonScanOptimizer;
// 5. Собираем всё в SessionContext
let state = SessionStateBuilder::new()
.with_default_features()
.with_expr_planners(vec![Arc::new(expr_planner)])
.build();
let ctx = SessionContext::new_with_state(state);
ctx.register_udf(json_get);
ctx.register_udf(json_len);
ctx.register_table("logs", json_source)?;
Комбинирование многих точек расширения увеличивает сложность отладки. Начинайте с UDF и TableProvider — они покрывают 90% задач. Переходите к UserDefinedLogicalNode и ExtensionPlanner только когда стандартная оптимизация не справляется с вашим use case.
Итоги
-
UserDefinedLogicalNodeCore— собственные логические операторы сClone + Eq + Hash -
ExtensionPlannerпреобразует кастомные логические узлы в физическиеExecutionPlan -
ExprPlannerперехватывает SQL-выражения для добавления новых операторов -
FunctionFactoryобрабатываетCREATE FUNCTIONдля динамического создания UDF -
TableFunctionImplсоздаёт табличные функции (UDTF), возвращающие набор строк -
FileFormatFactoryрегистрирует поддержку новых файловых форматов дляListingTable