Learning Platform
Глоссарий Troubleshooting
Урок 03.01 · 15 мин
Средний
Query PipelineSQL ParsingLogicalPlanPhysicalPlanExecutionDataFusion Architecture

Query Pipeline: от SQL-текста до RecordBatch

В модуле Arrow Foundation мы видели, что DataFusion работает с RecordBatch на каждом этапе. Теперь разберём весь путь запроса — от строки SQL до потока результатов.

Шесть стадий pipeline

Каждый SQL-запрос проходит шесть трансформаций. Каждая стадия принимает результат предыдущей и производит более конкретное представление:

Query Pipeline DataFusion
SQL TextИсходный SQL-запрос в текстовом виде — ещё не проверен на корректность
sqlparser-rs
AST (Statement)Абстрактное синтаксическое дерево: структура запроса без привязки к реальным таблицам
SqlToRel
Unoptimized LogicalPlanДерево логических операций с проверкой таблиц, колонок и типов через каталог
Optimizer (правила)
Optimized LogicalPlanОптимизированный план: после pushdown фильтров, проекций и упрощения выражений
PhysicalPlanner
ExecutionPlan (дерево)Дерево конкретных алгоритмов: HashJoin, SortExec, DataSourceExec с партиционированием
execute(partition)
SendableRecordBatchStreamАсинхронный поток RecordBatch — финальный результат запроса

Рассмотрим каждую стадию на примере запроса:

SELECT region, SUM(amount) AS total
FROM orders
WHERE status = 'completed'
GROUP BY region
ORDER BY total DESC
LIMIT 10;

Стадия 1: Парсинг SQL

DataFusion использует библиотеку sqlparser-rs для преобразования SQL-текста в Abstract Syntax Tree (AST). Парсер не знает о таблицах, колонках или типах — он работает только с синтаксисом.

use datafusion::sql::sqlparser::parser::Parser;
use datafusion::sql::sqlparser::dialect::GenericDialect;

let sql = "SELECT region, SUM(amount) FROM orders WHERE status = 'completed' GROUP BY region";
let dialect = GenericDialect {};
let ast = Parser::parse_sql(&dialect, sql)?;
// ast: Vec<Statement> — синтаксическое дерево

AST содержит структуру запроса: какие колонки в SELECT, какая таблица в FROM, какое условие в WHERE. Но он не проверяет, существует ли таблица orders и есть ли в ней колонка region.

NOTE

sqlparser-rs поддерживает диалекты SQL: PostgreSQL, MySQL, Hive, Snowflake и другие. DataFusion по умолчанию использует GenericDialect, но позволяет переключить диалект через SessionConfig.

Стадия 2: Построение логического плана

Компонент SqlToRel превращает AST в LogicalPlan — дерево операций с привязкой к реальным таблицам и колонкам. На этом этапе DataFusion проверяет:

  • Существует ли таблица orders в каталоге
  • Есть ли колонки region, amount, status в схеме таблицы
  • Корректны ли типы (сравнение status с текстовым литералом)
  • Разрешимы ли функции (SUM для числового типа)

Результат — дерево LogicalPlan:

LogicalPlan Tree (до оптимизации)
Limit (count=10)Ограничивает количество строк результата — применяется последним
Sort (total DESC)Сортирует данные по указанному выражению — здесь по сумме по убыванию
Projection (region, total)Выбирает и переименовывает колонки для финального результата
Aggregate (GROUP BY region, SUM(amount))Группирует строки по ключу и вычисляет агрегатные функции (SUM, COUNT)
Filter (status = ‘completed’)Оставляет только строки, удовлетворяющие предикату WHERE
TableScan (orders)Leaf-узел: чтение данных из зарегистрированного источника

Дерево читается снизу вверх: сначала сканируем таблицу, фильтруем строки, группируем, проецируем нужные колонки, сортируем и ограничиваем результат.

Стадия 3: Оптимизация логического плана

Оптимизатор применяет цепочку правил (rules) к LogicalPlan. Каждое правило — это трансформация дерева, которая сохраняет семантику, но улучшает эффективность.

Основные правила оптимизатора DataFusion:

ПравилоЧто делает
Filter PushdownПеремещает фильтры ближе к источнику данных
Projection PushdownУдаляет колонки, которые не нужны выше по дереву
Constant FoldingВычисляет константные выражения на этапе планирования
Join ReorderingПереставляет JOIN для минимизации промежуточных данных
Common Subexpression EliminationВычисляет повторяющиеся выражения один раз
Simplify ExpressionsУпрощает выражения (x AND truex)

