LogicalPlan: дерево логических операций
Логический план — центральное представление запроса в DataFusion. После парсинга SQL и до физического планирования запрос существует как дерево узлов LogicalPlan. Оптимизатор трансформирует это дерево, сохраняя семантику.
Структура LogicalPlan
LogicalPlan — это enum, каждый вариант которого описывает одну логическую операцию:
pub enum LogicalPlan {
Projection(Projection), // SELECT: выбор колонок и выражений
Filter(Filter), // WHERE / HAVING: фильтрация строк
Aggregate(Aggregate), // GROUP BY + агрегатные функции
Sort(Sort), // ORDER BY
Join(Join), // JOIN (inner, left, right, full, cross)
CrossJoin(CrossJoin), // CROSS JOIN (без условия)
Limit(Limit), // LIMIT / OFFSET
SubqueryAlias(SubqueryAlias), // AS alias для подзапросов
TableScan(TableScan), // Чтение из источника данных
EmptyRelation(EmptyRelation), // Пустой набор (для VALUES, тестов)
Union(Union), // UNION ALL
Distinct(Distinct), // DISTINCT
Window(Window), // Window functions
// ... и другие (Extension, Explain, CreateTable, etc.)
}
Каждый узел содержит ссылки на дочерние планы (inputs) и свою схему (output_schema). Дерево всегда имеет leaf-узлы типа TableScan или EmptyRelation.
Пример: SQL в дерево
Запрос:
SELECT u.name, COUNT(o.id) AS order_count
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE u.active = true
GROUP BY u.name
HAVING COUNT(o.id) > 5
Преобразуется в дерево:
Обратите внимание: HAVING не является отдельным узлом — это Filter, размещённый над Aggregate. SQL-конструкции транслируются в унифицированные логические операции.
Система выражений: Expr
Внутри каждого узла вычисления описываются через Expr — дерево выражений:
pub enum Expr {
Column(Column), // Ссылка на колонку: users.name
Literal(ScalarValue), // Константа: 42, 'hello', true
BinaryExpr(BinaryExpr), // Бинарная операция: a + b, x > 10
Not(Box<Expr>), // Логическое NOT
IsNull(Box<Expr>), // IS NULL
IsNotNull(Box<Expr>), // IS NOT NULL
ScalarFunction(ScalarFunction), // Вызов функции: UPPER(name), ABS(x)
AggregateFunction(AggregateFunction), // Агрегат: SUM(x), COUNT(*)
Alias(Alias), // AS alias
Cast(Cast), // CAST(x AS type)
Case(Case), // CASE WHEN ... THEN ... END
InList(InList), // x IN (1, 2, 3)
Between(Between), // x BETWEEN 10 AND 20
Like(Like), // x LIKE '%pattern%'
Sort(Sort), // ORDER BY expression (with ASC/DESC/NULLS)
Wildcard { qualifier }, // SELECT * / SELECT t.*
// ... и другие
}
Пример разбора выражения
SQL-выражение price * quantity > 1000.0 становится деревом Expr:
BinaryExpr {
left: BinaryExpr {
left: Column("price"),
op: Multiply,
right: Column("quantity"),
},
op: Gt,
right: Literal(ScalarValue::Float64(1000.0)),
}
Каждый Expr знает свой выходной тип данных. Это позволяет проверять корректность выражений на этапе планирования: price * quantity допустим, если оба — числовые; name * quantity — ошибка.
Ключевые узлы LogicalPlan
TableScan
Leaf-узел, представляющий чтение из источника данных:
pub struct TableScan {
pub table_name: TableReference, // Имя таблицы (catalog.schema.table)
pub source: Arc<dyn TableProvider>, // Провайдер данных
pub projection: Option<Vec<usize>>, // Какие колонки читать (None = все)
pub filters: Vec<Expr>, // Предикаты для pushdown
pub fetch: Option<usize>, // LIMIT для pushdown
}
Поля projection, filters и fetch заполняются оптимизатором: projection pushdown сужает колонки, filter pushdown передаёт предикаты в источник.
Projection
Вычисляет выражения и формирует новую схему:
pub struct Projection {
pub expr: Vec<Expr>, // Список выражений для вычисления
pub input: Arc<LogicalPlan>, // Входной план
pub schema: DFSchemaRef, // Выходная схема
}
SELECT name, price * quantity AS total создаст Projection с двумя выражениями: Column("name") и BinaryExpr(price * quantity) с алиасом total.
Filter
Оставляет только строки, удовлетворяющие предикату:
pub struct Filter {
pub predicate: Expr, // Условие (должно вернуть Boolean)
pub input: Arc<LogicalPlan>, // Входной план
}
Aggregate
Группировка и агрегация:
pub struct Aggregate {
pub group_expr: Vec<Expr>, // GROUP BY выражения
pub aggr_expr: Vec<Expr>, // Агрегатные функции (SUM, COUNT, AVG, ...)
pub input: Arc<LogicalPlan>, // Входной план
pub schema: DFSchemaRef, // Выходная схема
}
Join
Соединение двух планов:
pub struct Join {
pub left: Arc<LogicalPlan>,
pub right: Arc<LogicalPlan>,
pub on: Vec<(Expr, Expr)>, // Пары равенств (left_key = right_key)
pub filter: Option<Expr>, // Дополнительный предикат
pub join_type: JoinType, // Inner, Left, Right, Full, Semi, Anti
pub schema: DFSchemaRef,
}
JoinType::Semi и JoinType::Anti представляют EXISTS и NOT EXISTS подзапросы — SQL-конструкции, у которых нет прямого аналога в тексте запроса, но есть эффективная реализация через join.
Программное построение планов
DataFusion предоставляет LogicalPlanBuilder для построения планов без SQL:
use datafusion::logical_expr::LogicalPlanBuilder;
use datafusion::prelude::*;
let plan = LogicalPlanBuilder::scan("orders", source, None)?
.filter(col("status").eq(lit("completed")))?
.aggregate(
vec![col("region")],
vec![sum(col("amount")).alias("total")],
)?
.sort(vec![col("total").sort(false, true)])? // DESC, NULLS LAST
.limit(0, Some(10))?
.build()?;
Этот builder создаёт то же дерево, что и SQL-запрос. DataFrame API DataFusion — обёртка над LogicalPlanBuilder: каждый метод .filter(), .select(), .aggregate() добавляет узел в дерево.
Инвариант: схема propagation
Каждый узел LogicalPlan знает свою выходную схему (DFSchemaRef) до начала выполнения. Схема каскадно вычисляется снизу вверх:
TableScanопределяет схему изTableProviderFilterне меняет схему (только фильтрует строки)Aggregateсоздаёт новую схему из group-by ключей и агрегатовProjectionвыбирает и переименовывает колонки
Этот инвариант позволяет оптимизатору принимать решения на основе типов и кардинальности без выполнения запроса.
Отладка логических планов
DataFusion предоставляет форматированный вывод плана:
let ctx = SessionContext::new();
let df = ctx.sql("SELECT region, SUM(amount) FROM orders GROUP BY region").await?;
// Текстовое представление
println!("{}", df.logical_plan().display_indent());
// Projection: region, SUM(orders.amount)
// Aggregate: groupBy=[[orders.region]], aggr=[[SUM(orders.amount)]]
// TableScan: orders
// Графическое (graphviz dot)
println!("{}", df.logical_plan().display_graphviz());
EXPLAIN в SQL показывает оба плана — до и после оптимизации, что позволяет увидеть, какие правила сработали.
Итоги
-
LogicalPlan— enum с вариантами для каждой логической операции (Projection, Filter, Join, Aggregate и др.) -
Exprописывает вычисления внутри узлов: колонки, литералы, бинарные операции, функции - SQL-конструкции отображаются в унифицированные узлы (HAVING → Filter над Aggregate, EXISTS → Semi Join)
- Каждый узел знает свою выходную схему до выполнения
-
LogicalPlanBuilderи DataFrame API — программные способы построения планов без SQL -
EXPLAINпоказывает дерево до и после оптимизации