Learning Platform
Глоссарий Troubleshooting
Урок 13.01 · 14 мин
Продвинутый
EmbeddingSessionContextREST APIgRPCQuery RoutingConnection PoolingAxumTonic

Встраивание DataFusion в аналитические сервисы

В предыдущих модулях мы работали с DataFusion как с библиотекой: создавали SessionContext, регистрировали таблицы, выполняли SQL. Но в production DataFusion не живёт изолированно — он встраивается в сервис, который принимает запросы по сети, маршрутизирует их, управляет жизненным циклом сессий и отдаёт результаты клиентам. Этот урок показывает архитектурные паттерны такого встраивания.

Архитектура embedded query engine

DataFusion спроектирован как embedded движок — у него нет собственного сетевого сервера. Вы оборачиваете SessionContext в свой сервис:

Архитектура embedded analytics engine
КлиентыHTTP-запросы от клиентов: REST API, gRPC, WebSocket
Протоколы
API LayerAxum/Tonic сервер: аутентификация, валидация, rate limiting
Задачи
Session PoolПул SessionContext с предзарегистрированными таблицами и конфигурацией
Состояние
РезультатРезультаты: RecordBatch → JSON, Arrow IPC, CSV
Форматы

Ключевое отличие от standalone СУБД: вы контролируете каждый слой — от парсинга HTTP до формата ответа. DataFusion даёт движок выполнения, всё остальное — ваш код.

REST API с Axum

Самый распространённый паттерн — HTTP API поверх Axum. Клиент отправляет SQL, сервер выполняет запрос и возвращает результат в JSON.

Минимальный сервер

use axum::{extract::State, routing::post, Json, Router};
use datafusion::prelude::*;
use std::sync::Arc;

struct AppState {
    ctx: SessionContext,
}

async fn query_handler(
    State(state): State<Arc<AppState>>,
    Json(request): Json<QueryRequest>,
) -> Result<Json<QueryResponse>, AppError> {
    // Валидация: только SELECT, запрет DDL/DML
    validate_read_only(&request.sql)?;

    let df = state.ctx.sql(&request.sql).await?;
    let batches = df.collect().await?;

    let rows = arrow_json::writer::record_batches_to_json_rows(&batches)?;
    Ok(Json(QueryResponse { rows, row_count: rows.len() }))
}
WARNING

Никогда не принимайте произвольный SQL без валидации. Минимум — запрет DDL (CREATE, DROP, ALTER) и DML (INSERT, DELETE, UPDATE). DataFusion поддерживает эти команды, и без проверки клиент может модифицировать каталог.

Валидация SQL

Вместо regex-проверки используйте парсер DataFusion:

use datafusion::sql::parser::DFParser;
use datafusion::sql::sqlparser::ast::Statement;

fn validate_read_only(sql: &str) -> Result<(), AppError> {
    let statements = DFParser::parse_sql(sql)?;
    for stmt in &statements {
        match stmt {
            // DFStatement::Statement содержит sqlparser::Statement
            datafusion::sql::parser::Statement::Statement(s) => {
                match s.as_ref() {
                    Statement::Query(_) => {} // SELECT — ok
                    Statement::Explain { .. } => {} // EXPLAIN — ok
                    other => return Err(AppError::Forbidden(
                        format!("Запрещён тип запроса: {}", other)
                    )),
                }
            }
            // COPY, CREATE EXTERNAL TABLE и др. DataFusion-специфичные
            _ => return Err(AppError::Forbidden(
                "DataFusion DDL запрещён".into()
            )),
        }
    }
    Ok(())
}

Такой подход надёжнее regex: парсер корректно обрабатывает комментарии, строковые литералы и вложенные выражения.

gRPC с Arrow Flight

Для высокопроизводительных клиентов REST с JSON-сериализацией — узкое место. Apache Arrow Flight передаёт данные в нативном Arrow IPC формате, исключая сериализацию/десериализацию:

