Learning Platform
Глоссарий Troubleshooting
Урок 11.01 · 14 мин
Продвинутый
SessionConfigRuntimeEnvBuilderbatch_sizetarget_partitionsmemory_limitspill_compressionconfigurationcooperative cancellationEnsureCooperativetask budgettokio yield

Конфигурация runtime: SessionConfig, RuntimeEnv и лимиты

В модуле 02 мы изучили управление памятью — MemoryPool, FairSpillPool и DiskManager. Теперь переходим к системной настройке всего runtime: какие параметры критичны для производительности, как их задавать и как строить профили конфигурации для разных нагрузок.

Архитектура конфигурации DataFusion

DataFusion разделяет конфигурацию на два уровня:

Два уровня конфигурации DataFusion
SessionConfigПараметры запросов: batch_size, target_partitions, оптимизации планировщика
Отвечает за
Область
RuntimeEnvИнфраструктура: пул памяти, диск для spill, object store
Отвечает за
Область
TIP

Правило: SessionConfig меняется «дёшево» (можно на каждый запрос), RuntimeEnv создаётся «дорого» (один раз на приложение). Не пересоздавайте RuntimeEnv для каждого запроса.

SessionConfig: ключевые параметры производительности

batch_size — размер RecordBatch

batch_size определяет, сколько строк обрабатывается в одном RecordBatch за раз. Значение по умолчанию — 8192.

use datafusion::prelude::*;

// Программно
let config = SessionConfig::new()
    .with_batch_size(4096);

let ctx = SessionContext::new_with_config(config);
-- Через SQL
SET datafusion.execution.batch_size = 4096;

Влияние на производительность:

Влияние batch_size на производительность
Маленький batch (512-2048)Меньше памяти на batch, больше вызовов poll_next(), больше overhead на SIMD-прогрев
Плюсы
Минусы
Большой batch (8192-65536)Больше памяти на batch, меньше вызовов, лучше SIMD-утилизация
Плюсы
Минусы
NOTE

Для memory-limited сред (контейнеры с 512 MB — 2 GB) уменьшение batch_size до 1024-2048 уменьшает вероятность spill. Для аналитических серверов с 16+ GB — значение по умолчанию (8192) оптимально.

target_partitions — параллелизм выполнения

target_partitions определяет целевое количество партиций при выполнении запросов. По умолчанию равен количеству CPU-ядер.

let config = SessionConfig::new()
    .with_target_partitions(8);
SET datafusion.execution.target_partitions = 8;

Как target_partitions влияет на план:

-- При target_partitions = 16:
-- DataSourceExec (1 partition) 
--   → RepartitionExec: RoundRobinBatch(16)  -- расширяет до 16
--     → AggregateExec: mode=Partial          -- 16 параллельных partial
--       → CoalesceBatchesExec
--         → RepartitionExec: Hash([key], 16)
--           → AggregateExec: mode=FinalPartitioned
WARNING

Увеличение target_partitions не всегда ускоряет запросы. Каждая партиция потребляет память для промежуточных буферов. При memory_limit = 2 GB и target_partitions = 32 каждая партиция получит ~62 MB — крупные Hash Join могут не поместиться и упасть с ResourcesExhausted. Формула: memory_per_partition ≈ memory_limit / target_partitions.

Управление repartition-стратегиями

DataFusion предлагает несколько флагов для контроля автоматического repartitioning:

let config = SessionConfig::new()
    .with_repartition_joins(true)        // repartition для join keys
    .with_repartition_aggregations(true)  // repartition для GROUP BY
    .with_repartition_windows(true)       // repartition для PARTITION BY в window
    .with_repartition_sorts(true)         // параллельная сортировка
    .with_repartition_file_scans(true);   // разбивка крупных файлов на партиции
SET datafusion.optimizer.repartition_joins = true;
SET datafusion.optimizer.repartition_aggregations = true;
SET datafusion.optimizer.repartition_windows = true;
SET datafusion.optimizer.repartition_sorts = true;
SET datafusion.optimizer.repartition_file_scans = true;
TIP

Все флаги включены по умолчанию. Отключайте их только при диагностике — например, repartition_joins = false убирает Hash-repartition перед Join, что полезно если данные уже разложены по ключу join.

