Learning Platform
Глоссарий Troubleshooting
Урок 07.01 · 18 мин
Продвинутый
OptimizerAnalyzerRuleOptimizerRulePhysicalOptimizerRuleTransformedOptimizerConfigObserverSubstraitdatafusion-substraitplan portabilitycross-engine

Архитектура оптимизатора

В модуле 03 мы научились читать планы через EXPLAIN — видеть, какие операторы DataFusion выбрал и в каком порядке. Теперь разберём, как оптимизатор строит эти планы: из чего состоит конвейер оптимизации, как правила применяются к логическому и физическому планам, и как наблюдать за каждым шагом трансформации.

Три фазы оптимизации

Путь запроса от SQL-текста до физического плана проходит три фазы оптимизации. Каждая фаза — цепочка правил, применяемых к определённому представлению плана.

Трёхфазный конвейер оптимизации
SQL / DataFrame APISQL-запрос или DataFrame API — входная точка конвейера оптимизации
Парсинг → Unresolved LogicalPlan
Фаза 1: Analyzer (AnalyzerRules)Analyzer разрешает типы, подставляет CAST, разворачивает wildcard и валидирует семантику
Resolved LogicalPlan
Фаза 2: Logical Optimizer (OptimizerRules)Логический оптимизатор применяет multi-pass правила: pushdown фильтров, устранение проекций, упрощение выражений
Optimized LogicalPlan
Фаза 3: Physical Optimizer (PhysicalOptimizerRules)Физический оптимизатор выбирает алгоритмы JOIN, вставляет Repartition/Sort и оптимизирует pipeline
Optimized ExecutionPlan
Выполнение (RecordBatch stream)Финальный физический план выполняется как потоковый конвейер RecordBatch

Фаза 1: Analyzer

Analyzer работает с «сырым» логическим планом сразу после парсинга SQL. На этом этапе план ещё не полностью разрешён: типы могут быть неизвестны, неявные приведения не вставлены, wildcard * не развёрнут.

Задачи Analyzer:

  • Type coercion — вставка неявных приведений типов (Int32 + Float64 → оба к Float64)
  • Разрешение * — замена SELECT * на конкретный список колонок
  • Inline TableAlias — подстановка алиасов таблиц
  • Семантическая валидация — проверка существования колонок, корректности агрегаций

Каждое правило реализует trait AnalyzerRule:

pub trait AnalyzerRule {
    fn name(&self) -> &str;
    fn analyze(
        &self,
        plan: LogicalPlan,
        config: &ConfigOptions,
    ) -> Result<LogicalPlan>;
}

Фаза 2: Logical Optimizer

После анализа план полностью разрешён — типы известны, ссылки валидны. Logical Optimizer трансформирует план, сохраняя семантику, но улучшая эффективность: спускает фильтры ближе к источнику, удаляет ненужные проекции, упрощает выражения.

Каждое правило реализует trait OptimizerRule:

pub trait OptimizerRule: Send + Sync {
    fn name(&self) -> &str;

    fn rewrite(
        &self,
        plan: LogicalPlan,
        config: &dyn OptimizerConfig,
    ) -> Result<Transformed<LogicalPlan>>;
}

Ключевое отличие от AnalyzerRule: метод rewrite принимает план по значению (ownership) и возвращает Transformed<LogicalPlan>. Enum Transformed сигнализирует оптимизатору, было ли изменение:

pub enum Transformed<T> {
    /// Данные изменены — оптимизатор может повторить проход
    yes(T),
    /// Данные не изменены — пропускаем повторный проход
    no(T),
}

Фаза 3: Physical Optimizer

Physical Optimizer работает с ExecutionPlan — конкретными операторами (HashJoinExec, SortExec, DataSourceExec). Его задачи:

  • Выбор алгоритма соединения (Hash vs SortMerge vs NestedLoop)
  • Вставка операторов перераспределения (RepartitionExec)
  • Вставка операторов сортировки (SortExec) где требуется порядок
  • Оптимизация pipeline (объединение последовательных проекций)
pub trait PhysicalOptimizerRule: Send + Sync {
    fn name(&self) -> &str;

    fn optimize(
        &self,
        plan: Arc<dyn ExecutionPlan>,
        config: &ConfigOptions,
    ) -> Result<Arc<dyn ExecutionPlan>>;
}

Multi-pass выполнение

Logical Optimizer не применяет каждое правило однократно. Он выполняет несколько проходов (passes) по всем правилам, пока план не стабилизируется или не будет достигнут лимит.

// Конфигурация максимального числа проходов
let config = OptimizerContext::new()
    .with_max_passes(3);  // По умолчанию: 3

let optimizer = Optimizer::with_rules(rules);
let optimized = optimizer.optimize(plan, &config, observe)?;

Логика multi-pass:

  1. Применить все правила по порядку
  2. Если хотя бы одно правило вернуло Transformed::yes — повторить проход
  3. Если все правила вернули Transformed::no или достигнут max_passes — остановиться
