Learning Platform
Глоссарий Troubleshooting
Урок 04.06 · 15 мин
Средний
SQL vs DataFrameQuery BuildingTestingExpression ReuseDataFusion API

SQL vs DataFrame: когда что использовать

DataFusion предоставляет два эквивалентных интерфейса: SQL и DataFrame API. Оба строят один и тот же LogicalPlan, проходят одинаковую оптимизацию и выполняются идентично. Разница — в удобстве для конкретных задач.

Одинаковый результат, разный синтаксис

Один и тот же запрос двумя способами:

-- SQL
SELECT region, SUM(amount) AS total
FROM orders
WHERE status = 'completed'
GROUP BY region
ORDER BY total DESC
LIMIT 10;
// DataFrame API
use datafusion::prelude::*;
use datafusion::functions_aggregate::expr_fn::sum;

let result = ctx.table("orders").await?
    .filter(col("status").eq(lit("completed")))?
    .aggregate(vec![col("region")], vec![sum(col("amount")).alias("total")])?
    .sort(vec![col("total").sort(false, true)])?
    .limit(0, Some(10))?
    .collect().await?;

Оба варианта порождают идентичный LogicalPlan. Оптимизатор применяет одни и те же правила, физический планировщик выбирает те же алгоритмы.

Когда выбирать SQL

SQL удобнее для:

  • Ad-hoc аналитики — быстрые запросы к данным без компиляции
  • Сложные выражения — оконные функции, CTE, подзапросы компактнее в SQL
  • Знакомый интерфейс — аналитики и data engineers знают SQL
  • Динамические запросы от пользователей — принимать SQL из внешних источников
-- CTE + оконные функции — в SQL это компактно и читаемо
WITH monthly AS (
    SELECT
        DATE_TRUNC('month', order_date) AS month,
        region,
        SUM(amount) AS revenue
    FROM orders
    GROUP BY 1, 2
)
SELECT
    month, region, revenue,
    LAG(revenue) OVER (PARTITION BY region ORDER BY month) AS prev_month,
    RANK() OVER (PARTITION BY month ORDER BY revenue DESC) AS revenue_rank
FROM monthly;
NOTE

DataFusion поддерживает параметризованные запросы для защиты от SQL-инъекций:

let df = ctx.sql("SELECT * FROM orders WHERE region = $1 AND amount > $2")
    .await?
    .with_param_values(vec![
        ScalarValue::Utf8(Some("EU".into())),
        ScalarValue::Int64(Some(1000)),
    ])?;

Когда выбирать DataFrame API

DataFrame API удобнее для:

  • Программного построения запросов — условия и колонки определяются в runtime
  • Компиляция и типизация — Rust-компилятор проверяет структуру до запуска
  • Переиспользование выражений — Expr можно хранить, комбинировать, передавать в функции
  • Интеграция в приложения — DataFrame встраивается в Rust-код естественно
  • Тестирование — проще создать fixture, построить DataFrame и проверить результат
// Динамическое построение фильтров
fn build_filter(conditions: &[(String, String)]) -> Option<Expr> {
    conditions.iter()
        .map(|(column, value)| col(column).eq(lit(value.clone())))
        .reduce(|a, b| a.and(b))
}

// Использование
let filters = vec![
    ("region".into(), "EU".into()),
    ("status".into(), "completed".into()),
];

let mut df = ctx.table("orders").await?;
if let Some(filter_expr) = build_filter(&filters) {
    df = df.filter(filter_expr)?;
}
let results = df.collect().await?;

Переиспользование выражений

// Общие выражения, используемые в разных запросах
fn revenue_metrics() -> Vec<Expr> {
    vec![
        sum(col("amount")).alias("total_revenue"),
        avg(col("amount")).alias("avg_order"),
        count(col("order_id")).alias("order_count"),
    ]
}

// В разных контекстах
let by_region = ctx.table("orders").await?
    .aggregate(vec![col("region")], revenue_metrics())?;

let by_product = ctx.table("orders").await?
    .aggregate(vec![col("product_category")], revenue_metrics())?;

Мост: SQL возвращает DataFrame

Ключевое свойство DataFusion — ctx.sql() возвращает DataFrame. Это позволяет начать с SQL и продолжить программно:

