Learning Platform
Глоссарий Troubleshooting
Урок 06.01 · 18 мин
Продвинутый
ScalarUDFImplUDFSignatureVolatilitycreate_udfinvoke_with_argsScalarFunctionArgs

Скалярные UDF на Rust

Мы познакомились с пользовательскими функциями в модуле 04, где создавали UDF на Python. Python API удобен для прототипирования, но для production-расширений DataFusion предоставляет полноценный Rust API с контролем над типизацией, производительностью и интеграцией с оптимизатором.

В этом уроке реализуем скалярный UDF с нуля через trait ScalarUDFImpl, разберём систему сигнатур и зарегистрируем функцию для использования в SQL.

Архитектура UDF в DataFusion

Скалярный UDF принимает одну или несколько колонок и возвращает значение для каждой строки. DataFusion вызывает его на уровне RecordBatch — функция получает массивы Arrow, а не отдельные значения.

UDF Execution Flow
SQL: SELECT my_func(col_a, col_b) FROM tSQL-запрос с вызовом пользовательской функции запускает процесс поиска и выполнения UDF
Каталог функций
ScalarUDF lookup по имениКаталог функций SessionContext ищет зарегистрированный UDF по имени из SQL-запроса
Физическое выполнение
invoke_with_args(ScalarFunctionArgs)Метод invoke_with_args получает колонки как Arrow-массивы и выполняет vectorized-вычисление
Vectorized execution
Arrow Array (результат)Результат — Arrow-массив той же длины, что и входные данные, готовый для дальнейшей обработки

Ключевое отличие от Python UDF: Rust-реализация работает напрямую с Arrow-массивами без сериализации, что даёт на порядок лучшую производительность.

Trait ScalarUDFImpl

ScalarUDFImpl — основной trait для скалярных функций. Минимальная реализация требует четыре метода:

use datafusion::logical_expr::{ScalarUDFImpl, Signature, Volatility};
use datafusion::arrow::datatypes::DataType;
use datafusion::common::Result;
use datafusion::logical_expr::ColumnarValue;
use std::any::Any;

#[derive(Debug)]
struct SquareUdf;

impl ScalarUDFImpl for SquareUdf {
    fn as_any(&self) -> &dyn Any { self }

    fn name(&self) -> &str { "square" }

    fn signature(&self) -> &Signature {
        // Ленивая инициализация через static или хранение в struct
        &Signature::uniform(1, vec![DataType::Float64, DataType::Int64], Volatility::Immutable)
    }

    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
        // Тип результата зависит от типа входного аргумента
        Ok(arg_types[0].clone())
    }

    fn invoke_with_args(
        &self,
        args: datafusion::logical_expr::ScalarFunctionArgs,
    ) -> Result<ColumnarValue> {
        let input = &args.args[0];
        match input {
            ColumnarValue::Array(array) => {
                let float_array = array
                    .as_any()
                    .downcast_ref::<datafusion::arrow::array::Float64Array>()
                    .expect("expected Float64Array");

                let result: datafusion::arrow::array::Float64Array = float_array
                    .iter()
                    .map(|v| v.map(|x| x * x))
                    .collect();

                Ok(ColumnarValue::Array(std::sync::Arc::new(result)))
            }
            ColumnarValue::Scalar(value) => {
                // Для скалярных значений — обработка без создания массива
                let v = value.to_f64().unwrap_or(0.0);
                Ok(ColumnarValue::Scalar(
                    datafusion::common::ScalarValue::Float64(Some(v * v))
                ))
            }
        }
    }
}
WARNING

Метод invoke() устарел в DataFusion 52.x и в актуальной 53.0.0 (релиз 2026-04-02). Используйте invoke_with_args(ScalarFunctionArgs), который предоставляет расширенный контекст: аргументы, количество строк и информацию о типах возвращаемого значения.

Signature: система сигнатур

Signature описывает, какие типы аргументов принимает функция. Оптимизатор использует сигнатуру для валидации вызовов на этапе планирования.

Варианты Signature
UniformВсе аргументы одного типа из допустимого списка — самый простой вариант сигнатуры
ExactФиксированный набор типов в строгом порядке — для функций с чётко определённой сигнатурой
VariadicПринимает произвольное число аргументов одного типа — аналог varargs в языках программирования
OneOfНесколько допустимых перегрузок — оптимизатор выбирает подходящую на основе типов аргументов вызова

