Learning Platform
Глоссарий Troubleshooting
Урок 03.05 · 14 мин
Средний
SessionContextSessionConfigRuntimeEnvMemoryPoolObjectStoreTaskContext

SessionContext: точка входа в DataFusion

Весь pipeline DataFusion — парсинг, планирование, оптимизация, выполнение — координируется через один объект: SessionContext. Это главная точка входа для пользователя.

Что содержит SessionContext

SessionContext объединяет три компонента:

Структура SessionContext
SessionContextГлавная точка входа: координирует парсинг, планирование, оптимизацию и выполнение SQL
SessionConfigНастройки поведения: batch_size, target_partitions, правила оптимизатора
SessionStateСостояние сессии: каталог таблиц, реестр UDF, оптимизатор и планировщик
RuntimeEnvРесурсы выполнения: пул памяти, disk manager для spill, object store для S3/GCS
  • SessionConfig — настройки поведения: размер batch, количество партиций, правила оптимизатора
  • SessionState — состояние сессии: каталог, реестр функций, оптимизатор, планировщик
  • RuntimeEnv — ресурсы выполнения: пул памяти, менеджер диска, object store

Создание и конфигурация

Минимальная сессия

use datafusion::prelude::*;

let ctx = SessionContext::new();  // Всё по умолчанию
let df = ctx.sql("SELECT 1 + 2").await?;
let results = df.collect().await?;

Настройка через SessionConfig

let config = SessionConfig::new()
    .with_batch_size(4096)              // Размер RecordBatch (по умолчанию 8192)
    .with_target_partitions(16)         // Целевое количество партиций
    .with_information_schema(true)      // Включить INFORMATION_SCHEMA
    .with_create_default_catalog_and_schema(true)
    .set_bool("datafusion.optimizer.enable_round_robin_repartition", false)
    .set_u64("datafusion.execution.sort_spill_reservation_bytes", 10 * 1024 * 1024);

let ctx = SessionContext::new_with_config(config);

Ключевые параметры SessionConfig

ПараметрПо умолчаниюОписание
batch_size8192Строк в одном RecordBatch
target_partitionsкол-во CPUЦелевой параллелизм
repartition_joinstrueВставлять repartition перед join
repartition_aggregationstrueВставлять repartition перед агрегацией
parquet_pruningtrueRow group pruning по статистике
collect_statisticsfalseСобирать статистику при сканировании
NOTE

Все параметры доступны через SQL: SET datafusion.execution.batch_size = 4096;. Это позволяет менять настройки внутри SQL-скрипта без перекомпиляции.

RuntimeEnv: ресурсы выполнения

RuntimeEnv управляет ресурсами, которые разделяются между запросами:

use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::memory_pool::FairSpillPool;

let runtime_config = RuntimeEnvBuilder::new()
    .with_memory_pool(Arc::new(FairSpillPool::new(1024 * 1024 * 1024)))  // 1 GB
    .with_disk_manager(DiskManagerConfig::new_specified(vec!["/tmp/datafusion-spill"]))
    .with_temp_file_path("/tmp/datafusion");

let runtime = Arc::new(runtime_config.build()?);
let config = SessionConfig::new();
let ctx = SessionContext::new_with_config_rt(config, runtime);

Memory Pool

DataFusion контролирует использование памяти через пул:

  • UnboundedMemoryPool (по умолчанию) — без ограничений, полагается на OOM killer
  • FairSpillPool — ограничивает память, при превышении сбрасывает данные на диск (spill)
  • GreedyMemoryPool — ограничивает память, при превышении возвращает ошибку
// FairSpillPool: операторы получают справедливую долю памяти
// Если HashJoin превышает лимит — промежуточные данные уходят на диск
let pool = FairSpillPool::new(2 * 1024 * 1024 * 1024); // 2 GB

Spill-to-disk критичен для production: без него большой GROUP BY или ORDER BY может исчерпать всю память процесса.

Object Store Registry

RuntimeEnv содержит реестр object stores для доступа к удалённому хранилищу:

use object_store::aws::AmazonS3Builder;
use url::Url;

let s3 = AmazonS3Builder::from_env()
    .with_bucket_name("my-data")
    .build()?;

runtime.register_object_store(
    &Url::parse("s3://my-data")?,
    Arc::new(s3),
);

