Learning Platform
Глоссарий Troubleshooting
Урок 06.07 · 14 мин
Продвинутый
DataFusionErrorexec_err!plan_err!internal_err!error contexterror collectionthiserrorerror macros

Обработка ошибок в DataFusion

В предыдущих уроках мы создавали UDF, TableProvider, OptimizerRule. Каждый из них возвращает Result<T, DataFusionError>. Пора разобрать систему ошибок целиком: какие варианты существуют, какие макросы использовать, и как правильно оборачивать ошибки в расширениях.

Таксономия: Expected vs Unexpected

DataFusion делит ошибки на две категории. Это ключевое архитектурное решение, определяющее, какой вариант использовать:

Таксономия DataFusionError
Expected (ожидаемые)Пользователь сделал что-то неправильно: некорректный SQL, неверная конфигурация, ошибка в данных. Показываются пользователю — должны быть понятными
Unexpected (неожиданные)Баг в системе: нарушение инварианта, невозможное состояние. Не должны происходить — указывают на ошибку в коде

Это различие критично для авторов расширений. Типичная ошибка — использовать Internal для пользовательских ошибок валидации. Подробнее об этом — в конце урока.

Все варианты DataFusionError

pub enum DataFusionError {
    // Expected — пользователь может исправить
    Plan(String),              // Ошибка планирования: "Table 'x' not found"
    Execution(String),         // Ошибка выполнения: "Division by zero"
    Configuration(String),     // Неверная конфигурация: "Unknown option 'x'"
    SchemaError(SchemaError),  // Проблемы со схемой данных
    SQL(ParserError, Option<String>), // Синтаксическая ошибка SQL

    // Unexpected — баг в системе
    Internal(String),          // Нарушение инварианта

    // Обёртки внешних ошибок
    ArrowError(ArrowError, Option<String>), // Ошибка Apache Arrow
    IoError(io::Error),        // Ошибка ввода/вывода
    External(GenericError),    // Box<dyn Error + Send + Sync>
    Substrait(String),         // Ошибка Substrait-сериализации

    // Структурные
    Context(String, Box<DataFusionError>), // Обёртка с контекстом
    Collection(Vec<DataFusionError>),      // Набор ошибок
}
TIP

Размер DataFusionError на стеке — 40 байт (size_of::<DataFusionError>() == 40). Это осознанное ограничение: крупные данные (строки, вложенные ошибки) хранятся в куче через Box и String. Результат: Result<T, DataFusionError> не раздувает стек-фреймы в горячих путях выполнения.

Макросы ошибок

DataFusion предоставляет макросы для создания ошибок — они возвращают Err(DataFusionError::Variant(...)) с форматированием строки:

use datafusion::common::{exec_err, plan_err, internal_err, not_impl_err, substrait_err};

// Plan — ошибка планирования
fn validate_column(name: &str, schema: &Schema) -> Result<()> {
    if schema.field_with_name(name).is_err() {
        return plan_err!("Column '{}' not found in schema {:?}", name, schema);
    }
    Ok(())
}

// Execution — ошибка выполнения
fn safe_divide(a: f64, b: f64) -> Result<f64> {
    if b == 0.0 {
        return exec_err!("Division by zero: {} / {}", a, b);
    }
    Ok(a / b)
}

// Internal — баг, не должно произойти
fn get_required_field(batch: &RecordBatch, idx: usize) -> Result<&ArrayRef> {
    batch.column_opt(idx).ok_or_else(|| {
        internal_err!(
            "Column index {} out of bounds (batch has {} columns)",
            idx, batch.num_columns()
        )
    })
}

// NotImplemented — функциональность не реализована
fn handle_window_frame(frame: &WindowFrame) -> Result<()> {
    match frame.units {
        WindowFrameUnits::Rows => Ok(()),
        WindowFrameUnits::Groups => {
            not_impl_err!("GROUPS window frame is not yet supported")
        }
        WindowFrameUnits::Range => Ok(()),
    }
}

// Substrait — ошибка сериализации Substrait
fn decode_substrait(buf: &[u8]) -> Result<LogicalPlan> {
    let plan = substrait::proto::Plan::decode(buf)
        .map_err(|e| substrait_err!("Failed to decode Substrait plan: {e}"))?;
    // ...
}

Шпаргалка: какой макрос использовать

