Learning Platform
Глоссарий Troubleshooting
Урок 03.03 · 17 мин
Средний
ExecutionPlanPhysicalPlanPartitioningSendableRecordBatchStreamHashJoinSortMergeJoin

PhysicalPlan: от логики к алгоритмам выполнения

Логический план описывает что вычислить. Физический план описывает как: конкретные алгоритмы, порядок операций, параллелизм. PhysicalPlanner преобразует каждый узел LogicalPlan в реализацию ExecutionPlan.

Trait ExecutionPlan

Все физические операторы реализуют trait ExecutionPlan:

pub trait ExecutionPlan: Send + Sync + Debug {
    /// Выходная схема (Arrow SchemaRef)
    fn schema(&self) -> SchemaRef;

    /// Как данные распределены по партициям
    fn output_partitioning(&self) -> Partitioning;

    /// Требования к порядку входных данных
    fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortExpr>>>;

    /// Дочерние операторы
    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;

    /// Запуск выполнения для одной партиции
    fn execute(
        &self,
        partition: usize,
        context: Arc<TaskContext>,
    ) -> Result<SendableRecordBatchStream>;

    /// Статистика (кардинальность, размер) для оптимизатора
    fn statistics(&self) -> Result<Statistics>;
}

Метод execute(partition, context) — точка входа: для каждой партиции возвращается асинхронный поток RecordBatch. Это позволяет обрабатывать партиции параллельно — tokio runtime распределяет их по потокам.

Модель партиционирования

Каждый ExecutionPlan объявляет свою стратегию партиционирования:

pub enum Partitioning {
    RoundRobinBatch(usize),        // N партиций, строки распределены равномерно
    Hash(Vec<Arc<dyn PhysicalExpr>>, usize), // Партиционирование по hash ключа
    UnknownPartitioning(usize),    // N партиций, распределение неизвестно
}
Партиционирование и параллелизм
CoalescePartitionsExecСводит N партиций в одну для глобальных операций (ORDER BY, финальный LIMIT)
1 партиция
SortExec (ORDER BY total DESC)Глобальная сортировка по ключу — требует единственной входной партиции
AggregateExec (final merge)Финальное объединение промежуточных аккумуляторов из всех партиций
N партиций
AggregateExec (partial)Частичная агрегация: вычисляет промежуточные аккумуляторы в своей партиции
DataSourceExec (part 0)Чтение одного файла или row group Parquet — отдельная партиция данных
AggregateExec (partial)Частичная агрегация: вычисляет промежуточные аккумуляторы в своей партиции
DataSourceExec (part 1)Чтение второго файла Parquet параллельно с другими партициями
AggregateExec (partial)Частичная агрегация: вычисляет промежуточные аккумуляторы в своей партиции
DataSourceExec (part 2)Чтение третьего файла Parquet — все партиции исполняются параллельно через tokio

В этой схеме:

  • DataSourceExec создаёт по одной партиции на файл (или row group)
  • AggregateExec (partial) вычисляет промежуточные агрегаты в каждой партиции параллельно
  • AggregateExec (final) объединяет промежуточные результаты
  • CoalescePartitionsExec сводит N партиций в одну для глобальной сортировки

Repartition

Когда операция требует данные, распределённые по определённому ключу (например, Hash Join), планировщик вставляет RepartitionExec:

// Перераспределяет данные по Hash(user_id) на 8 партиций
RepartitionExec::try_new(
    input,
    Partitioning::Hash(vec![col("user_id")], 8),
)?

Repartition — дорогая операция (перемешивание данных между партициями), поэтому оптимизатор старается минимизировать количество repartition.

SendableRecordBatchStream

Результат execute() — это SendableRecordBatchStream:

pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;

pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
    fn schema(&self) -> SchemaRef;
}

Это асинхронный поток с гарантированной схемой. Каждый вызов .next().await возвращает следующий RecordBatch или None (конец потока). Ошибки передаются inline — каждый элемент потока это Result<RecordBatch>.

Pull-модель выполнения

DataFusion использует pull-модель (volcano model): потребитель запрашивает данные у производителя:

SortExec.next()          →  AggregateExec.next()    →  FilterExec.next()   →  DataSourceExec.next()
         ← RecordBatch   ←            RecordBatch    ←          RecordBatch ←          RecordBatch

Преимущества pull-модели:

  • Ленивость — данные обрабатываются по мере запроса
  • Ограничение памяти — в любой момент в памяти только текущие batch-и каждого оператора
  • Back-pressure — медленный потребитель автоматически замедляет производителя

Физические выражения: PhysicalExpr

