Learning Platform
Глоссарий Troubleshooting
Урок 06.05 · 20 мин
Продвинутый
UserDefinedLogicalNodeExtensionPlannerExprPlannerFunctionFactoryTableFunctionImplFileFormatFactory

Продвинутые точки расширения

UDF, TableProvider и CatalogProvider покрывают большинство задач расширения. Но DataFusion предоставляет ещё шесть точек расширения для сценариев, где стандартных механизмов недостаточно: собственные логические операторы, кастомное планирование, расширение синтаксиса SQL, табличные функции и пользовательские форматы файлов.

UserDefinedLogicalNode: собственные операторы

Когда ни один встроенный LogicalPlan-узел не подходит для вашей логики, создайте собственный через UserDefinedLogicalNodeCore:

use datafusion::logical_expr::{UserDefinedLogicalNodeCore, LogicalPlan};
use datafusion::logical_expr::Expr;
use datafusion::common::DFSchemaRef;
use std::fmt;
use std::hash::{Hash, Hasher};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct TopKNode {
    k: usize,
    order_by: Expr,
    input: LogicalPlan,
}

impl UserDefinedLogicalNodeCore for TopKNode {
    fn name(&self) -> &str { "TopK" }

    fn inputs(&self) -> Vec<&LogicalPlan> {
        vec![&self.input]
    }

    fn schema(&self) -> &DFSchemaRef {
        self.input.schema()
    }

    fn expressions(&self) -> Vec<Expr> {
        vec![self.order_by.clone()]
    }

    fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "TopK: k={}, order_by={}", self.k, self.order_by)
    }

    fn with_exprs_and_inputs(
        &self,
        exprs: Vec<Expr>,
        inputs: Vec<LogicalPlan>,
    ) -> datafusion::common::Result<Self> {
        Ok(TopKNode {
            k: self.k,
            order_by: exprs[0].clone(),
            input: inputs[0].clone(),
        })
    }
}
NOTE

UserDefinedLogicalNodeCore — рекомендуемый trait вместо более старого UserDefinedLogicalNode. Он автоматически реализует UserDefinedLogicalNode и требует Clone + PartialEq + Eq + Hash, что позволяет оптимизатору сравнивать и кэшировать узлы.

Использование в плане:

use datafusion::logical_expr::LogicalPlan;
use datafusion::logical_expr::Extension;

fn create_topk_plan(input: LogicalPlan, k: usize, order_by: Expr) -> LogicalPlan {
    LogicalPlan::Extension(Extension {
        node: Arc::new(TopKNode { k, order_by, input }),
    })
}

ExtensionPlanner: физическое планирование

UserDefinedLogicalNode описывает логику. Для выполнения нужен физический план. ExtensionPlanner преобразует логические Extension-узлы в ExecutionPlan:

use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion::execution::context::SessionState;
use async_trait::async_trait;

struct TopKPlanner;

#[async_trait]
impl ExtensionPlanner for TopKPlanner {
    async fn plan_extension(
        &self,
        _planner: &dyn PhysicalPlanner,
        node: &dyn UserDefinedLogicalNode,
        _logical_inputs: &[&LogicalPlan],
        physical_inputs: &[Arc<dyn ExecutionPlan>],
        _session_state: &SessionState,
    ) -> datafusion::common::Result<Option<Arc<dyn ExecutionPlan>>> {
        // Проверяем, что это наш узел
        if let Some(topk) = node.as_any().downcast_ref::<TopKNode>() {
            let input = physical_inputs[0].clone();
            let exec = TopKExec::new(input, topk.k, topk.order_by.clone());
            Ok(Some(Arc::new(exec)))
        } else {
            // Не наш узел — пропускаем
            Ok(None)
        }
    }
}

Регистрация планировщика:

use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::SessionContext;

let state = SessionStateBuilder::new()
    .with_default_features()
    .with_physical_optimizer_rules(vec![])  // Можно добавить свои правила
    .build();

// ExtensionPlanner добавляется к SessionState
// через SessionStateBuilder или кастомный PhysicalPlanner
let ctx = SessionContext::new_with_state(state);

ExprPlanner: расширение SQL-выражений

