Learning Platform
Глоссарий Troubleshooting
Урок 07.05 · 18 мин
Продвинутый
AnalyzerRulePhysicalOptimizerRuleEnforceDistributionEnforceSortingTypeCoercionJoinSelectionSessionStateBuilder

Analyzer и физические правила

Мы разобрали логические правила оптимизации (фаза 2) и научились писать кастомные OptimizerRule. Теперь рассмотрим две оставшиеся фазы: Analyzer (фаза 1, до логической оптимизации) и Physical Optimizer (фаза 3, после создания физического плана). Каждая фаза имеет свой trait, свои задачи и свой API регистрации.

Фаза 1: AnalyzerRule

Analyzer работает с «сырым» LogicalPlan сразу после парсинга. Его задача — разрешить неоднозначности и подготовить план для оптимизации.

Trait AnalyzerRule

use datafusion::common::config::ConfigOptions;
use datafusion::common::Result;
use datafusion::logical_expr::LogicalPlan;

pub trait AnalyzerRule {
    /// Имя правила (для диагностики)
    fn name(&self) -> &str;

    /// Трансформация плана
    fn analyze(
        &self,
        plan: LogicalPlan,
        config: &ConfigOptions,
    ) -> Result<LogicalPlan>;
}

Отличия от OptimizerRule:

  • Метод analyze вместо rewrite
  • Возвращает Result<LogicalPlan>, а не Transformed<LogicalPlan> — нет multi-pass, каждое правило применяется однократно
  • Конфигурация через ConfigOptions напрямую, а не через OptimizerConfig trait

Встроенные правила Analyzer

ПравилоЗадача
TypeCoercionВставка неявных CAST: Int32 + Float64CAST(Int32 AS Float64) + Float64
InlineTableScanРазворачивание SELECT * и inline view
CountWildcardRuleЗамена COUNT(*) на COUNT(Utf8("*"))
ApplyFunctionDefaultsПодстановка значений по умолчанию для аргументов функций

Пример: кастомное правило анализа

Допустим, мы хотим запретить запросы без фильтров на больших таблицах (защита от full scan):

use datafusion::common::config::ConfigOptions;
use datafusion::common::Result;
use datafusion::common::DataFusionError;
use datafusion::logical_expr::LogicalPlan;

#[derive(Debug, Default)]
struct RequireFilterRule;

impl AnalyzerRule for RequireFilterRule {
    fn name(&self) -> &str {
        "require_filter"
    }

    fn analyze(
        &self,
        plan: LogicalPlan,
        _config: &ConfigOptions,
    ) -> Result<LogicalPlan> {
        check_has_filter(&plan)?;
        Ok(plan)
    }
}

fn check_has_filter(plan: &LogicalPlan) -> Result<()> {
    match plan {
        LogicalPlan::TableScan(scan) => {
            // Проверить, есть ли фильтр в родительских узлах
            // (упрощённая проверка — в production нужна рекурсия)
            if scan.filters.is_empty() {
                // Разрешить только для маленьких таблиц
                return Err(DataFusionError::Plan(format!(
                    "Query on table '{}' requires a WHERE clause. \
                     Add a filter to avoid full scan.",
                    scan.table_name
                )));
            }
            Ok(())
        }
        _ => {
            for input in plan.inputs() {
                check_has_filter(input)?;
            }
            Ok(())
        }
    }
}
NOTE

AnalyzerRule — правильное место для семантических проверок и policy enforcement. Логические правила (OptimizerRule) не должны отклонять запросы — они только трансформируют план, сохраняя семантику. Analyzer может вернуть ошибку, запретив выполнение.

Регистрация AnalyzerRule

use datafusion::execution::SessionStateBuilder;
use std::sync::Arc;

let state = SessionStateBuilder::new()
    .with_analyzer_rules(vec![
        Arc::new(RequireFilterRule::default()),
    ])
    .build();

let ctx = SessionContext::new_with_state(state);

Фаза 3: PhysicalOptimizerRule

