Скалярные UDF на Rust
Мы познакомились с пользовательскими функциями в модуле 04, где создавали UDF на Python. Python API удобен для прототипирования, но для production-расширений DataFusion предоставляет полноценный Rust API с контролем над типизацией, производительностью и интеграцией с оптимизатором.
В этом уроке реализуем скалярный UDF с нуля через trait ScalarUDFImpl, разберём систему сигнатур и зарегистрируем функцию для использования в SQL.
Архитектура UDF в DataFusion
Скалярный UDF принимает одну или несколько колонок и возвращает значение для каждой строки. DataFusion вызывает его на уровне RecordBatch — функция получает массивы Arrow, а не отдельные значения.
Ключевое отличие от Python UDF: Rust-реализация работает напрямую с Arrow-массивами без сериализации, что даёт на порядок лучшую производительность.
Trait ScalarUDFImpl
ScalarUDFImpl — основной trait для скалярных функций. Минимальная реализация требует четыре метода:
use datafusion::logical_expr::{ScalarUDFImpl, Signature, Volatility};
use datafusion::arrow::datatypes::DataType;
use datafusion::common::Result;
use datafusion::logical_expr::ColumnarValue;
use std::any::Any;
#[derive(Debug)]
struct SquareUdf;
impl ScalarUDFImpl for SquareUdf {
fn as_any(&self) -> &dyn Any { self }
fn name(&self) -> &str { "square" }
fn signature(&self) -> &Signature {
// Ленивая инициализация через static или хранение в struct
&Signature::uniform(1, vec![DataType::Float64, DataType::Int64], Volatility::Immutable)
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
// Тип результата зависит от типа входного аргумента
Ok(arg_types[0].clone())
}
fn invoke_with_args(
&self,
args: datafusion::logical_expr::ScalarFunctionArgs,
) -> Result<ColumnarValue> {
let input = &args.args[0];
match input {
ColumnarValue::Array(array) => {
let float_array = array
.as_any()
.downcast_ref::<datafusion::arrow::array::Float64Array>()
.expect("expected Float64Array");
let result: datafusion::arrow::array::Float64Array = float_array
.iter()
.map(|v| v.map(|x| x * x))
.collect();
Ok(ColumnarValue::Array(std::sync::Arc::new(result)))
}
ColumnarValue::Scalar(value) => {
// Для скалярных значений — обработка без создания массива
let v = value.to_f64().unwrap_or(0.0);
Ok(ColumnarValue::Scalar(
datafusion::common::ScalarValue::Float64(Some(v * v))
))
}
}
}
}
Метод invoke() устарел в DataFusion 52.x и в актуальной 53.0.0 (релиз 2026-04-02). Используйте invoke_with_args(ScalarFunctionArgs), который предоставляет расширенный контекст: аргументы, количество строк и информацию о типах возвращаемого значения.
Signature: система сигнатур
Signature описывает, какие типы аргументов принимает функция. Оптимизатор использует сигнатуру для валидации вызовов на этапе планирования.
Пример: функция с несколькими перегрузками
use datafusion::logical_expr::{Signature, Volatility, TypeSignature};
fn multi_signature() -> Signature {
Signature::one_of(
vec![
// concat(Utf8, Utf8) → Utf8
TypeSignature::Exact(vec![DataType::Utf8, DataType::Utf8]),
// concat(Utf8, Utf8, Utf8) → Utf8
TypeSignature::Exact(vec![DataType::Utf8, DataType::Utf8, DataType::Utf8]),
// concat(Utf8...) → Utf8
TypeSignature::Variadic(vec![DataType::Utf8]),
],
Volatility::Immutable,
)
}
Volatility: влияние на оптимизацию
Мы уже видели Volatility в модуле 04. В Rust API это enum, влияющий на оптимизатор:
| Уровень | Поведение | Примеры |
|---|---|---|
Immutable | Одинаковый вход — всегда одинаковый выход. Оптимизатор может вычислить на этапе планирования | upper(), abs(), length() |
Stable | Постоянный в пределах одного запроса. Нельзя вычислить заранее, но можно кэшировать | current_date(), конфигурационные функции |
Volatile | Может меняться между вызовами. Никакого кэширования | random(), now(), uuid() |
Выбирайте наиболее строгий уровень Volatility, который соответствует семантике вашей функции. Immutable даёт оптимизатору максимальную свободу — например, WHERE square(3) > 5 может быть вычислен на этапе планирования без сканирования данных.
Регистрация и использование
Способ 1: create_udf (convenience-функция)
Для простых случаев DataFusion предоставляет create_udf, не требующий реализации trait:
use datafusion::logical_expr::create_udf;
use datafusion::logical_expr::{Volatility, ColumnarValue};
use datafusion::arrow::datatypes::DataType;
use std::sync::Arc;
let square_udf = create_udf(
"square",
vec![DataType::Float64], // Типы аргументов
DataType::Float64, // Тип результата
Volatility::Immutable,
Arc::new(|args: &[ColumnarValue]| {
// Реализация как замыкание
match &args[0] {
ColumnarValue::Array(array) => {
let input = array.as_any()
.downcast_ref::<datafusion::arrow::array::Float64Array>()
.unwrap();
let result: datafusion::arrow::array::Float64Array =
input.iter().map(|v| v.map(|x| x * x)).collect();
Ok(ColumnarValue::Array(Arc::new(result)))
}
_ => unimplemented!("scalar case"),
}
}),
);
Функция create_udf считается устаревшей начиная с DataFusion 42.x. Для новых проектов используйте ScalarUDF::new_from_impl(impl ScalarUDFImpl) — он даёт полный контроль над сигнатурой, типизацией и поведением оптимизатора. create_udf сохранена только для обратной совместимости.
Способ 2: ScalarUDF::new_from_impl (полный контроль)
Для production-функций используйте trait ScalarUDFImpl:
use datafusion::logical_expr::ScalarUDF;
use datafusion::prelude::SessionContext;
let udf = ScalarUDF::new_from_impl(SquareUdf);
let ctx = SessionContext::new();
// Регистрация для SQL
ctx.register_udf(udf.clone());
// Использование в SQL
let df = ctx.sql("SELECT square(price) AS price_sq FROM products").await?;
df.show().await?;
// Использование в DataFrame API
use datafusion::prelude::col;
let df = ctx.table("products").await?
.select(vec![udf.call(vec![col("price")])])?;
Полный пример: функция геохеширования
use datafusion::logical_expr::{ScalarUDFImpl, Signature, Volatility, ScalarUDF, ColumnarValue};
use datafusion::arrow::array::{Float64Array, StringArray};
use datafusion::arrow::datatypes::DataType;
use datafusion::common::Result;
use std::any::Any;
use std::sync::Arc;
#[derive(Debug)]
struct GeohashUdf {
signature: Signature,
}
impl GeohashUdf {
fn new() -> Self {
Self {
signature: Signature::exact(
vec![DataType::Float64, DataType::Float64], // lat, lon
Volatility::Immutable,
),
}
}
fn encode_geohash(lat: f64, lon: f64, precision: usize) -> String {
// Упрощённая реализация для примера
let base32 = "0123456789bcdefghjkmnpqrstuvwxyz";
let mut min_lat = -90.0_f64;
let mut max_lat = 90.0;
let mut min_lon = -180.0_f64;
let mut max_lon = 180.0;
let mut hash = String::with_capacity(precision);
let mut is_lon = true;
let mut bit = 0;
let mut ch = 0u8;
while hash.len() < precision {
if is_lon {
let mid = (min_lon + max_lon) / 2.0;
if lon >= mid { ch |= 1 << (4 - bit); min_lon = mid; }
else { max_lon = mid; }
} else {
let mid = (min_lat + max_lat) / 2.0;
if lat >= mid { ch |= 1 << (4 - bit); min_lat = mid; }
else { max_lat = mid; }
}
is_lon = !is_lon;
bit += 1;
if bit == 5 {
hash.push(base32.as_bytes()[ch as usize] as char);
bit = 0;
ch = 0;
}
}
hash
}
}
impl ScalarUDFImpl for GeohashUdf {
fn as_any(&self) -> &dyn Any { self }
fn name(&self) -> &str { "geohash" }
fn signature(&self) -> &Signature { &self.signature }
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8)
}
fn invoke_with_args(
&self,
args: datafusion::logical_expr::ScalarFunctionArgs,
) -> Result<ColumnarValue> {
let lat_array = args.args[0].clone().into_array(1)?;
let lon_array = args.args[1].clone().into_array(1)?;
let lats = lat_array.as_any().downcast_ref::<Float64Array>().unwrap();
let lons = lon_array.as_any().downcast_ref::<Float64Array>().unwrap();
let result: StringArray = lats.iter().zip(lons.iter())
.map(|(lat, lon)| match (lat, lon) {
(Some(la), Some(lo)) => Some(Self::encode_geohash(la, lo, 6)),
_ => None,
})
.collect();
Ok(ColumnarValue::Array(Arc::new(result)))
}
}
// Регистрация
async fn register_geohash(ctx: &SessionContext) -> Result<()> {
let udf = ScalarUDF::new_from_impl(GeohashUdf::new());
ctx.register_udf(udf);
Ok(())
}
Обратите внимание на паттерн: сигнатура хранится в struct и возвращается по ссылке из signature(). Это избегает пересоздания Signature на каждый вызов — важно для производительности в hot path.
return_type_from_exprs: продвинутая типизация
Для функций, где тип результата зависит от контекста (не только от типов аргументов), переопределите return_type_from_exprs:
fn return_type_from_exprs(
&self,
args: &[datafusion::logical_expr::Expr],
schema: &dyn datafusion::logical_expr::expr_schema::ExprSchemable,
arg_types: &[DataType],
) -> Result<DataType> {
// Например: coalesce возвращает тип первого non-null аргумента
for dt in arg_types {
if *dt != DataType::Null {
return Ok(dt.clone());
}
}
Ok(DataType::Null)
}
Итоги
-
ScalarUDFImpl— основной trait для скалярных UDF в Rust с методамиname,signature,return_type,invoke_with_args -
invoke_with_args(ScalarFunctionArgs)— актуальный метод выполнения (не устаревшийinvoke) -
Signatureописывает допустимые типы:Uniform,Exact,Variadic,OneOf -
Volatility(Immutable/Stable/Volatile) управляет оптимизацией на этапе планирования -
create_udf— быстрый способ для простых случаев,ScalarUDF::new_from_impl— для production - Rust UDF работает напрямую с Arrow-массивами без сериализации — на порядок быстрее Python UDF