Learning Platform
Глоссарий Troubleshooting
Урок 06.08 · 13 мин
Продвинутый
assert_batches_eq!assert_batches_sorted_eq!SessionContext testingUDF testingTableProvider testingOptimizerRule testingsqllogictesttokio::test

Тестирование расширений DataFusion

В предыдущих уроках мы создавали UDF, TableProvider, OptimizerRule. Теперь разберём, как всё это тестировать: от unit-тестов отдельных функций до SQL-level проверок через sqllogictest. DataFusion предоставляет набор тестовых макросов и паттернов, значительно упрощающих процесс.

Тестовый harness: SessionContext + tokio::test

Базовый паттерн тестирования DataFusion-расширений: создать SessionContext, зарегистрировать расширение, выполнить SQL, проверить результат:

#[cfg(test)]
mod tests {
    use datafusion::prelude::*;
    use datafusion::arrow::array::{Int64Array, StringArray};
    use datafusion::arrow::record_batch::RecordBatch;
    use std::sync::Arc;

    #[tokio::test]
    async fn test_basic_query() {
        // SessionContext::new() — дешёвый in-memory контекст
        let ctx = SessionContext::new();

        // Регистрируем данные
        ctx.register_batch("test_table", create_test_batch())
            .unwrap();

        // Выполняем запрос
        let df = ctx.sql("SELECT * FROM test_table WHERE id > 1")
            .await
            .unwrap();

        let batches = df.collect().await.unwrap();
        // ... проверяем результат
    }

    fn create_test_batch() -> RecordBatch {
        let id = Arc::new(Int64Array::from(vec![1, 2, 3]));
        let name = Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol"]));
        RecordBatch::try_from_iter(vec![
            ("id", id as _),
            ("name", name as _),
        ]).unwrap()
    }
}
TIP

SessionContext::new() — не singleton. Каждый тест создаёт независимый контекст со своим каталогом, конфигурацией и памятью. Никакого shared state между тестами.

assert_batches_eq! — golden-file стиль

Главный инструмент проверки результатов — макрос assert_batches_eq!. Он сравнивает Vec<RecordBatch> с текстовым представлением таблицы:

use datafusion::assert_batches_eq;

#[tokio::test]
async fn test_select() {
    let ctx = SessionContext::new();
    ctx.register_batch("t", create_test_batch()).unwrap();

    let batches = ctx
        .sql("SELECT id, name FROM t ORDER BY id")
        .await.unwrap()
        .collect().await.unwrap();

    // Golden-file стиль: ожидаемый результат как текстовая таблица
    assert_batches_eq!(
        &[
            "+----+-------+",
            "| id | name  |",
            "+----+-------+",
            "| 1  | Alice |",
            "| 2  | Bob   |",
            "| 3  | Carol |",
            "+----+-------+",
        ],
        &batches
    );
}

assert_batches_sorted_eq! — для недетерминированного порядка

Если порядок строк не гарантирован (нет ORDER BY), используйте assert_batches_sorted_eq! — он сортирует строки перед сравнением:

use datafusion::assert_batches_sorted_eq;

#[tokio::test]
async fn test_aggregate() {
    let ctx = SessionContext::new();
    ctx.register_batch("sales", create_sales_batch()).unwrap();

    let batches = ctx
        .sql("SELECT region, SUM(amount) as total FROM sales GROUP BY region")
        .await.unwrap()
        .collect().await.unwrap();

    // Порядок строк не гарантирован → sorted_eq
    assert_batches_sorted_eq!(
        &[
            "+--------+-------+",
            "| region | total |",
            "+--------+-------+",
            "| East   | 300   |",
            "| West   | 500   |",
            "+--------+-------+",
        ],
        &batches
    );
}
WARNING

Используйте assert_batches_eq! (строгий порядок) только с ORDER BY в запросе. Без ORDER BY результат может меняться между запусками — тест станет flaky.

Тестирование скалярных UDF