Physical Optimizer работает с ExecutionPlan — конкретными операторами, которые будут выполняться. Его задачи: выбрать алгоритмы, вставить операторы перераспределения и сортировки, оптимизировать pipeline.

Trait PhysicalOptimizerRule

use datafusion::common::config::ConfigOptions;
use datafusion::common::Result;
use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;

pub trait PhysicalOptimizerRule: Send + Sync {
    fn name(&self) -> &str;

    fn optimize(
        &self,
        plan: Arc<dyn ExecutionPlan>,
        config: &ConfigOptions,
    ) -> Result<Arc<dyn ExecutionPlan>>;
}

Отличия от OptimizerRule:

  • Работает с Arc<dyn ExecutionPlan>, не с LogicalPlan
  • Метод optimize вместо rewrite
  • Не использует Transformed — каждое правило применяется однократно

Встроенные физические правила

DataFusion содержит десятки физических правил. Ключевые:

EnforceDistribution

Вставляет RepartitionExec операторы, чтобы данные были распределены по нужным ключам для каждого оператора.

-- До EnforceDistribution
HashJoinExec: on=[(customer_id, id)]
  DataSourceExec: orders, format=parquet  -- партиционирован по file_groups
  DataSourceExec: customers, format=parquet  -- партиционирован по file_groups

-- После EnforceDistribution
HashJoinExec: mode=Partitioned, on=[(customer_id, id)]
  CoalesceBatchesExec: target_batch_size=8192
    RepartitionExec: partitioning=Hash([customer_id], 16)
      DataSourceExec: orders, format=parquet
  CoalesceBatchesExec: target_batch_size=8192
    RepartitionExec: partitioning=Hash([id], 16)
      DataSourceExec: customers, format=parquet

RepartitionExec перераспределяет данные по хешу ключа join — строки с одинаковым customer_id попадут в одну партицию, что позволяет параллельный HashJoin.

TIP

Стратегии перераспределения:

  • Hash — по хешу ключа (для join, group by)
  • RoundRobin — равномерно по партициям (для параллелизма без привязки к ключу)
  • Отключение: SET datafusion.optimizer.enable_round_robin_repartition = false

EnforceSorting

Вставляет SortExec операторы, где требуется определённый порядок данных, и удаляет лишние сортировки.

-- ORDER BY требует сортировки
-- Если данные уже отсортированы (Parquet sorted by timestamp), SortExec не вставляется
-- Если нет — EnforceSorting добавит SortExec

-- До
ProjectionExec: expr=[timestamp, value]
  DataSourceExec: ... (не отсортирован)

-- После EnforceSorting (для ORDER BY timestamp)
SortExec: expr=[timestamp@0 ASC]
  ProjectionExec: expr=[timestamp, value]
    DataSourceExec: ...

EnforceSorting также оптимизирует существующие сортировки:

  • Удаляет дублирующие SortExec (если данные уже в нужном порядке)
  • Заменяет полную сортировку на частичную (top-K) при наличии LIMIT
  • Учитывает порядок, обеспечиваемый источниками (SortedExec, отсортированные Parquet файлы)

JoinSelection

Выбирает алгоритм join на основе размера данных и доступной статистики:

Правила выбора (упрощённо):
1. Есть equi-предикат + обе стороны большие → HashJoinExec (Partitioned)
2. Есть equi-предикат + одна сторона мала → HashJoinExec (CollectLeft)
3. Данные отсортированы по ключу join → SortMergeJoinExec
4. Нет equi-предиката → NestedLoopJoinExec
Выбор алгоритма JOIN
JOIN predicateФизический оптимизатор анализирует предикат JOIN для выбора алгоритма
Есть equi-предикат?
Да: одна сторона мала?При наличии equi-предиката (ON a.id = b.id) проверяется размер сторон для выбора между режимами
Да: HashJoin CollectLeftCollectLeft: малая сторона собирается в одну партицию — быстрее при асимметрии размеров
Нет: HashJoin PartitionedPartitioned: обе стороны перераспределяются по хешу ключа — масштабируется на большие данные
Нет equi-предикатаБез equi-предиката (BETWEEN, <, >) невозможен Hash или SortMerge — только полный перебор
NestedLoopJoinNestedLoopJoin — O(N*M), избегайте на больших таблицах

