Learning Platform
Глоссарий Troubleshooting
Урок 03.07 · 14 мин
Продвинутый
MemoryPoolMemoryReservationMemoryConsumerGreedyMemoryPoolFairSpillPoolDiskManagerSpill-to-DiskTrackConsumersPool

Управление памятью в DataFusion

В уроке 05 мы видели, что RuntimeEnv принимает пул памяти. Теперь разберём подсистему управления памятью целиком: какие операторы регистрируются в пуле, как работает spill-to-disk, и как мониторить потребление.

Философия: streaming vs buffering

DataFusion делит операторы на две категории по потреблению памяти:

Категории операторов по потреблению памяти
Streaming-операторыПотребляют O(1) памяти — обрабатывают по одному batch за раз, не накапливают данные
Примеры
Память
Buffering-операторыПотребляют O(N) памяти — накапливают данные перед выдачей результата
Примеры
Память

Это прагматичный подход: отслеживать каждый байт дорого, а streaming-операторы потребляют фиксированное количество памяти (один-два batch). Нет смысла добавлять overhead учёта для них.

TIP

Планируя memory_limit, зарезервируйте ~10% сверх расчётного — на неотслеживаемые аллокации streaming-операторов, буферы Arrow и служебные структуры DataFusion.

MemoryPool trait

Все buffering-операторы взаимодействуют с памятью через единый trait:

pub trait MemoryPool: Send + Sync + std::fmt::Debug {
    /// Регистрирует новый MemoryConsumer
    fn register(&self, consumer: &MemoryConsumer);

    /// Отменяет регистрацию consumer-а
    fn unregister(&self, consumer: &MemoryConsumer);

    /// Увеличивает резервацию. Возвращает ошибку при нехватке памяти.
    fn grow(&self, reservation: &MemoryReservation, additional: usize);

    /// Уменьшает резервацию, возвращая память в пул
    fn shrink(&self, reservation: &MemoryReservation, shrink: usize);

    /// Пытается увеличить резервацию без ошибки — возвращает фактически выделенное
    fn try_grow(&self, reservation: &MemoryReservation, additional: usize)
        -> Result<usize>;

    /// Суммарное зарезервированное количество байт
    fn reserved(&self) -> usize;
}

Операторы не вызывают grow/shrink напрямую — они работают через MemoryReservation.

MemoryReservation и MemoryConsumer

MemoryConsumer — именованная сущность, регистрирующаяся в пуле (например, “HashJoinStream[0]”). MemoryReservation — RAII-обёртка, которая при drop() автоматически освобождает память:

MemoryConsumer → MemoryReservation → MemoryPool
Оператор (SortExec)Физический оператор — SortExec, HashJoinExec, AggregateExec
создаёт
MemoryConsumer(“SortExec[partition=2]“)Именованный потребитель: содержит имя и ссылку на пул
register() → reservation
MemoryReservation { size: 0 }RAII-обёртка: grow()/shrink() при накоплении данных, drop() возвращает всё в пул
grow() / shrink()
MemoryPool (FairSpillPool / GreedyMemoryPool)Глобальный пул: отслеживает суммарное потребление, применяет лимит
use datafusion::execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};

// Оператор создаёт consumer
let consumer = MemoryConsumer::new("MyOperator[partition=0]");

// Регистрирует и получает резервацию
let mut reservation = consumer.register(pool.as_ref());

// При получении каждого batch — grow
reservation.try_grow(batch_size_in_bytes)?;

// При обработке batch — shrink
reservation.shrink(freed_bytes);

// При drop() MemoryReservation — вся оставшаяся память возвращается в пул
drop(reservation);
WARNING

Не вызывайте grow() если не готовы к ошибке ResourcesExhausted. Для graceful degradation используйте try_grow() — он возвращает Result, позволяя оператору решить что делать (spill на диск, уменьшить batch size, вернуть частичный результат).

Встроенные пулы

DataFusion предоставляет четыре реализации MemoryPool:

UnboundedMemoryPool

Пул по умолчанию — не ограничивает память. Все вызовы grow() успешны. Используется, когда контроль памяти не нужен (разработка, тесты):

use datafusion::execution::memory_pool::UnboundedMemoryPool;

let pool = Arc::new(UnboundedMemoryPool::default());

GreedyMemoryPool

First-come-first-serve: первый оператор, запросивший память, получает всё до лимита. Остальные получают ResourcesExhausted:

use datafusion::execution::memory_pool::GreedyMemoryPool;

// Лимит 2 GB
let pool = Arc::new(GreedyMemoryPool::new(2 * 1024 * 1024 * 1024));

Характеристики:

  • Простая реализация, минимальный overhead
  • Не поддерживает spill: при превышении лимита возвращает ошибку, а не сигнал к spill
  • Подходит для workloads с предсказуемым потреблением

FairSpillPool