RuntimeEnvBuilder: настройка инфраструктуры

RuntimeEnv определяет инфраструктуру выполнения. Создаётся через builder:

use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::disk_manager::DiskManagerConfig;
use std::sync::Arc;

let runtime = RuntimeEnvBuilder::new()
    // Пул памяти: 4 GB с fair-распределением и spill
    .with_memory_pool(Arc::new(FairSpillPool::new(4_000_000_000)))
    // Менеджер диска для spill-файлов
    .with_disk_manager(DiskManagerConfig::NewOs)
    .build_arc()?;

// Комбинируем с SessionConfig
let config = SessionConfig::new()
    .with_batch_size(8192)
    .with_target_partitions(8);

let ctx = SessionContext::new_with_config_rt(config, runtime);
NOTE

DiskManagerConfig::NewOs использует системную temp-директорию для spill-файлов. Для production укажите конкретный путь: DiskManagerConfig::NewSpecified(vec!["/data/spill".into()]) — это даёт контроль над дисковым пространством и позволяет разместить spill на быстром SSD.

Сжатие spill-файлов

DataFusion v44+ поддерживает сжатие spill-файлов для уменьшения I/O:

use datafusion::common::config::SpillCompression;

let config = SessionConfig::new()
    .with_spill_compression(SpillCompression::Zstd);
SET datafusion.execution.spill_compression = 'zstd';

Выбор алгоритма сжатия:

Алгоритмы сжатия spill-файлов
Zstd
LZ4
Snappy
None

Конфигурация через SQL SET

Для интерактивной работы все параметры SessionConfig доступны через SQL:

-- Посмотреть все параметры
SHOW ALL;

-- Посмотреть конкретный параметр
SHOW datafusion.execution.batch_size;

-- Посмотреть группу параметров
SHOW ALL LIKE '%partition%';

-- Изменить параметр для текущей сессии
SET datafusion.execution.target_partitions = 4;
SET datafusion.execution.batch_size = 1024;
TIP

SET влияет только на текущий SessionContext. Если вы создаёте новый SessionContext с тем же RuntimeEnv, он получит значения по умолчанию. Для «глобальных» настроек используйте SessionConfig при создании контекста.

Профили конфигурации для типичных нагрузок

Профиль: Аналитический сервер (16+ GB RAM, 8+ ядер)

let runtime = RuntimeEnvBuilder::new()
    .with_memory_pool(Arc::new(FairSpillPool::new(12_000_000_000))) // 12 GB
    .with_disk_manager(DiskManagerConfig::NewSpecified(
        vec!["/data/spill".into()]
    ))
    .build_arc()?;

let config = SessionConfig::new()
    .with_batch_size(8192)
    .with_target_partitions(8)
    .with_spill_compression(SpillCompression::Zstd)
    .with_repartition_joins(true)
    .with_repartition_aggregations(true);

let ctx = SessionContext::new_with_config_rt(config, runtime);

Профиль: Контейнер с лимитом 2 GB

let runtime = RuntimeEnvBuilder::new()
    .with_memory_pool(Arc::new(FairSpillPool::new(1_500_000_000))) // 1.5 GB
    .with_disk_manager(DiskManagerConfig::NewOs)
    .build_arc()?;

let config = SessionConfig::new()
    .with_batch_size(1024)       // уменьшаем — меньше памяти на batch
    .with_target_partitions(4)   // меньше партиций — больше памяти на каждую
    .with_spill_compression(SpillCompression::Zstd);

let ctx = SessionContext::new_with_config_rt(config, runtime);
WARNING

Оставляйте 25-30% памяти контейнера для OS, GC (если Python), и overhead самого DataFusion. Для контейнера с 2 GB лимитом MemoryPool не должен превышать ~1.5 GB.

Профиль: Embedded / Edge (512 MB RAM)

let runtime = RuntimeEnvBuilder::new()
    .with_memory_pool(Arc::new(FairSpillPool::new(350_000_000))) // 350 MB
    .with_disk_manager(DiskManagerConfig::NewOs)
    .build_arc()?;

let config = SessionConfig::new()
    .with_batch_size(512)
    .with_target_partitions(2)
    .with_spill_compression(SpillCompression::Lz4)  // быстрее Zstd
    .with_repartition_joins(false)    // экономим память
    .with_repartition_windows(false);

