Learning Platform
Глоссарий Troubleshooting
Урок 13.02 · 15 мин
Продвинутый
Multi-TenancySessionContextCatalogProviderMemoryPoolResource IsolationQuery TimeoutTenant Management

Multi-tenant выполнение: изоляция сессий и ресурсов

В предыдущем уроке мы рассмотрели архитектуру embedded analytics engine. Но если сервис обслуживает несколько клиентов (tenants), появляется новая проблема: изоляция. Один tenant не должен видеть данные другого, один тяжёлый запрос не должен убивать производительность остальных. Этот урок показывает, как DataFusion решает каждый аспект мультитенантности.

Три уровня изоляции

Мультитенантность требует изоляции на нескольких уровнях:

Три уровня tenant-изоляции
КаталогКаждый tenant видит только свои таблицы — через отдельные каталоги
Механизм
Защищает от
ПамятьКаждый tenant имеет квоту памяти — через отдельные MemoryPool
Механизм
Защищает от
ВремяКаждый запрос имеет deadline — через tokio::time::timeout
Механизм
Защищает от

Изоляция каталога: per-tenant CatalogProvider

Самый фундаментальный уровень — каждый tenant видит только свои таблицы. DataFusion поддерживает кастомные каталоги через trait CatalogProvider.

Архитектура: один SessionContext на tenant

use datafusion::catalog::{CatalogProvider, SchemaProvider};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;

struct TenantManager {
    /// tenant_id → SessionContext с per-tenant каталогом
    contexts: RwLock<HashMap<String, SessionContext>>,
    /// Общий RuntimeEnv — чтобы не дублировать DiskManager
    shared_runtime: Arc<RuntimeEnv>,
}

impl TenantManager {
    async fn get_or_create(&self, tenant_id: &str) -> SessionContext {
        // Быстрый путь: контекст уже существует
        if let Some(ctx) = self.contexts.read().await.get(tenant_id) {
            return ctx.clone();
        }

        // Создаём новый контекст с per-tenant каталогом
        let mut write = self.contexts.write().await;
        // Double-check после получения write-lock
        if let Some(ctx) = write.get(tenant_id) {
            return ctx.clone();
        }

        let ctx = self.create_tenant_context(tenant_id).await;
        write.insert(tenant_id.to_string(), ctx.clone());
        ctx
    }

    async fn create_tenant_context(&self, tenant_id: &str) -> SessionContext {
        let config = SessionConfig::new()
            .with_target_partitions(4)  // Ограничиваем параллелизм на tenant
            .with_information_schema(true);

        let ctx = SessionContext::new_with_config_rt(
            config,
            self.shared_runtime.clone(),
        );

        // Регистрируем только таблицы этого tenant-а
        let tenant_path = format!("s3://data-lake/{}/", tenant_id);
        // ctx.register_listing_table(...)
        // Каждый tenant видит только свой prefix в object store

        ctx
    }
}
WARNING

Общий RuntimeEnv означает общий MemoryPool. Если нужна изоляция памяти — каждому tenant нужен свой RuntimeEnv с собственным пулом. Компромисс: общий runtime экономит ресурсы, отдельный runtime даёт гарантии. Выбор зависит от SLA.

Альтернатива: один SessionContext, per-tenant схемы

Если у всех tenants одинаковая структура таблиц (SaaS-паттерн), можно использовать один SessionContext с per-tenant схемами:

use datafusion::catalog::SchemaProvider;

struct TenantSchema {
    tenant_id: String,
    tables: HashMap<String, Arc<dyn TableProvider>>,
}

#[async_trait]
impl SchemaProvider for TenantSchema {
    fn table_names(&self) -> Vec<String> {
        self.tables.keys().cloned().collect()
    }

    async fn table(
        &self,
        name: &str,
    ) -> Result<Option<Arc<dyn TableProvider>>> {
        Ok(self.tables.get(name).cloned())
    }

    fn table_exist(&self, name: &str) -> bool {
        self.tables.contains_key(name)
    }
}

