Конфигурация runtime: SessionConfig, RuntimeEnv и лимиты
В модуле 02 мы изучили управление памятью — MemoryPool, FairSpillPool и DiskManager. Теперь переходим к системной настройке всего runtime: какие параметры критичны для производительности, как их задавать и как строить профили конфигурации для разных нагрузок.
Архитектура конфигурации DataFusion
DataFusion разделяет конфигурацию на два уровня:
Правило: SessionConfig меняется «дёшево» (можно на каждый запрос), RuntimeEnv создаётся «дорого» (один раз на приложение). Не пересоздавайте RuntimeEnv для каждого запроса.
SessionConfig: ключевые параметры производительности
batch_size — размер RecordBatch
batch_size определяет, сколько строк обрабатывается в одном RecordBatch за раз. Значение по умолчанию — 8192.
use datafusion::prelude::*;
// Программно
let config = SessionConfig::new()
.with_batch_size(4096);
let ctx = SessionContext::new_with_config(config);
-- Через SQL
SET datafusion.execution.batch_size = 4096;
Влияние на производительность:
Для memory-limited сред (контейнеры с 512 MB — 2 GB) уменьшение batch_size до 1024-2048 уменьшает вероятность spill. Для аналитических серверов с 16+ GB — значение по умолчанию (8192) оптимально.
target_partitions — параллелизм выполнения
target_partitions определяет целевое количество партиций при выполнении запросов. По умолчанию равен количеству CPU-ядер.
let config = SessionConfig::new()
.with_target_partitions(8);
SET datafusion.execution.target_partitions = 8;
Как target_partitions влияет на план:
-- При target_partitions = 16:
-- DataSourceExec (1 partition)
-- → RepartitionExec: RoundRobinBatch(16) -- расширяет до 16
-- → AggregateExec: mode=Partial -- 16 параллельных partial
-- → CoalesceBatchesExec
-- → RepartitionExec: Hash([key], 16)
-- → AggregateExec: mode=FinalPartitioned
Увеличение target_partitions не всегда ускоряет запросы. Каждая партиция потребляет память для промежуточных буферов. При memory_limit = 2 GB и target_partitions = 32 каждая партиция получит ~62 MB — крупные Hash Join могут не поместиться и упасть с ResourcesExhausted. Формула: memory_per_partition ≈ memory_limit / target_partitions.
Управление repartition-стратегиями
DataFusion предлагает несколько флагов для контроля автоматического repartitioning:
let config = SessionConfig::new()
.with_repartition_joins(true) // repartition для join keys
.with_repartition_aggregations(true) // repartition для GROUP BY
.with_repartition_windows(true) // repartition для PARTITION BY в window
.with_repartition_sorts(true) // параллельная сортировка
.with_repartition_file_scans(true); // разбивка крупных файлов на партиции
SET datafusion.optimizer.repartition_joins = true;
SET datafusion.optimizer.repartition_aggregations = true;
SET datafusion.optimizer.repartition_windows = true;
SET datafusion.optimizer.repartition_sorts = true;
SET datafusion.optimizer.repartition_file_scans = true;
Все флаги включены по умолчанию. Отключайте их только при диагностике — например, repartition_joins = false убирает Hash-repartition перед Join, что полезно если данные уже разложены по ключу join.
RuntimeEnvBuilder: настройка инфраструктуры
RuntimeEnv определяет инфраструктуру выполнения. Создаётся через builder:
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::disk_manager::DiskManagerConfig;
use std::sync::Arc;
let runtime = RuntimeEnvBuilder::new()
// Пул памяти: 4 GB с fair-распределением и spill
.with_memory_pool(Arc::new(FairSpillPool::new(4_000_000_000)))
// Менеджер диска для spill-файлов
.with_disk_manager(DiskManagerConfig::NewOs)
.build_arc()?;
// Комбинируем с SessionConfig
let config = SessionConfig::new()
.with_batch_size(8192)
.with_target_partitions(8);
let ctx = SessionContext::new_with_config_rt(config, runtime);
DiskManagerConfig::NewOs использует системную temp-директорию для spill-файлов. Для production укажите конкретный путь: DiskManagerConfig::NewSpecified(vec!["/data/spill".into()]) — это даёт контроль над дисковым пространством и позволяет разместить spill на быстром SSD.
Сжатие spill-файлов
DataFusion v44+ поддерживает сжатие spill-файлов для уменьшения I/O:
use datafusion::common::config::SpillCompression;
let config = SessionConfig::new()
.with_spill_compression(SpillCompression::Zstd);
SET datafusion.execution.spill_compression = 'zstd';
Выбор алгоритма сжатия:
Конфигурация через SQL SET
Для интерактивной работы все параметры SessionConfig доступны через SQL:
-- Посмотреть все параметры
SHOW ALL;
-- Посмотреть конкретный параметр
SHOW datafusion.execution.batch_size;
-- Посмотреть группу параметров
SHOW ALL LIKE '%partition%';
-- Изменить параметр для текущей сессии
SET datafusion.execution.target_partitions = 4;
SET datafusion.execution.batch_size = 1024;
SET влияет только на текущий SessionContext. Если вы создаёте новый SessionContext с тем же RuntimeEnv, он получит значения по умолчанию. Для «глобальных» настроек используйте SessionConfig при создании контекста.
Профили конфигурации для типичных нагрузок
Профиль: Аналитический сервер (16+ GB RAM, 8+ ядер)
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(12_000_000_000))) // 12 GB
.with_disk_manager(DiskManagerConfig::NewSpecified(
vec!["/data/spill".into()]
))
.build_arc()?;
let config = SessionConfig::new()
.with_batch_size(8192)
.with_target_partitions(8)
.with_spill_compression(SpillCompression::Zstd)
.with_repartition_joins(true)
.with_repartition_aggregations(true);
let ctx = SessionContext::new_with_config_rt(config, runtime);
Профиль: Контейнер с лимитом 2 GB
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(1_500_000_000))) // 1.5 GB
.with_disk_manager(DiskManagerConfig::NewOs)
.build_arc()?;
let config = SessionConfig::new()
.with_batch_size(1024) // уменьшаем — меньше памяти на batch
.with_target_partitions(4) // меньше партиций — больше памяти на каждую
.with_spill_compression(SpillCompression::Zstd);
let ctx = SessionContext::new_with_config_rt(config, runtime);
Оставляйте 25-30% памяти контейнера для OS, GC (если Python), и overhead самого DataFusion. Для контейнера с 2 GB лимитом MemoryPool не должен превышать ~1.5 GB.
Профиль: Embedded / Edge (512 MB RAM)
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(350_000_000))) // 350 MB
.with_disk_manager(DiskManagerConfig::NewOs)
.build_arc()?;
let config = SessionConfig::new()
.with_batch_size(512)
.with_target_partitions(2)
.with_spill_compression(SpillCompression::Lz4) // быстрее Zstd
.with_repartition_joins(false) // экономим память
.with_repartition_windows(false);
let ctx = SessionContext::new_with_config_rt(config, runtime);
Best practices конфигурации
Антипаттерны
| Антипаттерн | Почему плохо | Что делать |
|---|---|---|
target_partitions = 128 на 8-ядерной машине | 128 задач конкурируют за 8 ядер, overhead на scheduling и память | target_partitions = num_cores или num_cores * 2 |
batch_size = 1 для «минимизации памяти» | Уничтожает SIMD-vectorization, каждый batch имеет fixed overhead | Минимум 256, рекомендуется 1024+ |
| Без MemoryPool в production | OOM вместо контролируемой ошибки | FairSpillPool + memory_limit < 80% доступной RAM |
DiskManagerConfig::Disabled при FairSpillPool | Spill невозможен — FairSpillPool вернёт ResourcesExhausted | Всегда включайте DiskManager с FairSpillPool |
| Пересоздание RuntimeEnv на каждый запрос | Потеря кешей ObjectStore, пересоздание пулов потоков | Один RuntimeEnv → много SessionContext |
Мониторинг конфигурации в runtime
// Прочитать текущие значения конфигурации
let config = ctx.copied_config();
println!("batch_size: {}", config.batch_size());
println!("target_partitions: {}", config.target_partitions());
// Через SQL
let df = ctx.sql("SHOW ALL LIKE '%batch%'").await?;
df.show().await?;
Cooperative cancellation — async-aware прерывание запросов (DataFusion 49+)
Long-running queries в production требуют способа надёжно отменить выполнение — по timeout, на запрос пользователя, при перегрузке. До 49.x cancellation в DataFusion работал через tokio::select! на верхнем уровне — асинхронные операторы реагировали на abort, но синхронные tight loops внутри RecordBatchStream::poll_next могли удерживать тред несколько секунд после “отмены”.
DataFusion 49+ ввёл cooperative cancellation — protocol, при котором операторы периодически проверяют флаг отмены и останавливаются на yield-points.
Проблема runaway queries
// Старый код (упрощённо): tight CPU loop без yield points
impl Stream for OldOperator {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<...> {
let batch = self.input.next(); // dependency
let result = expensive_compute(batch); // 30 секунд CPU work
Poll::Ready(Some(result))
}
}
При cancellation tokio пытается отменить future, но poll_next уже вошёл в expensive_compute и не возвращает control. Result — query “висит” на 30 секунд после abort, держит память и CPU.
EnsureCooperative wrapper
DataFusion 49+ добавил EnsureCooperativeExec — physical optimizer rule, которая wrapает операторы с потенциально long-running computations:
use datafusion::physical_optimizer::ensure_coop::EnsureCoopExec;
use datafusion::execution::TaskContext;
// Optimizer вставляет EnsureCoopExec вокруг "тяжёлых" операторов:
// AggregateExec / SortExec / WindowExec / HashJoinExec
//
// EnsureCoopExec оборачивает каждый poll_next вызов так,
// что после N strerstdiff'нных rows проверяется TaskBudget
// и task yields cooperatively.
Поведение:
Original plan: Optimized plan:
HashJoinExec EnsureCoopExec
└── input └── HashJoinExec
└── input
EnsureCoopExec мониторит сколько rows / времени потратил inner оператор и вставляет yield, давая tokio runtime возможность обработать cancellation signal.
Task budget API
DataFusion вводит Task Budget — структура, ограничивающая объём работы, который операция может выполнить без yield:
use datafusion::execution::task_budget::TaskBudget;
// Budget настраивается через SessionConfig:
let config = SessionConfig::new()
.with_task_budget_rows(8192) // yield после 8K rows
.with_task_budget_duration_ms(50); // или после 50ms — что раньше
let ctx = SessionContext::new_with_config(config);
Когда оператор обрабатывает данные:
impl AggregateExec {
async fn process_batch(&self, batch: RecordBatch, ctx: &TaskContext) -> Result<()> {
for row in batch.rows() {
self.aggregate(row);
// Проверка budget — если исчерпан, yield
if ctx.task_budget().consume(1).await {
tokio::task::yield_now().await; // дать runtime обработать abort
}
}
Ok(())
}
}
TaskBudget::consume(rows) returns true когда budget исчерпан — оператор yields, что даёт runtime возможность доставить abort signal.
Cancellation example
use datafusion::prelude::SessionContext;
use tokio::time::{timeout, Duration};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new();
ctx.register_parquet("events", "events_huge.parquet", Default::default()).await?;
// Запустить long-running query с timeout 5 секунд
let result = timeout(Duration::from_secs(5), async {
let df = ctx.sql("
SELECT user_id, COUNT(*), AVG(amount)
FROM events
GROUP BY user_id
ORDER BY 2 DESC
LIMIT 100
").await?;
df.collect().await
}).await;
match result {
Ok(Ok(_)) => println!("Query completed"),
Ok(Err(e)) => println!("Query error: {}", e),
Err(_) => println!("Query cancelled by timeout"), // в 49+ — actually cancels быстро
}
Ok(())
}
В DataFusion 49+ cancellation отрабатывает обычно в пределах сотен миллисекунд — task budget гарантирует regular yield-points. До 49 могло занимать секунды или весь backlog operator.
Применение для production
| Сценарий | Решение через cooperative cancellation |
|---|---|
| Query timeout (SLA) | tokio::time::timeout вокруг collect; гарантия abort в пределах task_budget_duration |
| User-initiated cancel | CancellationToken пробрасывается в TaskContext — checked в EnsureCoopExec |
| Memory pressure abort | MemoryPool возвращает ResourcesExhausted; runaway loop теперь не блокирует cleanup |
| Service shutdown | Graceful shutdown — runtime отменяет все queries, cooperative ensures fast termination |
Custom operators — добавление cooperative checks
Если вы пишете кастомный ExecutionPlan с tight CPU loop:
impl Stream for MyCustomOperator {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<...> {
let mut processed = 0;
loop {
let row = self.next_row();
// ... обработка ...
processed += 1;
// Cooperative check — каждые batch_size rows
if processed >= self.context.session_config().batch_size() {
// Yield: пересоздаём future, runtime может обработать cancellation
cx.waker().wake_by_ref();
return Poll::Pending;
}
}
}
}
Альтернатива — wrap кастомный оператор в EnsureCoopExec через physical optimizer rule.
Если у вас в production случаются “висящие” queries, которые не реагируют на abort signal — обновитесь до DataFusion 49+ и убедитесь, что EnsureCooperative rule включён (по умолчанию). Замерьте latency cancellation через tokio::time::timeout — до 49 могла быть секундная latency на абсолютно тяжёлых aggregations, в 49+ — миллисекунды.
Cite DataFusion 49 cooperative cancellation issue + tokio cooperative scheduling.
Итоги
- SessionConfig — «дешёвая» конфигурация, меняется на каждый запрос: batch_size, target_partitions, repartition-флаги
- RuntimeEnv — «дорогая» инфраструктура, создаётся один раз: MemoryPool, DiskManager, ObjectStore
- batch_size по умолчанию (8192) оптимален для большинства случаев — уменьшайте только для memory-constrained сред
- target_partitions должен соответствовать количеству ядер — больше не значит быстрее
- FairSpillPool + DiskManager + spill_compression — обязательная комбинация для production
- Следующий урок: партиционирование и параллелизм — как DataFusion распараллеливает работу