Multi-tenant выполнение: изоляция сессий и ресурсов
В предыдущем уроке мы рассмотрели архитектуру embedded analytics engine. Но если сервис обслуживает несколько клиентов (tenants), появляется новая проблема: изоляция. Один tenant не должен видеть данные другого, один тяжёлый запрос не должен убивать производительность остальных. Этот урок показывает, как DataFusion решает каждый аспект мультитенантности.
Три уровня изоляции
Мультитенантность требует изоляции на нескольких уровнях:
Изоляция каталога: per-tenant CatalogProvider
Самый фундаментальный уровень — каждый tenant видит только свои таблицы. DataFusion поддерживает кастомные каталоги через trait CatalogProvider.
Архитектура: один SessionContext на tenant
use datafusion::catalog::{CatalogProvider, SchemaProvider};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
struct TenantManager {
/// tenant_id → SessionContext с per-tenant каталогом
contexts: RwLock<HashMap<String, SessionContext>>,
/// Общий RuntimeEnv — чтобы не дублировать DiskManager
shared_runtime: Arc<RuntimeEnv>,
}
impl TenantManager {
async fn get_or_create(&self, tenant_id: &str) -> SessionContext {
// Быстрый путь: контекст уже существует
if let Some(ctx) = self.contexts.read().await.get(tenant_id) {
return ctx.clone();
}
// Создаём новый контекст с per-tenant каталогом
let mut write = self.contexts.write().await;
// Double-check после получения write-lock
if let Some(ctx) = write.get(tenant_id) {
return ctx.clone();
}
let ctx = self.create_tenant_context(tenant_id).await;
write.insert(tenant_id.to_string(), ctx.clone());
ctx
}
async fn create_tenant_context(&self, tenant_id: &str) -> SessionContext {
let config = SessionConfig::new()
.with_target_partitions(4) // Ограничиваем параллелизм на tenant
.with_information_schema(true);
let ctx = SessionContext::new_with_config_rt(
config,
self.shared_runtime.clone(),
);
// Регистрируем только таблицы этого tenant-а
let tenant_path = format!("s3://data-lake/{}/", tenant_id);
// ctx.register_listing_table(...)
// Каждый tenant видит только свой prefix в object store
ctx
}
}
Общий RuntimeEnv означает общий MemoryPool. Если нужна изоляция памяти — каждому tenant нужен свой RuntimeEnv с собственным пулом. Компромисс: общий runtime экономит ресурсы, отдельный runtime даёт гарантии. Выбор зависит от SLA.
Альтернатива: один SessionContext, per-tenant схемы
Если у всех tenants одинаковая структура таблиц (SaaS-паттерн), можно использовать один SessionContext с per-tenant схемами:
use datafusion::catalog::SchemaProvider;
struct TenantSchema {
tenant_id: String,
tables: HashMap<String, Arc<dyn TableProvider>>,
}
#[async_trait]
impl SchemaProvider for TenantSchema {
fn table_names(&self) -> Vec<String> {
self.tables.keys().cloned().collect()
}
async fn table(
&self,
name: &str,
) -> Result<Option<Arc<dyn TableProvider>>> {
Ok(self.tables.get(name).cloned())
}
fn table_exist(&self, name: &str) -> bool {
self.tables.contains_key(name)
}
}
// Регистрация: каждый tenant — отдельная schema
// ctx.catalog("datafusion").register_schema("tenant_abc", tenant_schema);
// SQL: SELECT * FROM tenant_abc.orders
Per-schema подход проще, но не изолирует ресурсы. Tenant может выполнить SELECT * FROM information_schema.tables и увидеть имена чужих схем. Для строгой изоляции — используйте per-tenant SessionContext.
Изоляция памяти: per-tenant MemoryPool
В уроке по управлению памятью мы изучили FairSpillPool — он делит память поровну между запросами. Но между tenants нужна другая стратегия: один enterprise tenant может иметь квоту 16 GB, а free-tier — 512 MB.
Кастомный TenantMemoryPool
use datafusion::execution::memory_pool::{
MemoryConsumer, MemoryPool, MemoryReservation,
};
use std::sync::atomic::{AtomicUsize, Ordering};
/// Пул памяти с per-tenant лимитом.
/// Оборачивает FairSpillPool для spill-поддержки внутри tenant-а.
struct TenantMemoryPool {
tenant_id: String,
limit: usize,
used: AtomicUsize,
inner: Arc<FairSpillPool>,
}
impl TenantMemoryPool {
fn new(tenant_id: &str, limit: usize) -> Self {
Self {
tenant_id: tenant_id.to_string(),
limit,
used: AtomicUsize::new(0),
inner: Arc::new(FairSpillPool::new(limit)),
}
}
}
impl MemoryPool for TenantMemoryPool {
fn register(&self, consumer: &MemoryConsumer) {
self.inner.register(consumer);
}
fn unregister(&self, consumer: &MemoryConsumer) {
self.inner.unregister(consumer);
}
fn grow(
&self,
reservation: &MemoryReservation,
additional: usize,
) {
// Проверяем per-tenant лимит
let current = self.used.load(Ordering::Relaxed);
if current + additional > self.limit {
return Err(DataFusionError::ResourcesExhausted(
format!(
"Tenant '{}' превысил квоту памяти: {} / {} bytes",
self.tenant_id, current + additional, self.limit
),
));
}
// Делегируем внутреннему пулу (fair distribution между запросами)
self.inner.grow(reservation, additional)?;
self.used.fetch_add(additional, Ordering::Relaxed);
Ok(())
}
fn shrink(
&self,
reservation: &MemoryReservation,
shrink: usize,
) {
self.inner.shrink(reservation, shrink);
self.used.fetch_sub(shrink, Ordering::Relaxed);
}
fn reserved(&self) -> usize {
self.used.load(Ordering::Relaxed)
}
}
Конфигурация по tier-ам
struct TenantConfig {
memory_limit: usize,
max_partitions: usize,
query_timeout: Duration,
max_concurrent_queries: usize,
}
fn tier_config(tier: &str) -> TenantConfig {
match tier {
"enterprise" => TenantConfig {
memory_limit: 16_000_000_000, // 16 GB
max_partitions: 32,
query_timeout: Duration::from_secs(600),
max_concurrent_queries: 50,
},
"pro" => TenantConfig {
memory_limit: 4_000_000_000, // 4 GB
max_partitions: 8,
query_timeout: Duration::from_secs(120),
max_concurrent_queries: 10,
},
_ => TenantConfig { // free tier
memory_limit: 512_000_000, // 512 MB
max_partitions: 2,
query_timeout: Duration::from_secs(30),
max_concurrent_queries: 2,
},
}
}
Таймауты запросов
Без таймаутов один tenant может выполнить бесконечный запрос (CROSS JOIN без условий) и заблокировать ресурсы навсегда.
Timeout через tokio::time
use tokio::time::{timeout, Duration};
async fn execute_with_timeout(
ctx: &SessionContext,
sql: &str,
max_duration: Duration,
) -> Result<Vec<RecordBatch>, AppError> {
let df = ctx.sql(sql).await?;
match timeout(max_duration, df.collect()).await {
Ok(Ok(batches)) => Ok(batches),
Ok(Err(e)) => Err(AppError::QueryFailed(e.to_string())),
Err(_) => Err(AppError::QueryTimeout(format!(
"Запрос превысил таймаут {} сек",
max_duration.as_secs()
))),
}
}
tokio::time::timeout отменяет future, но DataFusion может не немедленно освободить память — buffering-операторы (SortExec, HashJoinExec) освобождают MemoryReservation при drop. Убедитесь, что все промежуточные структуры корректно drop-ятся при отмене запроса.
Concurrent query limiting
Ограничение числа одновременных запросов на tenant — через семафор:
use tokio::sync::Semaphore;
struct TenantRuntime {
ctx: SessionContext,
query_semaphore: Semaphore,
config: TenantConfig,
}
impl TenantRuntime {
async fn execute(&self, sql: &str) -> Result<Vec<RecordBatch>, AppError> {
// Ограничиваем concurrent запросы
let _permit = self.query_semaphore.acquire().await
.map_err(|_| AppError::ServiceShuttingDown)?;
// Таймаут на выполнение
execute_with_timeout(
&self.ctx,
sql,
self.config.query_timeout,
).await
}
}
Мониторинг per-tenant потребления
Для billing и alerting нужно отслеживать метрики по каждому tenant-у:
use std::sync::atomic::{AtomicU64, Ordering};
struct TenantMetrics {
queries_total: AtomicU64,
queries_failed: AtomicU64,
bytes_scanned: AtomicU64,
peak_memory: AtomicU64,
total_query_time_ms: AtomicU64,
}
impl TenantMetrics {
fn record_query(&self, bytes: u64, duration_ms: u64, success: bool) {
self.queries_total.fetch_add(1, Ordering::Relaxed);
self.bytes_scanned.fetch_add(bytes, Ordering::Relaxed);
self.total_query_time_ms.fetch_add(duration_ms, Ordering::Relaxed);
if !success {
self.queries_failed.fetch_add(1, Ordering::Relaxed);
}
}
fn update_peak_memory(&self, current: u64) {
self.peak_memory.fetch_max(current, Ordering::Relaxed);
}
}
Экспортируйте tenant-метрики через Prometheus с label tenant_id. Это позволяет строить per-tenant dashboards и алерты: “Tenant X использует 90% квоты памяти”, “Tenant Y имеет 30% failed queries”.
Паттерны мультитенантности: сравнение
| Паттерн | Изоляция | Overhead | Когда использовать |
|---|---|---|---|
| Per-tenant SessionContext + RuntimeEnv | Полная | Высокий: отдельный пул, каталог | Enterprise SaaS с SLA |
| Per-tenant SessionContext, shared RuntimeEnv | Каталог | Средний: отдельный каталог | B2B с shared infrastructure |
| Shared SessionContext, per-tenant schema | Минимальная | Низкий: один контекст | Internal multi-team analytics |
| Row-level security (фильтр по tenant_id) | Логическая | Минимальный | Прототипы, low-security |
Row-level security (WHERE tenant_id = ?) без per-tenant каталога — слабая изоляция. Ошибка в фильтре = утечка данных. Используйте только для внутренних инструментов, не для production SaaS.
Резюме
- Мультитенантность требует изоляции на трёх уровнях: каталог (видимость таблиц), память (ресурсные квоты), время (таймауты)
- Per-tenant
SessionContext— самая надёжная изоляция каталога;SessionContext::clone()экономит ресурсы для запросов внутри одного tenant-а - Per-tenant
MemoryPoolчерез обёртку FairSpillPool — изолирует потребление и позволяет tier-based квоты tokio::time::timeout+Semaphore— контроль длительности и concurrency запросов- Метрики per-tenant (queries, bytes_scanned, peak_memory) — основа для billing и alerting