Паттерн: зарегистрировать UDF → выполнить SQL → проверить assert_batches_eq:

use datafusion::logical_expr::{create_udf, Volatility};
use datafusion::arrow::datatypes::DataType;
use datafusion::physical_plan::functions::make_scalar_function;

#[tokio::test]
async fn test_double_udf() {
    let ctx = SessionContext::new();

    // Определяем UDF
    let double_fn = make_scalar_function(|args: &[ArrayRef]| {
        let input = args[0]
            .as_any()
            .downcast_ref::<Int64Array>()
            .unwrap();
        let result: Int64Array = input.iter()
            .map(|v| v.map(|x| x * 2))
            .collect();
        Ok(Arc::new(result) as ArrayRef)
    });

    let udf = create_udf(
        "double",
        vec![DataType::Int64],
        DataType::Int64,
        Volatility::Immutable,
        double_fn,
    );
    ctx.register_udf(udf);

    // Регистрируем тестовые данные
    ctx.register_batch("t", create_test_batch()).unwrap();

    // Выполняем SQL с UDF
    let batches = ctx
        .sql("SELECT id, double(id) as doubled FROM t ORDER BY id")
        .await.unwrap()
        .collect().await.unwrap();

    assert_batches_eq!(
        &[
            "+----+---------+",
            "| id | doubled |",
            "+----+---------+",
            "| 1  | 2       |",
            "| 2  | 4       |",
            "| 3  | 6       |",
            "+----+---------+",
        ],
        &batches
    );
}

Тестирование граничных случаев UDF

#[tokio::test]
async fn test_double_udf_with_nulls() {
    let ctx = SessionContext::new();
    // ... регистрация UDF ...

    let batch = RecordBatch::try_from_iter(vec![(
        "id",
        Arc::new(Int64Array::from(vec![Some(1), None, Some(3)])) as _,
    )]).unwrap();
    ctx.register_batch("t", batch).unwrap();

    let batches = ctx
        .sql("SELECT double(id) as result FROM t ORDER BY id")
        .await.unwrap()
        .collect().await.unwrap();

    assert_batches_eq!(
        &[
            "+--------+",
            "| result |",
            "+--------+",
            "| 2      |",
            "|        |",  // NULL → NULL (корректная обработка)
            "| 6      |",
            "+--------+",
        ],
        &batches
    );
}

Тестирование агрегатных UDF

use datafusion::logical_expr::{
    AggregateUDFImpl, Signature, Volatility,
};

#[tokio::test]
async fn test_custom_sum_aggregate() {
    let ctx = SessionContext::new();

    // Регистрируем кастомный агрегат
    ctx.register_udaf(my_sum_udaf());

    ctx.register_batch("data", create_grouped_batch()).unwrap();

    let batches = ctx
        .sql("SELECT grp, my_sum(value) as total FROM data GROUP BY grp ORDER BY grp")
        .await.unwrap()
        .collect().await.unwrap();

    assert_batches_sorted_eq!(
        &[
            "+-----+-------+",
            "| grp | total |",
            "+-----+-------+",
            "| A   | 30    |",
            "| B   | 70    |",
            "+-----+-------+",
        ],
        &batches
    );
}
NOTE

Для агрегатных функций с GROUP BY используйте assert_batches_sorted_eq! — порядок групп не детерминирован без ORDER BY.

Тестирование TableProvider

Зарегистрируйте провайдер и проверьте результат через SQL:

use datafusion::catalog::TableProvider;
use datafusion::datasource::MemTable;
use datafusion::arrow::datatypes::{Schema, Field, DataType};