Пример: функция с несколькими перегрузками

use datafusion::logical_expr::{Signature, Volatility, TypeSignature};

fn multi_signature() -> Signature {
    Signature::one_of(
        vec![
            // concat(Utf8, Utf8) → Utf8
            TypeSignature::Exact(vec![DataType::Utf8, DataType::Utf8]),
            // concat(Utf8, Utf8, Utf8) → Utf8
            TypeSignature::Exact(vec![DataType::Utf8, DataType::Utf8, DataType::Utf8]),
            // concat(Utf8...) → Utf8
            TypeSignature::Variadic(vec![DataType::Utf8]),
        ],
        Volatility::Immutable,
    )
}

Volatility: влияние на оптимизацию

Мы уже видели Volatility в модуле 04. В Rust API это enum, влияющий на оптимизатор:

УровеньПоведениеПримеры
ImmutableОдинаковый вход — всегда одинаковый выход. Оптимизатор может вычислить на этапе планированияupper(), abs(), length()
StableПостоянный в пределах одного запроса. Нельзя вычислить заранее, но можно кэшироватьcurrent_date(), конфигурационные функции
VolatileМожет меняться между вызовами. Никакого кэшированияrandom(), now(), uuid()
TIP

Выбирайте наиболее строгий уровень Volatility, который соответствует семантике вашей функции. Immutable даёт оптимизатору максимальную свободу — например, WHERE square(3) > 5 может быть вычислен на этапе планирования без сканирования данных.

Регистрация и использование

Способ 1: create_udf (convenience-функция)

Для простых случаев DataFusion предоставляет create_udf, не требующий реализации trait:

use datafusion::logical_expr::create_udf;
use datafusion::logical_expr::{Volatility, ColumnarValue};
use datafusion::arrow::datatypes::DataType;
use std::sync::Arc;

let square_udf = create_udf(
    "square",
    vec![DataType::Float64],       // Типы аргументов
    DataType::Float64,              // Тип результата
    Volatility::Immutable,
    Arc::new(|args: &[ColumnarValue]| {
        // Реализация как замыкание
        match &args[0] {
            ColumnarValue::Array(array) => {
                let input = array.as_any()
                    .downcast_ref::<datafusion::arrow::array::Float64Array>()
                    .unwrap();
                let result: datafusion::arrow::array::Float64Array =
                    input.iter().map(|v| v.map(|x| x * x)).collect();
                Ok(ColumnarValue::Array(Arc::new(result)))
            }
            _ => unimplemented!("scalar case"),
        }
    }),
);
WARNING

Функция create_udf считается устаревшей начиная с DataFusion 42.x. Для новых проектов используйте ScalarUDF::new_from_impl(impl ScalarUDFImpl) — он даёт полный контроль над сигнатурой, типизацией и поведением оптимизатора. create_udf сохранена только для обратной совместимости.

Способ 2: ScalarUDF::new_from_impl (полный контроль)

Для production-функций используйте trait ScalarUDFImpl:

use datafusion::logical_expr::ScalarUDF;
use datafusion::prelude::SessionContext;

let udf = ScalarUDF::new_from_impl(SquareUdf);
let ctx = SessionContext::new();

// Регистрация для SQL
ctx.register_udf(udf.clone());

// Использование в SQL
let df = ctx.sql("SELECT square(price) AS price_sq FROM products").await?;
df.show().await?;

// Использование в DataFrame API
use datafusion::prelude::col;
let df = ctx.table("products").await?
    .select(vec![udf.call(vec![col("price")])])?;

Полный пример: функция геохеширования

use datafusion::logical_expr::{ScalarUDFImpl, Signature, Volatility, ScalarUDF, ColumnarValue};
use datafusion::arrow::array::{Float64Array, StringArray};
use datafusion::arrow::datatypes::DataType;
use datafusion::common::Result;
use std::any::Any;
use std::sync::Arc;

#[derive(Debug)]
struct GeohashUdf {
    signature: Signature,
}

