Analyzer и физические правила
Мы разобрали логические правила оптимизации (фаза 2) и научились писать кастомные OptimizerRule. Теперь рассмотрим две оставшиеся фазы: Analyzer (фаза 1, до логической оптимизации) и Physical Optimizer (фаза 3, после создания физического плана). Каждая фаза имеет свой trait, свои задачи и свой API регистрации.
Фаза 1: AnalyzerRule
Analyzer работает с «сырым» LogicalPlan сразу после парсинга. Его задача — разрешить неоднозначности и подготовить план для оптимизации.
Trait AnalyzerRule
use datafusion::common::config::ConfigOptions;
use datafusion::common::Result;
use datafusion::logical_expr::LogicalPlan;
pub trait AnalyzerRule {
/// Имя правила (для диагностики)
fn name(&self) -> &str;
/// Трансформация плана
fn analyze(
&self,
plan: LogicalPlan,
config: &ConfigOptions,
) -> Result<LogicalPlan>;
}
Отличия от OptimizerRule:
- Метод
analyzeвместоrewrite - Возвращает
Result<LogicalPlan>, а неTransformed<LogicalPlan>— нет multi-pass, каждое правило применяется однократно - Конфигурация через
ConfigOptionsнапрямую, а не черезOptimizerConfigtrait
Встроенные правила Analyzer
| Правило | Задача |
|---|---|
TypeCoercion | Вставка неявных CAST: Int32 + Float64 → CAST(Int32 AS Float64) + Float64 |
InlineTableScan | Разворачивание SELECT * и inline view |
CountWildcardRule | Замена COUNT(*) на COUNT(Utf8("*")) |
ApplyFunctionDefaults | Подстановка значений по умолчанию для аргументов функций |
Пример: кастомное правило анализа
Допустим, мы хотим запретить запросы без фильтров на больших таблицах (защита от full scan):
use datafusion::common::config::ConfigOptions;
use datafusion::common::Result;
use datafusion::common::DataFusionError;
use datafusion::logical_expr::LogicalPlan;
#[derive(Debug, Default)]
struct RequireFilterRule;
impl AnalyzerRule for RequireFilterRule {
fn name(&self) -> &str {
"require_filter"
}
fn analyze(
&self,
plan: LogicalPlan,
_config: &ConfigOptions,
) -> Result<LogicalPlan> {
check_has_filter(&plan)?;
Ok(plan)
}
}
fn check_has_filter(plan: &LogicalPlan) -> Result<()> {
match plan {
LogicalPlan::TableScan(scan) => {
// Проверить, есть ли фильтр в родительских узлах
// (упрощённая проверка — в production нужна рекурсия)
if scan.filters.is_empty() {
// Разрешить только для маленьких таблиц
return Err(DataFusionError::Plan(format!(
"Query on table '{}' requires a WHERE clause. \
Add a filter to avoid full scan.",
scan.table_name
)));
}
Ok(())
}
_ => {
for input in plan.inputs() {
check_has_filter(input)?;
}
Ok(())
}
}
}
AnalyzerRule — правильное место для семантических проверок и policy enforcement. Логические правила (OptimizerRule) не должны отклонять запросы — они только трансформируют план, сохраняя семантику. Analyzer может вернуть ошибку, запретив выполнение.
Регистрация AnalyzerRule
use datafusion::execution::SessionStateBuilder;
use std::sync::Arc;
let state = SessionStateBuilder::new()
.with_analyzer_rules(vec![
Arc::new(RequireFilterRule::default()),
])
.build();
let ctx = SessionContext::new_with_state(state);
Фаза 3: PhysicalOptimizerRule
Physical Optimizer работает с ExecutionPlan — конкретными операторами, которые будут выполняться. Его задачи: выбрать алгоритмы, вставить операторы перераспределения и сортировки, оптимизировать pipeline.
Trait PhysicalOptimizerRule
use datafusion::common::config::ConfigOptions;
use datafusion::common::Result;
use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;
pub trait PhysicalOptimizerRule: Send + Sync {
fn name(&self) -> &str;
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>>;
}
Отличия от OptimizerRule:
- Работает с
Arc<dyn ExecutionPlan>, не сLogicalPlan - Метод
optimizeвместоrewrite - Не использует
Transformed— каждое правило применяется однократно
Встроенные физические правила
DataFusion содержит десятки физических правил. Ключевые:
EnforceDistribution
Вставляет RepartitionExec операторы, чтобы данные были распределены по нужным ключам для каждого оператора.
-- До EnforceDistribution
HashJoinExec: on=[(customer_id, id)]
DataSourceExec: orders, format=parquet -- партиционирован по file_groups
DataSourceExec: customers, format=parquet -- партиционирован по file_groups
-- После EnforceDistribution
HashJoinExec: mode=Partitioned, on=[(customer_id, id)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([customer_id], 16)
DataSourceExec: orders, format=parquet
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([id], 16)
DataSourceExec: customers, format=parquet
RepartitionExec перераспределяет данные по хешу ключа join — строки с одинаковым customer_id попадут в одну партицию, что позволяет параллельный HashJoin.
Стратегии перераспределения:
- Hash — по хешу ключа (для join, group by)
- RoundRobin — равномерно по партициям (для параллелизма без привязки к ключу)
- Отключение:
SET datafusion.optimizer.enable_round_robin_repartition = false
EnforceSorting
Вставляет SortExec операторы, где требуется определённый порядок данных, и удаляет лишние сортировки.
-- ORDER BY требует сортировки
-- Если данные уже отсортированы (Parquet sorted by timestamp), SortExec не вставляется
-- Если нет — EnforceSorting добавит SortExec
-- До
ProjectionExec: expr=[timestamp, value]
DataSourceExec: ... (не отсортирован)
-- После EnforceSorting (для ORDER BY timestamp)
SortExec: expr=[timestamp@0 ASC]
ProjectionExec: expr=[timestamp, value]
DataSourceExec: ...
EnforceSorting также оптимизирует существующие сортировки:
- Удаляет дублирующие
SortExec(если данные уже в нужном порядке) - Заменяет полную сортировку на частичную (top-K) при наличии
LIMIT - Учитывает порядок, обеспечиваемый источниками (
SortedExec, отсортированные Parquet файлы)
JoinSelection
Выбирает алгоритм join на основе размера данных и доступной статистики:
Правила выбора (упрощённо):
1. Есть equi-предикат + обе стороны большие → HashJoinExec (Partitioned)
2. Есть equi-предикат + одна сторона мала → HashJoinExec (CollectLeft)
3. Данные отсортированы по ключу join → SortMergeJoinExec
4. Нет equi-предиката → NestedLoopJoinExec
Пример: кастомное физическое правило
Правило, которое заменяет SortExec с fetch на кастомный TopK оператор:
use datafusion::common::config::ConfigOptions;
use datafusion::common::Result;
use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;
#[derive(Debug, Default)]
struct CustomTopKRule;
impl PhysicalOptimizerRule for CustomTopKRule {
fn name(&self) -> &str {
"custom_topk"
}
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
// Рекурсивно обойти дерево
let children: Vec<Arc<dyn ExecutionPlan>> = plan
.children()
.iter()
.map(|child| self.optimize(Arc::clone(child), config))
.collect::<Result<Vec<_>>>()?;
let plan = plan.with_new_children(children)?;
// Проверить: это SortExec с fetch (LIMIT)?
if let Some(sort) = plan.as_any().downcast_ref::<SortExec>() {
if let Some(fetch) = sort.fetch() {
if fetch <= 100 {
// Заменить на кастомный TopK оператор
// return Ok(Arc::new(MyTopKExec::new(sort, fetch)));
}
}
}
Ok(plan)
}
}
Регистрация физического правила
let state = SessionStateBuilder::new()
.with_physical_optimizer_rules(vec![
Arc::new(CustomTopKRule::default()),
])
.build();
let ctx = SessionContext::new_with_state(state);
Pipeline-breaking detection
Физический оптимизатор анализирует, какие операторы блокируют конвейер (pipeline-breaking). Блокирующие операторы должны накопить все данные перед выдачей результата:
| Тип | Примеры | Pipeline |
|---|---|---|
| Streaming | FilterExec, ProjectionExec | Не блокирует — строки проходят по одной |
| Partially blocking | HashJoinExec (build side) | Блокирует одну сторону |
| Fully blocking | SortExec (без fetch), AggregateExec (Final) | Блокирует полностью |
Physical Optimizer минимизирует blocking операторы:
- Двухфазная агрегация (Partial → Repartition → Final) вместо однофазной
- TopK (SortExec с fetch) вместо полной сортировки
- SortMergeJoin вместо HashJoin когда данные уже отсортированы
При написании кастомных физических правил учитывайте pipeline semantics. Замена streaming оператора на blocking может увеличить потребление памяти и снизить latency первого результата. Проверяйте через EXPLAIN ANALYZE метрику elapsed_compute на каждом этапе.
Регистрация правил на всех уровнях
SessionStateBuilder позволяет настроить все три фазы одновременно:
let state = SessionStateBuilder::new()
// Фаза 1: семантические проверки
.with_analyzer_rules(vec![
Arc::new(RequireFilterRule::default()),
])
// Фаза 2: логические трансформации
.with_optimizer_rules(vec![
Arc::new(TenantIsolationRule { tenant_id: "t123".into() }),
Arc::new(AddOneInliner::default()),
])
// Фаза 3: физические оптимизации
.with_physical_optimizer_rules(vec![
Arc::new(CustomTopKRule::default()),
])
.build();
let ctx = SessionContext::new_with_state(state);
Итоги
-
AnalyzerRule::analyze()— семантическая валидация и подготовка плана (type coercion, wildcard expansion, policy enforcement) - AnalyzerRule может отклонить запрос (вернуть ошибку) — OptimizerRule только трансформирует
-
PhysicalOptimizerRule::optimize()— выбор алгоритмов, вставка Repartition/Sort, pipeline-оптимизация -
EnforceDistribution— вставляетRepartitionExecдля параллельного join и aggregation -
EnforceSorting— вставляет и удаляетSortExecна основе требований и имеющегося порядка -
SessionStateBuilderпозволяет регистрировать кастомные правила на каждом из трёх уровней