Пример: кастомное физическое правило

Правило, которое заменяет SortExec с fetch на кастомный TopK оператор:

use datafusion::common::config::ConfigOptions;
use datafusion::common::Result;
use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;

#[derive(Debug, Default)]
struct CustomTopKRule;

impl PhysicalOptimizerRule for CustomTopKRule {
    fn name(&self) -> &str {
        "custom_topk"
    }

    fn optimize(
        &self,
        plan: Arc<dyn ExecutionPlan>,
        config: &ConfigOptions,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // Рекурсивно обойти дерево
        let children: Vec<Arc<dyn ExecutionPlan>> = plan
            .children()
            .iter()
            .map(|child| self.optimize(Arc::clone(child), config))
            .collect::<Result<Vec<_>>>()?;

        let plan = plan.with_new_children(children)?;

        // Проверить: это SortExec с fetch (LIMIT)?
        if let Some(sort) = plan.as_any().downcast_ref::<SortExec>() {
            if let Some(fetch) = sort.fetch() {
                if fetch <= 100 {
                    // Заменить на кастомный TopK оператор
                    // return Ok(Arc::new(MyTopKExec::new(sort, fetch)));
                }
            }
        }

        Ok(plan)
    }
}

Регистрация физического правила

let state = SessionStateBuilder::new()
    .with_physical_optimizer_rules(vec![
        Arc::new(CustomTopKRule::default()),
    ])
    .build();

let ctx = SessionContext::new_with_state(state);

Pipeline-breaking detection

Физический оптимизатор анализирует, какие операторы блокируют конвейер (pipeline-breaking). Блокирующие операторы должны накопить все данные перед выдачей результата:

ТипПримерыPipeline
StreamingFilterExec, ProjectionExecНе блокирует — строки проходят по одной
Partially blockingHashJoinExec (build side)Блокирует одну сторону
Fully blockingSortExec (без fetch), AggregateExec (Final)Блокирует полностью

Physical Optimizer минимизирует blocking операторы:

  • Двухфазная агрегация (Partial → Repartition → Final) вместо однофазной
  • TopK (SortExec с fetch) вместо полной сортировки
  • SortMergeJoin вместо HashJoin когда данные уже отсортированы
WARNING

При написании кастомных физических правил учитывайте pipeline semantics. Замена streaming оператора на blocking может увеличить потребление памяти и снизить latency первого результата. Проверяйте через EXPLAIN ANALYZE метрику elapsed_compute на каждом этапе.

Регистрация правил на всех уровнях

SessionStateBuilder позволяет настроить все три фазы одновременно:

let state = SessionStateBuilder::new()
    // Фаза 1: семантические проверки
    .with_analyzer_rules(vec![
        Arc::new(RequireFilterRule::default()),
    ])
    // Фаза 2: логические трансформации
    .with_optimizer_rules(vec![
        Arc::new(TenantIsolationRule { tenant_id: "t123".into() }),
        Arc::new(AddOneInliner::default()),
    ])
    // Фаза 3: физические оптимизации
    .with_physical_optimizer_rules(vec![
        Arc::new(CustomTopKRule::default()),
    ])
    .build();

let ctx = SessionContext::new_with_state(state);

Итоги

  • AnalyzerRule::analyze() — семантическая валидация и подготовка плана (type coercion, wildcard expansion, policy enforcement)
  • AnalyzerRule может отклонить запрос (вернуть ошибку) — OptimizerRule только трансформирует
  • PhysicalOptimizerRule::optimize() — выбор алгоритмов, вставка Repartition/Sort, pipeline-оптимизация
  • EnforceDistribution — вставляет RepartitionExec для параллельного join и aggregation
  • EnforceSorting — вставляет и удаляет SortExec на основе требований и имеющегося порядка
  • SessionStateBuilder позволяет регистрировать кастомные правила на каждом из трёх уровней

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Чем AnalyzerRule::analyze() отличается от OptimizerRule::rewrite() по возвращаемому типу?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6