NOTE

Три прохода по умолчанию достаточны для большинства планов. Правила взаимно зависимы: PushDownFilter может создать новые возможности для SimplifyExpressions, которое в свою очередь позволит EliminateFilter удалить тривиальный фильтр WHERE true. Multi-pass гарантирует, что такие цепочки отработают полностью.

Observer callback

DataFusion предоставляет механизм observer для наблюдения за работой оптимизатора. Callback вызывается после каждого применения правила — это бесценный инструмент для отладки трансформаций.

use datafusion::optimizer::{Optimizer, OptimizerContext};
use datafusion::logical_expr::LogicalPlan;
use datafusion::common::Result;

fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) {
    println!(
        "After rule '{}': \n{}",
        rule.name(),
        plan.display_indent()
    );
}

let optimizer = Optimizer::with_rules(rules);
let config = OptimizerContext::new();

// optimize с observer callback
let optimized = optimizer.optimize(
    logical_plan,
    &config,
    observe,  // fn(&LogicalPlan, &dyn OptimizerRule)
)?;

Пример вывода observer при оптимизации SELECT * FROM t WHERE a > 5 AND true:

After rule 'simplify_expressions':
  Filter: t.a > Int64(5)          -- AND true убрано
    TableScan: t

After rule 'push_down_filter':
  TableScan: t, filter=[a > 5]   -- фильтр спущен в scan
TIP

Observer — лучший способ понять, почему оптимизатор принял то или иное решение. Вместо сравнения начального и конечного планов, вы видите каждый шаг трансформации с именем правила.

Порядок правил

Порядок правил в цепочке имеет значение. DataFusion определяет порядок встроенных правил так, чтобы максимизировать возможности каждого следующего правила:

1. SimplifyExpressions     — упростить выражения (WHERE true AND x > 5 → WHERE x > 5)
2. UnwrapCastInComparison  — убрать лишние CAST
3. ReplaceDistinctWithAggregate — DISTINCT → GROUP BY
4. DecorrelatePredicateSubquery — декорреляция подзапросов
5. ScalarSubqueryToJoin    — скалярный подзапрос → JOIN
6. ExtractEquijoinPredicate — выделить equi-join предикаты
7. PushDownFilter          — спустить фильтры к источнику
8. OptimizeProjections     — убрать ненужные проекции
9. CommonSubexprEliminate  — устранить повторные вычисления
10. EliminateLimit         — убрать LIMIT без эффекта
WARNING

При добавлении кастомных правил важно правильно выбрать позицию в цепочке. Правило, зависящее от упрощённых выражений, должно идти после SimplifyExpressions. Правило, работающее с фильтрами, — после PushDownFilter, чтобы фильтры уже были на месте.

Создание Optimizer вручную

Для тестирования или нестандартных сценариев можно создать оптимизатор с произвольным набором правил:

use datafusion::optimizer::{Optimizer, OptimizerContext, OptimizerRule};
use std::sync::Arc;

// Только два правила — для тестирования
let optimizer = Optimizer::with_rules(vec![
    Arc::new(MyCustomRule::default()),
    Arc::new(AnotherRule::default()),
]);

let config = OptimizerContext::new()
    .with_max_passes(5)      // Больше проходов для сложных планов
    .with_skip_failing_rules(true); // Не останавливаться при ошибке правила

let result = optimizer.optimize(plan, &config, |_, _| {})?;

Интеграция через SessionStateBuilder

В production правила обычно добавляются через SessionStateBuilder, который подключает их в соответствующую фазу конвейера:

use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::SessionContext;
use std::sync::Arc;

let state = SessionStateBuilder::new()
    // Добавить кастомное правило анализа
    .with_analyzer_rules(vec![
        Arc::new(MyAnalyzerRule::default()),
    ])
    // Добавить кастомное правило логической оптимизации
    .with_optimizer_rules(vec![
        Arc::new(MyOptimizerRule::default()),
    ])
    // Добавить кастомное правило физической оптимизации
    .with_physical_optimizer_rules(vec![
        Arc::new(MyPhysicalRule::default()),
    ])
    .build();

let ctx = SessionContext::new_with_state(state);
NOTE

SessionStateBuilder заменяет устаревшие паттерны SessionState::new() и SessionConfig::with_*. Он позволяет полностью контролировать набор правил для каждой фазы конвейера.

Substrait — сериализованный план для cross-engine portability

Substrait — открытая спецификация для сериализации логических и физических query-планов между разными engine. Идея: план, построенный одним engine (DataFusion), может быть выполнен другим engine (Spark, Trino, Velox), при условии что оба понимают Substrait.

Зачем нужен Substrait

DataFusion имеет свой in-memory LogicalPlan (Rust struct) — он непереносим: нельзя сериализовать и отправить план в Spark JVM или в Trino. Substrait вводит универсальный язык для планов на основе protobuf:

