DataFrame API (Rust)
SQL удобен для ad-hoc запросов, но для программного построения запросов DataFusion предоставляет DataFrame API — типизированный Rust-интерфейс с ленивым выполнением. DataFrame не выполняет вычисления до вызова terminal-операции (collect(), show()).
Ленивая модель
Каждый вызов метода DataFrame (filter, select, aggregate) не обрабатывает данные, а строит LogicalPlan. Выполнение запускается только при terminal-операции:
До вызова collect() DataFusion не читает ни одного байта данных. Это позволяет оптимизатору трансформировать весь план целиком.
use datafusion::prelude::*;
let ctx = SessionContext::new();
ctx.register_parquet("orders", "data/orders.parquet", ParquetReadOptions::default()).await?;
// Строим план (ленивый — данные не читаются)
let df = ctx.table("orders").await?
.filter(col("status").eq(lit("completed")))?
.select(vec![col("region"), col("amount")])?;
// Выполняем (данные читаются и обрабатываются)
let results: Vec<RecordBatch> = df.collect().await?;
Построение выражений: col() и lit()
Выражения — основа DataFrame API. Два базовых строительных блока:
col("name")— ссылка на колонку таблицыlit(value)— литеральное значение (число, строка, bool)
use datafusion::prelude::*;
// Ссылка на колонку
let region = col("region");
// Литералы
let threshold = lit(1000);
let status_completed = lit("completed");
let is_active = lit(true);
// Комбинирование в выражения
let filter_expr = col("amount").gt(lit(1000))
.and(col("status").eq(lit("completed")));
// Алиасы
let total_expr = col("amount").alias("total_amount");
Типизированные операции
// Сравнение
col("amount").gt(lit(100)) // amount > 100
col("amount").gt_eq(lit(100)) // amount >= 100
col("amount").lt(lit(100)) // amount < 100
col("status").eq(lit("active")) // status = 'active'
col("status").not_eq(lit("cancel")) // status != 'cancel'
// Логика
col("a").gt(lit(1)).and(col("b").lt(lit(10))) // a > 1 AND b < 10
col("x").eq(lit(1)).or(col("x").eq(lit(2))) // x = 1 OR x = 2
col("flag").is_null() // flag IS NULL
col("flag").is_not_null() // flag IS NOT NULL
// IN
col("region").in_list(vec![lit("EU"), lit("US")], false) // region IN ('EU', 'US')
// BETWEEN
col("amount").between(lit(100), lit(1000)) // amount BETWEEN 100 AND 1000
// LIKE
col("name").like(lit("%Smith%")) // name LIKE '%Smith%'
// Сортировка
col("amount").sort(false, true) // DESC NULLS FIRST
Основные операции DataFrame
filter — WHERE
let df = ctx.table("orders").await?
.filter(
col("status").eq(lit("completed"))
.and(col("amount").gt(lit(500)))
)?;
select — проекция колонок
// По именам
let df = ctx.table("orders").await?
.select_columns(&["order_id", "region", "amount"])?;
// С выражениями
let df = ctx.table("orders").await?
.select(vec![
col("region"),
col("amount"),
(col("amount") * lit(0.2)).alias("tax"),
])?;
aggregate — GROUP BY
use datafusion::functions_aggregate::expr_fn::{sum, avg, count, min, max};
let df = ctx.table("orders").await?
.filter(col("status").eq(lit("completed")))?
.aggregate(
vec![col("region")], // GROUP BY
vec![ // Агрегаты
count(col("order_id")).alias("order_count"),
sum(col("amount")).alias("total"),
avg(col("amount")).alias("average"),
min(col("amount")).alias("min_amount"),
max(col("amount")).alias("max_amount"),
],
)?;
join — соединение таблиц
let orders = ctx.table("orders").await?;
let customers = ctx.table("customers").await?;
// INNER JOIN
let joined = orders.join(
customers,
JoinType::Inner,
&["customer_id"], // Колонки левой таблицы
&["customer_id"], // Колонки правой таблицы
None, // Дополнительное условие
)?;
// LEFT JOIN с выборкой
let result = orders
.join(customers, JoinType::Left, &["customer_id"], &["customer_id"], None)?
.select(vec![
col("orders.order_id"),
col("customers.name"),
col("orders.amount"),
])?;
sort — сортировка
let df = ctx.table("orders").await?
.sort(vec![
col("region").sort(true, false), // ASC NULLS LAST
col("amount").sort(false, true), // DESC NULLS FIRST
])?;
limit — ограничение
// LIMIT 10
let df = ctx.table("orders").await?
.limit(0, Some(10))?;
// OFFSET 20 LIMIT 10
let df = ctx.table("orders").await?
.limit(20, Some(10))?;
distinct — уникальные строки
let df = ctx.table("orders").await?
.select_columns(&["region"])?
.distinct()?;
Быстрое создание DataFrame: dataframe!
Макрос dataframe! создаёт DataFrame из литеральных данных без регистрации таблиц:
use datafusion::prelude::*;
let df = dataframe!(
"product" => ["Laptop", "Phone", "Tablet", "Monitor"],
"price" => [1200, 800, 400, 600],
"category" => ["electronics", "electronics", "electronics", "peripherals"]
)?;
let result = df
.filter(col("price").gt(lit(500)))?
.select(vec![col("product"), col("price")])?;
result.show().await?;
// +----------+-------+
// | product | price |
// +----------+-------+
// | Laptop | 1200 |
// | Phone | 800 |
// | Monitor | 600 |
// +----------+-------+
Terminal-операции
Эти методы запускают выполнение:
| Метод | Возвращает | Описание |
|---|---|---|
collect().await? | Vec<RecordBatch> | Все результаты в памяти |
show().await? | () | Печатает таблицу в stdout |
count().await? | usize | Количество строк |
write_parquet(path, options).await? | () | Запись в Parquet |
write_csv(path, options).await? | () | Запись в CSV |
execute_stream().await? | SendableRecordBatchStream | Потоковое чтение |
// collect — все данные в память
let batches = df.collect().await?;
// show — быстрый просмотр
df.show().await?;
// count — количество строк без передачи данных
let row_count = df.count().await?;
// Потоковое чтение для больших данных
let mut stream = df.execute_stream().await?;
while let Some(batch) = stream.next().await {
let batch = batch?;
process(batch);
}
Используйте execute_stream() вместо collect() для больших данных. collect() загружает все результаты в память, а execute_stream() обрабатывает их порциями (RecordBatch).
Enum Expr
Под капотом каждый метод DataFrame строит дерево Expr — перечисление всех возможных выражений:
use datafusion::prelude::*;
// col("amount") создаёт Expr::Column
// lit(100) создаёт Expr::Literal
// .gt() создаёт Expr::BinaryExpr { op: Operator::Gt, ... }
let expr = col("amount").gt(lit(100));
// Это эквивалентно ручному построению:
use datafusion::logical_expr::{Expr, Operator, BinaryExpr};
use datafusion::common::ScalarValue;
let manual_expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column("amount".into())),
op: Operator::Gt,
right: Box::new(Expr::Literal(ScalarValue::Int32(Some(100)))),
});
Знание структуры Expr полезно для построения динамических запросов — когда набор фильтров определяется в runtime.
Создание DataFrame из разных источников
let ctx = SessionContext::new();
// Из SQL-запроса (sql() возвращает DataFrame)
let df = ctx.sql("SELECT * FROM orders WHERE amount > 100").await?;
// Из зарегистрированной таблицы
let df = ctx.table("orders").await?;
// Из Parquet без регистрации
let df = ctx.read_parquet("data/orders.parquet", ParquetReadOptions::default()).await?;
// Из CSV
let df = ctx.read_csv("data/events.csv", CsvReadOptions::new().has_header(true)).await?;
// Из RecordBatch
let df = ctx.read_batch(batch)?;
ctx.sql() возвращает DataFrame — результат SQL-запроса можно обрабатывать через DataFrame API. Это позволяет комбинировать SQL и программный подход в одном pipeline.
Итоги
- DataFrame строит LogicalPlan лениво — данные не читаются до collect()/show()
- col() и lit() — базовые строительные блоки выражений
- Основные операции: filter, select, aggregate, join, sort, limit, distinct
- dataframe! макрос создаёт DataFrame из литеральных данных
- Terminal-операции: collect (все данные), execute_stream (потоково), show (вывод)
- Expr enum — программное представление любого выражения для динамических запросов