let ctx = SessionContext::new_with_config_rt(config, runtime);

Best practices конфигурации

Чек-лист настройки DataFusion
1. Начните с дефолтов — замерьте baseline через EXPLAIN ANALYZEВсегда начинайте с дефолтов и замеряйте baseline
2. Проверьте memory/partition ratiomemory_limit / target_partitions — если ratio < 100 MB, уменьшите партиции
3. Настройте spill: FairSpillPool + DiskManager + compressionFairSpillPool + DiskManager + compression — для production обязательно
4. Проверьте file layout — размер и количество файловОдин файл — мало партиций. Много мелких файлов — overhead на open. 64-256 MB / файл оптимально
5. Тюните batch_size только по результатам профилированияУменьшите batch_size только если видите чрезмерный spill или high peak memory

Антипаттерны

АнтипаттернПочему плохоЧто делать
target_partitions = 128 на 8-ядерной машине128 задач конкурируют за 8 ядер, overhead на scheduling и памятьtarget_partitions = num_cores или num_cores * 2
batch_size = 1 для «минимизации памяти»Уничтожает SIMD-vectorization, каждый batch имеет fixed overheadМинимум 256, рекомендуется 1024+
Без MemoryPool в productionOOM вместо контролируемой ошибкиFairSpillPool + memory_limit < 80% доступной RAM
DiskManagerConfig::Disabled при FairSpillPoolSpill невозможен — FairSpillPool вернёт ResourcesExhaustedВсегда включайте DiskManager с FairSpillPool
Пересоздание RuntimeEnv на каждый запросПотеря кешей ObjectStore, пересоздание пулов потоковОдин RuntimeEnv → много SessionContext

Мониторинг конфигурации в runtime

// Прочитать текущие значения конфигурации
let config = ctx.copied_config();
println!("batch_size: {}", config.batch_size());
println!("target_partitions: {}", config.target_partitions());

// Через SQL
let df = ctx.sql("SHOW ALL LIKE '%batch%'").await?;
df.show().await?;

Cooperative cancellation — async-aware прерывание запросов (DataFusion 49+)

Long-running queries в production требуют способа надёжно отменить выполнение — по timeout, на запрос пользователя, при перегрузке. До 49.x cancellation в DataFusion работал через tokio::select! на верхнем уровне — асинхронные операторы реагировали на abort, но синхронные tight loops внутри RecordBatchStream::poll_next могли удерживать тред несколько секунд после “отмены”.

DataFusion 49+ ввёл cooperative cancellation — protocol, при котором операторы периодически проверяют флаг отмены и останавливаются на yield-points.

Проблема runaway queries

// Старый код (упрощённо): tight CPU loop без yield points
impl Stream for OldOperator {
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<...> {
        let batch = self.input.next();           // dependency
        let result = expensive_compute(batch);    // 30 секунд CPU work
        Poll::Ready(Some(result))
    }
}

При cancellation tokio пытается отменить future, но poll_next уже вошёл в expensive_compute и не возвращает control. Result — query “висит” на 30 секунд после abort, держит память и CPU.

EnsureCooperative wrapper

DataFusion 49+ добавил EnsureCooperativeExec — physical optimizer rule, которая wrapает операторы с потенциально long-running computations:

use datafusion::physical_optimizer::ensure_coop::EnsureCoopExec;
use datafusion::execution::TaskContext;

// Optimizer вставляет EnsureCoopExec вокруг "тяжёлых" операторов:
//   AggregateExec / SortExec / WindowExec / HashJoinExec
//
// EnsureCoopExec оборачивает каждый poll_next вызов так,
// что после N strerstdiff'нных rows проверяется TaskBudget
// и task yields cooperatively.

Поведение:

Original plan:                   Optimized plan:
HashJoinExec                     EnsureCoopExec
  └── input                        └── HashJoinExec
                                         └── input

EnsureCoopExec мониторит сколько rows / времени потратил inner оператор и вставляет yield, давая tokio runtime возможность обработать cancellation signal.

Task budget API

DataFusion вводит Task Budget — структура, ограничивающая объём работы, который операция может выполнить без yield:

use datafusion::execution::task_budget::TaskBudget;

