SessionContext: точка входа в DataFusion
Весь pipeline DataFusion — парсинг, планирование, оптимизация, выполнение — координируется через один объект: SessionContext. Это главная точка входа для пользователя.
Что содержит SessionContext
SessionContext объединяет три компонента:
- SessionConfig — настройки поведения: размер batch, количество партиций, правила оптимизатора
- SessionState — состояние сессии: каталог, реестр функций, оптимизатор, планировщик
- RuntimeEnv — ресурсы выполнения: пул памяти, менеджер диска, object store
Создание и конфигурация
Минимальная сессия
use datafusion::prelude::*;
let ctx = SessionContext::new(); // Всё по умолчанию
let df = ctx.sql("SELECT 1 + 2").await?;
let results = df.collect().await?;
Настройка через SessionConfig
let config = SessionConfig::new()
.with_batch_size(4096) // Размер RecordBatch (по умолчанию 8192)
.with_target_partitions(16) // Целевое количество партиций
.with_information_schema(true) // Включить INFORMATION_SCHEMA
.with_create_default_catalog_and_schema(true)
.set_bool("datafusion.optimizer.enable_round_robin_repartition", false)
.set_u64("datafusion.execution.sort_spill_reservation_bytes", 10 * 1024 * 1024);
let ctx = SessionContext::new_with_config(config);
Ключевые параметры SessionConfig
| Параметр | По умолчанию | Описание |
|---|---|---|
batch_size | 8192 | Строк в одном RecordBatch |
target_partitions | кол-во CPU | Целевой параллелизм |
repartition_joins | true | Вставлять repartition перед join |
repartition_aggregations | true | Вставлять repartition перед агрегацией |
parquet_pruning | true | Row group pruning по статистике |
collect_statistics | false | Собирать статистику при сканировании |
Все параметры доступны через SQL: SET datafusion.execution.batch_size = 4096;. Это позволяет менять настройки внутри SQL-скрипта без перекомпиляции.
RuntimeEnv: ресурсы выполнения
RuntimeEnv управляет ресурсами, которые разделяются между запросами:
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::memory_pool::FairSpillPool;
let runtime_config = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(1024 * 1024 * 1024))) // 1 GB
.with_disk_manager(DiskManagerConfig::new_specified(vec!["/tmp/datafusion-spill"]))
.with_temp_file_path("/tmp/datafusion");
let runtime = Arc::new(runtime_config.build()?);
let config = SessionConfig::new();
let ctx = SessionContext::new_with_config_rt(config, runtime);
Memory Pool
DataFusion контролирует использование памяти через пул:
UnboundedMemoryPool(по умолчанию) — без ограничений, полагается на OOM killerFairSpillPool— ограничивает память, при превышении сбрасывает данные на диск (spill)GreedyMemoryPool— ограничивает память, при превышении возвращает ошибку
// FairSpillPool: операторы получают справедливую долю памяти
// Если HashJoin превышает лимит — промежуточные данные уходят на диск
let pool = FairSpillPool::new(2 * 1024 * 1024 * 1024); // 2 GB
Spill-to-disk критичен для production: без него большой GROUP BY или ORDER BY может исчерпать всю память процесса.
Object Store Registry
RuntimeEnv содержит реестр object stores для доступа к удалённому хранилищу:
use object_store::aws::AmazonS3Builder;
use url::Url;
let s3 = AmazonS3Builder::from_env()
.with_bucket_name("my-data")
.build()?;
runtime.register_object_store(
&Url::parse("s3://my-data")?,
Arc::new(s3),
);
// Теперь можно работать с S3
ctx.register_parquet("orders", "s3://my-data/orders/", Default::default()).await?;
DataFusion поддерживает object_store crate — абстракцию над S3, GCS, Azure Blob, HDFS и локальной файловой системой.
Disk Manager
DiskManager управляет временными файлами при spill-to-disk:
let dm = DiskManagerConfig::new_specified(vec![
"/fast-ssd/tmp/datafusion", // Предпочтительный быстрый диск
"/slow-hdd/tmp/datafusion", // Запасной
]);
Когда оператор (HashJoin, Sort, Aggregate) превышает лимит памяти, данные сбрасываются во временные файлы через DiskManager. После обработки файлы автоматически удаляются.
TaskContext: контекст выполнения партиции
Каждый вызов ExecutionPlan::execute() получает TaskContext — облегчённую ссылку на ресурсы сессии:
pub struct TaskContext {
session_config: SessionConfig,
scalar_functions: HashMap<String, Arc<ScalarUDF>>,
aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
window_functions: HashMap<String, Arc<WindowUDF>>,
runtime: Arc<RuntimeEnv>,
}
TaskContext обеспечивает доступ к конфигурации и функциям без полного SessionContext. Это позволяет запускать ExecutionPlan в изолированных контекстах — например, в разных потоках с разными наборами UDF.
Шаблоны использования
Множественные сессии с общим runtime
// Один RuntimeEnv с пулом памяти
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(4_000_000_000)))
.build()?
);
// Две сессии делят один пул памяти
let ctx_analytics = SessionContext::new_with_config_rt(
SessionConfig::new().with_target_partitions(16),
runtime.clone(),
);
let ctx_etl = SessionContext::new_with_config_rt(
SessionConfig::new().with_batch_size(65536).with_target_partitions(4),
runtime.clone(),
);
Общий RuntimeEnv позволяет контролировать суммарное потребление памяти при нескольких сессиях в одном процессе.
Регистрация UDF
use datafusion::logical_expr::{create_udf, Volatility};
use datafusion::arrow::datatypes::DataType;
let udf = create_udf(
"to_upper_ru",
vec![DataType::Utf8],
DataType::Utf8,
Volatility::Immutable,
Arc::new(|args| { /* реализация */ }),
);
ctx.register_udf(udf);
ctx.sql("SELECT to_upper_ru(name) FROM users").await?;
UDF регистрируются в SessionState и доступны во всех SQL-запросах этой сессии.
Функция create_udf считается устаревшей (deprecated) начиная с DataFusion v43+. Рекомендуется использовать ScalarUDF::new_from_impl — он даёт полный контроль над типами, nullable-поведением и документацией функции. Подробнее см. модуль UDF.
Жизненный цикл запроса через SessionContext
Каждый метод SessionContext (sql(), read_parquet(), read_csv()) проходит первые три стадии pipeline и возвращает DataFrame. Выполнение запускается только при вызове terminal-операции: collect(), show(), write_parquet().
Итоги
-
SessionContext=SessionConfig+SessionState+RuntimeEnv -
SessionConfigуправляет batch_size, target_partitions, правилами оптимизатора -
RuntimeEnvсодержит пул памяти, disk manager и object store registry -
FairSpillPoolограничивает память и сбрасывает данные на диск при превышении -
TaskContext— облегчённый контекст для выполнения партиций - Запрос ленив до
collect()/show()— планирование и выполнение разделены ение разделены