Producer (DataFusion) → LogicalPlan → Substrait protobuf bytes

                                 (передача по сети / диск)

Consumer (Spark / Trino / Velox) ← Substrait protobuf bytes → их native план

Use cases

  1. Plan portability — сохранить план, выполнить на другом engine без переписывания SQL.
  2. Compute pushdown через границы systems — DataFusion-driver отправляет Substrait-план в исполнитель (Velox, ClickHouse).
  3. Plan caching — закэшировать оптимизированный план как Substrait protobuf, переиспользовать без re-парсинга SQL.
  4. Multi-engine federation — координирующий engine (DataFusion) делегирует подпланы другим engine (Substrait как протокол).

datafusion-substrait crate

DataFusion поддерживает Substrait через datafusion-substrait crate (separate from main datafusion crate).

use datafusion::prelude::SessionContext;
use datafusion_substrait::logical_plan::{producer, consumer};
use datafusion_substrait::serializer;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let ctx = SessionContext::new();
    ctx.register_csv("orders", "data/orders.csv", Default::default()).await?;

    // 1. Построить LogicalPlan
    let df = ctx
        .sql("SELECT region, SUM(amount) FROM orders WHERE status = 'completed' GROUP BY 1")
        .await?;
    let logical_plan = df.into_optimized_plan()?;

    // 2. DataFusion LogicalPlan → Substrait protobuf
    let substrait_plan = producer::to_substrait_plan(&logical_plan, &ctx.state())?;

    // 3. Сериализовать в bytes (для передачи или storage)
    let bytes = serializer::serialize_bytes(&substrait_plan)?;
    println!("Substrait plan: {} bytes", bytes.len());

    // 4. Десериализовать в другом процессе / engine
    let restored = serializer::deserialize_bytes(&bytes)?;

    // 5. Substrait → DataFusion LogicalPlan (тот же или другой ctx)
    let new_logical_plan = consumer::from_substrait_plan(&ctx.state(), &restored).await?;

    // Выполняем восстановленный план
    let df2 = DataFrame::new(ctx.state(), new_logical_plan);
    df2.show().await?;
    Ok(())
}

Cross-engine pipeline

# Спарк-сторона (Java/Scala/PySpark): выполнить Substrait-план, построенный DataFusion
from pyspark.sql import SparkSession
import substrait

spark = SparkSession.builder.getOrCreate()

# Substrait-план получен от DataFusion (через файл, gRPC, etc.)
with open("plan.substrait", "rb") as f:
    plan_bytes = f.read()

# Spark Substrait extension (gluten / native-spark) выполняет план
# (точное API зависит от версии Spark Substrait support)
result = spark.execute_substrait(plan_bytes)
result.show()

Текущее состояние DataFusion Substrait support

FeatureDataFusion 49+ status
Logical plan: Project / Filter / Aggregate / Sort / LimitПолная поддержка
Joins (Inner / Left / Right / Full)Полная поддержка
Window functionsПоддерживается
Subqueries (correlated, scalar, EXISTS/IN)Частично
UDF / UDAFЧерез extension functions (требует registration на consumer)
Set operations (UNION / INTERSECT / EXCEPT)Поддерживается
Common table expressions (WITH)Поддерживается
Physical plans (RelRoot extensions)Экспериментальная поддержка
WARNING

Substrait coverage не 100%: некоторые DataFusion-specific operators (custom ExecutionPlan, специфические optimizer rules) не имеют representation в Substrait. Roundtrip LogicalPlan → Substrait → LogicalPlan теряет engine-specific детали — это спецификация-минимум для portability, не полный snapshot. Для cross-engine workflows закладывайте integration testing на каждый план.

TIP

Substrait — относительно молодой стандарт (1.0 в 2023). Он стабилизируется, но ecosystem coverage растёт постепенно: Spark поддержка через Gluten, Velox через Substrait-Velox plugin, Trino — экспериментальная. Для production cross-engine federation сейчас обычно используют либо Substrait для специфичных интеграций, либо raw SQL strings для простоты.

Cite substrait.io + datafusion-substrait crate + Substrait Specification.


Итоги

  • Оптимизация проходит три фазы: Analyzer (разрешение типов) → Logical Optimizer (логические трансформации) → Physical Optimizer (выбор алгоритмов)
  • OptimizerRule::rewrite() возвращает Transformed<LogicalPlan>yes или no сигнализирует, было ли изменение
  • Multi-pass: правила применяются повторно (до max_passes), пока план не стабилизируется
  • Observer callback показывает план после каждого правила — ключевой инструмент отладки
  • Порядок правил важен — каждое правило создаёт возможности для следующих
  • SessionStateBuilder — рекомендуемый способ регистрации кастомных правил на всех уровнях

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какой порядок фаз оптимизации проходит запрос от SQL-текста до выполнения в DataFusion?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6