ExprPlanner позволяет перехватывать и трансформировать SQL-выражения, которые стандартный планировщик не понимает:

use datafusion::logical_expr::planner::ExprPlanner;
use datafusion::logical_expr::Expr;
use datafusion::common::Result;
use datafusion::logical_expr::sqlparser::ast as sql_ast;

#[derive(Debug)]
struct JsonExprPlanner;

impl ExprPlanner for JsonExprPlanner {
    fn plan_binary_op(
        &self,
        expr: datafusion::logical_expr::planner::RawBinaryExpr,
    ) -> Result<datafusion::logical_expr::planner::PlannerResult<datafusion::logical_expr::planner::RawBinaryExpr>> {
        // Перехватываем оператор -> для JSON-доступа
        // data -> 'key' преобразуется в json_get(data, 'key')
        match &expr.op {
            datafusion::logical_expr::Operator::ArrowAt => {
                let func = datafusion::logical_expr::expr::ScalarFunction::new_udf(
                    json_get_udf(),  // Наш UDF для JSON
                    vec![expr.left.clone(), expr.right.clone()],
                );
                Ok(datafusion::logical_expr::planner::PlannerResult::Planned(
                    Expr::ScalarFunction(func)
                ))
            }
            _ => Ok(datafusion::logical_expr::planner::PlannerResult::Original(expr)),
        }
    }
}
TIP

ExprPlanner идеален для добавления синтаксического сахара: JSON-операторы (->), регулярные выражения (~), геопространственные операции. Стандартный SQL-парсер уже знает эти токены — ExprPlanner определяет их семантику.

FunctionFactory: CREATE FUNCTION

FunctionFactory обрабатывает SQL-команду CREATE FUNCTION, создавая UDF динамически:

use datafusion::logical_expr::CreateFunction;
use datafusion::execution::context::FunctionFactory;
use datafusion::logical_expr::ScalarUDF;
use async_trait::async_trait;

#[derive(Debug)]
struct WasmFunctionFactory;

#[async_trait]
impl FunctionFactory for WasmFunctionFactory {
    type Context = SessionState;

    async fn create(
        &self,
        _state: &SessionState,
        statement: CreateFunction,
    ) -> datafusion::common::Result<RegisterFunction> {
        // statement.name — имя функции
        // statement.args — аргументы
        // statement.params.as_ — тело функции (LANGUAGE, AS, ...)

        let func_name = &statement.name;
        let body = statement.params.as_.as_ref()
            .ok_or_else(|| datafusion::error::DataFusionError::Plan(
                "CREATE FUNCTION requires AS clause".to_string()
            ))?;

        // Компилируем WASM-модуль в UDF
        let udf = compile_wasm_to_udf(func_name, body)?;

        Ok(RegisterFunction::Scalar(Arc::new(udf)))
    }
}

После регистрации фабрики:

-- Создаёт UDF из WASM-модуля
CREATE FUNCTION double_value(x DOUBLE)
RETURNS DOUBLE
LANGUAGE WASM
AS 'module.wasm:double';

-- Используем как обычную функцию
SELECT double_value(price) FROM products;

TableFunctionImpl: табличные функции (UDTF)

Табличные функции возвращают таблицу вместо скалярного значения. Используются в FROM clause:

use datafusion::logical_expr::TableFunctionImpl;
use datafusion::datasource::TableProvider;
use datafusion::logical_expr::Expr;

#[derive(Debug)]
struct GenerateSeriesFunction;

impl TableFunctionImpl for GenerateSeriesFunction {
    fn call(
        &self,
        args: &[Expr],
    ) -> datafusion::common::Result<Arc<dyn TableProvider>> {
        // Извлекаем аргументы: start, stop, step
        let start = extract_literal_i64(&args[0])?;
        let stop = extract_literal_i64(&args[1])?;
        let step = if args.len() > 2 {
            extract_literal_i64(&args[2])?
        } else {
            1
        };

        // Генерируем данные
        let values: Vec<i64> = (start..=stop).step_by(step as usize).collect();

        let schema = Arc::new(Schema::new(vec![
            Field::new("value", DataType::Int64, false),
        ]));

        let batch = RecordBatch::try_new(
            schema.clone(),
            vec![Arc::new(Int64Array::from(values))],
        )?;

        Ok(Arc::new(MemTable::try_new(schema, vec![vec![batch]])?))
    }
}