Равномерное распределение: делит доступную память между всеми активными spillable-резервациями. Когда одна резервация запрашивает слишком много, пул сигнализирует другим spillable-резервациям освободить память:

use datafusion::execution::memory_pool::FairSpillPool;

// Лимит 4 GB
let pool = Arc::new(FairSpillPool::new(4 * 1024 * 1024 * 1024));

Характеристики:

  • Поддерживает spill-to-disk — операторы с поддержкой spill получают сигнал сбросить данные на диск
  • Равномерное распределение предотвращает ситуацию, когда один тяжёлый оператор захватывает всю память
  • Рекомендуется для продакшен-нагрузок с несколькими concurrent запросами

TrackConsumersPool

Не самостоятельный пул, а обёртка над любым другим пулом, добавляющая мониторинг. Об этом — в разделе о мониторинге ниже.

Сравнение пулов

СвойствоUnboundedMemoryPoolGreedyMemoryPoolFairSpillPool
Лимит памятиНетДаДа
Поддержка spillНетНетДа
Стратегия при нехваткеОшибкаSpill + равномерное перераспределение
OverheadНулевойМинимальныйУмеренный
ПродакшенПростые случаи✅ Рекомендуется
NOTE

FairSpillPool делит память между spillable-резервациями. Если оператор не поддерживает spill (CrossJoin, NestedLoopJoin), его резервация считается non-spillable — пул не будет просить его сбросить данные на диск.

Spill-to-disk

Когда FairSpillPool сигнализирует оператору сбросить данные, оператор использует DiskManager для создания временных файлов:

Механизм spill-to-disk
FairSpillPool: reservation.grow() → need_spillFairSpillPool определяет, что consumer превысил fair-share
сигнал к spill
Оператор: serialize batches → Arrow IPCОператор сериализует накопленные данные в Arrow IPC формат
create_tmp_file()
DiskManager → RefCountedTempFileDiskManager создаёт временный файл, RefCountedTempFile удалит его при drop
write + shrink reservation
Данные на диске (Arrow IPC), память возвращена в пулДанные на диске, память освобождена. При необходимости — read back

Какие операторы поддерживают spill

ОператорSpillПримечание
SortExecВнешняя сортировка: сортирует chunks, сбрасывает на диск, merge при чтении
AggregateExec (hash)Сбрасывает hash-таблицу, перечитывает при merge
HashJoinExecСбрасывает build-сторону
SymmetricHashJoinExecСбрасывает обе стороны
CrossJoinExecНет поддержки spill — при нехватке памяти возвращает ошибку
NestedLoopJoinExecНет поддержки spill — при нехватке памяти возвращает ошибку
DANGER

CrossJoin и NestedLoopJoin не поддерживают spill. Если данные не помещаются в память — запрос завершится с ошибкой ResourcesExhausted. Для больших данных замените CROSS JOIN на INNER JOIN с условием, или увеличьте memory_limit.

Формат spill-файлов

Данные сбрасываются в формате Arrow IPC (Feather v2). Это нативный бинарный формат Arrow — десериализация не требует конвертации типов, что делает read-back быстрым. DiskManager создаёт файлы через RefCountedTempFile, который автоматически удаляет файл при drop().

Конфигурация

Через RuntimeEnvBuilder (Rust API)

use std::sync::Arc;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::prelude::SessionContext;

// Пул с лимитом 4 GB + DiskManager для spill
let runtime_env = RuntimeEnvBuilder::new()
    .with_memory_pool(Arc::new(FairSpillPool::new(4_000_000_000)))
    .with_disk_manager(DiskManagerConfig::NewOs)  // OS temp directory
    .build_arc()?;

let ctx = SessionContext::new_with_config_rt(
    Default::default(),
    runtime_env,
);

DiskManagerConfig варианты:

  • NewOs — временные файлы в системной temp-директории
  • NewSpecified(paths) — конкретные директории для spill (полезно для выделенного SSD)
  • Disabled — spill запрещён, при нехватке памяти всегда ошибка

Через SQL SET

-- Установить лимит памяти для текущей сессии
SET datafusion.execution.memory_limit = '4GB';

-- Настроить количество партиций (влияет на распределение памяти в FairSpillPool)
SET datafusion.execution.target_partitions = 4;

-- Уменьшить batch size для снижения пикового потребления
SET datafusion.execution.batch_size = 1024;
TIP

При использовании FairSpillPool, память делится между партициями. Чем больше target_partitions, тем меньше памяти на каждую партицию, и тем чаще будет происходить spill. Найдите баланс: больше партиций = больше параллелизма, но меньше памяти на каждую.

Мониторинг: TrackConsumersPool

TrackConsumersPool — обёртка, которая записывает, какой consumer сколько памяти потребляет. При ошибке ResourcesExhausted включает в сообщение об ошибке информацию о top-потребителях:

use std::sync::Arc;
use datafusion::execution::memory_pool::{FairSpillPool, TrackConsumersPool};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;