#[tokio::test]
async fn test_custom_table_provider() {
    let ctx = SessionContext::new();

    // Создаём MemTable — встроенный TableProvider для тестов
    let schema = Arc::new(Schema::new(vec![
        Field::new("city", DataType::Utf8, false),
        Field::new("population", DataType::Int64, false),
    ]));
    let batch = RecordBatch::try_new(
        schema.clone(),
        vec![
            Arc::new(StringArray::from(vec!["Moscow", "Berlin", "Tokyo"])),
            Arc::new(Int64Array::from(vec![12_000_000, 3_600_000, 14_000_000])),
        ],
    ).unwrap();

    let table = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
    ctx.register_table("cities", Arc::new(table)).unwrap();

    // Проверяем, что filter pushdown работает
    let batches = ctx
        .sql("SELECT city FROM cities WHERE population > 10000000 ORDER BY city")
        .await.unwrap()
        .collect().await.unwrap();

    assert_batches_eq!(
        &[
            "+--------+",
            "| city   |",
            "+--------+",
            "| Moscow |",
            "| Tokyo  |",
            "+--------+",
        ],
        &batches
    );
}

Проверка метаданных scan

Для более детального тестирования — проверяйте план выполнения:

#[tokio::test]
async fn test_table_provider_filter_pushdown() {
    let ctx = SessionContext::new();
    // ... регистрация таблицы ...

    // Проверяем EXPLAIN — pushdown фильтра в провайдер
    let explain = ctx
        .sql("EXPLAIN SELECT * FROM my_table WHERE id > 100")
        .await.unwrap()
        .collect().await.unwrap();

    let plan_str = format!("{:?}", explain);
    assert!(
        plan_str.contains("FilterPushdown"),
        "Expected filter pushdown in plan: {plan_str}"
    );
}

Тестирование OptimizerRule

Паттерн: создать логический план → применить правило → проверить трансформированный план:

use datafusion::optimizer::{OptimizerRule, OptimizerConfig};
use datafusion::logical_expr::LogicalPlanBuilder;

#[tokio::test]
async fn test_optimizer_rule() {
    let ctx = SessionContext::new();
    ctx.register_batch("t", create_test_batch()).unwrap();

    // Создаём план через SQL
    let plan = ctx
        .sql("SELECT * FROM t WHERE id > 1 AND id > 1")
        .await.unwrap()
        .into_unoptimized_plan();

    // Применяем наше правило
    let rule = MyDeduplicateFiltersRule::new();
    let config = ctx.state().config_options().clone();
    let optimized = rule
        .rewrite(plan, &config)
        .unwrap()
        .data;

    // Проверяем, что дублирующий фильтр убран
    let plan_str = format!("{optimized}");
    // Ожидаем один фильтр, не два
    let filter_count = plan_str.matches("Filter").count();
    assert_eq!(
        filter_count, 1,
        "Expected 1 filter after dedup, got {filter_count}: {plan_str}"
    );
}
TIP

into_unoptimized_plan() — возвращает логический план до применения оптимизатора. Это позволяет тестировать ваше правило изолированно, без влияния встроенных правил DataFusion.

sqllogictest: SQL-level тестирование

DataFusion использует sqllogictest для тестирования SQL-поведения. Это формат, где каждый тест — SQL-запрос и ожидаемый результат:

# test_basic.slt

# Создание таблицы
statement ok
CREATE TABLE test AS VALUES (1, 'a'), (2, 'b'), (3, 'c');

# Проверка SELECT
query IT
SELECT * FROM test ORDER BY column1
----
1 a
2 b
3 c

# Проверка агрегации
query I
SELECT COUNT(*) FROM test
----
3

# Ожидаем ошибку
statement error DataFusion error
SELECT * FROM nonexistent_table

Формат sqllogictest

ДирективаОписание
statement okSQL должен выполниться без ошибки
statement error <pattern>SQL должен завершиться ошибкой, содержащей pattern
query <types>SQL возвращает результат; типы: I (int), T (text), R (real), B (bool)
----Разделитель между запросом и ожидаемым результатом

Интеграция в проект

use datafusion::test_util::TestContextProvider;
use sqllogictest::Runner;

