Learning Platform
Глоссарий Troubleshooting
Урок 07.04 · 20 мин
Продвинутый
OptimizerRuleTransformedTreeNodetransform_uptransform_downExprRewritingSessionStateBuilder

Кастомные правила оптимизации

DataFusion позволяет добавлять собственные правила оптимизации логического плана. Это мощный механизм: вы можете переписывать запросы, инлайнить функции, добавлять domain-specific оптимизации, которые встроенный оптимизатор не знает. В этом уроке реализуем кастомное правило с нуля, используя TreeNode API для обхода плана.

Trait OptimizerRule

Минимальная реализация требует два метода:

use datafusion::common::tree_node::Transformed;
use datafusion::common::Result;
use datafusion::logical_expr::LogicalPlan;
use datafusion::optimizer::{OptimizerConfig, OptimizerRule};

#[derive(Default, Debug)]
struct MyRule;

impl OptimizerRule for MyRule {
    fn name(&self) -> &str {
        "my_rule"
    }

    fn rewrite(
        &self,
        plan: LogicalPlan,
        _config: &dyn OptimizerConfig,
    ) -> Result<Transformed<LogicalPlan>> {
        // Вернуть Transformed::no если план не изменён
        Ok(Transformed::no(plan))
    }
}

Контракт rewrite()

  • Получает план по значению (ownership transfer)
  • Возвращает Result<Transformed<LogicalPlan>>
  • Transformed::yes(plan) — план был изменён, оптимизатор может запустить повторный проход
  • Transformed::no(plan) — план не изменён, пропустить повторный проход
  • Правило не должно менять семантику запроса — только способ выполнения

OptimizerConfig

OptimizerConfig предоставляет конфигурацию оптимизатора:

fn rewrite(
    &self,
    plan: LogicalPlan,
    config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
    // Проверить настройки
    let options = config.options();
    let batch_size = options.execution.batch_size;

    // Пропустить правило если отключено
    if !config.options().optimizer.enable_round_robin_repartition {
        return Ok(Transformed::no(plan));
    }

    // ... логика трансформации ...
    Ok(Transformed::no(plan))
}

TreeNode API

LogicalPlan — дерево. Для обхода и трансформации DataFusion предоставляет trait TreeNode с методами:

МетодНаправлениеИспользование
transformРекурсивный (pre+post)Обход всего дерева с pre- и post-визитами
transform_upСнизу вверх (post-order)Обработать дочерние узлы, потом текущий
transform_downСверху вниз (pre-order)Обработать текущий узел, потом дочерние

transform_down: паттерн-матчинг сверху вниз

Обрабатывает узлы от корня к листьям. Полезно, когда решение зависит от контекста родителя:

use datafusion::common::tree_node::{Transformed, TreeNode};

fn rewrite(
    &self,
    plan: LogicalPlan,
    _config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
    plan.transform_down(|node| {
        match &node {
            LogicalPlan::Filter(filter) => {
                // Проверить предикат, возможно переписать
                if is_always_true(&filter.predicate) {
                    // Убрать фильтр, вернуть дочерний план
                    Ok(Transformed::yes(filter.input.as_ref().clone()))
                } else {
                    Ok(Transformed::no(node))
                }
            }
            _ => Ok(Transformed::no(node)),
        }
    })
}

transform_up: обработка снизу вверх

Обрабатывает листья первыми, затем поднимается к корню. Полезно, когда решение зависит от результатов дочерних узлов:

plan.transform_up(|node| {
    match &node {
        LogicalPlan::Aggregate(agg) => {
            // Дочерние узлы уже обработаны
            // Можно анализировать трансформированный input
            Ok(Transformed::no(node))
        }
        _ => Ok(Transformed::no(node)),
    }
})
NOTE

transform_down обычно эффективнее для правил, которые удаляют узлы (фильтры, проекции): удаление родительского узла сразу сокращает дерево. transform_up лучше для правил, которые объединяют или модифицируют дочерние узлы.

Пример: инлайнинг UDF

