SQL vs DataFrame: когда что использовать
DataFusion предоставляет два эквивалентных интерфейса: SQL и DataFrame API. Оба строят один и тот же LogicalPlan, проходят одинаковую оптимизацию и выполняются идентично. Разница — в удобстве для конкретных задач.
Одинаковый результат, разный синтаксис
Один и тот же запрос двумя способами:
-- SQL
SELECT region, SUM(amount) AS total
FROM orders
WHERE status = 'completed'
GROUP BY region
ORDER BY total DESC
LIMIT 10;
// DataFrame API
use datafusion::prelude::*;
use datafusion::functions_aggregate::expr_fn::sum;
let result = ctx.table("orders").await?
.filter(col("status").eq(lit("completed")))?
.aggregate(vec![col("region")], vec![sum(col("amount")).alias("total")])?
.sort(vec![col("total").sort(false, true)])?
.limit(0, Some(10))?
.collect().await?;
Оба варианта порождают идентичный LogicalPlan. Оптимизатор применяет одни и те же правила, физический планировщик выбирает те же алгоритмы.
Когда выбирать SQL
SQL удобнее для:
- Ad-hoc аналитики — быстрые запросы к данным без компиляции
- Сложные выражения — оконные функции, CTE, подзапросы компактнее в SQL
- Знакомый интерфейс — аналитики и data engineers знают SQL
- Динамические запросы от пользователей — принимать SQL из внешних источников
-- CTE + оконные функции — в SQL это компактно и читаемо
WITH monthly AS (
SELECT
DATE_TRUNC('month', order_date) AS month,
region,
SUM(amount) AS revenue
FROM orders
GROUP BY 1, 2
)
SELECT
month, region, revenue,
LAG(revenue) OVER (PARTITION BY region ORDER BY month) AS prev_month,
RANK() OVER (PARTITION BY month ORDER BY revenue DESC) AS revenue_rank
FROM monthly;
DataFusion поддерживает параметризованные запросы для защиты от SQL-инъекций:
let df = ctx.sql("SELECT * FROM orders WHERE region = $1 AND amount > $2")
.await?
.with_param_values(vec![
ScalarValue::Utf8(Some("EU".into())),
ScalarValue::Int64(Some(1000)),
])?;Когда выбирать DataFrame API
DataFrame API удобнее для:
- Программного построения запросов — условия и колонки определяются в runtime
- Компиляция и типизация — Rust-компилятор проверяет структуру до запуска
- Переиспользование выражений — Expr можно хранить, комбинировать, передавать в функции
- Интеграция в приложения — DataFrame встраивается в Rust-код естественно
- Тестирование — проще создать fixture, построить DataFrame и проверить результат
// Динамическое построение фильтров
fn build_filter(conditions: &[(String, String)]) -> Option<Expr> {
conditions.iter()
.map(|(column, value)| col(column).eq(lit(value.clone())))
.reduce(|a, b| a.and(b))
}
// Использование
let filters = vec![
("region".into(), "EU".into()),
("status".into(), "completed".into()),
];
let mut df = ctx.table("orders").await?;
if let Some(filter_expr) = build_filter(&filters) {
df = df.filter(filter_expr)?;
}
let results = df.collect().await?;
Переиспользование выражений
// Общие выражения, используемые в разных запросах
fn revenue_metrics() -> Vec<Expr> {
vec![
sum(col("amount")).alias("total_revenue"),
avg(col("amount")).alias("avg_order"),
count(col("order_id")).alias("order_count"),
]
}
// В разных контекстах
let by_region = ctx.table("orders").await?
.aggregate(vec![col("region")], revenue_metrics())?;
let by_product = ctx.table("orders").await?
.aggregate(vec![col("product_category")], revenue_metrics())?;
Мост: SQL возвращает DataFrame
Ключевое свойство DataFusion — ctx.sql() возвращает DataFrame. Это позволяет начать с SQL и продолжить программно:
// SQL для сложной логики + DataFrame для программных доработок
let base = ctx.sql(
"WITH monthly AS (
SELECT DATE_TRUNC('month', order_date) AS month,
region, SUM(amount) AS revenue
FROM orders GROUP BY 1, 2
)
SELECT month, region, revenue,
LAG(revenue) OVER (PARTITION BY region ORDER BY month) AS prev
FROM monthly"
).await?;
// Программное добавление фильтра и лимита
let result = base
.filter(col("revenue").gt(lit(10000)))?
.limit(0, Some(100))?
.collect().await?;
Обратный мост тоже работает — DataFrame можно зарегистрировать как таблицу:
let df = ctx.read_parquet("data/orders.parquet", ParquetReadOptions::default()).await?
.filter(col("status").eq(lit("completed")))?;
// Зарегистрировать DataFrame как временную таблицу
ctx.register_table("completed_orders", df.into_view())?;
// Использовать в SQL
let result = ctx.sql("SELECT region, COUNT(*) FROM completed_orders GROUP BY region").await?;
Сравнительная таблица
Стратегии тестирования
Тестирование SQL-запросов
#[tokio::test]
async fn test_revenue_query() -> Result<()> {
let ctx = SessionContext::new();
// Fixture: in-memory данные
let batch = RecordBatch::try_from_iter(vec![
("region", Arc::new(StringArray::from(vec!["EU", "EU", "US"])) as ArrayRef),
("amount", Arc::new(Int64Array::from(vec![100, 200, 300]))),
("status", Arc::new(StringArray::from(vec!["completed", "completed", "completed"]))),
])?;
ctx.register_batch("orders", batch)?;
let result = ctx.sql(
"SELECT region, SUM(amount) AS total FROM orders GROUP BY region ORDER BY region"
).await?.collect().await?;
assert_eq!(result[0].num_rows(), 2);
// EU: 300, US: 300
Ok(())
}
Тестирование DataFrame pipeline
#[tokio::test]
async fn test_revenue_pipeline() -> Result<()> {
let ctx = SessionContext::new();
let batch = RecordBatch::try_from_iter(vec![
("region", Arc::new(StringArray::from(vec!["EU", "EU", "US"])) as ArrayRef),
("amount", Arc::new(Int64Array::from(vec![100, 200, 300]))),
])?;
ctx.register_batch("orders", batch)?;
let result = ctx.table("orders").await?
.aggregate(vec![col("region")], vec![sum(col("amount")).alias("total")])?
.filter(col("total").gt(lit(250)))?
.collect().await?;
// EU: 300, US: 300 — обе группы проходят фильтр > 250
assert_eq!(result[0].num_rows(), 2);
Ok(())
}
DataFrame API проще для unit-тестов: создаёте fixture через register_batch(), строите pipeline, проверяете результат. Тесты не зависят от файловой системы и работают без внешних данных.
Рекомендации
- Аналитика и исследование данных — SQL. Быстрее написать, проще читать.
- Приложения и библиотеки — DataFrame API. Типобезопасность, переиспользование, тестируемость.
- Сложные запросы в приложениях — SQL + DataFrame. Начните с SQL для CTE/окон, доработайте программно.
- Динамические запросы — DataFrame API. Строить Expr программно безопаснее, чем конкатенировать SQL-строки.
Итоги
- SQL и DataFrame API строят одинаковый LogicalPlan — разница только в синтаксисе
- SQL удобнее для ad-hoc, оконных функций, CTE
- DataFrame API удобнее для программных запросов, переиспользования Expr, тестирования
- ctx.sql() возвращает DataFrame — можно комбинировать оба подхода
- DataFrame как временная таблица: register_table() позволяет использовать в SQL
- Для тестов DataFrame API + register_batch() — простые, изолированные, быстрые тесты олированные, быстрые тесты �рые тесты