Learning Platform
Глоссарий Troubleshooting
Урок 04.04 · 15 мин
Средний
DataFrame APILazy EvaluationExprcollitfilterselectaggregatejoin

DataFrame API (Rust)

SQL удобен для ad-hoc запросов, но для программного построения запросов DataFusion предоставляет DataFrame API — типизированный Rust-интерфейс с ленивым выполнением. DataFrame не выполняет вычисления до вызова terminal-операции (collect(), show()).

Ленивая модель

Каждый вызов метода DataFrame (filter, select, aggregate) не обрабатывает данные, а строит LogicalPlan. Выполнение запускается только при terminal-операции:

DataFrame: ленивое построение плана
ctx.table(“orders”)Точка входа — получение DataFrame из зарегистрированной таблицы
.filter()
DataFrame (LogicalPlan: Filter)Ленивый шаг — добавляет узел Filter в LogicalPlan без чтения данных
.aggregate()
DataFrame (LogicalPlan: Aggregate)Ленивый шаг — добавляет узел Aggregate (GROUP BY) в план
.sort()
DataFrame (LogicalPlan: Sort)Ленивый шаг — добавляет узел Sort в план выполнения
.collect() — ВЫПОЛНЕНИЕ
Vec<RecordBatch>Терминальная операция — запускает выполнение плана и возвращает результат в память

До вызова collect() DataFusion не читает ни одного байта данных. Это позволяет оптимизатору трансформировать весь план целиком.

use datafusion::prelude::*;

let ctx = SessionContext::new();
ctx.register_parquet("orders", "data/orders.parquet", ParquetReadOptions::default()).await?;

// Строим план (ленивый — данные не читаются)
let df = ctx.table("orders").await?
    .filter(col("status").eq(lit("completed")))?
    .select(vec![col("region"), col("amount")])?;

// Выполняем (данные читаются и обрабатываются)
let results: Vec<RecordBatch> = df.collect().await?;

Построение выражений: col() и lit()

Выражения — основа DataFrame API. Два базовых строительных блока:

  • col("name") — ссылка на колонку таблицы
  • lit(value) — литеральное значение (число, строка, bool)
use datafusion::prelude::*;

// Ссылка на колонку
let region = col("region");

// Литералы
let threshold = lit(1000);
let status_completed = lit("completed");
let is_active = lit(true);

// Комбинирование в выражения
let filter_expr = col("amount").gt(lit(1000))
    .and(col("status").eq(lit("completed")));

// Алиасы
let total_expr = col("amount").alias("total_amount");

Типизированные операции

// Сравнение
col("amount").gt(lit(100))          // amount > 100
col("amount").gt_eq(lit(100))       // amount >= 100
col("amount").lt(lit(100))          // amount < 100
col("status").eq(lit("active"))     // status = 'active'
col("status").not_eq(lit("cancel")) // status != 'cancel'

// Логика
col("a").gt(lit(1)).and(col("b").lt(lit(10)))  // a > 1 AND b < 10
col("x").eq(lit(1)).or(col("x").eq(lit(2)))    // x = 1 OR x = 2
col("flag").is_null()                           // flag IS NULL
col("flag").is_not_null()                       // flag IS NOT NULL

// IN
col("region").in_list(vec![lit("EU"), lit("US")], false)  // region IN ('EU', 'US')

// BETWEEN
col("amount").between(lit(100), lit(1000))  // amount BETWEEN 100 AND 1000

// LIKE
col("name").like(lit("%Smith%"))  // name LIKE '%Smith%'

// Сортировка
col("amount").sort(false, true)  // DESC NULLS FIRST

Основные операции DataFrame

filter — WHERE

let df = ctx.table("orders").await?
    .filter(
        col("status").eq(lit("completed"))
            .and(col("amount").gt(lit(500)))
    )?;

select — проекция колонок

// По именам
let df = ctx.table("orders").await?
    .select_columns(&["order_id", "region", "amount"])?;

// С выражениями
let df = ctx.table("orders").await?
    .select(vec![
        col("region"),
        col("amount"),
        (col("amount") * lit(0.2)).alias("tax"),
    ])?;

aggregate — GROUP BY

use datafusion::functions_aggregate::expr_fn::{sum, avg, count, min, max};

let df = ctx.table("orders").await?
    .filter(col("status").eq(lit("completed")))?
    .aggregate(
        vec![col("region")],           // GROUP BY
        vec![                          // Агрегаты
            count(col("order_id")).alias("order_count"),
            sum(col("amount")).alias("total"),
            avg(col("amount")).alias("average"),
            min(col("amount")).alias("min_amount"),
            max(col("amount")).alias("max_amount"),
        ],
    )?;

join — соединение таблиц

let orders = ctx.table("orders").await?;
let customers = ctx.table("customers").await?;