// Регистрация: каждый tenant — отдельная schema
// ctx.catalog("datafusion").register_schema("tenant_abc", tenant_schema);
// SQL: SELECT * FROM tenant_abc.orders
Один контекст, per-tenant схемы
SessionContextОдин SessionContext с общей конфигурацией
Catalog: datafusiondefault catalog, содержит per-tenant schema
Schema: tenant_abcКаждая schema = один tenant
Schema: tenant_xyzКаждая schema = один tenant
TIP

Per-schema подход проще, но не изолирует ресурсы. Tenant может выполнить SELECT * FROM information_schema.tables и увидеть имена чужих схем. Для строгой изоляции — используйте per-tenant SessionContext.

Изоляция памяти: per-tenant MemoryPool

В уроке по управлению памятью мы изучили FairSpillPool — он делит память поровну между запросами. Но между tenants нужна другая стратегия: один enterprise tenant может иметь квоту 16 GB, а free-tier — 512 MB.

Кастомный TenantMemoryPool

use datafusion::execution::memory_pool::{
    MemoryConsumer, MemoryPool, MemoryReservation,
};
use std::sync::atomic::{AtomicUsize, Ordering};

/// Пул памяти с per-tenant лимитом.
/// Оборачивает FairSpillPool для spill-поддержки внутри tenant-а.
struct TenantMemoryPool {
    tenant_id: String,
    limit: usize,
    used: AtomicUsize,
    inner: Arc<FairSpillPool>,
}

impl TenantMemoryPool {
    fn new(tenant_id: &str, limit: usize) -> Self {
        Self {
            tenant_id: tenant_id.to_string(),
            limit,
            used: AtomicUsize::new(0),
            inner: Arc::new(FairSpillPool::new(limit)),
        }
    }
}

impl MemoryPool for TenantMemoryPool {
    fn register(&self, consumer: &MemoryConsumer) {
        self.inner.register(consumer);
    }

    fn unregister(&self, consumer: &MemoryConsumer) {
        self.inner.unregister(consumer);
    }

    fn grow(
        &self,
        reservation: &MemoryReservation,
        additional: usize,
    ) {
        // Проверяем per-tenant лимит
        let current = self.used.load(Ordering::Relaxed);
        if current + additional > self.limit {
            return Err(DataFusionError::ResourcesExhausted(
                format!(
                    "Tenant '{}' превысил квоту памяти: {} / {} bytes",
                    self.tenant_id, current + additional, self.limit
                ),
            ));
        }

        // Делегируем внутреннему пулу (fair distribution между запросами)
        self.inner.grow(reservation, additional)?;
        self.used.fetch_add(additional, Ordering::Relaxed);
        Ok(())
    }

    fn shrink(
        &self,
        reservation: &MemoryReservation,
        shrink: usize,
    ) {
        self.inner.shrink(reservation, shrink);
        self.used.fetch_sub(shrink, Ordering::Relaxed);
    }

    fn reserved(&self) -> usize {
        self.used.load(Ordering::Relaxed)
    }
}

Конфигурация по tier-ам

struct TenantConfig {
    memory_limit: usize,
    max_partitions: usize,
    query_timeout: Duration,
    max_concurrent_queries: usize,
}

fn tier_config(tier: &str) -> TenantConfig {
    match tier {
        "enterprise" => TenantConfig {
            memory_limit: 16_000_000_000,   // 16 GB
            max_partitions: 32,
            query_timeout: Duration::from_secs(600),
            max_concurrent_queries: 50,
        },
        "pro" => TenantConfig {
            memory_limit: 4_000_000_000,    // 4 GB
            max_partitions: 8,
            query_timeout: Duration::from_secs(120),
            max_concurrent_queries: 10,
        },
        _ => TenantConfig {  // free tier
            memory_limit: 512_000_000,       // 512 MB
            max_partitions: 2,
            query_timeout: Duration::from_secs(30),
            max_concurrent_queries: 2,
        },
    }
}

Таймауты запросов

Без таймаутов один tenant может выполнить бесконечный запрос (CROSS JOIN без условий) и заблокировать ресурсы навсегда.

Timeout через tokio::time

use tokio::time::{timeout, Duration};

