PhysicalPlan: от логики к алгоритмам выполнения
Логический план описывает что вычислить. Физический план описывает как: конкретные алгоритмы, порядок операций, параллелизм. PhysicalPlanner преобразует каждый узел LogicalPlan в реализацию ExecutionPlan.
Trait ExecutionPlan
Все физические операторы реализуют trait ExecutionPlan:
pub trait ExecutionPlan: Send + Sync + Debug {
/// Выходная схема (Arrow SchemaRef)
fn schema(&self) -> SchemaRef;
/// Как данные распределены по партициям
fn output_partitioning(&self) -> Partitioning;
/// Требования к порядку входных данных
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortExpr>>>;
/// Дочерние операторы
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
/// Запуск выполнения для одной партиции
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>;
/// Статистика (кардинальность, размер) для оптимизатора
fn statistics(&self) -> Result<Statistics>;
}
Метод execute(partition, context) — точка входа: для каждой партиции возвращается асинхронный поток RecordBatch. Это позволяет обрабатывать партиции параллельно — tokio runtime распределяет их по потокам.
Модель партиционирования
Каждый ExecutionPlan объявляет свою стратегию партиционирования:
pub enum Partitioning {
RoundRobinBatch(usize), // N партиций, строки распределены равномерно
Hash(Vec<Arc<dyn PhysicalExpr>>, usize), // Партиционирование по hash ключа
UnknownPartitioning(usize), // N партиций, распределение неизвестно
}
В этой схеме:
DataSourceExecсоздаёт по одной партиции на файл (или row group)AggregateExec (partial)вычисляет промежуточные агрегаты в каждой партиции параллельноAggregateExec (final)объединяет промежуточные результатыCoalescePartitionsExecсводит N партиций в одну для глобальной сортировки
Repartition
Когда операция требует данные, распределённые по определённому ключу (например, Hash Join), планировщик вставляет RepartitionExec:
// Перераспределяет данные по Hash(user_id) на 8 партиций
RepartitionExec::try_new(
input,
Partitioning::Hash(vec![col("user_id")], 8),
)?
Repartition — дорогая операция (перемешивание данных между партициями), поэтому оптимизатор старается минимизировать количество repartition.
SendableRecordBatchStream
Результат execute() — это SendableRecordBatchStream:
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
fn schema(&self) -> SchemaRef;
}
Это асинхронный поток с гарантированной схемой. Каждый вызов .next().await возвращает следующий RecordBatch или None (конец потока). Ошибки передаются inline — каждый элемент потока это Result<RecordBatch>.
Pull-модель выполнения
DataFusion использует pull-модель (volcano model): потребитель запрашивает данные у производителя:
SortExec.next() → AggregateExec.next() → FilterExec.next() → DataSourceExec.next()
← RecordBatch ← RecordBatch ← RecordBatch ← RecordBatch
Преимущества pull-модели:
- Ленивость — данные обрабатываются по мере запроса
- Ограничение памяти — в любой момент в памяти только текущие batch-и каждого оператора
- Back-pressure — медленный потребитель автоматически замедляет производителя
Физические выражения: PhysicalExpr
Аналог Expr из логического плана, но работающий с реальными Arrow-данными:
pub trait PhysicalExpr: Send + Sync {
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
fn nullable(&self, input_schema: &Schema) -> Result<bool>;
}
evaluate() принимает RecordBatch и возвращает ColumnarValue:
ColumnarValue::Array(ArrayRef)— массив значений (по одному на строку)ColumnarValue::Scalar(ScalarValue)— одно значение, broadcast на все строки
Скалярная оптимизация существенна: выражение price * 1.2 не создаёт массив из одинаковых 1.2 — скаляр применяется поэлементно без аллокации.
Выбор алгоритмов: Join
Физический планировщик выбирает алгоритм join на основе характеристик входов:
| Алгоритм | ExecutionPlan | Когда используется |
|---|---|---|
| Hash Join | HashJoinExec | Один вход помещается в память. Build-сторона создаёт hash table, probe-сторона ищет совпадения |
| Sort-Merge Join | SortMergeJoinExec | Оба входа большие. Требует сортировки по ключу join, но не держит всё в памяти |
| Nested Loop Join | NestedLoopJoinExec | Для non-equi joins (ON a.x > b.y) или cross join. O(n*m), используется редко |
| Symmetric Hash Join | SymmetricHashJoinExec | Для streaming joins, когда оба входа — бесконечные потоки |
// Hash Join: build hash table из правого входа, probe левым
// Подходит когда правый вход маленький (dimension table)
HashJoinExec::try_new(
left, // probe side (большая таблица)
right, // build side (маленькая таблица, hash table в памяти)
on, // пары ключей
join_type, // Inner, Left, Right, Full, Semi, Anti
)?
DataFusion автоматически выбирает build-сторону Hash Join на основе статистики. Если статистика недоступна, по умолчанию правый вход — build-сторона. Можно переопределить через SessionConfig::with_prefer_existing_sort(true).
Выбор алгоритмов: Aggregate
Агрегация в DataFusion — двухфазная:
- Partial — каждая партиция вычисляет промежуточные аккумуляторы
- Final — аккумуляторы объединяются
Для GROUP BY используется hash-агрегация: DataFusion строит hash table по ключам группировки. Если данные уже отсортированы по ключу — используется streaming агрегация (без hash table).
// Partial: в каждой партиции
AggregateExec::try_new(
AggregateMode::Partial, // Промежуточные результаты
group_by,
aggr_expr,
input,
)?
// Final: объединение
AggregateExec::try_new(
AggregateMode::Final, // Финальное объединение
group_by,
aggr_expr,
partial_aggregate,
)?
Обменные операторы
Планировщик вставляет специальные операторы для управления партиционированием:
| Оператор | Назначение |
|---|---|
RepartitionExec | Перераспределяет данные (Hash, RoundRobin) |
CoalescePartitionsExec | Сводит N партиций в одну |
CoalesceBatchesExec | Объединяет мелкие batch в target_batch_size |
SortPreservingMergeExec | Merge-сортировка из N отсортированных партиций |
Эти операторы не вычисляют новые данные — они перемещают и объединяют RecordBatch между партициями.
Инспекция физического плана
let ctx = SessionContext::new();
let df = ctx.sql("EXPLAIN SELECT region, SUM(amount) FROM orders GROUP BY region").await?;
// Logical plan:
// Aggregate: groupBy=[[orders.region]], aggr=[[SUM(orders.amount)]]
// TableScan: orders
//
// Physical plan:
// AggregateExec: mode=Final, gby=[region@0 as region], aggr=[SUM(orders.amount)]
// CoalescePartitionsExec
// AggregateExec: mode=Partial, gby=[region@0 as region], aggr=[SUM(orders.amount)]
// DataSourceExec: file_groups={...}, format=parquet, projection=[region, amount]
Обратите внимание: физический план содержит два AggregateExec (Partial + Final) и CoalescePartitionsExec между ними — деталей, которых нет в логическом плане.
Итоги
-
ExecutionPlantrait определяет физический оператор:schema(),execute(),output_partitioning() -
execute(partition)возвращаетSendableRecordBatchStream— асинхронный поток RecordBatch - Партиционирование определяет параллелизм: больше партиций = больше потоков
- Join: HashJoin (по умолчанию), SortMergeJoin (для больших входов), NestedLoop (для non-equi)
- Агрегация: двухфазная (Partial → Final) с hash table или streaming
- Обменные операторы (Repartition, Coalesce, Merge) управляют потоком данных между партициями