Query Pipeline: от SQL-текста до RecordBatch
В модуле Arrow Foundation мы видели, что DataFusion работает с RecordBatch на каждом этапе. Теперь разберём весь путь запроса — от строки SQL до потока результатов.
Шесть стадий pipeline
Каждый SQL-запрос проходит шесть трансформаций. Каждая стадия принимает результат предыдущей и производит более конкретное представление:
Рассмотрим каждую стадию на примере запроса:
SELECT region, SUM(amount) AS total
FROM orders
WHERE status = 'completed'
GROUP BY region
ORDER BY total DESC
LIMIT 10;
Стадия 1: Парсинг SQL
DataFusion использует библиотеку sqlparser-rs для преобразования SQL-текста в Abstract Syntax Tree (AST). Парсер не знает о таблицах, колонках или типах — он работает только с синтаксисом.
use datafusion::sql::sqlparser::parser::Parser;
use datafusion::sql::sqlparser::dialect::GenericDialect;
let sql = "SELECT region, SUM(amount) FROM orders WHERE status = 'completed' GROUP BY region";
let dialect = GenericDialect {};
let ast = Parser::parse_sql(&dialect, sql)?;
// ast: Vec<Statement> — синтаксическое дерево
AST содержит структуру запроса: какие колонки в SELECT, какая таблица в FROM, какое условие в WHERE. Но он не проверяет, существует ли таблица orders и есть ли в ней колонка region.
sqlparser-rs поддерживает диалекты SQL: PostgreSQL, MySQL, Hive, Snowflake и другие. DataFusion по умолчанию использует GenericDialect, но позволяет переключить диалект через SessionConfig.
Стадия 2: Построение логического плана
Компонент SqlToRel превращает AST в LogicalPlan — дерево операций с привязкой к реальным таблицам и колонкам. На этом этапе DataFusion проверяет:
- Существует ли таблица
ordersв каталоге - Есть ли колонки
region,amount,statusв схеме таблицы - Корректны ли типы (сравнение
statusс текстовым литералом) - Разрешимы ли функции (
SUMдля числового типа)
Результат — дерево LogicalPlan:
Дерево читается снизу вверх: сначала сканируем таблицу, фильтруем строки, группируем, проецируем нужные колонки, сортируем и ограничиваем результат.
Стадия 3: Оптимизация логического плана
Оптимизатор применяет цепочку правил (rules) к LogicalPlan. Каждое правило — это трансформация дерева, которая сохраняет семантику, но улучшает эффективность.
Основные правила оптимизатора DataFusion:
| Правило | Что делает |
|---|---|
| Filter Pushdown | Перемещает фильтры ближе к источнику данных |
| Projection Pushdown | Удаляет колонки, которые не нужны выше по дереву |
| Constant Folding | Вычисляет константные выражения на этапе планирования |
| Join Reordering | Переставляет JOIN для минимизации промежуточных данных |
| Common Subexpression Elimination | Вычисляет повторяющиеся выражения один раз |
| Simplify Expressions | Упрощает выражения (x AND true → x) |
Для нашего запроса оптимизатор выполнит:
- Projection pushdown: TableScan будет читать только
region,amount,statusвместо всех колонок - Filter pushdown: если источник поддерживает (Parquet), фильтр
status = 'completed'уйдёт в predicate pushdown
// До оптимизации: TableScan читает все колонки
// После: TableScan читает только region, amount, status
// + передаёт фильтр status = 'completed' в data source
Стадия 4: Физическое планирование
PhysicalPlanner заменяет каждый узел логического плана конкретной реализацией — ExecutionPlan. Здесь абстрактные операции превращаются в алгоритмы:
| LogicalPlan | ExecutionPlan | Выбор |
|---|---|---|
| TableScan | DataSourceExec / MemoryExec | По типу источника |
| Filter | FilterExec | Всегда один вариант |
| Aggregate | AggregateExec (Hash или Sort) | По размеру данных и наличию сортировки |
| Sort | SortExec / SortPreservingMergeExec | По партиционированию |
| Join | HashJoinExec / SortMergeJoinExec / NestedLoopJoinExec | По размерам входов и доступным индексам |
| Limit | GlobalLimitExec / LocalLimitExec | По позиции в плане |
Физический планировщик также вставляет обменные операторы (RepartitionExec, CoalescePartitionsExec) для управления параллелизмом. Если данные нужно перераспределить по ключу группировки — планировщик добавляет Hash Repartition.
Стадия 5: Выполнение
Физический план исполняется в pull-модели: верхний оператор вызывает execute() у дочернего, запрос каскадом спускается до leaf-операторов (scan). Каждый execute() возвращает SendableRecordBatchStream — асинхронный поток RecordBatch.
// Упрощённый цикл выполнения
let plan: Arc<dyn ExecutionPlan> = physical_planner.create_physical_plan(&logical_plan)?;
let stream = plan.execute(0, task_ctx)?;
while let Some(batch) = stream.next().await.transpose()? {
// batch: RecordBatch — очередная порция результатов
process(batch);
}
Ключевые свойства потоковой модели:
- Ленивость: данные обрабатываются по мере запроса, не загружаются целиком в память
- Партиционирование: каждый
execute(partition)обрабатывает свой срез данных параллельно - Back-pressure: если потребитель медленно читает, производитель приостанавливается
Полная картина: API вызов
На практике SessionContext инкапсулирует весь pipeline:
use datafusion::prelude::*;
let ctx = SessionContext::new();
ctx.register_parquet("orders", "data/orders.parquet", Default::default()).await?;
// sql() проходит все 6 стадий внутри
let df = ctx.sql(
"SELECT region, SUM(amount) AS total FROM orders
WHERE status = 'completed'
GROUP BY region ORDER BY total DESC LIMIT 10"
).await?;
// collect() запускает выполнение и собирает все RecordBatch
let results: Vec<RecordBatch> = df.collect().await?;
Метод sql() выполняет стадии 1–4 (парсинг → логический план → оптимизация → физический план). Метод collect() запускает стадию 5 (выполнение) и собирает результаты.
Для отладки pipeline DataFusion предоставляет explain():
let df = ctx.sql("EXPLAIN SELECT ...").await?;
// Показывает логический и физический планы
let df = ctx.sql("EXPLAIN ANALYZE SELECT ...").await?;
// Показывает планы + реальные метрики выполнения (время, строки)Точки расширения на каждой стадии
DataFusion позволяет встраиваться в pipeline на любом этапе:
| Стадия | Точка расширения | Зачем |
|---|---|---|
| Парсинг | Custom SqlParser | Поддержка нестандартного SQL-синтаксиса |
| Логический план | TableProvider, UDF | Свои источники данных и функции |
| Оптимизация | OptimizerRule | Свои правила оптимизации |
| Физическое планирование | PhysicalPlanner | Свои физические операторы |
| Выполнение | ExecutionPlan | Свои алгоритмы обработки |
Эта расширяемость — главное отличие DataFusion от других query engines. В следующих уроках мы подробно разберём каждую стадию.
Итоги
- SQL-запрос проходит 6 стадий: парсинг → логический план → оптимизация → физический план → выполнение → RecordBatch stream
- Парсинг (
sqlparser-rs) — чисто синтаксический, без привязки к данным -
SqlToRel— привязка к каталогу: проверка таблиц, колонок, типов - Оптимизатор применяет цепочку правил: pushdown, folding, simplification
- Физический планировщик выбирает конкретные алгоритмы (HashJoin vs SortMergeJoin)
- Потоковая pull-модель: ленивое выполнение с back-pressure и параллелизмом