Для нашего запроса оптимизатор выполнит:

  • Projection pushdown: TableScan будет читать только region, amount, status вместо всех колонок
  • Filter pushdown: если источник поддерживает (Parquet), фильтр status = 'completed' уйдёт в predicate pushdown
// До оптимизации: TableScan читает все колонки
// После: TableScan читает только region, amount, status
// + передаёт фильтр status = 'completed' в data source

Стадия 4: Физическое планирование

PhysicalPlanner заменяет каждый узел логического плана конкретной реализацией — ExecutionPlan. Здесь абстрактные операции превращаются в алгоритмы:

LogicalPlanExecutionPlanВыбор
TableScanDataSourceExec / MemoryExecПо типу источника
FilterFilterExecВсегда один вариант
AggregateAggregateExec (Hash или Sort)По размеру данных и наличию сортировки
SortSortExec / SortPreservingMergeExecПо партиционированию
JoinHashJoinExec / SortMergeJoinExec / NestedLoopJoinExecПо размерам входов и доступным индексам
LimitGlobalLimitExec / LocalLimitExecПо позиции в плане

Физический планировщик также вставляет обменные операторы (RepartitionExec, CoalescePartitionsExec) для управления параллелизмом. Если данные нужно перераспределить по ключу группировки — планировщик добавляет Hash Repartition.

Стадия 5: Выполнение

Физический план исполняется в pull-модели: верхний оператор вызывает execute() у дочернего, запрос каскадом спускается до leaf-операторов (scan). Каждый execute() возвращает SendableRecordBatchStream — асинхронный поток RecordBatch.

// Упрощённый цикл выполнения
let plan: Arc<dyn ExecutionPlan> = physical_planner.create_physical_plan(&logical_plan)?;
let stream = plan.execute(0, task_ctx)?;

while let Some(batch) = stream.next().await.transpose()? {
    // batch: RecordBatch — очередная порция результатов
    process(batch);
}

Ключевые свойства потоковой модели:

  • Ленивость: данные обрабатываются по мере запроса, не загружаются целиком в память
  • Партиционирование: каждый execute(partition) обрабатывает свой срез данных параллельно
  • Back-pressure: если потребитель медленно читает, производитель приостанавливается

Полная картина: API вызов

На практике SessionContext инкапсулирует весь pipeline:

use datafusion::prelude::*;

let ctx = SessionContext::new();
ctx.register_parquet("orders", "data/orders.parquet", Default::default()).await?;

// sql() проходит все 6 стадий внутри
let df = ctx.sql(
    "SELECT region, SUM(amount) AS total FROM orders
     WHERE status = 'completed'
     GROUP BY region ORDER BY total DESC LIMIT 10"
).await?;

// collect() запускает выполнение и собирает все RecordBatch
let results: Vec<RecordBatch> = df.collect().await?;

Метод sql() выполняет стадии 1–4 (парсинг → логический план → оптимизация → физический план). Метод collect() запускает стадию 5 (выполнение) и собирает результаты.

TIP

Для отладки pipeline DataFusion предоставляет explain():

let df = ctx.sql("EXPLAIN SELECT ...").await?;
// Показывает логический и физический планы

let df = ctx.sql("EXPLAIN ANALYZE SELECT ...").await?;
// Показывает планы + реальные метрики выполнения (время, строки)

Точки расширения на каждой стадии

DataFusion позволяет встраиваться в pipeline на любом этапе:

СтадияТочка расширенияЗачем
ПарсингCustom SqlParserПоддержка нестандартного SQL-синтаксиса
Логический планTableProvider, UDFСвои источники данных и функции
ОптимизацияOptimizerRuleСвои правила оптимизации
Физическое планированиеPhysicalPlannerСвои физические операторы
ВыполнениеExecutionPlanСвои алгоритмы обработки

Эта расширяемость — главное отличие DataFusion от других query engines. В следующих уроках мы подробно разберём каждую стадию.

Итоги

  • SQL-запрос проходит 6 стадий: парсинг → логический план → оптимизация → физический план → выполнение → RecordBatch stream
  • Парсинг (sqlparser-rs) — чисто синтаксический, без привязки к данным
  • SqlToRel — привязка к каталогу: проверка таблиц, колонок, типов
  • Оптимизатор применяет цепочку правил: pushdown, folding, simplification
  • Физический планировщик выбирает конкретные алгоритмы (HashJoin vs SortMergeJoin)
  • Потоковая pull-модель: ленивое выполнение с back-pressure и параллелизмом
Spark Catalyst: тот же pipeline в JVM ClickHouse: vectorized execution

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какие шесть стадий проходит SQL-запрос в DataFusion от текста до результата?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7