Аналог Expr из логического плана, но работающий с реальными Arrow-данными:

pub trait PhysicalExpr: Send + Sync {
    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
    fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
    fn nullable(&self, input_schema: &Schema) -> Result<bool>;
}

evaluate() принимает RecordBatch и возвращает ColumnarValue:

  • ColumnarValue::Array(ArrayRef) — массив значений (по одному на строку)
  • ColumnarValue::Scalar(ScalarValue) — одно значение, broadcast на все строки

Скалярная оптимизация существенна: выражение price * 1.2 не создаёт массив из одинаковых 1.2 — скаляр применяется поэлементно без аллокации.

Выбор алгоритмов: Join

Физический планировщик выбирает алгоритм join на основе характеристик входов:

АлгоритмExecutionPlanКогда используется
Hash JoinHashJoinExecОдин вход помещается в память. Build-сторона создаёт hash table, probe-сторона ищет совпадения
Sort-Merge JoinSortMergeJoinExecОба входа большие. Требует сортировки по ключу join, но не держит всё в памяти
Nested Loop JoinNestedLoopJoinExecДля non-equi joins (ON a.x > b.y) или cross join. O(n*m), используется редко
Symmetric Hash JoinSymmetricHashJoinExecДля streaming joins, когда оба входа — бесконечные потоки
// Hash Join: build hash table из правого входа, probe левым
// Подходит когда правый вход маленький (dimension table)
HashJoinExec::try_new(
    left,           // probe side (большая таблица)
    right,          // build side (маленькая таблица, hash table в памяти)
    on,             // пары ключей
    join_type,      // Inner, Left, Right, Full, Semi, Anti
)?
NOTE

DataFusion автоматически выбирает build-сторону Hash Join на основе статистики. Если статистика недоступна, по умолчанию правый вход — build-сторона. Можно переопределить через SessionConfig::with_prefer_existing_sort(true).

Выбор алгоритмов: Aggregate

Агрегация в DataFusion — двухфазная:

  1. Partial — каждая партиция вычисляет промежуточные аккумуляторы
  2. Final — аккумуляторы объединяются

Для GROUP BY используется hash-агрегация: DataFusion строит hash table по ключам группировки. Если данные уже отсортированы по ключу — используется streaming агрегация (без hash table).

// Partial: в каждой партиции
AggregateExec::try_new(
    AggregateMode::Partial,         // Промежуточные результаты
    group_by,
    aggr_expr,
    input,
)?

// Final: объединение
AggregateExec::try_new(
    AggregateMode::Final,           // Финальное объединение
    group_by,
    aggr_expr,
    partial_aggregate,
)?

Обменные операторы

Планировщик вставляет специальные операторы для управления партиционированием:

ОператорНазначение
RepartitionExecПерераспределяет данные (Hash, RoundRobin)
CoalescePartitionsExecСводит N партиций в одну
CoalesceBatchesExecОбъединяет мелкие batch в target_batch_size
SortPreservingMergeExecMerge-сортировка из N отсортированных партиций

Эти операторы не вычисляют новые данные — они перемещают и объединяют RecordBatch между партициями.

Инспекция физического плана

let ctx = SessionContext::new();
let df = ctx.sql("EXPLAIN SELECT region, SUM(amount) FROM orders GROUP BY region").await?;
// Logical plan:
//   Aggregate: groupBy=[[orders.region]], aggr=[[SUM(orders.amount)]]
//     TableScan: orders
//
// Physical plan:
//   AggregateExec: mode=Final, gby=[region@0 as region], aggr=[SUM(orders.amount)]
//     CoalescePartitionsExec
//       AggregateExec: mode=Partial, gby=[region@0 as region], aggr=[SUM(orders.amount)]
//         DataSourceExec: file_groups={...}, format=parquet, projection=[region, amount]

Обратите внимание: физический план содержит два AggregateExec (Partial + Final) и CoalescePartitionsExec между ними — деталей, которых нет в логическом плане.

Итоги

  • ExecutionPlan trait определяет физический оператор: schema(), execute(), output_partitioning()
  • execute(partition) возвращает SendableRecordBatchStream — асинхронный поток RecordBatch
  • Партиционирование определяет параллелизм: больше партиций = больше потоков
  • Join: HashJoin (по умолчанию), SortMergeJoin (для больших входов), NestedLoop (для non-equi)
  • Агрегация: двухфазная (Partial → Final) с hash table или streaming
  • Обменные операторы (Repartition, Coalesce, Merge) управляют потоком данных между партициями

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Trait ExecutionPlan определяет физический оператор. Что возвращает метод execute(partition, context)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7