Классический пример кастомного правила — инлайнинг простых UDF. Если пользователь зарегистрировал add_one(x) = x + 1, оптимизатор может заменить вызов функции на арифметику:

use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::Result;
use datafusion::logical_expr::{Expr, LogicalPlan};
use datafusion::optimizer::{OptimizerConfig, OptimizerRule};
use datafusion::prelude::lit;

/// Правило: заменить add_one(x) на x + 1
#[derive(Default, Debug)]
struct AddOneInliner;

impl OptimizerRule for AddOneInliner {
    fn name(&self) -> &str {
        "add_one_inliner"
    }

    fn rewrite(
        &self,
        plan: LogicalPlan,
        _config: &dyn OptimizerConfig,
    ) -> Result<Transformed<LogicalPlan>> {
        // Переписать выражения в каждом узле плана
        let new_expressions: Vec<Expr> = plan
            .expressions()
            .into_iter()
            .map(|expr| rewrite_add_one(expr))
            .collect::<Result<Vec<_>>>()?
            .into_iter()
            .map(|transformed| transformed.data)
            .collect();

        let inputs = plan.inputs().into_iter().cloned().collect::<Vec<_>>();
        let plan = plan.with_new_exprs(new_expressions, inputs)?;

        Ok(Transformed::yes(plan))
    }
}

fn rewrite_add_one(expr: Expr) -> Result<Transformed<Expr>> {
    expr.transform(|expr| {
        Ok(match expr {
            Expr::ScalarFunction(ref func)
                if func.func.inner().name() == "add_one" =>
            {
                let arg = func.args[0].clone();
                Transformed::yes(arg + lit(1i64))
            }
            _ => Transformed::no(expr),
        })
    })
}

После применения правила:

-- Исходный план
Projection: add_one(t.x) AS result
  TableScan: t

-- После AddOneInliner
Projection: t.x + Int64(1) AS result
  TableScan: t

Теперь SimplifyExpressions может свернуть 5 + 16 для константных вызовов.

Переписывание выражений

TreeNode API работает и с Expr. Типичные паттерны:

Замена функции

fn replace_deprecated_function(expr: Expr) -> Result<Transformed<Expr>> {
    expr.transform_up(|e| {
        Ok(match e {
            Expr::ScalarFunction(ref func)
                if func.func.inner().name() == "old_func" =>
            {
                // Заменить old_func(x) на new_func(x, 'default')
                let args = func.args.clone();
                let new_args = vec![args[0].clone(), lit("default")];
                // Создать вызов new_func с новыми аргументами
                Transformed::yes(
                    Expr::ScalarFunction(datafusion::logical_expr::expr::ScalarFunction {
                        func: get_new_func_udf(),
                        args: new_args,
                    })
                )
            }
            _ => Transformed::no(e),
        })
    })
}

Переписывание BETWEEN

fn rewrite_between(expr: Expr) -> Result<Transformed<Expr>> {
    expr.transform_up(|e| {
        let Expr::Between(between) = e else {
            return Ok(Transformed::no(e));
        };

        if between.negated {
            return Ok(Transformed::no(Expr::Between(between)));
        }

        // BETWEEN low AND high → expr >= low AND expr <= high
        let rewritten = between.expr.clone()
            .gt_eq(*between.low)
            .and(between.expr.lt_eq(*between.high));

        Ok(Transformed::yes(rewritten))
    })
}
TIP

Всегда используйте transform_up для переписывания выражений, если вложенные подвыражения тоже нужно обработать. transform_up гарантирует, что дочерние выражения уже переписаны к моменту обработки родителя.

Регистрация через SessionStateBuilder

Кастомные правила подключаются через SessionStateBuilder:

use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::SessionContext;
use std::sync::Arc;

// Добавить правило к существующим
let state = SessionStateBuilder::new()
    .with_optimizer_rules(vec![
        Arc::new(AddOneInliner::default()),
    ])
    .build();

let ctx = SessionContext::new_with_state(state);

// Зарегистрировать UDF
ctx.register_udf(add_one_udf);

