Ballista на практике
В предыдущем уроке мы разобрали архитектуру Ballista — scheduler, executor-ы, query stages. Теперь перейдём к практике: как запустить кластер, выполнить запросы, подключить кастомные расширения и развернуть в production. Ключевое изменение в 43.0.0: устаревший BallistaContext полностью заменён стандартным SessionContext из DataFusion.
Режимы работы
Ballista предлагает два режима: standalone (для разработки) и remote (для кластера).
Standalone
SessionContext::standalone() запускает scheduler и executor в одном процессе. Полезно для тестирования и разработки — не нужен внешний кластер:
use datafusion::prelude::SessionContext;
// Запуск встроенного scheduler + executor
let ctx = SessionContext::standalone().await?;
// Далее — обычный DataFusion API
ctx.register_parquet("orders", "/data/orders/").await?;
let df = ctx.sql(
"SELECT status, COUNT(*) as cnt, SUM(amount) as total
FROM orders
GROUP BY status
ORDER BY total DESC"
).await?;
df.show().await?;
Standalone-режим использует те же code paths, что и кластерный: план разбивается на query stages, shuffle идёт через Arrow IPC файлы на диске. Разница — всё в одном процессе.
Remote
SessionContext::remote() подключается к существующему scheduler:
use datafusion::prelude::SessionContext;
// Подключение к кластеру Ballista
let ctx = SessionContext::remote("df://scheduler:50050").await?;
// API полностью идентичен standalone
ctx.register_parquet("orders", "s3://analytics/orders/").await?;
let results = ctx.sql("SELECT * FROM orders LIMIT 10")
.await?
.collect()
.await?;
Переключение между standalone и remote — одна строка. Логика запросов не меняется.
Регистрация источников данных
Ballista наследует все механизмы регистрации DataFusion:
// Parquet
ctx.register_parquet("orders", "s3://bucket/orders/").await?;
// CSV
ctx.register_csv("customers", "/data/customers.csv").await?;
// Кастомный TableProvider
let provider = MyCustomProvider::new("postgres://...");
ctx.register_table("pg_users", Arc::new(provider))?;
Для кластерного режима важно: все executor-ы должны иметь доступ к зарегистрированным путям. Локальные пути (/data/...) работают только если файлы доступны на всех узлах. Для production используйте объектное хранилище (S3, GCS, HDFS).
DataFrame и SQL API
Запросы выполняются теми же API, что и в обычном DataFusion:
// SQL
let df = ctx.sql("SELECT * FROM orders WHERE amount > 100").await?;
// DataFrame API
use datafusion::prelude::*;
let df = ctx.table("orders").await?
.filter(col("amount").gt(lit(100)))?
.aggregate(
vec![col("status")],
vec![sum(col("amount")).alias("total")]
)?
.sort(vec![col("total").sort(false, true)])?;
// Выполнение и сбор результатов
let batches = df.collect().await?;
Под капотом Ballista преобразует DataFrame API в логический план, оптимизирует его, строит execution graph и распределяет по кластеру. Для клиента процесс прозрачен.
Кастомные расширения
Ballista поддерживает расширение через механизмы DataFusion — с учётом того, что расширения должны быть доступны на всех executor-ах.
Кастомный TableProvider
use datafusion::catalog::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::logical_expr::TableType;
#[derive(Debug)]
struct ClickHouseProvider {
connection: String,
table: String,
}
#[async_trait]
impl TableProvider for ClickHouseProvider {
fn as_any(&self) -> &dyn Any { self }
fn schema(&self) -> SchemaRef { /* ... */ }
fn table_type(&self) -> TableType { TableType::Base }
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Pushdown фильтров в ClickHouse
let pushdown_sql = self.build_query(filters, projection, limit);
Ok(Arc::new(ClickHouseScanExec::new(
&self.connection,
&pushdown_sql,
self.schema(),
)))
}
}
ExtensionPlanner
Для кастомных ExecutionPlan в распределённом контексте используйте ExtensionPlanner:
use datafusion::physical_planner::ExtensionPlanner;
use datafusion::physical_planner::PhysicalPlanner;
struct MyExtensionPlanner;
#[async_trait]
impl ExtensionPlanner for MyExtensionPlanner {
async fn plan_extension(
&self,
planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
// Преобразование кастомного LogicalPlan
// в ExecutionPlan для распределённого выполнения
if let Some(my_node) = node.as_any().downcast_ref::<MyNode>() {
return Ok(Some(Arc::new(MyExec::new(
my_node.config(),
physical_inputs,
))));
}
Ok(None)
}
}
Кастомные ExecutionPlan должны быть сериализуемы в protobuf для передачи между scheduler и executor. Это требует реализации PhysicalExtensionCodec.
Развёртывание
Docker
Ballista публикует Docker-образы в GitHub Container Registry:
# Запуск scheduler
docker run -p 50050:50050 \
ghcr.io/apache/datafusion-ballista-scheduler:43.0.0
# Запуск executor (подключение к scheduler)
docker run \
-e BALLISTA_SCHEDULER_HOST=scheduler \
-e BALLISTA_SCHEDULER_PORT=50050 \
ghcr.io/apache/datafusion-ballista-executor:43.0.0
Docker Compose
Минимальный кластер для локальной разработки:
version: "3.8"
services:
scheduler:
image: ghcr.io/apache/datafusion-ballista-scheduler:43.0.0
ports:
- "50050:50050"
executor-1:
image: ghcr.io/apache/datafusion-ballista-executor:43.0.0
environment:
BALLISTA_SCHEDULER_HOST: scheduler
BALLISTA_SCHEDULER_PORT: 50050
executor-2:
image: ghcr.io/apache/datafusion-ballista-executor:43.0.0
environment:
BALLISTA_SCHEDULER_HOST: scheduler
BALLISTA_SCHEDULER_PORT: 50050
Kubernetes и KEDA
Для production Ballista разворачивается в Kubernetes. Scheduler — Deployment с одной репликой, executor-ы — Deployment с автоскейлингом:
# scheduler-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ballista-scheduler
spec:
replicas: 1
template:
spec:
containers:
- name: scheduler
image: ghcr.io/apache/datafusion-ballista-scheduler:43.0.0
ports:
- containerPort: 50050
---
# executor-deployment.yaml (с KEDA autoscaling)
apiVersion: apps/v1
kind: Deployment
metadata:
name: ballista-executor
spec:
replicas: 2
template:
spec:
containers:
- name: executor
image: ghcr.io/apache/datafusion-ballista-executor:43.0.0
env:
- name: BALLISTA_SCHEDULER_HOST
value: ballista-scheduler
KEDA (Kubernetes Event-Driven Autoscaling) позволяет масштабировать executor-ы по метрикам — например, по количеству pending задач в scheduler.
Cargo features
Ballista поддерживает feature flags при сборке из исходников:
[dependencies]
ballista = { version = "43.0.0", features = ["standalone"] }
Основные features:
standalone— встроенный scheduler + executor дляSessionContext::standalone()substrait— поддержка Substrait планов (альтернатива protobuf для кросс-платформенного обмена)
Что было удалено в 43.0.0
Ballista 43.0.0 убрала несколько экспериментальных компонентов, чтобы сфокусироваться на ядре:
Это осознанное решение: вместо встроенных, но недоработанных компонентов — расширяемость через стандартные механизмы DataFusion (TableProvider, UDF, OptimizerRule). Пользователь подключает нужные компоненты сам, а не борется с внутренними abstractions.
Миграция с BallistaContext
Если вы использовали BallistaContext из ранних версий Ballista — замените его на SessionContext:
// Было (deprecated)
// let ctx = BallistaContext::remote("df://host:50050", &config).await?;
// Стало (43.0.0+)
let ctx = SessionContext::remote("df://host:50050").await?;
API запросов не изменился — sql(), table(), register_parquet(), collect() работают одинаково.
Производительность
Ballista демонстрирует 2.9x ускорение по сравнению с Apache Spark на бенчмарке TPC-H SF100 (100 GB) при одинаковых ресурсах кластера. Основные факторы:
- Arrow columnar — данные остаются в columnar формате на всём пути, без row→column→row конвертаций
- Zero-copy shuffle — Arrow IPC сохраняет layout буферов, минимизируя overhead десериализации
- Rust runtime — отсутствие JVM GC, предсказуемая задержка, низкий memory overhead
- DataFusion оптимизатор — те же правила оптимизации, что и в однопроцессном DataFusion
Ключевые выводы
SessionContext::standalone()— scheduler + executor в одном процессе для разработки.SessionContext::remote()— подключение к кластеру- API запросов идентичен однопроцессному DataFusion: SQL, DataFrame, регистрация источников
- Кастомные расширения (TableProvider, ExtensionPlanner, PhysicalExtensionCodec) работают в распределённом контексте
- Docker-образы и Kubernetes с KEDA autoscaling для production-развёртывания
- В 43.0.0 удалены UI, кеширование, plugin subsystem и KV stores — фокус на расширяемом ядре
BallistaContextустарел — используйтеSessionContextдля всех новых проектов