Тестирование расширений DataFusion
В предыдущих уроках мы создавали UDF, TableProvider, OptimizerRule. Теперь разберём, как всё это тестировать: от unit-тестов отдельных функций до SQL-level проверок через sqllogictest. DataFusion предоставляет набор тестовых макросов и паттернов, значительно упрощающих процесс.
Тестовый harness: SessionContext + tokio::test
Базовый паттерн тестирования DataFusion-расширений: создать SessionContext, зарегистрировать расширение, выполнить SQL, проверить результат:
#[cfg(test)]
mod tests {
use datafusion::prelude::*;
use datafusion::arrow::array::{Int64Array, StringArray};
use datafusion::arrow::record_batch::RecordBatch;
use std::sync::Arc;
#[tokio::test]
async fn test_basic_query() {
// SessionContext::new() — дешёвый in-memory контекст
let ctx = SessionContext::new();
// Регистрируем данные
ctx.register_batch("test_table", create_test_batch())
.unwrap();
// Выполняем запрос
let df = ctx.sql("SELECT * FROM test_table WHERE id > 1")
.await
.unwrap();
let batches = df.collect().await.unwrap();
// ... проверяем результат
}
fn create_test_batch() -> RecordBatch {
let id = Arc::new(Int64Array::from(vec![1, 2, 3]));
let name = Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol"]));
RecordBatch::try_from_iter(vec![
("id", id as _),
("name", name as _),
]).unwrap()
}
}
SessionContext::new() — не singleton. Каждый тест создаёт независимый контекст со своим каталогом, конфигурацией и памятью. Никакого shared state между тестами.
assert_batches_eq! — golden-file стиль
Главный инструмент проверки результатов — макрос assert_batches_eq!. Он сравнивает Vec<RecordBatch> с текстовым представлением таблицы:
use datafusion::assert_batches_eq;
#[tokio::test]
async fn test_select() {
let ctx = SessionContext::new();
ctx.register_batch("t", create_test_batch()).unwrap();
let batches = ctx
.sql("SELECT id, name FROM t ORDER BY id")
.await.unwrap()
.collect().await.unwrap();
// Golden-file стиль: ожидаемый результат как текстовая таблица
assert_batches_eq!(
&[
"+----+-------+",
"| id | name |",
"+----+-------+",
"| 1 | Alice |",
"| 2 | Bob |",
"| 3 | Carol |",
"+----+-------+",
],
&batches
);
}
assert_batches_sorted_eq! — для недетерминированного порядка
Если порядок строк не гарантирован (нет ORDER BY), используйте assert_batches_sorted_eq! — он сортирует строки перед сравнением:
use datafusion::assert_batches_sorted_eq;
#[tokio::test]
async fn test_aggregate() {
let ctx = SessionContext::new();
ctx.register_batch("sales", create_sales_batch()).unwrap();
let batches = ctx
.sql("SELECT region, SUM(amount) as total FROM sales GROUP BY region")
.await.unwrap()
.collect().await.unwrap();
// Порядок строк не гарантирован → sorted_eq
assert_batches_sorted_eq!(
&[
"+--------+-------+",
"| region | total |",
"+--------+-------+",
"| East | 300 |",
"| West | 500 |",
"+--------+-------+",
],
&batches
);
}
Используйте assert_batches_eq! (строгий порядок) только с ORDER BY в запросе. Без ORDER BY результат может меняться между запусками — тест станет flaky.
Тестирование скалярных UDF
Паттерн: зарегистрировать UDF → выполнить SQL → проверить assert_batches_eq:
use datafusion::logical_expr::{create_udf, Volatility};
use datafusion::arrow::datatypes::DataType;
use datafusion::physical_plan::functions::make_scalar_function;
#[tokio::test]
async fn test_double_udf() {
let ctx = SessionContext::new();
// Определяем UDF
let double_fn = make_scalar_function(|args: &[ArrayRef]| {
let input = args[0]
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let result: Int64Array = input.iter()
.map(|v| v.map(|x| x * 2))
.collect();
Ok(Arc::new(result) as ArrayRef)
});
let udf = create_udf(
"double",
vec![DataType::Int64],
DataType::Int64,
Volatility::Immutable,
double_fn,
);
ctx.register_udf(udf);
// Регистрируем тестовые данные
ctx.register_batch("t", create_test_batch()).unwrap();
// Выполняем SQL с UDF
let batches = ctx
.sql("SELECT id, double(id) as doubled FROM t ORDER BY id")
.await.unwrap()
.collect().await.unwrap();
assert_batches_eq!(
&[
"+----+---------+",
"| id | doubled |",
"+----+---------+",
"| 1 | 2 |",
"| 2 | 4 |",
"| 3 | 6 |",
"+----+---------+",
],
&batches
);
}
Тестирование граничных случаев UDF
#[tokio::test]
async fn test_double_udf_with_nulls() {
let ctx = SessionContext::new();
// ... регистрация UDF ...
let batch = RecordBatch::try_from_iter(vec![(
"id",
Arc::new(Int64Array::from(vec![Some(1), None, Some(3)])) as _,
)]).unwrap();
ctx.register_batch("t", batch).unwrap();
let batches = ctx
.sql("SELECT double(id) as result FROM t ORDER BY id")
.await.unwrap()
.collect().await.unwrap();
assert_batches_eq!(
&[
"+--------+",
"| result |",
"+--------+",
"| 2 |",
"| |", // NULL → NULL (корректная обработка)
"| 6 |",
"+--------+",
],
&batches
);
}
Тестирование агрегатных UDF
use datafusion::logical_expr::{
AggregateUDFImpl, Signature, Volatility,
};
#[tokio::test]
async fn test_custom_sum_aggregate() {
let ctx = SessionContext::new();
// Регистрируем кастомный агрегат
ctx.register_udaf(my_sum_udaf());
ctx.register_batch("data", create_grouped_batch()).unwrap();
let batches = ctx
.sql("SELECT grp, my_sum(value) as total FROM data GROUP BY grp ORDER BY grp")
.await.unwrap()
.collect().await.unwrap();
assert_batches_sorted_eq!(
&[
"+-----+-------+",
"| grp | total |",
"+-----+-------+",
"| A | 30 |",
"| B | 70 |",
"+-----+-------+",
],
&batches
);
}
Для агрегатных функций с GROUP BY используйте assert_batches_sorted_eq! — порядок групп не детерминирован без ORDER BY.
Тестирование TableProvider
Зарегистрируйте провайдер и проверьте результат через SQL:
use datafusion::catalog::TableProvider;
use datafusion::datasource::MemTable;
use datafusion::arrow::datatypes::{Schema, Field, DataType};
#[tokio::test]
async fn test_custom_table_provider() {
let ctx = SessionContext::new();
// Создаём MemTable — встроенный TableProvider для тестов
let schema = Arc::new(Schema::new(vec![
Field::new("city", DataType::Utf8, false),
Field::new("population", DataType::Int64, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["Moscow", "Berlin", "Tokyo"])),
Arc::new(Int64Array::from(vec![12_000_000, 3_600_000, 14_000_000])),
],
).unwrap();
let table = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
ctx.register_table("cities", Arc::new(table)).unwrap();
// Проверяем, что filter pushdown работает
let batches = ctx
.sql("SELECT city FROM cities WHERE population > 10000000 ORDER BY city")
.await.unwrap()
.collect().await.unwrap();
assert_batches_eq!(
&[
"+--------+",
"| city |",
"+--------+",
"| Moscow |",
"| Tokyo |",
"+--------+",
],
&batches
);
}
Проверка метаданных scan
Для более детального тестирования — проверяйте план выполнения:
#[tokio::test]
async fn test_table_provider_filter_pushdown() {
let ctx = SessionContext::new();
// ... регистрация таблицы ...
// Проверяем EXPLAIN — pushdown фильтра в провайдер
let explain = ctx
.sql("EXPLAIN SELECT * FROM my_table WHERE id > 100")
.await.unwrap()
.collect().await.unwrap();
let plan_str = format!("{:?}", explain);
assert!(
plan_str.contains("FilterPushdown"),
"Expected filter pushdown in plan: {plan_str}"
);
}
Тестирование OptimizerRule
Паттерн: создать логический план → применить правило → проверить трансформированный план:
use datafusion::optimizer::{OptimizerRule, OptimizerConfig};
use datafusion::logical_expr::LogicalPlanBuilder;
#[tokio::test]
async fn test_optimizer_rule() {
let ctx = SessionContext::new();
ctx.register_batch("t", create_test_batch()).unwrap();
// Создаём план через SQL
let plan = ctx
.sql("SELECT * FROM t WHERE id > 1 AND id > 1")
.await.unwrap()
.into_unoptimized_plan();
// Применяем наше правило
let rule = MyDeduplicateFiltersRule::new();
let config = ctx.state().config_options().clone();
let optimized = rule
.rewrite(plan, &config)
.unwrap()
.data;
// Проверяем, что дублирующий фильтр убран
let plan_str = format!("{optimized}");
// Ожидаем один фильтр, не два
let filter_count = plan_str.matches("Filter").count();
assert_eq!(
filter_count, 1,
"Expected 1 filter after dedup, got {filter_count}: {plan_str}"
);
}
into_unoptimized_plan() — возвращает логический план до применения оптимизатора. Это позволяет тестировать ваше правило изолированно, без влияния встроенных правил DataFusion.
sqllogictest: SQL-level тестирование
DataFusion использует sqllogictest для тестирования SQL-поведения. Это формат, где каждый тест — SQL-запрос и ожидаемый результат:
# test_basic.slt
# Создание таблицы
statement ok
CREATE TABLE test AS VALUES (1, 'a'), (2, 'b'), (3, 'c');
# Проверка SELECT
query IT
SELECT * FROM test ORDER BY column1
----
1 a
2 b
3 c
# Проверка агрегации
query I
SELECT COUNT(*) FROM test
----
3
# Ожидаем ошибку
statement error DataFusion error
SELECT * FROM nonexistent_table
Формат sqllogictest
| Директива | Описание |
|---|---|
statement ok | SQL должен выполниться без ошибки |
statement error <pattern> | SQL должен завершиться ошибкой, содержащей pattern |
query <types> | SQL возвращает результат; типы: I (int), T (text), R (real), B (bool) |
---- | Разделитель между запросом и ожидаемым результатом |
Интеграция в проект
use datafusion::test_util::TestContextProvider;
use sqllogictest::Runner;
#[tokio::test]
async fn run_sql_logic_tests() {
let ctx = SessionContext::new();
// Регистрируем расширения
register_my_extensions(&ctx).await;
let mut runner = Runner::new(|| async {
// Провайдер, исполняющий SQL через наш ctx
Ok(MyTestContext::new(ctx.clone()))
});
// Запускаем все .slt файлы
runner.run_file("tests/sqllogictest/my_extension.slt")
.await
.unwrap();
}
sqllogictest лучше всего подходит для тестирования SQL-совместимости и регрессий. Для тестирования внутренних деталей (структура плана, метаданные) — используйте unit-тесты с assert_batches_eq.
Визуализация результатов: pretty_format_batches
Для отладки и диагностики — человекочитаемый вывод RecordBatch:
use datafusion::arrow::util::pretty::pretty_format_batches;
#[tokio::test]
async fn test_with_debug_output() {
let ctx = SessionContext::new();
// ...
let batches = ctx
.sql("SELECT * FROM t")
.await.unwrap()
.collect().await.unwrap();
// Диагностический вывод в случае неудачи
if batches.is_empty() {
panic!("No results returned");
}
// Красивый вывод для отладки
let display = pretty_format_batches(&batches).unwrap();
println!("Results:\n{display}");
// ... assert_batches_eq! ...
}
Вывод pretty_format_batches:
+----+-------+--------+
| id | name | active |
+----+-------+--------+
| 1 | Alice | true |
| 2 | Bob | false |
+----+-------+--------+
CI-паттерны
Async-тесты
// Стандартный — однопоточный tokio runtime
#[tokio::test]
async fn test_single_thread() {
// Подходит для большинства тестов
}
// Многопоточный — для тестирования параллелизма
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_multi_thread() {
// Используйте для тестирования concurrent execution
// Например, параллельные scan операции
}
Паттерн: общий test setup
/// Создаёт SessionContext с зарегистрированными расширениями
async fn test_ctx() -> SessionContext {
let ctx = SessionContext::new();
ctx.register_udf(my_udf());
ctx.register_udaf(my_udaf());
ctx.register_table(
"test_data",
Arc::new(create_test_table()),
).unwrap();
ctx
}
#[tokio::test]
async fn test_feature_a() {
let ctx = test_ctx().await;
// ...
}
#[tokio::test]
async fn test_feature_b() {
let ctx = test_ctx().await;
// ...
}
Каждый вызов test_ctx() создаёт изолированный контекст. Нет shared state — тесты можно запускать параллельно через cargo test.
Паттерн: snapshot-тесты планов
#[tokio::test]
async fn test_plan_snapshot() {
let ctx = test_ctx().await;
let plan = ctx
.sql("SELECT * FROM t WHERE id > 1")
.await.unwrap()
.into_optimized_plan()
.unwrap();
// Используем Display для сравнения текстового представления
let plan_text = format!("{plan}");
insta::assert_snapshot!(plan_text);
// insta создаст .snap файл при первом запуске
// При изменениях — cargo insta review
}
Резюме
| Инструмент | Когда использовать |
|---|---|
assert_batches_eq! | Проверка результата с фиксированным порядком (с ORDER BY) |
assert_batches_sorted_eq! | Проверка результата без гарантии порядка |
SessionContext::new() | Создание изолированного тестового контекста |
register_batch / register_table | Загрузка тестовых данных |
into_unoptimized_plan() | Тестирование OptimizerRule изолированно |
pretty_format_batches | Отладочный вывод RecordBatch |
sqllogictest | SQL-level regression-тесты |
#[tokio::test] | Стандартный async-тест |
#[tokio::test(flavor = "multi_thread")] | Тестирование параллельного выполнения |
insta | Snapshot-тесты планов выполнения |
Хорошо протестированные расширения — основа стабильной системы на DataFusion. assert_batches_eq! + SessionContext::new() — ваш повседневный рабочий инструмент.