Обработка ошибок в DataFusion
В предыдущих уроках мы создавали UDF, TableProvider, OptimizerRule. Каждый из них возвращает Result<T, DataFusionError>. Пора разобрать систему ошибок целиком: какие варианты существуют, какие макросы использовать, и как правильно оборачивать ошибки в расширениях.
Таксономия: Expected vs Unexpected
DataFusion делит ошибки на две категории. Это ключевое архитектурное решение, определяющее, какой вариант использовать:
Это различие критично для авторов расширений. Типичная ошибка — использовать Internal для пользовательских ошибок валидации. Подробнее об этом — в конце урока.
Все варианты DataFusionError
pub enum DataFusionError {
// Expected — пользователь может исправить
Plan(String), // Ошибка планирования: "Table 'x' not found"
Execution(String), // Ошибка выполнения: "Division by zero"
Configuration(String), // Неверная конфигурация: "Unknown option 'x'"
SchemaError(SchemaError), // Проблемы со схемой данных
SQL(ParserError, Option<String>), // Синтаксическая ошибка SQL
// Unexpected — баг в системе
Internal(String), // Нарушение инварианта
// Обёртки внешних ошибок
ArrowError(ArrowError, Option<String>), // Ошибка Apache Arrow
IoError(io::Error), // Ошибка ввода/вывода
External(GenericError), // Box<dyn Error + Send + Sync>
Substrait(String), // Ошибка Substrait-сериализации
// Структурные
Context(String, Box<DataFusionError>), // Обёртка с контекстом
Collection(Vec<DataFusionError>), // Набор ошибок
}
Размер DataFusionError на стеке — 40 байт (size_of::<DataFusionError>() == 40). Это осознанное ограничение: крупные данные (строки, вложенные ошибки) хранятся в куче через Box и String. Результат: Result<T, DataFusionError> не раздувает стек-фреймы в горячих путях выполнения.
Макросы ошибок
DataFusion предоставляет макросы для создания ошибок — они возвращают Err(DataFusionError::Variant(...)) с форматированием строки:
use datafusion::common::{exec_err, plan_err, internal_err, not_impl_err, substrait_err};
// Plan — ошибка планирования
fn validate_column(name: &str, schema: &Schema) -> Result<()> {
if schema.field_with_name(name).is_err() {
return plan_err!("Column '{}' not found in schema {:?}", name, schema);
}
Ok(())
}
// Execution — ошибка выполнения
fn safe_divide(a: f64, b: f64) -> Result<f64> {
if b == 0.0 {
return exec_err!("Division by zero: {} / {}", a, b);
}
Ok(a / b)
}
// Internal — баг, не должно произойти
fn get_required_field(batch: &RecordBatch, idx: usize) -> Result<&ArrayRef> {
batch.column_opt(idx).ok_or_else(|| {
internal_err!(
"Column index {} out of bounds (batch has {} columns)",
idx, batch.num_columns()
)
})
}
// NotImplemented — функциональность не реализована
fn handle_window_frame(frame: &WindowFrame) -> Result<()> {
match frame.units {
WindowFrameUnits::Rows => Ok(()),
WindowFrameUnits::Groups => {
not_impl_err!("GROUPS window frame is not yet supported")
}
WindowFrameUnits::Range => Ok(()),
}
}
// Substrait — ошибка сериализации Substrait
fn decode_substrait(buf: &[u8]) -> Result<LogicalPlan> {
let plan = substrait::proto::Plan::decode(buf)
.map_err(|e| substrait_err!("Failed to decode Substrait plan: {e}"))?;
// ...
}
Шпаргалка: какой макрос использовать
| Ситуация | Макрос | Пример |
|---|---|---|
| Некорректный SQL, отсутствующая таблица/колонка | plan_err! | "Column 'x' not found" |
| Ошибка в данных при выполнении | exec_err! | "Division by zero" |
| Невалидный параметр конфигурации | DataFusionError::Configuration(...) | "Unknown option" |
| Нарушение инварианта (баг) | internal_err! | "Expected 3 columns, got 5" |
| Функция не реализована | not_impl_err! | "GROUPS frame not supported" |
| Ошибка Substrait | substrait_err! | "Failed to decode plan" |
Assertion-макросы
Для проверки инвариантов DataFusion предоставляет assertion-макросы, возвращающие Result вместо паники:
use datafusion::common::{
assert_or_internal_err,
assert_eq_or_internal_err,
assert_ne_or_internal_err,
};
fn validate_partition(partition: usize, total: usize) -> Result<()> {
// Вместо: assert!(partition < total); // паникует!
// Используем:
assert_or_internal_err!(
partition < total,
"Partition index {partition} >= total partitions {total}"
);
Ok(())
}
fn verify_schema_match(expected: &Schema, actual: &Schema) -> Result<()> {
assert_eq_or_internal_err!(
expected.fields().len(),
actual.fields().len(),
"Schema field count mismatch"
);
Ok(())
}
Assertion-макросы генерируют Internal ошибки — это для проверки инвариантов, не для валидации пользовательского ввода. Если пользователь может вызвать эту ситуацию (передав неверные данные), используйте exec_err! или plan_err!.
Контекст: DataFusionError::context()
Метод .context() оборачивает ошибку дополнительной информацией, не теряя оригинальную ошибку:
use datafusion::common::DataFusionError;
fn read_parquet_file(path: &str) -> Result<RecordBatch> {
let file = File::open(path)
.map_err(|e| DataFusionError::from(e))
.map_err(|e| e.context(format!("reading Parquet file '{path}'")))?;
let reader = ParquetRecordBatchReader::try_new(file, 1024)
.map_err(|e| DataFusionError::from(e))
.map_err(|e| e.context("building Parquet reader"))?;
// ...
}
Результат при ошибке — цепочка контекста:
reading Parquet file '/data/sales.parquet'
caused by
building Parquet reader
caused by
Arrow error: Parquet error: invalid magic number
Когда использовать context() vs новую ошибку
Правило: если ошибка уже правильного типа — context(). Если нужно изменить тип (например, Arrow-ошибка → пользовательское сообщение) — создайте новую ошибку.
Error Collection: пакетные ошибки
Иногда нужно собрать все ошибки вместо остановки на первой. DataFusionError::Collection и builder-паттерн:
use datafusion::common::DataFusionError;
fn validate_all_columns(
columns: &[String],
schema: &Schema,
) -> Result<()> {
let mut errors = vec![];
for col in columns {
if schema.field_with_name(col).is_err() {
errors.push(DataFusionError::Plan(
format!("Column '{}' not found", col)
));
}
}
if errors.is_empty() {
Ok(())
} else {
Err(DataFusionError::Collection(errors))
}
}
Итерация по Collection
fn report_errors(error: &DataFusionError) {
match error {
DataFusionError::Collection(errors) => {
eprintln!("Found {} errors:", errors.len());
for (i, e) in errors.iter().enumerate() {
eprintln!(" [{}] {}", i + 1, e);
}
}
other => eprintln!("Error: {}", other),
}
}
Collection — для случаев, когда пользователю полезно увидеть все ошибки сразу (валидация схемы, проверка конфигурации). Для обычных операций используйте стандартный ? — «остановиться на первой ошибке».
Интеграция с thiserror
Если вы пишете crate поверх DataFusion, определите собственный тип ошибки и используйте thiserror для конвертации:
use datafusion::common::DataFusionError;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum MyEngineError {
#[error("DataFusion error: {0}")]
DataFusion(#[from] DataFusionError),
#[error("Storage error: {0}")]
Storage(#[from] std::io::Error),
#[error("Configuration error: {0}")]
Config(String),
#[error("Connection pool exhausted")]
PoolExhausted,
}
// Конвертация обратно в DataFusionError (для trait-реализаций)
impl From<MyEngineError> for DataFusionError {
fn from(e: MyEngineError) -> Self {
match e {
MyEngineError::DataFusion(df) => df,
MyEngineError::Storage(io) => DataFusionError::IoError(io),
other => DataFusionError::External(Box::new(other)),
}
}
}
Паттерн использования в trait-реализациях:
impl TableProvider for MyTable {
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Внутри — работаем с MyEngineError
let data = self.storage.read()
.map_err(MyEngineError::Storage)?;
let plan = self.build_plan(data, projection, filters, limit)
.map_err(|e: MyEngineError| DataFusionError::from(e))?;
Ok(plan)
}
}
DataFusionError::External(Box::new(e)) — универсальный способ обернуть любую ошибку, реализующую std::error::Error + Send + Sync. Используйте для ошибок, не имеющих прямого аналога среди вариантов DataFusionError.
Паттерн Result type alias
DataFusion определяет собственный Result:
// В datafusion::common
pub type Result<T, E = DataFusionError> = std::result::Result<T, E>;
Импортируйте и используйте:
use datafusion::common::Result;
fn my_function() -> Result<RecordBatch> {
// Оператор ? автоматически конвертирует
// ArrowError, io::Error и другие через From
let batch = create_batch()?;
Ok(batch)
}
Best Practices для авторов расширений
1. Expected vs Unexpected — правильная классификация
// ❌ НЕПРАВИЛЬНО: internal_err! для пользовательской ошибки
fn my_udf(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 2 {
return internal_err!("Expected 2 arguments, got {}", args.len());
// Internal = баг в системе. Но пользователь передал неверное кол-во аргументов!
}
// ...
}
// ✅ ПРАВИЛЬНО: exec_err! — пользователь может исправить
fn my_udf(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 2 {
return exec_err!("my_udf expects 2 arguments, got {}", args.len());
}
// ...
}
2. Контекст вместо голых ошибок
// ❌ Голая ошибка — непонятно, где произошла
fn process(path: &str) -> Result<()> {
let data = std::fs::read(path)?; // IoError без контекста
Ok(())
}
// ✅ С контекстом — видно, что и где
fn process(path: &str) -> Result<()> {
let data = std::fs::read(path)
.map_err(|e| DataFusionError::from(e))
.map_err(|e| e.context(format!("processing file '{path}'")))?;
Ok(())
}
3. Информативные сообщения
// ❌ Бесполезное сообщение
return exec_err!("invalid type");
// ✅ Информативное — пользователь знает, что исправить
return exec_err!(
"my_udf argument 1: expected Int64, got {:?}",
args[0].data_type()
);
Распространённая ошибка: internal_err! в расширениях
Исторически internal_err! использовался слишком широко в DataFusion. Если вы пишете расширение — всегда спрашивайте: «Может ли пользователь вызвать эту ситуацию?». Если да, это exec_err! или plan_err!, не internal_err!.
Резюме
| Концепция | Когда использовать |
|---|---|
plan_err! | Некорректный SQL, отсутствующая таблица/колонка, невалидный план |
exec_err! | Ошибка в данных при выполнении (деление на 0, overflow, невалидный тип) |
internal_err! | Нарушение инварианта — баг, невозможное состояние |
not_impl_err! | Функциональность не реализована |
.context() | Добавить «где произошло» к существующей ошибке |
Collection | Собрать все ошибки (валидация, пакетные проверки) |
External | Обернуть любую std::error::Error |
thiserror + From | Интеграция с собственным типом ошибки в downstream crate |
Правильная классификация ошибок — не формальность. Она определяет, увидит ли пользователь понятное сообщение или «Internal error: …» без возможности разобраться.