Архитектура оптимизатора
В модуле 03 мы научились читать планы через EXPLAIN — видеть, какие операторы DataFusion выбрал и в каком порядке. Теперь разберём, как оптимизатор строит эти планы: из чего состоит конвейер оптимизации, как правила применяются к логическому и физическому планам, и как наблюдать за каждым шагом трансформации.
Три фазы оптимизации
Путь запроса от SQL-текста до физического плана проходит три фазы оптимизации. Каждая фаза — цепочка правил, применяемых к определённому представлению плана.
Фаза 1: Analyzer
Analyzer работает с «сырым» логическим планом сразу после парсинга SQL. На этом этапе план ещё не полностью разрешён: типы могут быть неизвестны, неявные приведения не вставлены, wildcard * не развёрнут.
Задачи Analyzer:
- Type coercion — вставка неявных приведений типов (
Int32 + Float64→ оба кFloat64) - Разрешение
*— заменаSELECT *на конкретный список колонок - Inline TableAlias — подстановка алиасов таблиц
- Семантическая валидация — проверка существования колонок, корректности агрегаций
Каждое правило реализует trait AnalyzerRule:
pub trait AnalyzerRule {
fn name(&self) -> &str;
fn analyze(
&self,
plan: LogicalPlan,
config: &ConfigOptions,
) -> Result<LogicalPlan>;
}
Фаза 2: Logical Optimizer
После анализа план полностью разрешён — типы известны, ссылки валидны. Logical Optimizer трансформирует план, сохраняя семантику, но улучшая эффективность: спускает фильтры ближе к источнику, удаляет ненужные проекции, упрощает выражения.
Каждое правило реализует trait OptimizerRule:
pub trait OptimizerRule: Send + Sync {
fn name(&self) -> &str;
fn rewrite(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>>;
}
Ключевое отличие от AnalyzerRule: метод rewrite принимает план по значению (ownership) и возвращает Transformed<LogicalPlan>. Enum Transformed сигнализирует оптимизатору, было ли изменение:
pub enum Transformed<T> {
/// Данные изменены — оптимизатор может повторить проход
yes(T),
/// Данные не изменены — пропускаем повторный проход
no(T),
}
Фаза 3: Physical Optimizer
Physical Optimizer работает с ExecutionPlan — конкретными операторами (HashJoinExec, SortExec, DataSourceExec). Его задачи:
- Выбор алгоритма соединения (Hash vs SortMerge vs NestedLoop)
- Вставка операторов перераспределения (
RepartitionExec) - Вставка операторов сортировки (
SortExec) где требуется порядок - Оптимизация pipeline (объединение последовательных проекций)
pub trait PhysicalOptimizerRule: Send + Sync {
fn name(&self) -> &str;
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>>;
}
Multi-pass выполнение
Logical Optimizer не применяет каждое правило однократно. Он выполняет несколько проходов (passes) по всем правилам, пока план не стабилизируется или не будет достигнут лимит.
// Конфигурация максимального числа проходов
let config = OptimizerContext::new()
.with_max_passes(3); // По умолчанию: 3
let optimizer = Optimizer::with_rules(rules);
let optimized = optimizer.optimize(plan, &config, observe)?;
Логика multi-pass:
- Применить все правила по порядку
- Если хотя бы одно правило вернуло
Transformed::yes— повторить проход - Если все правила вернули
Transformed::noили достигнутmax_passes— остановиться
Три прохода по умолчанию достаточны для большинства планов. Правила взаимно зависимы: PushDownFilter может создать новые возможности для SimplifyExpressions, которое в свою очередь позволит EliminateFilter удалить тривиальный фильтр WHERE true. Multi-pass гарантирует, что такие цепочки отработают полностью.
Observer callback
DataFusion предоставляет механизм observer для наблюдения за работой оптимизатора. Callback вызывается после каждого применения правила — это бесценный инструмент для отладки трансформаций.
use datafusion::optimizer::{Optimizer, OptimizerContext};
use datafusion::logical_expr::LogicalPlan;
use datafusion::common::Result;
fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) {
println!(
"After rule '{}': \n{}",
rule.name(),
plan.display_indent()
);
}
let optimizer = Optimizer::with_rules(rules);
let config = OptimizerContext::new();
// optimize с observer callback
let optimized = optimizer.optimize(
logical_plan,
&config,
observe, // fn(&LogicalPlan, &dyn OptimizerRule)
)?;
Пример вывода observer при оптимизации SELECT * FROM t WHERE a > 5 AND true:
After rule 'simplify_expressions':
Filter: t.a > Int64(5) -- AND true убрано
TableScan: t
After rule 'push_down_filter':
TableScan: t, filter=[a > 5] -- фильтр спущен в scan
Observer — лучший способ понять, почему оптимизатор принял то или иное решение. Вместо сравнения начального и конечного планов, вы видите каждый шаг трансформации с именем правила.
Порядок правил
Порядок правил в цепочке имеет значение. DataFusion определяет порядок встроенных правил так, чтобы максимизировать возможности каждого следующего правила:
1. SimplifyExpressions — упростить выражения (WHERE true AND x > 5 → WHERE x > 5)
2. UnwrapCastInComparison — убрать лишние CAST
3. ReplaceDistinctWithAggregate — DISTINCT → GROUP BY
4. DecorrelatePredicateSubquery — декорреляция подзапросов
5. ScalarSubqueryToJoin — скалярный подзапрос → JOIN
6. ExtractEquijoinPredicate — выделить equi-join предикаты
7. PushDownFilter — спустить фильтры к источнику
8. OptimizeProjections — убрать ненужные проекции
9. CommonSubexprEliminate — устранить повторные вычисления
10. EliminateLimit — убрать LIMIT без эффекта
При добавлении кастомных правил важно правильно выбрать позицию в цепочке. Правило, зависящее от упрощённых выражений, должно идти после SimplifyExpressions. Правило, работающее с фильтрами, — после PushDownFilter, чтобы фильтры уже были на месте.
Создание Optimizer вручную
Для тестирования или нестандартных сценариев можно создать оптимизатор с произвольным набором правил:
use datafusion::optimizer::{Optimizer, OptimizerContext, OptimizerRule};
use std::sync::Arc;
// Только два правила — для тестирования
let optimizer = Optimizer::with_rules(vec![
Arc::new(MyCustomRule::default()),
Arc::new(AnotherRule::default()),
]);
let config = OptimizerContext::new()
.with_max_passes(5) // Больше проходов для сложных планов
.with_skip_failing_rules(true); // Не останавливаться при ошибке правила
let result = optimizer.optimize(plan, &config, |_, _| {})?;
Интеграция через SessionStateBuilder
В production правила обычно добавляются через SessionStateBuilder, который подключает их в соответствующую фазу конвейера:
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::SessionContext;
use std::sync::Arc;
let state = SessionStateBuilder::new()
// Добавить кастомное правило анализа
.with_analyzer_rules(vec![
Arc::new(MyAnalyzerRule::default()),
])
// Добавить кастомное правило логической оптимизации
.with_optimizer_rules(vec![
Arc::new(MyOptimizerRule::default()),
])
// Добавить кастомное правило физической оптимизации
.with_physical_optimizer_rules(vec![
Arc::new(MyPhysicalRule::default()),
])
.build();
let ctx = SessionContext::new_with_state(state);
SessionStateBuilder заменяет устаревшие паттерны SessionState::new() и SessionConfig::with_*. Он позволяет полностью контролировать набор правил для каждой фазы конвейера.
Substrait — сериализованный план для cross-engine portability
Substrait — открытая спецификация для сериализации логических и физических query-планов между разными engine. Идея: план, построенный одним engine (DataFusion), может быть выполнен другим engine (Spark, Trino, Velox), при условии что оба понимают Substrait.
Зачем нужен Substrait
DataFusion имеет свой in-memory LogicalPlan (Rust struct) — он непереносим: нельзя сериализовать и отправить план в Spark JVM или в Trino. Substrait вводит универсальный язык для планов на основе protobuf:
Producer (DataFusion) → LogicalPlan → Substrait protobuf bytes
↓
(передача по сети / диск)
↓
Consumer (Spark / Trino / Velox) ← Substrait protobuf bytes → их native план
Use cases
- Plan portability — сохранить план, выполнить на другом engine без переписывания SQL.
- Compute pushdown через границы systems — DataFusion-driver отправляет Substrait-план в исполнитель (Velox, ClickHouse).
- Plan caching — закэшировать оптимизированный план как Substrait protobuf, переиспользовать без re-парсинга SQL.
- Multi-engine federation — координирующий engine (DataFusion) делегирует подпланы другим engine (Substrait как протокол).
datafusion-substrait crate
DataFusion поддерживает Substrait через datafusion-substrait crate (separate from main datafusion crate).
use datafusion::prelude::SessionContext;
use datafusion_substrait::logical_plan::{producer, consumer};
use datafusion_substrait::serializer;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new();
ctx.register_csv("orders", "data/orders.csv", Default::default()).await?;
// 1. Построить LogicalPlan
let df = ctx
.sql("SELECT region, SUM(amount) FROM orders WHERE status = 'completed' GROUP BY 1")
.await?;
let logical_plan = df.into_optimized_plan()?;
// 2. DataFusion LogicalPlan → Substrait protobuf
let substrait_plan = producer::to_substrait_plan(&logical_plan, &ctx.state())?;
// 3. Сериализовать в bytes (для передачи или storage)
let bytes = serializer::serialize_bytes(&substrait_plan)?;
println!("Substrait plan: {} bytes", bytes.len());
// 4. Десериализовать в другом процессе / engine
let restored = serializer::deserialize_bytes(&bytes)?;
// 5. Substrait → DataFusion LogicalPlan (тот же или другой ctx)
let new_logical_plan = consumer::from_substrait_plan(&ctx.state(), &restored).await?;
// Выполняем восстановленный план
let df2 = DataFrame::new(ctx.state(), new_logical_plan);
df2.show().await?;
Ok(())
}
Cross-engine pipeline
# Спарк-сторона (Java/Scala/PySpark): выполнить Substrait-план, построенный DataFusion
from pyspark.sql import SparkSession
import substrait
spark = SparkSession.builder.getOrCreate()
# Substrait-план получен от DataFusion (через файл, gRPC, etc.)
with open("plan.substrait", "rb") as f:
plan_bytes = f.read()
# Spark Substrait extension (gluten / native-spark) выполняет план
# (точное API зависит от версии Spark Substrait support)
result = spark.execute_substrait(plan_bytes)
result.show()
Текущее состояние DataFusion Substrait support
| Feature | DataFusion 49+ status |
|---|---|
| Logical plan: Project / Filter / Aggregate / Sort / Limit | Полная поддержка |
| Joins (Inner / Left / Right / Full) | Полная поддержка |
| Window functions | Поддерживается |
| Subqueries (correlated, scalar, EXISTS/IN) | Частично |
| UDF / UDAF | Через extension functions (требует registration на consumer) |
| Set operations (UNION / INTERSECT / EXCEPT) | Поддерживается |
| Common table expressions (WITH) | Поддерживается |
| Physical plans (RelRoot extensions) | Экспериментальная поддержка |
Substrait coverage не 100%: некоторые DataFusion-specific operators (custom ExecutionPlan, специфические optimizer rules) не имеют representation в Substrait. Roundtrip LogicalPlan → Substrait → LogicalPlan теряет engine-specific детали — это спецификация-минимум для portability, не полный snapshot. Для cross-engine workflows закладывайте integration testing на каждый план.
Substrait — относительно молодой стандарт (1.0 в 2023). Он стабилизируется, но ecosystem coverage растёт постепенно: Spark поддержка через Gluten, Velox через Substrait-Velox plugin, Trino — экспериментальная. Для production cross-engine federation сейчас обычно используют либо Substrait для специфичных интеграций, либо raw SQL strings для простоты.
Cite substrait.io + datafusion-substrait crate + Substrait Specification.
Итоги
- Оптимизация проходит три фазы: Analyzer (разрешение типов) → Logical Optimizer (логические трансформации) → Physical Optimizer (выбор алгоритмов)
-
OptimizerRule::rewrite()возвращаетTransformed<LogicalPlan>—yesилиnoсигнализирует, было ли изменение - Multi-pass: правила применяются повторно (до
max_passes), пока план не стабилизируется - Observer callback показывает план после каждого правила — ключевой инструмент отладки
- Порядок правил важен — каждое правило создаёт возможности для следующих
-
SessionStateBuilder— рекомендуемый способ регистрации кастомных правил на всех уровнях