REST vs Arrow Flight: путь данных
REST PathREST: RecordBatch → JSON → HTTP → parse JSON → строки
Шаги
Overhead
Flight PathFlight: RecordBatch → Arrow IPC → gRPC → zero-copy read
Шаги
Overhead

Flight SQL сервер

DataFusion предоставляет FlightSqlService trait для реализации Flight SQL протокола:

use arrow_flight::sql::server::FlightSqlService;
use datafusion::prelude::*;

struct MyFlightSql {
    ctx: SessionContext,
}

#[tonic::async_trait]
impl FlightSqlService for MyFlightSql {
    type FlightService = MyFlightSql;

    async fn do_get_statement(
        &self,
        _ticket: TicketStatementQuery,
        _request: Request<Ticket>,
    ) -> Result<Response<Self::DoGetStream>, Status> {
        // Выполняем SQL, стримим RecordBatch как Arrow IPC
        let df = self.ctx.sql(&ticket.statement_handle).await
            .map_err(|e| Status::internal(e.to_string()))?;

        let stream = df.execute_stream().await
            .map_err(|e| Status::internal(e.to_string()))?;

        // Конвертируем SendableRecordBatchStream → FlightDataStream
        let flight_stream = FlightDataEncoderBuilder::new()
            .with_schema(stream.schema())
            .build(stream.map(|batch| batch.map_err(Into::into)));

        Ok(Response::new(Box::pin(flight_stream)))
    }
}
TIP

Arrow Flight SQL — стандартный протокол. Клиенты (JDBC/ODBC драйверы, DBeaver, pandas) подключаются без кастомного кода. Если ваш сервис обслуживает BI-инструменты — это предпочтительный протокол.

Маршрутизация запросов

В production не все запросы одинаковы. Точечные lookup-запросы и тяжёлые аналитические scan-ы требуют разных ресурсов:

Query routing по типу нагрузки
Входящий SQLSQL запрос от клиента
Query ClassifierКлассификатор анализирует план запроса
Сигналы
Fast LaneЛёгкие запросы: малая memory, быстрый timeout
Конфигурация
Heavy LaneТяжёлые запросы: большая memory, spill, длинный timeout
Конфигурация

Классификация по логическому плану

use datafusion::logical_expr::{LogicalPlan, JoinType};

enum QueryClass {
    Fast,   // lookup, simple filter + LIMIT
    Heavy,  // JOIN, GROUP BY, full scan
}

fn classify_query(plan: &LogicalPlan) -> QueryClass {
    let mut has_join = false;
    let mut has_aggregate = false;
    let mut has_limit = false;

    plan.apply(|node| {
        match node {
            LogicalPlan::Join(_) => has_join = true,
            LogicalPlan::Aggregate(_) => has_aggregate = true,
            LogicalPlan::Limit(_) => has_limit = true,
            _ => {}
        }
        Ok(datafusion::common::tree_node::TreeNodeRecursion::Continue)
    })?;

    if has_join || (has_aggregate && !has_limit) {
        QueryClass::Heavy
    } else {
        QueryClass::Fast
    }
}

Каждый класс запросов выполняется на SessionContext с соответствующей конфигурацией. Это предотвращает ситуацию, когда один тяжёлый scan блокирует все лёгкие запросы.

Пулинг сессий

Создание SessionContext — не бесплатная операция: регистрация каталога, загрузка метаданных таблиц, создание RuntimeEnv. Для сервиса с высоким RPS нужен пул предварительно настроенных сессий.

Паттерн: shared context с clone

use std::sync::Arc;

/// Базовый контекст с предзарегистрированными таблицами.
/// SessionContext внутри использует Arc — clone дёшев.
fn build_base_context() -> SessionContext {
    let config = SessionConfig::new()
        .with_target_partitions(num_cpus::get())
        .with_batch_size(8192);

    let runtime = RuntimeEnvBuilder::new()
        .with_memory_pool(Arc::new(FairSpillPool::new(8_000_000_000)))
        .with_disk_manager(DiskManagerConfig::NewOs)
        .build_arc()
        .expect("RuntimeEnv build failed");

    let ctx = SessionContext::new_with_config_rt(config, runtime);

    // Регистрация таблиц — один раз
    // ctx.register_parquet(...).await;
    // ctx.register_listing_table(...).await;

    ctx
}