// Регистрация
let ctx = SessionContext::new();
ctx.register_udtf("generate_series", Arc::new(GenerateSeriesFunction));

// Использование в SQL
let df = ctx.sql("SELECT * FROM generate_series(1, 100, 5)").await?;
NOTE

UDTF отличается от UDF тем, что возвращает набор строк, а не одно значение. Это аналог generate_series() в PostgreSQL или UNNEST(). В DataFusion UDTF создаёт TableProvider, встраиваемый в план как обычная таблица.

FileFormatFactory: пользовательские форматы

FileFormatFactory регистрирует поддержку новых файловых форматов:

use datafusion::datasource::file_format::FileFormatFactory;
use datafusion::datasource::file_format::FileFormat;

#[derive(Debug)]
struct MsgPackFormatFactory;

impl FileFormatFactory for MsgPackFormatFactory {
    fn create(
        &self,
        _state: &SessionState,
        _options: &HashMap<String, String>,
    ) -> datafusion::common::Result<Arc<dyn FileFormat>> {
        Ok(Arc::new(MsgPackFormat::default()))
    }

    fn default_file_extension(&self) -> &str {
        ".msgpack"
    }
}

После регистрации ListingTable и CREATE EXTERNAL TABLE поддерживают новый формат:

CREATE EXTERNAL TABLE telemetry
STORED AS MSGPACK
LOCATION 's3://bucket/telemetry/';

Карта точек расширения

Точки расширения DataFusion
ДанныеТочки расширения для подключения произвольных источников данных и форматов файлов
ВычисленияРасширение вычислительных возможностей: скалярные, агрегатные, оконные и табличные функции
ПланированиеРасширение планировщика: собственные логические операторы и их физическая реализация
SQL-синтаксисРасширение SQL-синтаксиса: динамическое создание функций и таблиц через DDL-команды

Комбинирование точек расширения

В реальных проектах точки расширения комбинируются. Пример: JSON-аналитический движок:

// 1. ExprPlanner: поддержка JSON-операторов в SQL
//    data -> 'key' → json_get(data, key)
let expr_planner = JsonExprPlanner;

// 2. ScalarUDF: реализация json_get, json_array_length, json_path
let json_get = ScalarUDF::new_from_impl(JsonGetUdf::new());
let json_len = ScalarUDF::new_from_impl(JsonArrayLengthUdf::new());

// 3. TableProvider: чтение NDJSON из S3
let json_source = Arc::new(NdJsonS3Provider::new("s3://logs/"));

// 4. UserDefinedLogicalNode: оптимизированный JSON-путь scan
//    вместо scan + json_get → специальный JsonScan с path pushdown
let json_scan_optimizer = JsonScanOptimizer;

// 5. Собираем всё в SessionContext
let state = SessionStateBuilder::new()
    .with_default_features()
    .with_expr_planners(vec![Arc::new(expr_planner)])
    .build();

let ctx = SessionContext::new_with_state(state);
ctx.register_udf(json_get);
ctx.register_udf(json_len);
ctx.register_table("logs", json_source)?;
WARNING

Комбинирование многих точек расширения увеличивает сложность отладки. Начинайте с UDF и TableProvider — они покрывают 90% задач. Переходите к UserDefinedLogicalNode и ExtensionPlanner только когда стандартная оптимизация не справляется с вашим use case.

Итоги

  • UserDefinedLogicalNodeCore — собственные логические операторы с Clone + Eq + Hash
  • ExtensionPlanner преобразует кастомные логические узлы в физические ExecutionPlan
  • ExprPlanner перехватывает SQL-выражения для добавления новых операторов
  • FunctionFactory обрабатывает CREATE FUNCTION для динамического создания UDF
  • TableFunctionImpl создаёт табличные функции (UDTF), возвращающие набор строк
  • FileFormatFactory регистрирует поддержку новых файловых форматов для ListingTable

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какой trait рекомендуется для создания собственного логического оператора вместо UserDefinedLogicalNode?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8