Встраивание DataFusion в аналитические сервисы
В предыдущих модулях мы работали с DataFusion как с библиотекой: создавали SessionContext, регистрировали таблицы, выполняли SQL. Но в production DataFusion не живёт изолированно — он встраивается в сервис, который принимает запросы по сети, маршрутизирует их, управляет жизненным циклом сессий и отдаёт результаты клиентам. Этот урок показывает архитектурные паттерны такого встраивания.
Архитектура embedded query engine
DataFusion спроектирован как embedded движок — у него нет собственного сетевого сервера. Вы оборачиваете SessionContext в свой сервис:
Ключевое отличие от standalone СУБД: вы контролируете каждый слой — от парсинга HTTP до формата ответа. DataFusion даёт движок выполнения, всё остальное — ваш код.
REST API с Axum
Самый распространённый паттерн — HTTP API поверх Axum. Клиент отправляет SQL, сервер выполняет запрос и возвращает результат в JSON.
Минимальный сервер
use axum::{extract::State, routing::post, Json, Router};
use datafusion::prelude::*;
use std::sync::Arc;
struct AppState {
ctx: SessionContext,
}
async fn query_handler(
State(state): State<Arc<AppState>>,
Json(request): Json<QueryRequest>,
) -> Result<Json<QueryResponse>, AppError> {
// Валидация: только SELECT, запрет DDL/DML
validate_read_only(&request.sql)?;
let df = state.ctx.sql(&request.sql).await?;
let batches = df.collect().await?;
let rows = arrow_json::writer::record_batches_to_json_rows(&batches)?;
Ok(Json(QueryResponse { rows, row_count: rows.len() }))
}
Никогда не принимайте произвольный SQL без валидации. Минимум — запрет DDL (CREATE, DROP, ALTER) и DML (INSERT, DELETE, UPDATE). DataFusion поддерживает эти команды, и без проверки клиент может модифицировать каталог.
Валидация SQL
Вместо regex-проверки используйте парсер DataFusion:
use datafusion::sql::parser::DFParser;
use datafusion::sql::sqlparser::ast::Statement;
fn validate_read_only(sql: &str) -> Result<(), AppError> {
let statements = DFParser::parse_sql(sql)?;
for stmt in &statements {
match stmt {
// DFStatement::Statement содержит sqlparser::Statement
datafusion::sql::parser::Statement::Statement(s) => {
match s.as_ref() {
Statement::Query(_) => {} // SELECT — ok
Statement::Explain { .. } => {} // EXPLAIN — ok
other => return Err(AppError::Forbidden(
format!("Запрещён тип запроса: {}", other)
)),
}
}
// COPY, CREATE EXTERNAL TABLE и др. DataFusion-специфичные
_ => return Err(AppError::Forbidden(
"DataFusion DDL запрещён".into()
)),
}
}
Ok(())
}
Такой подход надёжнее regex: парсер корректно обрабатывает комментарии, строковые литералы и вложенные выражения.
gRPC с Arrow Flight
Для высокопроизводительных клиентов REST с JSON-сериализацией — узкое место. Apache Arrow Flight передаёт данные в нативном Arrow IPC формате, исключая сериализацию/десериализацию:
Flight SQL сервер
DataFusion предоставляет FlightSqlService trait для реализации Flight SQL протокола:
use arrow_flight::sql::server::FlightSqlService;
use datafusion::prelude::*;
struct MyFlightSql {
ctx: SessionContext,
}
#[tonic::async_trait]
impl FlightSqlService for MyFlightSql {
type FlightService = MyFlightSql;
async fn do_get_statement(
&self,
_ticket: TicketStatementQuery,
_request: Request<Ticket>,
) -> Result<Response<Self::DoGetStream>, Status> {
// Выполняем SQL, стримим RecordBatch как Arrow IPC
let df = self.ctx.sql(&ticket.statement_handle).await
.map_err(|e| Status::internal(e.to_string()))?;
let stream = df.execute_stream().await
.map_err(|e| Status::internal(e.to_string()))?;
// Конвертируем SendableRecordBatchStream → FlightDataStream
let flight_stream = FlightDataEncoderBuilder::new()
.with_schema(stream.schema())
.build(stream.map(|batch| batch.map_err(Into::into)));
Ok(Response::new(Box::pin(flight_stream)))
}
}
Arrow Flight SQL — стандартный протокол. Клиенты (JDBC/ODBC драйверы, DBeaver, pandas) подключаются без кастомного кода. Если ваш сервис обслуживает BI-инструменты — это предпочтительный протокол.
Маршрутизация запросов
В production не все запросы одинаковы. Точечные lookup-запросы и тяжёлые аналитические scan-ы требуют разных ресурсов:
Классификация по логическому плану
use datafusion::logical_expr::{LogicalPlan, JoinType};
enum QueryClass {
Fast, // lookup, simple filter + LIMIT
Heavy, // JOIN, GROUP BY, full scan
}
fn classify_query(plan: &LogicalPlan) -> QueryClass {
let mut has_join = false;
let mut has_aggregate = false;
let mut has_limit = false;
plan.apply(|node| {
match node {
LogicalPlan::Join(_) => has_join = true,
LogicalPlan::Aggregate(_) => has_aggregate = true,
LogicalPlan::Limit(_) => has_limit = true,
_ => {}
}
Ok(datafusion::common::tree_node::TreeNodeRecursion::Continue)
})?;
if has_join || (has_aggregate && !has_limit) {
QueryClass::Heavy
} else {
QueryClass::Fast
}
}
Каждый класс запросов выполняется на SessionContext с соответствующей конфигурацией. Это предотвращает ситуацию, когда один тяжёлый scan блокирует все лёгкие запросы.
Пулинг сессий
Создание SessionContext — не бесплатная операция: регистрация каталога, загрузка метаданных таблиц, создание RuntimeEnv. Для сервиса с высоким RPS нужен пул предварительно настроенных сессий.
Паттерн: shared context с clone
use std::sync::Arc;
/// Базовый контекст с предзарегистрированными таблицами.
/// SessionContext внутри использует Arc — clone дёшев.
fn build_base_context() -> SessionContext {
let config = SessionConfig::new()
.with_target_partitions(num_cpus::get())
.with_batch_size(8192);
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(8_000_000_000)))
.with_disk_manager(DiskManagerConfig::NewOs)
.build_arc()
.expect("RuntimeEnv build failed");
let ctx = SessionContext::new_with_config_rt(config, runtime);
// Регистрация таблиц — один раз
// ctx.register_parquet(...).await;
// ctx.register_listing_table(...).await;
ctx
}
/// Для каждого запроса — clone базового контекста.
/// Каталог общий (Arc), но состояние запроса изолировано.
async fn handle_query(base_ctx: &SessionContext, sql: &str) {
let ctx = base_ctx.clone();
// ctx наследует каталог, но имеет свой TaskContext при выполнении
let df = ctx.sql(sql).await?;
let batches = df.collect().await?;
}
SessionContext::clone() клонирует Arc внутри — это O(1) операция. Каталог таблиц, RuntimeEnv и конфигурация разделяются между клонами. Каждый запрос получает свой TaskContext при выполнении, обеспечивая изоляцию runtime-состояния.
Паттерн: пул с per-query конфигурацией
Если разным запросам нужны разные параметры (timeout, memory limit), создайте пул контекстов с разными конфигурациями:
struct ContextPool {
fast: SessionContext, // lookup запросы
heavy: SessionContext, // аналитика
admin: SessionContext, // DDL, метаданные
}
impl ContextPool {
fn get(&self, class: QueryClass) -> &SessionContext {
match class {
QueryClass::Fast => &self.fast,
QueryClass::Heavy => &self.heavy,
}
}
}
Graceful shutdown
При остановке сервиса нужно дождаться завершения текущих запросов:
use tokio::signal;
use tokio_util::sync::CancellationToken;
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
// Обработчик сигналов
tokio::spawn(async move {
signal::ctrl_c().await.expect("signal handler failed");
tracing::info!("Shutdown signal received, draining queries...");
cancel_clone.cancel();
});
// В handler-е
async fn query_handler(
cancel: CancellationToken,
// ...
) -> Result<Json<QueryResponse>, AppError> {
if cancel.is_cancelled() {
return Err(AppError::ServiceShuttingDown);
}
tokio::select! {
result = execute_query(&state.ctx, &request.sql) => result,
_ = cancel.cancelled() => {
Err(AppError::QueryCancelled("Service shutting down".into()))
}
}
}
Без graceful shutdown активные запросы обрываются на полпути. Если запрос писал spill-файлы через DiskManager — они останутся на диске. Добавьте cleanup spill-директории при старте сервиса.
Выбор протокола
| Критерий | REST (JSON) | Arrow Flight | WebSocket |
|---|---|---|---|
| Клиенты | Браузер, curl, любой HTTP-клиент | JDBC/ODBC, BI-инструменты, Python | Dashboard с live-обновлением |
| Overhead сериализации | Высокий (JSON) | Минимальный (Arrow IPC) | Средний (зависит от формата) |
| Streaming | Нет (весь ответ сразу) | Да (RecordBatch stream) | Да (WebSocket frames) |
| Типизация | Слабая (всё строки в JSON) | Сильная (Arrow schema) | Зависит от формата |
| Порог входа | Низкий | Средний | Средний |
Для MVP начните с REST. Когда JSON-сериализация станет bottleneck (видно по CPU профилю) — добавьте Flight endpoint параллельно. Два протокола на одном сервисе — нормальная практика: REST для дашбордов, Flight для ETL и BI.
Резюме
- DataFusion — embedded движок: оборачивайте
SessionContextв свой сетевой сервис (Axum, Tonic) - Валидируйте SQL через парсер, не regex — блокируйте DDL/DML на API-уровне
- Arrow Flight SQL исключает JSON-сериализацию — предпочтительный протокол для BI и ETL
- Классифицируйте запросы и маршрутизируйте на контексты с разной конфигурацией
SessionContext::clone()— O(1), каталог общий, выполнение изолировано- Graceful shutdown: CancellationToken + drain текущих запросов + cleanup spill-файлов