Learning Platform
Глоссарий Troubleshooting
Урок 02.06 · 14 мин
Средний
DataFusion Arrow IntegrationRecordBatchArrayRefCompute KernelsSchemaRef

Arrow в DataFusion: RecordBatch, ArrayRef и compute kernels

DataFusion = Arrow Native Query Engine

Всё, что мы рассматривали в модуле — колоночный формат, буферы, типовая система, IPC — это не абстрактная теория. DataFusion построен на этих примитивах: каждый оператор принимает Arrow RecordBatch и возвращает Arrow RecordBatch. Промежуточных форматов нет.

В этом уроке мы свяжем Arrow-концепции с конкретными точками в архитектуре DataFusion.

RecordBatch: единица потоковой обработки

Когда DataFusion выполняет запрос, данные движутся через pipeline операторов как поток RecordBatch:

RecordBatch Pipeline в DataFusion
TableScan (Parquet/CSV/Memory)Leaf-оператор: читает данные из Parquet, CSV или MemTable и создаёт начальный поток RecordBatch
RecordBatch StreamАсинхронный поток данных: каждый batch содержит до 8192 строк для баланса SIMD и cache
FilterExec (WHERE clause)Применяет предикат WHERE к каждому RecordBatch через arrow compute kernels
RecordBatch Stream (filtered)Поток после фильтрации: количество строк ≤ исходного batch_size
ProjectionExec (SELECT columns)Вычисляет выражения SELECT: выбирает колонки, применяет функции, создаёт новые колонки
Результат: поток RecordBatchФинальный поток RecordBatch, готовый для потребления клиентом

Каждый ExecutionPlan реализует метод execute(), возвращающий SendableRecordBatchStream:

pub trait ExecutionPlan: Send + Sync {
    fn execute(
        &self,
        partition: usize,
        context: Arc<TaskContext>,
    ) -> Result<SendableRecordBatchStream>;

    fn schema(&self) -> SchemaRef;
    // ...
}

SendableRecordBatchStream — это Stream<Item = Result<RecordBatch>> + SchemaRef. Операторы получают поток от дочернего плана, обрабатывают каждый batch и передают дальше. Это pull-модель: верхний оператор запрашивает следующий batch, запрос каскадом спускается до leaf-оператора (scan).

Batch Size

По умолчанию DataFusion использует batch_size = 8192. Это настраивается через SessionConfig:

let config = SessionConfig::new()
    .with_batch_size(4096);  // меньший batch — ниже латентность первого результата

Выбор 8192 — компромисс:

  • Достаточно для эффективной SIMD-обработки (сотни значений за инструкцию)
  • Помещается в L2 cache (8192 x 8 байт = 64 KB для одной колонки Float64)
  • Не задерживает первый результат в потоковом режиме

ArrayRef: колонки в выражениях

Внутри операторов DataFusion работает с отдельными колонками через ArrayRefArc<dyn Array>. Это тип-стёртая ссылка на Arrow-массив:

use arrow::array::{ArrayRef, Int64Array, Float64Array};
use arrow::datatypes::DataType;

fn process_column(col: &ArrayRef) {
    match col.data_type() {
        DataType::Int64 => {
            let int_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
            // Работа с типизированным массивом
        }
        DataType::Float64 => {
            let float_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
            // ...
        }
        _ => { /* другие типы */ }
    }
}

Выражения DataFusion (PhysicalExpr) вычисляются на уровне ArrayRef:

pub trait PhysicalExpr: Send + Sync {
    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
    fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
    // ...
}

ColumnarValue — это либо Array(ArrayRef) (колонка значений), либо Scalar(ScalarValue) (одно значение, broadcast на все строки batch). Скалярная оптимизация позволяет не создавать массив из одинаковых значений для выражений вроде x + 1.

Compute Kernels: вычисления на Arrow-массивах

DataFusion использует arrow compute kernels — оптимизированные функции для операций над Arrow-массивами. Это не обёртки поверх Rust итераторов, а специализированные реализации с учётом:

  • Null bitmap (пропуск NULL без ветвлений)
  • SIMD-векторизация (автоматическая через LLVM)
  • Batch-обработка (вся колонка за один вызов)

Примеры compute kernels

use arrow::compute;
use arrow::array::{Int64Array, BooleanArray};

let values = Int64Array::from(vec![Some(10), None, Some(30), Some(5)]);

// Фильтрация: WHERE value > 7
let predicate = compute::gt_scalar(&values, 7)?;
// predicate: BooleanArray [true, null, true, false]

let filtered = compute::filter(&values, &predicate)?;
// filtered: Int64Array [10, 30]

// Арифметика: value * 2
let doubled = compute::multiply_scalar(&values, 2)?;
// doubled: Int64Array [20, null, 60, 10]