/// Для каждого запроса — clone базового контекста.
/// Каталог общий (Arc), но состояние запроса изолировано.
async fn handle_query(base_ctx: &SessionContext, sql: &str) {
    let ctx = base_ctx.clone();
    // ctx наследует каталог, но имеет свой TaskContext при выполнении
    let df = ctx.sql(sql).await?;
    let batches = df.collect().await?;
}
NOTE

SessionContext::clone() клонирует Arc внутри — это O(1) операция. Каталог таблиц, RuntimeEnv и конфигурация разделяются между клонами. Каждый запрос получает свой TaskContext при выполнении, обеспечивая изоляцию runtime-состояния.

Паттерн: пул с per-query конфигурацией

Если разным запросам нужны разные параметры (timeout, memory limit), создайте пул контекстов с разными конфигурациями:

struct ContextPool {
    fast: SessionContext,   // lookup запросы
    heavy: SessionContext,  // аналитика
    admin: SessionContext,  // DDL, метаданные
}

impl ContextPool {
    fn get(&self, class: QueryClass) -> &SessionContext {
        match class {
            QueryClass::Fast => &self.fast,
            QueryClass::Heavy => &self.heavy,
        }
    }
}

Graceful shutdown

При остановке сервиса нужно дождаться завершения текущих запросов:

use tokio::signal;
use tokio_util::sync::CancellationToken;

let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();

// Обработчик сигналов
tokio::spawn(async move {
    signal::ctrl_c().await.expect("signal handler failed");
    tracing::info!("Shutdown signal received, draining queries...");
    cancel_clone.cancel();
});

// В handler-е
async fn query_handler(
    cancel: CancellationToken,
    // ...
) -> Result<Json<QueryResponse>, AppError> {
    if cancel.is_cancelled() {
        return Err(AppError::ServiceShuttingDown);
    }

    tokio::select! {
        result = execute_query(&state.ctx, &request.sql) => result,
        _ = cancel.cancelled() => {
            Err(AppError::QueryCancelled("Service shutting down".into()))
        }
    }
}
WARNING

Без graceful shutdown активные запросы обрываются на полпути. Если запрос писал spill-файлы через DiskManager — они останутся на диске. Добавьте cleanup spill-директории при старте сервиса.

Выбор протокола

КритерийREST (JSON)Arrow FlightWebSocket
КлиентыБраузер, curl, любой HTTP-клиентJDBC/ODBC, BI-инструменты, PythonDashboard с live-обновлением
Overhead сериализацииВысокий (JSON)Минимальный (Arrow IPC)Средний (зависит от формата)
StreamingНет (весь ответ сразу)Да (RecordBatch stream)Да (WebSocket frames)
ТипизацияСлабая (всё строки в JSON)Сильная (Arrow schema)Зависит от формата
Порог входаНизкийСреднийСредний
TIP

Для MVP начните с REST. Когда JSON-сериализация станет bottleneck (видно по CPU профилю) — добавьте Flight endpoint параллельно. Два протокола на одном сервисе — нормальная практика: REST для дашбордов, Flight для ETL и BI.

Резюме

  • DataFusion — embedded движок: оборачивайте SessionContext в свой сетевой сервис (Axum, Tonic)
  • Валидируйте SQL через парсер, не regex — блокируйте DDL/DML на API-уровне
  • Arrow Flight SQL исключает JSON-сериализацию — предпочтительный протокол для BI и ETL
  • Классифицируйте запросы и маршрутизируйте на контексты с разной конфигурацией
  • SessionContext::clone() — O(1), каталог общий, выполнение изолировано
  • Graceful shutdown: CancellationToken + drain текущих запросов + cleanup spill-файлов

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Почему DataFusion называется embedded query engine, а не standalone СУБД?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4