Управление памятью в DataFusion
В уроке 05 мы видели, что RuntimeEnv принимает пул памяти. Теперь разберём подсистему управления памятью целиком: какие операторы регистрируются в пуле, как работает spill-to-disk, и как мониторить потребление.
Философия: streaming vs buffering
DataFusion делит операторы на две категории по потреблению памяти:
Это прагматичный подход: отслеживать каждый байт дорого, а streaming-операторы потребляют фиксированное количество памяти (один-два batch). Нет смысла добавлять overhead учёта для них.
Планируя memory_limit, зарезервируйте ~10% сверх расчётного — на неотслеживаемые аллокации streaming-операторов, буферы Arrow и служебные структуры DataFusion.
MemoryPool trait
Все buffering-операторы взаимодействуют с памятью через единый trait:
pub trait MemoryPool: Send + Sync + std::fmt::Debug {
/// Регистрирует новый MemoryConsumer
fn register(&self, consumer: &MemoryConsumer);
/// Отменяет регистрацию consumer-а
fn unregister(&self, consumer: &MemoryConsumer);
/// Увеличивает резервацию. Возвращает ошибку при нехватке памяти.
fn grow(&self, reservation: &MemoryReservation, additional: usize);
/// Уменьшает резервацию, возвращая память в пул
fn shrink(&self, reservation: &MemoryReservation, shrink: usize);
/// Пытается увеличить резервацию без ошибки — возвращает фактически выделенное
fn try_grow(&self, reservation: &MemoryReservation, additional: usize)
-> Result<usize>;
/// Суммарное зарезервированное количество байт
fn reserved(&self) -> usize;
}
Операторы не вызывают grow/shrink напрямую — они работают через MemoryReservation.
MemoryReservation и MemoryConsumer
MemoryConsumer — именованная сущность, регистрирующаяся в пуле (например, “HashJoinStream[0]”). MemoryReservation — RAII-обёртка, которая при drop() автоматически освобождает память:
use datafusion::execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
// Оператор создаёт consumer
let consumer = MemoryConsumer::new("MyOperator[partition=0]");
// Регистрирует и получает резервацию
let mut reservation = consumer.register(pool.as_ref());
// При получении каждого batch — grow
reservation.try_grow(batch_size_in_bytes)?;
// При обработке batch — shrink
reservation.shrink(freed_bytes);
// При drop() MemoryReservation — вся оставшаяся память возвращается в пул
drop(reservation);
Не вызывайте grow() если не готовы к ошибке ResourcesExhausted. Для graceful degradation используйте try_grow() — он возвращает Result, позволяя оператору решить что делать (spill на диск, уменьшить batch size, вернуть частичный результат).
Встроенные пулы
DataFusion предоставляет четыре реализации MemoryPool:
UnboundedMemoryPool
Пул по умолчанию — не ограничивает память. Все вызовы grow() успешны. Используется, когда контроль памяти не нужен (разработка, тесты):
use datafusion::execution::memory_pool::UnboundedMemoryPool;
let pool = Arc::new(UnboundedMemoryPool::default());
GreedyMemoryPool
First-come-first-serve: первый оператор, запросивший память, получает всё до лимита. Остальные получают ResourcesExhausted:
use datafusion::execution::memory_pool::GreedyMemoryPool;
// Лимит 2 GB
let pool = Arc::new(GreedyMemoryPool::new(2 * 1024 * 1024 * 1024));
Характеристики:
- Простая реализация, минимальный overhead
- Не поддерживает spill: при превышении лимита возвращает ошибку, а не сигнал к spill
- Подходит для workloads с предсказуемым потреблением
FairSpillPool
Равномерное распределение: делит доступную память между всеми активными spillable-резервациями. Когда одна резервация запрашивает слишком много, пул сигнализирует другим spillable-резервациям освободить память:
use datafusion::execution::memory_pool::FairSpillPool;
// Лимит 4 GB
let pool = Arc::new(FairSpillPool::new(4 * 1024 * 1024 * 1024));
Характеристики:
- Поддерживает spill-to-disk — операторы с поддержкой spill получают сигнал сбросить данные на диск
- Равномерное распределение предотвращает ситуацию, когда один тяжёлый оператор захватывает всю память
- Рекомендуется для продакшен-нагрузок с несколькими concurrent запросами
TrackConsumersPool
Не самостоятельный пул, а обёртка над любым другим пулом, добавляющая мониторинг. Об этом — в разделе о мониторинге ниже.
Сравнение пулов
| Свойство | UnboundedMemoryPool | GreedyMemoryPool | FairSpillPool |
|---|---|---|---|
| Лимит памяти | Нет | Да | Да |
| Поддержка spill | Нет | Нет | Да |
| Стратегия при нехватке | — | Ошибка | Spill + равномерное перераспределение |
| Overhead | Нулевой | Минимальный | Умеренный |
| Продакшен | ❌ | Простые случаи | ✅ Рекомендуется |
FairSpillPool делит память между spillable-резервациями. Если оператор не поддерживает spill (CrossJoin, NestedLoopJoin), его резервация считается non-spillable — пул не будет просить его сбросить данные на диск.
Spill-to-disk
Когда FairSpillPool сигнализирует оператору сбросить данные, оператор использует DiskManager для создания временных файлов:
Какие операторы поддерживают spill
| Оператор | Spill | Примечание |
|---|---|---|
| SortExec | ✅ | Внешняя сортировка: сортирует chunks, сбрасывает на диск, merge при чтении |
| AggregateExec (hash) | ✅ | Сбрасывает hash-таблицу, перечитывает при merge |
| HashJoinExec | ✅ | Сбрасывает build-сторону |
| SymmetricHashJoinExec | ✅ | Сбрасывает обе стороны |
| CrossJoinExec | ❌ | Нет поддержки spill — при нехватке памяти возвращает ошибку |
| NestedLoopJoinExec | ❌ | Нет поддержки spill — при нехватке памяти возвращает ошибку |
CrossJoin и NestedLoopJoin не поддерживают spill. Если данные не помещаются в память — запрос завершится с ошибкой ResourcesExhausted. Для больших данных замените CROSS JOIN на INNER JOIN с условием, или увеличьте memory_limit.
Формат spill-файлов
Данные сбрасываются в формате Arrow IPC (Feather v2). Это нативный бинарный формат Arrow — десериализация не требует конвертации типов, что делает read-back быстрым. DiskManager создаёт файлы через RefCountedTempFile, который автоматически удаляет файл при drop().
Конфигурация
Через RuntimeEnvBuilder (Rust API)
use std::sync::Arc;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::prelude::SessionContext;
// Пул с лимитом 4 GB + DiskManager для spill
let runtime_env = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(4_000_000_000)))
.with_disk_manager(DiskManagerConfig::NewOs) // OS temp directory
.build_arc()?;
let ctx = SessionContext::new_with_config_rt(
Default::default(),
runtime_env,
);
DiskManagerConfig варианты:
NewOs— временные файлы в системной temp-директорииNewSpecified(paths)— конкретные директории для spill (полезно для выделенного SSD)Disabled— spill запрещён, при нехватке памяти всегда ошибка
Через SQL SET
-- Установить лимит памяти для текущей сессии
SET datafusion.execution.memory_limit = '4GB';
-- Настроить количество партиций (влияет на распределение памяти в FairSpillPool)
SET datafusion.execution.target_partitions = 4;
-- Уменьшить batch size для снижения пикового потребления
SET datafusion.execution.batch_size = 1024;
При использовании FairSpillPool, память делится между партициями. Чем больше target_partitions, тем меньше памяти на каждую партицию, и тем чаще будет происходить spill. Найдите баланс: больше партиций = больше параллелизма, но меньше памяти на каждую.
Мониторинг: TrackConsumersPool
TrackConsumersPool — обёртка, которая записывает, какой consumer сколько памяти потребляет. При ошибке ResourcesExhausted включает в сообщение об ошибке информацию о top-потребителях:
use std::sync::Arc;
use datafusion::execution::memory_pool::{FairSpillPool, TrackConsumersPool};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
let inner_pool = FairSpillPool::new(4_000_000_000);
let tracking_pool = TrackConsumersPool::new(inner_pool, /* top_n */ 5);
let runtime_env = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(tracking_pool))
.build_arc()?;
Когда запрос превышает лимит, ошибка содержит диагностику:
Resources exhausted: Failed to allocate 256.0 MB for HashJoinStream[1].
Top memory consumers:
1. HashJoinStream[0]: 1.2 GB
2. SortExec[partition=3]: 800 MB
3. AggregateExec[partition=0]: 600 MB
4. HashJoinStream[1]: 400 MB
5. SortExec[partition=1]: 350 MB
Это ключевой инструмент диагностики: вместо загадочной “out of memory” вы видите, какой оператор и какая партиция потребляет больше всего.
Реализация кастомного MemoryPool
Для нестандартных требований (внешний оркестратор, per-query лимиты, метрики в Prometheus) можно реализовать свой пул:
use datafusion::execution::memory_pool::{
MemoryPool, MemoryConsumer, MemoryReservation,
};
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
struct MetricsPool {
limit: usize,
used: AtomicUsize,
}
impl MetricsPool {
fn new(limit: usize) -> Self {
Self {
limit,
used: AtomicUsize::new(0),
}
}
}
impl MemoryPool for MetricsPool {
fn register(&self, _consumer: &MemoryConsumer) {
// Можно отправить метрику о новом consumer
}
fn unregister(&self, _consumer: &MemoryConsumer) {
// Можно отправить метрику об удалении consumer
}
fn grow(
&self,
_reservation: &MemoryReservation,
additional: usize,
) {
let new_total = self.used.fetch_add(additional, Ordering::Relaxed)
+ additional;
if new_total > self.limit {
self.used.fetch_sub(additional, Ordering::Relaxed);
// В реальном коде — вернуть DataFusionError::ResourcesExhausted
}
// Отправить метрику: gauge "datafusion_memory_used" = new_total
}
fn shrink(
&self,
_reservation: &MemoryReservation,
shrink: usize,
) {
self.used.fetch_sub(shrink, Ordering::Relaxed);
}
fn try_grow(
&self,
reservation: &MemoryReservation,
additional: usize,
) -> datafusion::common::Result<usize> {
let current = self.used.load(Ordering::Relaxed);
let available = self.limit.saturating_sub(current);
let granted = additional.min(available);
if granted > 0 {
self.grow(reservation, granted);
}
Ok(granted)
}
fn reserved(&self) -> usize {
self.used.load(Ordering::Relaxed)
}
}
В production-реализации кастомного пула обязательно обрабатывайте race conditions при concurrent grow/shrink — используйте compare_exchange вместо fetch_add, если нужна точная проверка лимита.
Практические рекомендации
Выбор пула
- Разработка и тесты →
UnboundedMemoryPool(по умолчанию) - Продакшен, один запрос →
GreedyMemoryPool— простой лимит - Продакшен, concurrent запросы →
FairSpillPool— справедливое распределение + spill - Диагностика →
TrackConsumersPoolобёртка вокруг любого пула
Расчёт memory_limit
memory_limit = available_ram
- OS overhead (~1-2 GB)
- other processes
- 10% reserve (untracked DataFusion allocations)
Для сервера с 32 GB RAM и dedicated DataFusion:
memory_limit = 32 GB - 2 GB (OS) - 3 GB (10% reserve) ≈ 27 GB
Оптимизация spill-производительности
- Используйте SSD для директории spill — Arrow IPC файлы генерируют значительный I/O
- Настройте
DiskManagerConfig::NewSpecifiedс путём к быстрому диску - Уменьшите
target_partitionsесли spill происходит слишком часто - Увеличьте
batch_sizeесли spill слишком частый и мелкий (больше данных за один spill)
Итоги
- Streaming-операторы (Filter, Projection) не регистрируются в MemoryPool — их потребление фиксировано
- Buffering-операторы (Sort, HashJoin, Aggregate) отслеживают память через MemoryConsumer → MemoryReservation
- GreedyMemoryPool — простой лимит, FairSpillPool — равномерное распределение + spill-to-disk
- CrossJoin и NestedLoopJoin не поддерживают spill — при нехватке памяти ошибка
- DiskManager сбрасывает данные в Arrow IPC, файлы удаляются автоматически
- TrackConsumersPool — обёртка для диагностики top-потребителей памяти
- Зарезервируйте ~10% памяти сверх лимита на неотслеживаемые аллокации