async fn execute_with_timeout(
    ctx: &SessionContext,
    sql: &str,
    max_duration: Duration,
) -> Result<Vec<RecordBatch>, AppError> {
    let df = ctx.sql(sql).await?;

    match timeout(max_duration, df.collect()).await {
        Ok(Ok(batches)) => Ok(batches),
        Ok(Err(e)) => Err(AppError::QueryFailed(e.to_string())),
        Err(_) => Err(AppError::QueryTimeout(format!(
            "Запрос превысил таймаут {} сек",
            max_duration.as_secs()
        ))),
    }
}
WARNING

tokio::time::timeout отменяет future, но DataFusion может не немедленно освободить память — buffering-операторы (SortExec, HashJoinExec) освобождают MemoryReservation при drop. Убедитесь, что все промежуточные структуры корректно drop-ятся при отмене запроса.

Concurrent query limiting

Ограничение числа одновременных запросов на tenant — через семафор:

use tokio::sync::Semaphore;

struct TenantRuntime {
    ctx: SessionContext,
    query_semaphore: Semaphore,
    config: TenantConfig,
}

impl TenantRuntime {
    async fn execute(&self, sql: &str) -> Result<Vec<RecordBatch>, AppError> {
        // Ограничиваем concurrent запросы
        let _permit = self.query_semaphore.acquire().await
            .map_err(|_| AppError::ServiceShuttingDown)?;

        // Таймаут на выполнение
        execute_with_timeout(
            &self.ctx,
            sql,
            self.config.query_timeout,
        ).await
    }
}

Мониторинг per-tenant потребления

Для billing и alerting нужно отслеживать метрики по каждому tenant-у:

use std::sync::atomic::{AtomicU64, Ordering};

struct TenantMetrics {
    queries_total: AtomicU64,
    queries_failed: AtomicU64,
    bytes_scanned: AtomicU64,
    peak_memory: AtomicU64,
    total_query_time_ms: AtomicU64,
}

impl TenantMetrics {
    fn record_query(&self, bytes: u64, duration_ms: u64, success: bool) {
        self.queries_total.fetch_add(1, Ordering::Relaxed);
        self.bytes_scanned.fetch_add(bytes, Ordering::Relaxed);
        self.total_query_time_ms.fetch_add(duration_ms, Ordering::Relaxed);
        if !success {
            self.queries_failed.fetch_add(1, Ordering::Relaxed);
        }
    }

    fn update_peak_memory(&self, current: u64) {
        self.peak_memory.fetch_max(current, Ordering::Relaxed);
    }
}
TIP

Экспортируйте tenant-метрики через Prometheus с label tenant_id. Это позволяет строить per-tenant dashboards и алерты: “Tenant X использует 90% квоты памяти”, “Tenant Y имеет 30% failed queries”.

Паттерны мультитенантности: сравнение

ПаттернИзоляцияOverheadКогда использовать
Per-tenant SessionContext + RuntimeEnvПолнаяВысокий: отдельный пул, каталогEnterprise SaaS с SLA
Per-tenant SessionContext, shared RuntimeEnvКаталогСредний: отдельный каталогB2B с shared infrastructure
Shared SessionContext, per-tenant schemaМинимальнаяНизкий: один контекстInternal multi-team analytics
Row-level security (фильтр по tenant_id)ЛогическаяМинимальныйПрототипы, low-security
DANGER

Row-level security (WHERE tenant_id = ?) без per-tenant каталога — слабая изоляция. Ошибка в фильтре = утечка данных. Используйте только для внутренних инструментов, не для production SaaS.

Резюме

  • Мультитенантность требует изоляции на трёх уровнях: каталог (видимость таблиц), память (ресурсные квоты), время (таймауты)
  • Per-tenant SessionContext — самая надёжная изоляция каталога; SessionContext::clone() экономит ресурсы для запросов внутри одного tenant-а
  • Per-tenant MemoryPool через обёртку FairSpillPool — изолирует потребление и позволяет tier-based квоты
  • tokio::time::timeout + Semaphore — контроль длительности и concurrency запросов
  • Метрики per-tenant (queries, bytes_scanned, peak_memory) — основа для billing и alerting

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какие три уровня изоляции нужны для мультитенантного DataFusion-сервиса?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4