// SQL для сложной логики + DataFrame для программных доработок
let base = ctx.sql(
    "WITH monthly AS (
        SELECT DATE_TRUNC('month', order_date) AS month,
               region, SUM(amount) AS revenue
        FROM orders GROUP BY 1, 2
    )
    SELECT month, region, revenue,
           LAG(revenue) OVER (PARTITION BY region ORDER BY month) AS prev
    FROM monthly"
).await?;

// Программное добавление фильтра и лимита
let result = base
    .filter(col("revenue").gt(lit(10000)))?
    .limit(0, Some(100))?
    .collect().await?;

Обратный мост тоже работает — DataFrame можно зарегистрировать как таблицу:

let df = ctx.read_parquet("data/orders.parquet", ParquetReadOptions::default()).await?
    .filter(col("status").eq(lit("completed")))?;

// Зарегистрировать DataFrame как временную таблицу
ctx.register_table("completed_orders", df.into_view())?;

// Использовать в SQL
let result = ctx.sql("SELECT region, COUNT(*) FROM completed_orders GROUP BY region").await?;

Сравнительная таблица

SQL vs DataFrame API
АспектКритерии сравнения SQL и DataFrame API по удобству и возможностям
SQLSQL-интерфейс — декларативный, удобен для ad-hoc аналитики и сложных выражений
DataFrameDataFrame API — программный, с проверкой типов на этапе компиляции и переиспользованием Expr

Стратегии тестирования

Тестирование SQL-запросов

#[tokio::test]
async fn test_revenue_query() -> Result<()> {
    let ctx = SessionContext::new();

    // Fixture: in-memory данные
    let batch = RecordBatch::try_from_iter(vec![
        ("region", Arc::new(StringArray::from(vec!["EU", "EU", "US"])) as ArrayRef),
        ("amount", Arc::new(Int64Array::from(vec![100, 200, 300]))),
        ("status", Arc::new(StringArray::from(vec!["completed", "completed", "completed"]))),
    ])?;
    ctx.register_batch("orders", batch)?;

    let result = ctx.sql(
        "SELECT region, SUM(amount) AS total FROM orders GROUP BY region ORDER BY region"
    ).await?.collect().await?;

    assert_eq!(result[0].num_rows(), 2);
    // EU: 300, US: 300
    Ok(())
}

Тестирование DataFrame pipeline

#[tokio::test]
async fn test_revenue_pipeline() -> Result<()> {
    let ctx = SessionContext::new();

    let batch = RecordBatch::try_from_iter(vec![
        ("region", Arc::new(StringArray::from(vec!["EU", "EU", "US"])) as ArrayRef),
        ("amount", Arc::new(Int64Array::from(vec![100, 200, 300]))),
    ])?;
    ctx.register_batch("orders", batch)?;

    let result = ctx.table("orders").await?
        .aggregate(vec![col("region")], vec![sum(col("amount")).alias("total")])?
        .filter(col("total").gt(lit(250)))?
        .collect().await?;

    // EU: 300, US: 300 — обе группы проходят фильтр > 250
    assert_eq!(result[0].num_rows(), 2);
    Ok(())
}
TIP

DataFrame API проще для unit-тестов: создаёте fixture через register_batch(), строите pipeline, проверяете результат. Тесты не зависят от файловой системы и работают без внешних данных.

Рекомендации

  • Аналитика и исследование данных — SQL. Быстрее написать, проще читать.
  • Приложения и библиотеки — DataFrame API. Типобезопасность, переиспользование, тестируемость.
  • Сложные запросы в приложениях — SQL + DataFrame. Начните с SQL для CTE/окон, доработайте программно.
  • Динамические запросы — DataFrame API. Строить Expr программно безопаснее, чем конкатенировать SQL-строки.

Итоги

  • SQL и DataFrame API строят одинаковый LogicalPlan — разница только в синтаксисе
  • SQL удобнее для ad-hoc, оконных функций, CTE
  • DataFrame API удобнее для программных запросов, переиспользования Expr, тестирования
  • ctx.sql() возвращает DataFrame — можно комбинировать оба подхода
  • DataFrame как временная таблица: register_table() позволяет использовать в SQL
  • Для тестов DataFrame API + register_batch() — простые, изолированные, быстрые тесты олированные, быстрые тесты �рые тесты

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. В каком сценарии DataFrame API предпочтительнее SQL?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7