Кастомные правила оптимизации
DataFusion позволяет добавлять собственные правила оптимизации логического плана. Это мощный механизм: вы можете переписывать запросы, инлайнить функции, добавлять domain-specific оптимизации, которые встроенный оптимизатор не знает. В этом уроке реализуем кастомное правило с нуля, используя TreeNode API для обхода плана.
Trait OptimizerRule
Минимальная реализация требует два метода:
use datafusion::common::tree_node::Transformed;
use datafusion::common::Result;
use datafusion::logical_expr::LogicalPlan;
use datafusion::optimizer::{OptimizerConfig, OptimizerRule};
#[derive(Default, Debug)]
struct MyRule;
impl OptimizerRule for MyRule {
fn name(&self) -> &str {
"my_rule"
}
fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
// Вернуть Transformed::no если план не изменён
Ok(Transformed::no(plan))
}
}
Контракт rewrite()
- Получает план по значению (ownership transfer)
- Возвращает
Result<Transformed<LogicalPlan>> Transformed::yes(plan)— план был изменён, оптимизатор может запустить повторный проходTransformed::no(plan)— план не изменён, пропустить повторный проход- Правило не должно менять семантику запроса — только способ выполнения
OptimizerConfig
OptimizerConfig предоставляет конфигурацию оптимизатора:
fn rewrite(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
// Проверить настройки
let options = config.options();
let batch_size = options.execution.batch_size;
// Пропустить правило если отключено
if !config.options().optimizer.enable_round_robin_repartition {
return Ok(Transformed::no(plan));
}
// ... логика трансформации ...
Ok(Transformed::no(plan))
}
TreeNode API
LogicalPlan — дерево. Для обхода и трансформации DataFusion предоставляет trait TreeNode с методами:
| Метод | Направление | Использование |
|---|---|---|
transform | Рекурсивный (pre+post) | Обход всего дерева с pre- и post-визитами |
transform_up | Снизу вверх (post-order) | Обработать дочерние узлы, потом текущий |
transform_down | Сверху вниз (pre-order) | Обработать текущий узел, потом дочерние |
transform_down: паттерн-матчинг сверху вниз
Обрабатывает узлы от корня к листьям. Полезно, когда решение зависит от контекста родителя:
use datafusion::common::tree_node::{Transformed, TreeNode};
fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
plan.transform_down(|node| {
match &node {
LogicalPlan::Filter(filter) => {
// Проверить предикат, возможно переписать
if is_always_true(&filter.predicate) {
// Убрать фильтр, вернуть дочерний план
Ok(Transformed::yes(filter.input.as_ref().clone()))
} else {
Ok(Transformed::no(node))
}
}
_ => Ok(Transformed::no(node)),
}
})
}
transform_up: обработка снизу вверх
Обрабатывает листья первыми, затем поднимается к корню. Полезно, когда решение зависит от результатов дочерних узлов:
plan.transform_up(|node| {
match &node {
LogicalPlan::Aggregate(agg) => {
// Дочерние узлы уже обработаны
// Можно анализировать трансформированный input
Ok(Transformed::no(node))
}
_ => Ok(Transformed::no(node)),
}
})
transform_down обычно эффективнее для правил, которые удаляют узлы (фильтры, проекции): удаление родительского узла сразу сокращает дерево. transform_up лучше для правил, которые объединяют или модифицируют дочерние узлы.
Пример: инлайнинг UDF
Классический пример кастомного правила — инлайнинг простых UDF. Если пользователь зарегистрировал add_one(x) = x + 1, оптимизатор может заменить вызов функции на арифметику:
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::Result;
use datafusion::logical_expr::{Expr, LogicalPlan};
use datafusion::optimizer::{OptimizerConfig, OptimizerRule};
use datafusion::prelude::lit;
/// Правило: заменить add_one(x) на x + 1
#[derive(Default, Debug)]
struct AddOneInliner;
impl OptimizerRule for AddOneInliner {
fn name(&self) -> &str {
"add_one_inliner"
}
fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
// Переписать выражения в каждом узле плана
let new_expressions: Vec<Expr> = plan
.expressions()
.into_iter()
.map(|expr| rewrite_add_one(expr))
.collect::<Result<Vec<_>>>()?
.into_iter()
.map(|transformed| transformed.data)
.collect();
let inputs = plan.inputs().into_iter().cloned().collect::<Vec<_>>();
let plan = plan.with_new_exprs(new_expressions, inputs)?;
Ok(Transformed::yes(plan))
}
}
fn rewrite_add_one(expr: Expr) -> Result<Transformed<Expr>> {
expr.transform(|expr| {
Ok(match expr {
Expr::ScalarFunction(ref func)
if func.func.inner().name() == "add_one" =>
{
let arg = func.args[0].clone();
Transformed::yes(arg + lit(1i64))
}
_ => Transformed::no(expr),
})
})
}
После применения правила:
-- Исходный план
Projection: add_one(t.x) AS result
TableScan: t
-- После AddOneInliner
Projection: t.x + Int64(1) AS result
TableScan: t
Теперь SimplifyExpressions может свернуть 5 + 1 → 6 для константных вызовов.
Переписывание выражений
TreeNode API работает и с Expr. Типичные паттерны:
Замена функции
fn replace_deprecated_function(expr: Expr) -> Result<Transformed<Expr>> {
expr.transform_up(|e| {
Ok(match e {
Expr::ScalarFunction(ref func)
if func.func.inner().name() == "old_func" =>
{
// Заменить old_func(x) на new_func(x, 'default')
let args = func.args.clone();
let new_args = vec![args[0].clone(), lit("default")];
// Создать вызов new_func с новыми аргументами
Transformed::yes(
Expr::ScalarFunction(datafusion::logical_expr::expr::ScalarFunction {
func: get_new_func_udf(),
args: new_args,
})
)
}
_ => Transformed::no(e),
})
})
}
Переписывание BETWEEN
fn rewrite_between(expr: Expr) -> Result<Transformed<Expr>> {
expr.transform_up(|e| {
let Expr::Between(between) = e else {
return Ok(Transformed::no(e));
};
if between.negated {
return Ok(Transformed::no(Expr::Between(between)));
}
// BETWEEN low AND high → expr >= low AND expr <= high
let rewritten = between.expr.clone()
.gt_eq(*between.low)
.and(between.expr.lt_eq(*between.high));
Ok(Transformed::yes(rewritten))
})
}
Всегда используйте transform_up для переписывания выражений, если вложенные подвыражения тоже нужно обработать. transform_up гарантирует, что дочерние выражения уже переписаны к моменту обработки родителя.
Регистрация через SessionStateBuilder
Кастомные правила подключаются через SessionStateBuilder:
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::SessionContext;
use std::sync::Arc;
// Добавить правило к существующим
let state = SessionStateBuilder::new()
.with_optimizer_rules(vec![
Arc::new(AddOneInliner::default()),
])
.build();
let ctx = SessionContext::new_with_state(state);
// Зарегистрировать UDF
ctx.register_udf(add_one_udf);
// Проверить оптимизацию
let plan = ctx.sql("SELECT add_one(5) AS result")
.await?
.into_optimized_plan()?;
println!("{}", plan.display_indent());
// Projection: Int64(6) AS result
// EmptyRelation: rows=1
with_optimizer_rules() задаёт полный набор правил, заменяя встроенные. Чтобы добавить правило к существующим, используйте ctx.add_optimizer_rule(Arc::new(MyRule::default())) на SessionContext, или соберите полный список правил вручную.
Тестирование правил
Правила можно тестировать изолированно через Optimizer:
use datafusion::optimizer::{Optimizer, OptimizerContext};
use std::sync::Arc;
#[tokio::test]
async fn test_add_one_inliner() {
let ctx = SessionContext::new();
ctx.register_udf(create_add_one_udf());
// Получить неоптимизированный план
let unoptimized = ctx
.sql("SELECT add_one(x) FROM t")
.await
.unwrap()
.into_unoptimized_plan();
// Применить только наше правило
let optimizer = Optimizer::with_rules(vec![
Arc::new(AddOneInliner::default()),
]);
let config = OptimizerContext::new();
let optimized = optimizer
.optimize(unoptimized, &config, |_, _| {})
.unwrap();
// Проверить, что add_one заменён на сложение
let plan_str = format!("{}", optimized.display_indent());
assert!(!plan_str.contains("add_one"));
assert!(plan_str.contains("+ Int64(1)"));
}
Тестирование с observer
#[test]
fn test_rule_fires() {
let mut fired = false;
let optimized = optimizer.optimize(
plan,
&config,
|_plan, rule| {
if rule.name() == "add_one_inliner" {
fired = true;
}
},
).unwrap();
assert!(fired, "Rule should have been applied");
}
Domain-specific правила
Кастомные правила особенно полезны для domain-specific оптимизаций:
- Временные ряды — переписывание агрегаций с предвычисленными rollup-таблицами
- Геоданные — замена
ST_Distance(a, b) < thresholdна пространственный индекс - Мультитенантность — автоматическая вставка
WHERE tenant_id = ?в каждый TableScan - Кэширование — замена повторяющихся подзапросов на чтение из кэша
Пример: автоматический tenant filter:
#[derive(Debug)]
struct TenantIsolationRule {
tenant_id: String,
}
impl OptimizerRule for TenantIsolationRule {
fn name(&self) -> &str { "tenant_isolation" }
fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
let tenant_id = self.tenant_id.clone();
plan.transform_down(|node| {
match node {
LogicalPlan::TableScan(scan) => {
// Добавить фильтр tenant_id к каждому scan
let filter = col("tenant_id").eq(lit(tenant_id.clone()));
let filtered = LogicalPlanBuilder::from(
LogicalPlan::TableScan(scan)
)
.filter(filter)?
.build()?;
Ok(Transformed::yes(filtered))
}
_ => Ok(Transformed::no(node)),
}
})
}
}
Итоги
-
OptimizerRule::rewrite()принимает план по значению и возвращаетTransformed<LogicalPlan> - TreeNode API:
transform_down(сверху вниз),transform_up(снизу вверх),transform(оба направления) - Выражения (
Expr) тоже реализуютTreeNode— переписываются теми же методами -
Optimizer::with_rules()для изолированного тестирования,SessionStateBuilderдля production - Observer callback в тестах позволяет убедиться, что правило применилось
- Domain-specific правила — мультитенантность, временные ряды, кэширование — главная причина писать кастомные правила