// Теперь можно работать с S3
ctx.register_parquet("orders", "s3://my-data/orders/", Default::default()).await?;

DataFusion поддерживает object_store crate — абстракцию над S3, GCS, Azure Blob, HDFS и локальной файловой системой.

Disk Manager

DiskManager управляет временными файлами при spill-to-disk:

let dm = DiskManagerConfig::new_specified(vec![
    "/fast-ssd/tmp/datafusion",  // Предпочтительный быстрый диск
    "/slow-hdd/tmp/datafusion",  // Запасной
]);

Когда оператор (HashJoin, Sort, Aggregate) превышает лимит памяти, данные сбрасываются во временные файлы через DiskManager. После обработки файлы автоматически удаляются.

TaskContext: контекст выполнения партиции

Каждый вызов ExecutionPlan::execute() получает TaskContext — облегчённую ссылку на ресурсы сессии:

pub struct TaskContext {
    session_config: SessionConfig,
    scalar_functions: HashMap<String, Arc<ScalarUDF>>,
    aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
    window_functions: HashMap<String, Arc<WindowUDF>>,
    runtime: Arc<RuntimeEnv>,
}

TaskContext обеспечивает доступ к конфигурации и функциям без полного SessionContext. Это позволяет запускать ExecutionPlan в изолированных контекстах — например, в разных потоках с разными наборами UDF.

Шаблоны использования

Множественные сессии с общим runtime

// Один RuntimeEnv с пулом памяти
let runtime = Arc::new(
    RuntimeEnvBuilder::new()
        .with_memory_pool(Arc::new(FairSpillPool::new(4_000_000_000)))
        .build()?
);

// Две сессии делят один пул памяти
let ctx_analytics = SessionContext::new_with_config_rt(
    SessionConfig::new().with_target_partitions(16),
    runtime.clone(),
);

let ctx_etl = SessionContext::new_with_config_rt(
    SessionConfig::new().with_batch_size(65536).with_target_partitions(4),
    runtime.clone(),
);

Общий RuntimeEnv позволяет контролировать суммарное потребление памяти при нескольких сессиях в одном процессе.

Регистрация UDF

use datafusion::logical_expr::{create_udf, Volatility};
use datafusion::arrow::datatypes::DataType;

let udf = create_udf(
    "to_upper_ru",
    vec![DataType::Utf8],
    DataType::Utf8,
    Volatility::Immutable,
    Arc::new(|args| { /* реализация */ }),
);

ctx.register_udf(udf);
ctx.sql("SELECT to_upper_ru(name) FROM users").await?;

UDF регистрируются в SessionState и доступны во всех SQL-запросах этой сессии.

WARNING

Функция create_udf считается устаревшей (deprecated) начиная с DataFusion v43+. Рекомендуется использовать ScalarUDF::new_from_impl — он даёт полный контроль над типами, nullable-поведением и документацией функции. Подробнее см. модуль UDF.

Жизненный цикл запроса через SessionContext

Pipeline: SQL → DataFrame → RecordBatch
ctx.sql(“SELECT …“)Точка входа: SQL текст. SessionContext.sql() ничего не исполняет — лишь стартует ленивый pipeline.
SessionState — построение планаLazy фаза: парсинг → логический план → оптимизация. Никаких I/O.
DataFrame (lazy)DataFrame обёртывает Optimized LogicalPlan. Терминальная операция (collect/show/write_parquet) запускает следующую стадию.
.collect() — физическое исполнениеEager фаза: физический план + исполнение. Здесь происходят все I/O и вычисления.

Каждый метод SessionContext (sql(), read_parquet(), read_csv()) проходит первые три стадии pipeline и возвращает DataFrame. Выполнение запускается только при вызове terminal-операции: collect(), show(), write_parquet().

Итоги

  • SessionContext = SessionConfig + SessionState + RuntimeEnv
  • SessionConfig управляет batch_size, target_partitions, правилами оптимизатора
  • RuntimeEnv содержит пул памяти, disk manager и object store registry
  • FairSpillPool ограничивает память и сбрасывает данные на диск при превышении
  • TaskContext — облегчённый контекст для выполнения партиций
  • Запрос ленив до collect()/show() — планирование и выполнение разделены ение разделены

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. SessionContext в DataFusion объединяет три компонента. Какие?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7