// Проверить оптимизацию
let plan = ctx.sql("SELECT add_one(5) AS result")
    .await?
    .into_optimized_plan()?;

println!("{}", plan.display_indent());
// Projection: Int64(6) AS result
//   EmptyRelation: rows=1
WARNING

with_optimizer_rules() задаёт полный набор правил, заменяя встроенные. Чтобы добавить правило к существующим, используйте ctx.add_optimizer_rule(Arc::new(MyRule::default())) на SessionContext, или соберите полный список правил вручную.

Тестирование правил

Правила можно тестировать изолированно через Optimizer:

use datafusion::optimizer::{Optimizer, OptimizerContext};
use std::sync::Arc;

#[tokio::test]
async fn test_add_one_inliner() {
    let ctx = SessionContext::new();
    ctx.register_udf(create_add_one_udf());

    // Получить неоптимизированный план
    let unoptimized = ctx
        .sql("SELECT add_one(x) FROM t")
        .await
        .unwrap()
        .into_unoptimized_plan();

    // Применить только наше правило
    let optimizer = Optimizer::with_rules(vec![
        Arc::new(AddOneInliner::default()),
    ]);
    let config = OptimizerContext::new();

    let optimized = optimizer
        .optimize(unoptimized, &config, |_, _| {})
        .unwrap();

    // Проверить, что add_one заменён на сложение
    let plan_str = format!("{}", optimized.display_indent());
    assert!(!plan_str.contains("add_one"));
    assert!(plan_str.contains("+ Int64(1)"));
}

Тестирование с observer

#[test]
fn test_rule_fires() {
    let mut fired = false;

    let optimized = optimizer.optimize(
        plan,
        &config,
        |_plan, rule| {
            if rule.name() == "add_one_inliner" {
                fired = true;
            }
        },
    ).unwrap();

    assert!(fired, "Rule should have been applied");
}

Domain-specific правила

Кастомные правила особенно полезны для domain-specific оптимизаций:

  • Временные ряды — переписывание агрегаций с предвычисленными rollup-таблицами
  • Геоданные — замена ST_Distance(a, b) < threshold на пространственный индекс
  • Мультитенантность — автоматическая вставка WHERE tenant_id = ? в каждый TableScan
  • Кэширование — замена повторяющихся подзапросов на чтение из кэша

Пример: автоматический tenant filter:

#[derive(Debug)]
struct TenantIsolationRule {
    tenant_id: String,
}

impl OptimizerRule for TenantIsolationRule {
    fn name(&self) -> &str { "tenant_isolation" }

    fn rewrite(
        &self,
        plan: LogicalPlan,
        _config: &dyn OptimizerConfig,
    ) -> Result<Transformed<LogicalPlan>> {
        let tenant_id = self.tenant_id.clone();

        plan.transform_down(|node| {
            match node {
                LogicalPlan::TableScan(scan) => {
                    // Добавить фильтр tenant_id к каждому scan
                    let filter = col("tenant_id").eq(lit(tenant_id.clone()));
                    let filtered = LogicalPlanBuilder::from(
                        LogicalPlan::TableScan(scan)
                    )
                    .filter(filter)?
                    .build()?;

                    Ok(Transformed::yes(filtered))
                }
                _ => Ok(Transformed::no(node)),
            }
        })
    }
}

Итоги

  • OptimizerRule::rewrite() принимает план по значению и возвращает Transformed<LogicalPlan>
  • TreeNode API: transform_down (сверху вниз), transform_up (снизу вверх), transform (оба направления)
  • Выражения (Expr) тоже реализуют TreeNode — переписываются теми же методами
  • Optimizer::with_rules() для изолированного тестирования, SessionStateBuilder для production
  • Observer callback в тестах позволяет убедиться, что правило применилось
  • Domain-specific правила — мультитенантность, временные ряды, кэширование — главная причина писать кастомные правила

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. OptimizerRule::rewrite() принимает LogicalPlan по значению (ownership), а не по ссылке. Какое преимущество это даёт?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6