// Budget настраивается через SessionConfig:
let config = SessionConfig::new()
    .with_task_budget_rows(8192)            // yield после 8K rows
    .with_task_budget_duration_ms(50);      // или после 50ms — что раньше

let ctx = SessionContext::new_with_config(config);

Когда оператор обрабатывает данные:

impl AggregateExec {
    async fn process_batch(&self, batch: RecordBatch, ctx: &TaskContext) -> Result<()> {
        for row in batch.rows() {
            self.aggregate(row);

            // Проверка budget — если исчерпан, yield
            if ctx.task_budget().consume(1).await {
                tokio::task::yield_now().await;     // дать runtime обработать abort
            }
        }
        Ok(())
    }
}

TaskBudget::consume(rows) returns true когда budget исчерпан — оператор yields, что даёт runtime возможность доставить abort signal.

Cancellation example

use datafusion::prelude::SessionContext;
use tokio::time::{timeout, Duration};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let ctx = SessionContext::new();
    ctx.register_parquet("events", "events_huge.parquet", Default::default()).await?;

    // Запустить long-running query с timeout 5 секунд
    let result = timeout(Duration::from_secs(5), async {
        let df = ctx.sql("
            SELECT user_id, COUNT(*), AVG(amount)
            FROM events
            GROUP BY user_id
            ORDER BY 2 DESC
            LIMIT 100
        ").await?;
        df.collect().await
    }).await;

    match result {
        Ok(Ok(_)) => println!("Query completed"),
        Ok(Err(e)) => println!("Query error: {}", e),
        Err(_) => println!("Query cancelled by timeout"),  // в 49+ — actually cancels быстро
    }
    Ok(())
}

В DataFusion 49+ cancellation отрабатывает обычно в пределах сотен миллисекунд — task budget гарантирует regular yield-points. До 49 могло занимать секунды или весь backlog operator.

Применение для production

СценарийРешение через cooperative cancellation
Query timeout (SLA)tokio::time::timeout вокруг collect; гарантия abort в пределах task_budget_duration
User-initiated cancelCancellationToken пробрасывается в TaskContext — checked в EnsureCoopExec
Memory pressure abortMemoryPool возвращает ResourcesExhausted; runaway loop теперь не блокирует cleanup
Service shutdownGraceful shutdown — runtime отменяет все queries, cooperative ensures fast termination

Custom operators — добавление cooperative checks

Если вы пишете кастомный ExecutionPlan с tight CPU loop:

impl Stream for MyCustomOperator {
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<...> {
        let mut processed = 0;
        loop {
            let row = self.next_row();

            // ... обработка ...
            processed += 1;

            // Cooperative check — каждые batch_size rows
            if processed >= self.context.session_config().batch_size() {
                // Yield: пересоздаём future, runtime может обработать cancellation
                cx.waker().wake_by_ref();
                return Poll::Pending;
            }
        }
    }
}

Альтернатива — wrap кастомный оператор в EnsureCoopExec через physical optimizer rule.

TIP

Если у вас в production случаются “висящие” queries, которые не реагируют на abort signal — обновитесь до DataFusion 49+ и убедитесь, что EnsureCooperative rule включён (по умолчанию). Замерьте latency cancellation через tokio::time::timeout — до 49 могла быть секундная latency на абсолютно тяжёлых aggregations, в 49+ — миллисекунды.

Cite DataFusion 49 cooperative cancellation issue + tokio cooperative scheduling.


Итоги

  • SessionConfig — «дешёвая» конфигурация, меняется на каждый запрос: batch_size, target_partitions, repartition-флаги
  • RuntimeEnv — «дорогая» инфраструктура, создаётся один раз: MemoryPool, DiskManager, ObjectStore
  • batch_size по умолчанию (8192) оптимален для большинства случаев — уменьшайте только для memory-constrained сред
  • target_partitions должен соответствовать количеству ядер — больше не значит быстрее
  • FairSpillPool + DiskManager + spill_compression — обязательная комбинация для production
  • Следующий урок: партиционирование и параллелизм — как DataFusion распараллеливает работу
ClickHouse: memory tuning ClickHouse: ключевые настройки производительности

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. В чём принципиальная разница между SessionConfig и RuntimeEnv при настройке DataFusion?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4