СитуацияМакросПример
Некорректный SQL, отсутствующая таблица/колонкаplan_err!"Column 'x' not found"
Ошибка в данных при выполненииexec_err!"Division by zero"
Невалидный параметр конфигурацииDataFusionError::Configuration(...)"Unknown option"
Нарушение инварианта (баг)internal_err!"Expected 3 columns, got 5"
Функция не реализованаnot_impl_err!"GROUPS frame not supported"
Ошибка Substraitsubstrait_err!"Failed to decode plan"

Assertion-макросы

Для проверки инвариантов DataFusion предоставляет assertion-макросы, возвращающие Result вместо паники:

use datafusion::common::{
    assert_or_internal_err,
    assert_eq_or_internal_err,
    assert_ne_or_internal_err,
};

fn validate_partition(partition: usize, total: usize) -> Result<()> {
    // Вместо: assert!(partition < total);  // паникует!
    // Используем:
    assert_or_internal_err!(
        partition < total,
        "Partition index {partition} >= total partitions {total}"
    );
    Ok(())
}

fn verify_schema_match(expected: &Schema, actual: &Schema) -> Result<()> {
    assert_eq_or_internal_err!(
        expected.fields().len(),
        actual.fields().len(),
        "Schema field count mismatch"
    );
    Ok(())
}
WARNING

Assertion-макросы генерируют Internal ошибки — это для проверки инвариантов, не для валидации пользовательского ввода. Если пользователь может вызвать эту ситуацию (передав неверные данные), используйте exec_err! или plan_err!.

Контекст: DataFusionError::context()

Метод .context() оборачивает ошибку дополнительной информацией, не теряя оригинальную ошибку:

use datafusion::common::DataFusionError;

fn read_parquet_file(path: &str) -> Result<RecordBatch> {
    let file = File::open(path)
        .map_err(|e| DataFusionError::from(e))
        .map_err(|e| e.context(format!("reading Parquet file '{path}'")))?;

    let reader = ParquetRecordBatchReader::try_new(file, 1024)
        .map_err(|e| DataFusionError::from(e))
        .map_err(|e| e.context("building Parquet reader"))?;

    // ...
}

Результат при ошибке — цепочка контекста:

reading Parquet file '/data/sales.parquet'
caused by
  building Parquet reader
caused by
  Arrow error: Parquet error: invalid magic number

Когда использовать context() vs новую ошибку

context() vs новая ошибка
context() — добавить контекстСохраняет оригинальную ошибку, добавляет описание 'где произошло'. Используйте для трассировки
Новая ошибка — переклассификацияСоздаёт новую ошибку другого типа. Используйте, когда нужно изменить категорию ошибки

Правило: если ошибка уже правильного типа — context(). Если нужно изменить тип (например, Arrow-ошибка → пользовательское сообщение) — создайте новую ошибку.

Error Collection: пакетные ошибки

Иногда нужно собрать все ошибки вместо остановки на первой. DataFusionError::Collection и builder-паттерн:

use datafusion::common::DataFusionError;

fn validate_all_columns(
    columns: &[String],
    schema: &Schema,
) -> Result<()> {
    let mut errors = vec![];

    for col in columns {
        if schema.field_with_name(col).is_err() {
            errors.push(DataFusionError::Plan(
                format!("Column '{}' not found", col)
            ));
        }
    }

    if errors.is_empty() {
        Ok(())
    } else {
        Err(DataFusionError::Collection(errors))
    }
}

Итерация по Collection

fn report_errors(error: &DataFusionError) {
    match error {
        DataFusionError::Collection(errors) => {
            eprintln!("Found {} errors:", errors.len());
            for (i, e) in errors.iter().enumerate() {
                eprintln!("  [{}] {}", i + 1, e);
            }
        }
        other => eprintln!("Error: {}", other),
    }
}
NOTE

Collection — для случаев, когда пользователю полезно увидеть все ошибки сразу (валидация схемы, проверка конфигурации). Для обычных операций используйте стандартный ? — «остановиться на первой ошибке».

Интеграция с thiserror

Если вы пишете crate поверх DataFusion, определите собственный тип ошибки и используйте thiserror для конвертации:

