Arrow в DataFusion: RecordBatch, ArrayRef и compute kernels
DataFusion = Arrow Native Query Engine
Всё, что мы рассматривали в модуле — колоночный формат, буферы, типовая система, IPC — это не абстрактная теория. DataFusion построен на этих примитивах: каждый оператор принимает Arrow RecordBatch и возвращает Arrow RecordBatch. Промежуточных форматов нет.
В этом уроке мы свяжем Arrow-концепции с конкретными точками в архитектуре DataFusion.
RecordBatch: единица потоковой обработки
Когда DataFusion выполняет запрос, данные движутся через pipeline операторов как поток RecordBatch:
Каждый ExecutionPlan реализует метод execute(), возвращающий SendableRecordBatchStream:
pub trait ExecutionPlan: Send + Sync {
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>;
fn schema(&self) -> SchemaRef;
// ...
}
SendableRecordBatchStream — это Stream<Item = Result<RecordBatch>> + SchemaRef. Операторы получают поток от дочернего плана, обрабатывают каждый batch и передают дальше. Это pull-модель: верхний оператор запрашивает следующий batch, запрос каскадом спускается до leaf-оператора (scan).
Batch Size
По умолчанию DataFusion использует batch_size = 8192. Это настраивается через SessionConfig:
let config = SessionConfig::new()
.with_batch_size(4096); // меньший batch — ниже латентность первого результата
Выбор 8192 — компромисс:
- Достаточно для эффективной SIMD-обработки (сотни значений за инструкцию)
- Помещается в L2 cache (8192 x 8 байт = 64 KB для одной колонки Float64)
- Не задерживает первый результат в потоковом режиме
ArrayRef: колонки в выражениях
Внутри операторов DataFusion работает с отдельными колонками через ArrayRef — Arc<dyn Array>. Это тип-стёртая ссылка на Arrow-массив:
use arrow::array::{ArrayRef, Int64Array, Float64Array};
use arrow::datatypes::DataType;
fn process_column(col: &ArrayRef) {
match col.data_type() {
DataType::Int64 => {
let int_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
// Работа с типизированным массивом
}
DataType::Float64 => {
let float_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
// ...
}
_ => { /* другие типы */ }
}
}
Выражения DataFusion (PhysicalExpr) вычисляются на уровне ArrayRef:
pub trait PhysicalExpr: Send + Sync {
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
// ...
}
ColumnarValue — это либо Array(ArrayRef) (колонка значений), либо Scalar(ScalarValue) (одно значение, broadcast на все строки batch). Скалярная оптимизация позволяет не создавать массив из одинаковых значений для выражений вроде x + 1.
Compute Kernels: вычисления на Arrow-массивах
DataFusion использует arrow compute kernels — оптимизированные функции для операций над Arrow-массивами. Это не обёртки поверх Rust итераторов, а специализированные реализации с учётом:
- Null bitmap (пропуск NULL без ветвлений)
- SIMD-векторизация (автоматическая через LLVM)
- Batch-обработка (вся колонка за один вызов)
Примеры compute kernels
use arrow::compute;
use arrow::array::{Int64Array, BooleanArray};
let values = Int64Array::from(vec![Some(10), None, Some(30), Some(5)]);
// Фильтрация: WHERE value > 7
let predicate = compute::gt_scalar(&values, 7)?;
// predicate: BooleanArray [true, null, true, false]
let filtered = compute::filter(&values, &predicate)?;
// filtered: Int64Array [10, 30]
// Арифметика: value * 2
let doubled = compute::multiply_scalar(&values, 2)?;
// doubled: Int64Array [20, null, 60, 10]
// Агрегация: SUM(value)
let sum = compute::sum(&values);
// sum: Some(45) — NULL пропущен автоматически
Compute kernels корректно обрабатывают NULL: арифметика с NULL даёт NULL, сравнение с NULL даёт NULL, агрегации пропускают NULL. Это реализовано через побитовые операции на validity bitmap, без ветвлений per-element.
Какие kernels использует DataFusion
| SQL операция | Compute kernel | Описание |
|---|---|---|
WHERE x > 10 | gt_scalar, filter | Сравнение + фильтрация |
SELECT x + y | add | Поэлементное сложение |
CAST(x AS BIGINT) | cast | Конвертация типов |
x IS NULL | is_null | Проверка NULL |
COALESCE(x, 0) | coalesce | Замена NULL |
LIKE '%pattern%' | like_utf8 | Строковый поиск |
ORDER BY x | sort | Сортировка массива |
x IN (1, 2, 3) | in_list | Проверка принадлежности |
DataFusion также реализует собственные kernels для операций, не покрытых arrow-compute: hash joins, hash aggregation, window functions. Эти kernels работают напрямую с Arrow-буферами.
SchemaRef: типовая безопасность pipeline
Каждый оператор в DataFusion знает свою выходную схему до начала выполнения:
let plan: Arc<dyn ExecutionPlan> = /* ... */;
// Схема известна без выполнения запроса
let output_schema: SchemaRef = plan.schema();
Это позволяет:
- Ловить ошибки типов на этапе планирования (до сканирования данных)
- Аллоцировать буферы правильного размера заранее
- Выбирать оптимальный kernel по типу (Int32 vs Float64 vs Utf8)
Projection сужает схему (3 колонки до 2), filter сохраняет (все колонки проходят, только строки фильтруются). Optimizer использует эту информацию для projection pushdown: если верхний оператор запрашивает 2 из 10 колонок, scan не будет читать остальные 8.
Arrow в пользовательских расширениях DataFusion
Когда вы пишете UDF (User Defined Function) для DataFusion, вы работаете непосредственно с Arrow типами:
use datafusion::arrow::array::{ArrayRef, Float64Array};
use datafusion::logical_expr::{ScalarUDF, Volatility};
// UDF: celsius_to_fahrenheit(temp) -> temp * 9/5 + 32
fn celsius_to_fahrenheit(args: &[ArrayRef]) -> Result<ArrayRef> {
let temps = args[0].as_any().downcast_ref::<Float64Array>()
.ok_or_else(|| DataFusionError::Internal("Expected Float64".into()))?;
let result: Float64Array = temps.iter()
.map(|v| v.map(|t| t * 9.0 / 5.0 + 32.0))
.collect();
Ok(Arc::new(result))
}
Вход — ArrayRef (Arrow-массив), выход — ArrayRef. Нет DataFrame, нет Row, нет Object — только Arrow. Это означает, что UDF автоматически получает преимущества колоночной обработки: SIMD, cache locality, null propagation.
Модуль расширяемости DataFusion (Module 05 в этом курсе) подробно покрывает создание UDF, UDAF (агрегатные функции) и TableProvider. Все они работают с Arrow типами напрямую.
Связь с остальным курсом
Arrow Foundation — это фундамент для всего, что следует:
| Модуль | Как использует Arrow |
|---|---|
| 02 Architecture | LogicalPlan и PhysicalPlan оперируют SchemaRef и DataType |
| 03 SQL и DataFrame | Результаты — RecordBatch; DataFrame — обёртка над LogicalPlan |
| 04 Python | PyArrow как мост: DataFusion Python возвращает PyArrow таблицы |
| 05 Extensibility | UDF/UDAF принимают и возвращают ArrayRef |
| 06 Optimization | Predicate pushdown работает с Arrow compute kernels |
| 07 Comet | Spark RecordBatch → DataFusion RecordBatch через JNI |
| 08 Distributed | Ballista обменивается данными через Arrow Flight |
Итоги
- DataFusion — Arrow-native: каждый оператор работает с RecordBatch
- Потоковая модель:
SendableRecordBatchStreamс batch_size 8192 - ArrayRef — тип-стёртая ссылка на Arrow-массив, основной тип в выражениях
- Compute kernels: SIMD-оптимизированные операции на Arrow-буферах (filter, cast, math)
- SchemaRef обеспечивает типовую безопасность pipeline до выполнения
- UDF и расширения работают напрямую с Arrow типами — нет промежуточных форматов