// Агрегация: SUM(value)
let sum = compute::sum(&values);
// sum: Some(45) — NULL пропущен автоматически
NOTE

Compute kernels корректно обрабатывают NULL: арифметика с NULL даёт NULL, сравнение с NULL даёт NULL, агрегации пропускают NULL. Это реализовано через побитовые операции на validity bitmap, без ветвлений per-element.

Какие kernels использует DataFusion

SQL операцияCompute kernelОписание
WHERE x > 10gt_scalar, filterСравнение + фильтрация
SELECT x + yaddПоэлементное сложение
CAST(x AS BIGINT)castКонвертация типов
x IS NULLis_nullПроверка NULL
COALESCE(x, 0)coalesceЗамена NULL
LIKE '%pattern%'like_utf8Строковый поиск
ORDER BY xsortСортировка массива
x IN (1, 2, 3)in_listПроверка принадлежности

DataFusion также реализует собственные kernels для операций, не покрытых arrow-compute: hash joins, hash aggregation, window functions. Эти kernels работают напрямую с Arrow-буферами.

SchemaRef: типовая безопасность pipeline

Каждый оператор в DataFusion знает свою выходную схему до начала выполнения:

let plan: Arc<dyn ExecutionPlan> = /* ... */;

// Схема известна без выполнения запроса
let output_schema: SchemaRef = plan.schema();

Это позволяет:

  • Ловить ошибки типов на этапе планирования (до сканирования данных)
  • Аллоцировать буферы правильного размера заранее
  • Выбирать оптимальный kernel по типу (Int32 vs Float64 vs Utf8)
Schema Propagation через план
TableScanСхема определяется из TableProvider: все колонки таблицы доступны
FilterExec (salary > 50000)Filter сохраняет схему — фильтрует строки, не колонки
ProjectionExec (SELECT name, salary)Projection сужает схему: из 3 колонок выбираются 2
РезультатФинальный RecordBatch содержит только запрошенные колонки

Projection сужает схему (3 колонки до 2), filter сохраняет (все колонки проходят, только строки фильтруются). Optimizer использует эту информацию для projection pushdown: если верхний оператор запрашивает 2 из 10 колонок, scan не будет читать остальные 8.

Arrow в пользовательских расширениях DataFusion

Когда вы пишете UDF (User Defined Function) для DataFusion, вы работаете непосредственно с Arrow типами:

use datafusion::arrow::array::{ArrayRef, Float64Array};
use datafusion::logical_expr::{ScalarUDF, Volatility};

// UDF: celsius_to_fahrenheit(temp) -> temp * 9/5 + 32
fn celsius_to_fahrenheit(args: &[ArrayRef]) -> Result<ArrayRef> {
    let temps = args[0].as_any().downcast_ref::<Float64Array>()
        .ok_or_else(|| DataFusionError::Internal("Expected Float64".into()))?;

    let result: Float64Array = temps.iter()
        .map(|v| v.map(|t| t * 9.0 / 5.0 + 32.0))
        .collect();

    Ok(Arc::new(result))
}

Вход — ArrayRef (Arrow-массив), выход — ArrayRef. Нет DataFrame, нет Row, нет Object — только Arrow. Это означает, что UDF автоматически получает преимущества колоночной обработки: SIMD, cache locality, null propagation.

TIP

Модуль расширяемости DataFusion (Module 05 в этом курсе) подробно покрывает создание UDF, UDAF (агрегатные функции) и TableProvider. Все они работают с Arrow типами напрямую.

Связь с остальным курсом

Arrow Foundation — это фундамент для всего, что следует:

МодульКак использует Arrow
02 ArchitectureLogicalPlan и PhysicalPlan оперируют SchemaRef и DataType
03 SQL и DataFrameРезультаты — RecordBatch; DataFrame — обёртка над LogicalPlan
04 PythonPyArrow как мост: DataFusion Python возвращает PyArrow таблицы
05 ExtensibilityUDF/UDAF принимают и возвращают ArrayRef
06 OptimizationPredicate pushdown работает с Arrow compute kernels
07 CometSpark RecordBatch → DataFusion RecordBatch через JNI
08 DistributedBallista обменивается данными через Arrow Flight

Итоги

  • DataFusion — Arrow-native: каждый оператор работает с RecordBatch
  • Потоковая модель: SendableRecordBatchStream с batch_size 8192
  • ArrayRef — тип-стёртая ссылка на Arrow-массив, основной тип в выражениях
  • Compute kernels: SIMD-оптимизированные операции на Arrow-буферах (filter, cast, math)
  • SchemaRef обеспечивает типовую безопасность pipeline до выполнения
  • UDF и расширения работают напрямую с Arrow типами — нет промежуточных форматов

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Что возвращает метод execute() у ExecutionPlan в DataFusion?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6