use datafusion::common::DataFusionError;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum MyEngineError {
    #[error("DataFusion error: {0}")]
    DataFusion(#[from] DataFusionError),

    #[error("Storage error: {0}")]
    Storage(#[from] std::io::Error),

    #[error("Configuration error: {0}")]
    Config(String),

    #[error("Connection pool exhausted")]
    PoolExhausted,
}

// Конвертация обратно в DataFusionError (для trait-реализаций)
impl From<MyEngineError> for DataFusionError {
    fn from(e: MyEngineError) -> Self {
        match e {
            MyEngineError::DataFusion(df) => df,
            MyEngineError::Storage(io) => DataFusionError::IoError(io),
            other => DataFusionError::External(Box::new(other)),
        }
    }
}

Паттерн использования в trait-реализациях:

impl TableProvider for MyTable {
    async fn scan(
        &self,
        state: &dyn Session,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // Внутри — работаем с MyEngineError
        let data = self.storage.read()
            .map_err(MyEngineError::Storage)?;

        let plan = self.build_plan(data, projection, filters, limit)
            .map_err(|e: MyEngineError| DataFusionError::from(e))?;

        Ok(plan)
    }
}
TIP

DataFusionError::External(Box::new(e)) — универсальный способ обернуть любую ошибку, реализующую std::error::Error + Send + Sync. Используйте для ошибок, не имеющих прямого аналога среди вариантов DataFusionError.

Паттерн Result type alias

DataFusion определяет собственный Result:

// В datafusion::common
pub type Result<T, E = DataFusionError> = std::result::Result<T, E>;

Импортируйте и используйте:

use datafusion::common::Result;

fn my_function() -> Result<RecordBatch> {
    // Оператор ? автоматически конвертирует
    // ArrowError, io::Error и другие через From
    let batch = create_batch()?;
    Ok(batch)
}

Best Practices для авторов расширений

1. Expected vs Unexpected — правильная классификация

// ❌ НЕПРАВИЛЬНО: internal_err! для пользовательской ошибки
fn my_udf(args: &[ArrayRef]) -> Result<ArrayRef> {
    if args.len() != 2 {
        return internal_err!("Expected 2 arguments, got {}", args.len());
        // Internal = баг в системе. Но пользователь передал неверное кол-во аргументов!
    }
    // ...
}

// ✅ ПРАВИЛЬНО: exec_err! — пользователь может исправить
fn my_udf(args: &[ArrayRef]) -> Result<ArrayRef> {
    if args.len() != 2 {
        return exec_err!("my_udf expects 2 arguments, got {}", args.len());
    }
    // ...
}

2. Контекст вместо голых ошибок

// ❌ Голая ошибка — непонятно, где произошла
fn process(path: &str) -> Result<()> {
    let data = std::fs::read(path)?;  // IoError без контекста
    Ok(())
}

// ✅ С контекстом — видно, что и где
fn process(path: &str) -> Result<()> {
    let data = std::fs::read(path)
        .map_err(|e| DataFusionError::from(e))
        .map_err(|e| e.context(format!("processing file '{path}'")))?;
    Ok(())
}

3. Информативные сообщения

// ❌ Бесполезное сообщение
return exec_err!("invalid type");

// ✅ Информативное — пользователь знает, что исправить
return exec_err!(
    "my_udf argument 1: expected Int64, got {:?}",
    args[0].data_type()
);

Распространённая ошибка: internal_err! в расширениях

internal_err! — когда действительно использовать
Правило: internal_err! = БАГ в кодеinternal_err! означает, что произошло невозможное состояние — нарушение инварианта. Если пользователь может это вызвать, используйте exec_err! или plan_err!
WARNING

Исторически internal_err! использовался слишком широко в DataFusion. Если вы пишете расширение — всегда спрашивайте: «Может ли пользователь вызвать эту ситуацию?». Если да, это exec_err! или plan_err!, не internal_err!.

Резюме

КонцепцияКогда использовать
plan_err!Некорректный SQL, отсутствующая таблица/колонка, невалидный план
exec_err!Ошибка в данных при выполнении (деление на 0, overflow, невалидный тип)
internal_err!Нарушение инварианта — баг, невозможное состояние
not_impl_err!Функциональность не реализована
.context()Добавить «где произошло» к существующей ошибке
CollectionСобрать все ошибки (валидация, пакетные проверки)
ExternalОбернуть любую std::error::Error
thiserror + FromИнтеграция с собственным типом ошибки в downstream crate

Правильная классификация ошибок — не формальность. Она определяет, увидит ли пользователь понятное сообщение или «Internal error: …» без возможности разобраться.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. DataFusion делит ошибки на Expected и Unexpected. К какой категории относится DataFusionError::Plan?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8