impl GeohashUdf {
    fn new() -> Self {
        Self {
            signature: Signature::exact(
                vec![DataType::Float64, DataType::Float64],  // lat, lon
                Volatility::Immutable,
            ),
        }
    }

    fn encode_geohash(lat: f64, lon: f64, precision: usize) -> String {
        // Упрощённая реализация для примера
        let base32 = "0123456789bcdefghjkmnpqrstuvwxyz";
        let mut min_lat = -90.0_f64;
        let mut max_lat = 90.0;
        let mut min_lon = -180.0_f64;
        let mut max_lon = 180.0;
        let mut hash = String::with_capacity(precision);
        let mut is_lon = true;
        let mut bit = 0;
        let mut ch = 0u8;

        while hash.len() < precision {
            if is_lon {
                let mid = (min_lon + max_lon) / 2.0;
                if lon >= mid { ch |= 1 << (4 - bit); min_lon = mid; }
                else { max_lon = mid; }
            } else {
                let mid = (min_lat + max_lat) / 2.0;
                if lat >= mid { ch |= 1 << (4 - bit); min_lat = mid; }
                else { max_lat = mid; }
            }
            is_lon = !is_lon;
            bit += 1;
            if bit == 5 {
                hash.push(base32.as_bytes()[ch as usize] as char);
                bit = 0;
                ch = 0;
            }
        }
        hash
    }
}

impl ScalarUDFImpl for GeohashUdf {
    fn as_any(&self) -> &dyn Any { self }
    fn name(&self) -> &str { "geohash" }
    fn signature(&self) -> &Signature { &self.signature }
    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
        Ok(DataType::Utf8)
    }

    fn invoke_with_args(
        &self,
        args: datafusion::logical_expr::ScalarFunctionArgs,
    ) -> Result<ColumnarValue> {
        let lat_array = args.args[0].clone().into_array(1)?;
        let lon_array = args.args[1].clone().into_array(1)?;

        let lats = lat_array.as_any().downcast_ref::<Float64Array>().unwrap();
        let lons = lon_array.as_any().downcast_ref::<Float64Array>().unwrap();

        let result: StringArray = lats.iter().zip(lons.iter())
            .map(|(lat, lon)| match (lat, lon) {
                (Some(la), Some(lo)) => Some(Self::encode_geohash(la, lo, 6)),
                _ => None,
            })
            .collect();

        Ok(ColumnarValue::Array(Arc::new(result)))
    }
}

// Регистрация
async fn register_geohash(ctx: &SessionContext) -> Result<()> {
    let udf = ScalarUDF::new_from_impl(GeohashUdf::new());
    ctx.register_udf(udf);
    Ok(())
}
NOTE

Обратите внимание на паттерн: сигнатура хранится в struct и возвращается по ссылке из signature(). Это избегает пересоздания Signature на каждый вызов — важно для производительности в hot path.

return_type_from_exprs: продвинутая типизация

Для функций, где тип результата зависит от контекста (не только от типов аргументов), переопределите return_type_from_exprs:

fn return_type_from_exprs(
    &self,
    args: &[datafusion::logical_expr::Expr],
    schema: &dyn datafusion::logical_expr::expr_schema::ExprSchemable,
    arg_types: &[DataType],
) -> Result<DataType> {
    // Например: coalesce возвращает тип первого non-null аргумента
    for dt in arg_types {
        if *dt != DataType::Null {
            return Ok(dt.clone());
        }
    }
    Ok(DataType::Null)
}

Итоги

  • ScalarUDFImpl — основной trait для скалярных UDF в Rust с методами name, signature, return_type, invoke_with_args
  • invoke_with_args(ScalarFunctionArgs) — актуальный метод выполнения (не устаревший invoke)
  • Signature описывает допустимые типы: Uniform, Exact, Variadic, OneOf
  • Volatility (Immutable/Stable/Volatile) управляет оптимизацией на этапе планирования
  • create_udf — быстрый способ для простых случаев, ScalarUDF::new_from_impl — для production
  • Rust UDF работает напрямую с Arrow-массивами без сериализации — на порядок быстрее Python UDF

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какой метод trait ScalarUDFImpl является актуальным для выполнения скалярного UDF в DataFusion 52.x?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8