// INNER JOIN
let joined = orders.join(
    customers,
    JoinType::Inner,
    &["customer_id"],   // Колонки левой таблицы
    &["customer_id"],   // Колонки правой таблицы
    None,               // Дополнительное условие
)?;

// LEFT JOIN с выборкой
let result = orders
    .join(customers, JoinType::Left, &["customer_id"], &["customer_id"], None)?
    .select(vec![
        col("orders.order_id"),
        col("customers.name"),
        col("orders.amount"),
    ])?;

sort — сортировка

let df = ctx.table("orders").await?
    .sort(vec![
        col("region").sort(true, false),       // ASC NULLS LAST
        col("amount").sort(false, true),       // DESC NULLS FIRST
    ])?;

limit — ограничение

// LIMIT 10
let df = ctx.table("orders").await?
    .limit(0, Some(10))?;

// OFFSET 20 LIMIT 10
let df = ctx.table("orders").await?
    .limit(20, Some(10))?;

distinct — уникальные строки

let df = ctx.table("orders").await?
    .select_columns(&["region"])?
    .distinct()?;

Быстрое создание DataFrame: dataframe!

Макрос dataframe! создаёт DataFrame из литеральных данных без регистрации таблиц:

use datafusion::prelude::*;

let df = dataframe!(
    "product"  => ["Laptop", "Phone", "Tablet", "Monitor"],
    "price"    => [1200, 800, 400, 600],
    "category" => ["electronics", "electronics", "electronics", "peripherals"]
)?;

let result = df
    .filter(col("price").gt(lit(500)))?
    .select(vec![col("product"), col("price")])?;

result.show().await?;
// +----------+-------+
// | product  | price |
// +----------+-------+
// | Laptop   | 1200  |
// | Phone    | 800   |
// | Monitor  | 600   |
// +----------+-------+

Terminal-операции

Эти методы запускают выполнение:

МетодВозвращаетОписание
collect().await?Vec<RecordBatch>Все результаты в памяти
show().await?()Печатает таблицу в stdout
count().await?usizeКоличество строк
write_parquet(path, options).await?()Запись в Parquet
write_csv(path, options).await?()Запись в CSV
execute_stream().await?SendableRecordBatchStreamПотоковое чтение
// collect — все данные в память
let batches = df.collect().await?;

// show — быстрый просмотр
df.show().await?;

// count — количество строк без передачи данных
let row_count = df.count().await?;

// Потоковое чтение для больших данных
let mut stream = df.execute_stream().await?;
while let Some(batch) = stream.next().await {
    let batch = batch?;
    process(batch);
}
TIP

Используйте execute_stream() вместо collect() для больших данных. collect() загружает все результаты в память, а execute_stream() обрабатывает их порциями (RecordBatch).

Enum Expr

Под капотом каждый метод DataFrame строит дерево Expr — перечисление всех возможных выражений:

use datafusion::prelude::*;

// col("amount") создаёт Expr::Column
// lit(100) создаёт Expr::Literal
// .gt() создаёт Expr::BinaryExpr { op: Operator::Gt, ... }
let expr = col("amount").gt(lit(100));

// Это эквивалентно ручному построению:
use datafusion::logical_expr::{Expr, Operator, BinaryExpr};
use datafusion::common::ScalarValue;

let manual_expr = Expr::BinaryExpr(BinaryExpr {
    left: Box::new(Expr::Column("amount".into())),
    op: Operator::Gt,
    right: Box::new(Expr::Literal(ScalarValue::Int32(Some(100)))),
});

Знание структуры Expr полезно для построения динамических запросов — когда набор фильтров определяется в runtime.

Создание DataFrame из разных источников

let ctx = SessionContext::new();

// Из SQL-запроса (sql() возвращает DataFrame)
let df = ctx.sql("SELECT * FROM orders WHERE amount > 100").await?;

// Из зарегистрированной таблицы
let df = ctx.table("orders").await?;

// Из Parquet без регистрации
let df = ctx.read_parquet("data/orders.parquet", ParquetReadOptions::default()).await?;

// Из CSV
let df = ctx.read_csv("data/events.csv", CsvReadOptions::new().has_header(true)).await?;

// Из RecordBatch
let df = ctx.read_batch(batch)?;
NOTE

ctx.sql() возвращает DataFrame — результат SQL-запроса можно обрабатывать через DataFrame API. Это позволяет комбинировать SQL и программный подход в одном pipeline.

Итоги

  • DataFrame строит LogicalPlan лениво — данные не читаются до collect()/show()
  • col() и lit() — базовые строительные блоки выражений
  • Основные операции: filter, select, aggregate, join, sort, limit, distinct
  • dataframe! макрос создаёт DataFrame из литеральных данных
  • Terminal-операции: collect (все данные), execute_stream (потоково), show (вывод)
  • Expr enum — программное представление любого выражения для динамических запросов

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Почему DataFrame в DataFusion является ленивым (lazy)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7