#[tokio::test]
async fn run_sql_logic_tests() {
    let ctx = SessionContext::new();
    // Регистрируем расширения
    register_my_extensions(&ctx).await;

    let mut runner = Runner::new(|| async {
        // Провайдер, исполняющий SQL через наш ctx
        Ok(MyTestContext::new(ctx.clone()))
    });

    // Запускаем все .slt файлы
    runner.run_file("tests/sqllogictest/my_extension.slt")
        .await
        .unwrap();
}
NOTE

sqllogictest лучше всего подходит для тестирования SQL-совместимости и регрессий. Для тестирования внутренних деталей (структура плана, метаданные) — используйте unit-тесты с assert_batches_eq.

Визуализация результатов: pretty_format_batches

Для отладки и диагностики — человекочитаемый вывод RecordBatch:

use datafusion::arrow::util::pretty::pretty_format_batches;

#[tokio::test]
async fn test_with_debug_output() {
    let ctx = SessionContext::new();
    // ...

    let batches = ctx
        .sql("SELECT * FROM t")
        .await.unwrap()
        .collect().await.unwrap();

    // Диагностический вывод в случае неудачи
    if batches.is_empty() {
        panic!("No results returned");
    }

    // Красивый вывод для отладки
    let display = pretty_format_batches(&batches).unwrap();
    println!("Results:\n{display}");

    // ... assert_batches_eq! ...
}

Вывод pretty_format_batches:

+----+-------+--------+
| id | name  | active |
+----+-------+--------+
| 1  | Alice | true   |
| 2  | Bob   | false  |
+----+-------+--------+

CI-паттерны

Async-тесты

// Стандартный — однопоточный tokio runtime
#[tokio::test]
async fn test_single_thread() {
    // Подходит для большинства тестов
}

// Многопоточный — для тестирования параллелизма
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_multi_thread() {
    // Используйте для тестирования concurrent execution
    // Например, параллельные scan операции
}

Паттерн: общий test setup

/// Создаёт SessionContext с зарегистрированными расширениями
async fn test_ctx() -> SessionContext {
    let ctx = SessionContext::new();
    ctx.register_udf(my_udf());
    ctx.register_udaf(my_udaf());
    ctx.register_table(
        "test_data",
        Arc::new(create_test_table()),
    ).unwrap();
    ctx
}

#[tokio::test]
async fn test_feature_a() {
    let ctx = test_ctx().await;
    // ...
}

#[tokio::test]
async fn test_feature_b() {
    let ctx = test_ctx().await;
    // ...
}
TIP

Каждый вызов test_ctx() создаёт изолированный контекст. Нет shared state — тесты можно запускать параллельно через cargo test.

Паттерн: snapshot-тесты планов

#[tokio::test]
async fn test_plan_snapshot() {
    let ctx = test_ctx().await;
    let plan = ctx
        .sql("SELECT * FROM t WHERE id > 1")
        .await.unwrap()
        .into_optimized_plan()
        .unwrap();

    // Используем Display для сравнения текстового представления
    let plan_text = format!("{plan}");
    insta::assert_snapshot!(plan_text);
    // insta создаст .snap файл при первом запуске
    // При изменениях — cargo insta review
}

Резюме

ИнструментКогда использовать
assert_batches_eq!Проверка результата с фиксированным порядком (с ORDER BY)
assert_batches_sorted_eq!Проверка результата без гарантии порядка
SessionContext::new()Создание изолированного тестового контекста
register_batch / register_tableЗагрузка тестовых данных
into_unoptimized_plan()Тестирование OptimizerRule изолированно
pretty_format_batchesОтладочный вывод RecordBatch
sqllogictestSQL-level regression-тесты
#[tokio::test]Стандартный async-тест
#[tokio::test(flavor = "multi_thread")]Тестирование параллельного выполнения
instaSnapshot-тесты планов выполнения

Хорошо протестированные расширения — основа стабильной системы на DataFusion. assert_batches_eq! + SessionContext::new() — ваш повседневный рабочий инструмент.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Что проверяет макрос assert_batches_eq! в тестах DataFusion?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8