let inner_pool = FairSpillPool::new(4_000_000_000);
let tracking_pool = TrackConsumersPool::new(inner_pool, /* top_n */ 5);

let runtime_env = RuntimeEnvBuilder::new()
    .with_memory_pool(Arc::new(tracking_pool))
    .build_arc()?;

Когда запрос превышает лимит, ошибка содержит диагностику:

Resources exhausted: Failed to allocate 256.0 MB for HashJoinStream[1].
Top memory consumers:
  1. HashJoinStream[0]: 1.2 GB
  2. SortExec[partition=3]: 800 MB
  3. AggregateExec[partition=0]: 600 MB
  4. HashJoinStream[1]: 400 MB
  5. SortExec[partition=1]: 350 MB

Это ключевой инструмент диагностики: вместо загадочной “out of memory” вы видите, какой оператор и какая партиция потребляет больше всего.

Реализация кастомного MemoryPool

Для нестандартных требований (внешний оркестратор, per-query лимиты, метрики в Prometheus) можно реализовать свой пул:

use datafusion::execution::memory_pool::{
    MemoryPool, MemoryConsumer, MemoryReservation,
};
use std::sync::atomic::{AtomicUsize, Ordering};

#[derive(Debug)]
struct MetricsPool {
    limit: usize,
    used: AtomicUsize,
}

impl MetricsPool {
    fn new(limit: usize) -> Self {
        Self {
            limit,
            used: AtomicUsize::new(0),
        }
    }
}

impl MemoryPool for MetricsPool {
    fn register(&self, _consumer: &MemoryConsumer) {
        // Можно отправить метрику о новом consumer
    }

    fn unregister(&self, _consumer: &MemoryConsumer) {
        // Можно отправить метрику об удалении consumer
    }

    fn grow(
        &self,
        _reservation: &MemoryReservation,
        additional: usize,
    ) {
        let new_total = self.used.fetch_add(additional, Ordering::Relaxed)
            + additional;
        if new_total > self.limit {
            self.used.fetch_sub(additional, Ordering::Relaxed);
            // В реальном коде — вернуть DataFusionError::ResourcesExhausted
        }
        // Отправить метрику: gauge "datafusion_memory_used" = new_total
    }

    fn shrink(
        &self,
        _reservation: &MemoryReservation,
        shrink: usize,
    ) {
        self.used.fetch_sub(shrink, Ordering::Relaxed);
    }

    fn try_grow(
        &self,
        reservation: &MemoryReservation,
        additional: usize,
    ) -> datafusion::common::Result<usize> {
        let current = self.used.load(Ordering::Relaxed);
        let available = self.limit.saturating_sub(current);
        let granted = additional.min(available);
        if granted > 0 {
            self.grow(reservation, granted);
        }
        Ok(granted)
    }

    fn reserved(&self) -> usize {
        self.used.load(Ordering::Relaxed)
    }
}
NOTE

В production-реализации кастомного пула обязательно обрабатывайте race conditions при concurrent grow/shrink — используйте compare_exchange вместо fetch_add, если нужна точная проверка лимита.

Практические рекомендации

Выбор пула

  1. Разработка и тестыUnboundedMemoryPool (по умолчанию)
  2. Продакшен, один запросGreedyMemoryPool — простой лимит
  3. Продакшен, concurrent запросыFairSpillPool — справедливое распределение + spill
  4. ДиагностикаTrackConsumersPool обёртка вокруг любого пула

Расчёт memory_limit

memory_limit = available_ram
    - OS overhead (~1-2 GB)
    - other processes
    - 10% reserve (untracked DataFusion allocations)

Для сервера с 32 GB RAM и dedicated DataFusion:

memory_limit = 32 GB - 2 GB (OS) - 3 GB (10% reserve) ≈ 27 GB

Оптимизация spill-производительности

  • Используйте SSD для директории spill — Arrow IPC файлы генерируют значительный I/O
  • Настройте DiskManagerConfig::NewSpecified с путём к быстрому диску
  • Уменьшите target_partitions если spill происходит слишком часто
  • Увеличьте batch_size если spill слишком частый и мелкий (больше данных за один spill)

Итоги

  • Streaming-операторы (Filter, Projection) не регистрируются в MemoryPool — их потребление фиксировано
  • Buffering-операторы (Sort, HashJoin, Aggregate) отслеживают память через MemoryConsumer → MemoryReservation
  • GreedyMemoryPool — простой лимит, FairSpillPool — равномерное распределение + spill-to-disk
  • CrossJoin и NestedLoopJoin не поддерживают spill — при нехватке памяти ошибка
  • DiskManager сбрасывает данные в Arrow IPC, файлы удаляются автоматически
  • TrackConsumersPool — обёртка для диагностики top-потребителей памяти
  • Зарезервируйте ~10% памяти сверх лимита на неотслеживаемые аллокации

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. DataFusion делит операторы на streaming и buffering. Какие из этих операторов НЕ регистрируются в MemoryPool?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7