Learning Platform
Глоссарий Troubleshooting
Урок 09.03 · 17 мин
Продвинутый
BallistaSessionContextstandaloneremoteDockerKubernetesKEDAExtensionPlanner

Ballista на практике

В предыдущем уроке мы разобрали архитектуру Ballista — scheduler, executor-ы, query stages. Теперь перейдём к практике: как запустить кластер, выполнить запросы, подключить кастомные расширения и развернуть в production. Ключевое изменение в 43.0.0: устаревший BallistaContext полностью заменён стандартным SessionContext из DataFusion.

Режимы работы

Ballista предлагает два режима: standalone (для разработки) и remote (для кластера).

Standalone

SessionContext::standalone() запускает scheduler и executor в одном процессе. Полезно для тестирования и разработки — не нужен внешний кластер:

use datafusion::prelude::SessionContext;

// Запуск встроенного scheduler + executor
let ctx = SessionContext::standalone().await?;

// Далее — обычный DataFusion API
ctx.register_parquet("orders", "/data/orders/").await?;

let df = ctx.sql(
    "SELECT status, COUNT(*) as cnt, SUM(amount) as total
     FROM orders
     GROUP BY status
     ORDER BY total DESC"
).await?;

df.show().await?;

Standalone-режим использует те же code paths, что и кластерный: план разбивается на query stages, shuffle идёт через Arrow IPC файлы на диске. Разница — всё в одном процессе.

Remote

SessionContext::remote() подключается к существующему scheduler:

use datafusion::prelude::SessionContext;

// Подключение к кластеру Ballista
let ctx = SessionContext::remote("df://scheduler:50050").await?;

// API полностью идентичен standalone
ctx.register_parquet("orders", "s3://analytics/orders/").await?;

let results = ctx.sql("SELECT * FROM orders LIMIT 10")
    .await?
    .collect()
    .await?;

Переключение между standalone и remote — одна строка. Логика запросов не меняется.

Регистрация источников данных

Ballista наследует все механизмы регистрации DataFusion:

// Parquet
ctx.register_parquet("orders", "s3://bucket/orders/").await?;

// CSV
ctx.register_csv("customers", "/data/customers.csv").await?;

// Кастомный TableProvider
let provider = MyCustomProvider::new("postgres://...");
ctx.register_table("pg_users", Arc::new(provider))?;

Для кластерного режима важно: все executor-ы должны иметь доступ к зарегистрированным путям. Локальные пути (/data/...) работают только если файлы доступны на всех узлах. Для production используйте объектное хранилище (S3, GCS, HDFS).

DataFrame и SQL API

Запросы выполняются теми же API, что и в обычном DataFusion:

// SQL
let df = ctx.sql("SELECT * FROM orders WHERE amount > 100").await?;

// DataFrame API
use datafusion::prelude::*;

let df = ctx.table("orders").await?
    .filter(col("amount").gt(lit(100)))?
    .aggregate(
        vec![col("status")],
        vec![sum(col("amount")).alias("total")]
    )?
    .sort(vec![col("total").sort(false, true)])?;

// Выполнение и сбор результатов
let batches = df.collect().await?;

Под капотом Ballista преобразует DataFrame API в логический план, оптимизирует его, строит execution graph и распределяет по кластеру. Для клиента процесс прозрачен.

Кастомные расширения

Ballista поддерживает расширение через механизмы DataFusion — с учётом того, что расширения должны быть доступны на всех executor-ах.

Кастомный TableProvider

use datafusion::catalog::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::logical_expr::TableType;

#[derive(Debug)]
struct ClickHouseProvider {
    connection: String,
    table: String,
}

#[async_trait]
impl TableProvider for ClickHouseProvider {
    fn as_any(&self) -> &dyn Any { self }
    fn schema(&self) -> SchemaRef { /* ... */ }
    fn table_type(&self) -> TableType { TableType::Base }

    async fn scan(
        &self,
        state: &dyn Session,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // Pushdown фильтров в ClickHouse
        let pushdown_sql = self.build_query(filters, projection, limit);
        Ok(Arc::new(ClickHouseScanExec::new(
            &self.connection,
            &pushdown_sql,
            self.schema(),
        )))
    }
}

ExtensionPlanner

Для кастомных ExecutionPlan в распределённом контексте используйте ExtensionPlanner:

use datafusion::physical_planner::ExtensionPlanner;
use datafusion::physical_planner::PhysicalPlanner;

struct MyExtensionPlanner;

#[async_trait]
impl ExtensionPlanner for MyExtensionPlanner {
    async fn plan_extension(
        &self,
        planner: &dyn PhysicalPlanner,
        node: &dyn UserDefinedLogicalNode,
        logical_inputs: &[&LogicalPlan],
        physical_inputs: &[Arc<dyn ExecutionPlan>],
        session_state: &SessionState,
    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
        // Преобразование кастомного LogicalPlan
        // в ExecutionPlan для распределённого выполнения
        if let Some(my_node) = node.as_any().downcast_ref::<MyNode>() {
            return Ok(Some(Arc::new(MyExec::new(
                my_node.config(),
                physical_inputs,
            ))));
        }
        Ok(None)
    }
}

Кастомные ExecutionPlan должны быть сериализуемы в protobuf для передачи между scheduler и executor. Это требует реализации PhysicalExtensionCodec.

Развёртывание

Docker

Ballista публикует Docker-образы в GitHub Container Registry:

# Запуск scheduler
docker run -p 50050:50050 \
  ghcr.io/apache/datafusion-ballista-scheduler:43.0.0

# Запуск executor (подключение к scheduler)
docker run \
  -e BALLISTA_SCHEDULER_HOST=scheduler \
  -e BALLISTA_SCHEDULER_PORT=50050 \
  ghcr.io/apache/datafusion-ballista-executor:43.0.0

Docker Compose

Минимальный кластер для локальной разработки:

version: "3.8"
services:
  scheduler:
    image: ghcr.io/apache/datafusion-ballista-scheduler:43.0.0
    ports:
      - "50050:50050"

  executor-1:
    image: ghcr.io/apache/datafusion-ballista-executor:43.0.0
    environment:
      BALLISTA_SCHEDULER_HOST: scheduler
      BALLISTA_SCHEDULER_PORT: 50050

  executor-2:
    image: ghcr.io/apache/datafusion-ballista-executor:43.0.0
    environment:
      BALLISTA_SCHEDULER_HOST: scheduler
      BALLISTA_SCHEDULER_PORT: 50050

Kubernetes и KEDA

Для production Ballista разворачивается в Kubernetes. Scheduler — Deployment с одной репликой, executor-ы — Deployment с автоскейлингом:

# scheduler-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ballista-scheduler
spec:
  replicas: 1
  template:
    spec:
      containers:
        - name: scheduler
          image: ghcr.io/apache/datafusion-ballista-scheduler:43.0.0
          ports:
            - containerPort: 50050
---
# executor-deployment.yaml (с KEDA autoscaling)
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ballista-executor
spec:
  replicas: 2
  template:
    spec:
      containers:
        - name: executor
          image: ghcr.io/apache/datafusion-ballista-executor:43.0.0
          env:
            - name: BALLISTA_SCHEDULER_HOST
              value: ballista-scheduler

KEDA (Kubernetes Event-Driven Autoscaling) позволяет масштабировать executor-ы по метрикам — например, по количеству pending задач в scheduler.

Cargo features

Ballista поддерживает feature flags при сборке из исходников:

[dependencies]
ballista = { version = "43.0.0", features = ["standalone"] }

Основные features:

  • standalone — встроенный scheduler + executor для SessionContext::standalone()
  • substrait — поддержка Substrait планов (альтернатива protobuf для кросс-платформенного обмена)

Что было удалено в 43.0.0

Ballista 43.0.0 убрала несколько экспериментальных компонентов, чтобы сфокусироваться на ядре:

Удалённые компоненты Ballista 43.0.0
Web UIВстроенный веб-дашборд планировщика удалён — используйте внешний мониторинг (Prometheus, Grafana)
Caching subsystemПодсистема кеширования промежуточных результатов удалена — реализуйте через кастомный TableProvider
Plugin subsystemДинамическая загрузка расширений удалена — компилируйте расширения статически для стабильности
KV storesВстроенные KV stores (etcd, sled) удалены — scheduler хранит состояние in-memory для простоты

Это осознанное решение: вместо встроенных, но недоработанных компонентов — расширяемость через стандартные механизмы DataFusion (TableProvider, UDF, OptimizerRule). Пользователь подключает нужные компоненты сам, а не борется с внутренними abstractions.

Миграция с BallistaContext

Если вы использовали BallistaContext из ранних версий Ballista — замените его на SessionContext:

// Было (deprecated)
// let ctx = BallistaContext::remote("df://host:50050", &config).await?;

// Стало (43.0.0+)
let ctx = SessionContext::remote("df://host:50050").await?;

API запросов не изменился — sql(), table(), register_parquet(), collect() работают одинаково.

Производительность

Ballista демонстрирует 2.9x ускорение по сравнению с Apache Spark на бенчмарке TPC-H SF100 (100 GB) при одинаковых ресурсах кластера. Основные факторы:

  • Arrow columnar — данные остаются в columnar формате на всём пути, без row→column→row конвертаций
  • Zero-copy shuffle — Arrow IPC сохраняет layout буферов, минимизируя overhead десериализации
  • Rust runtime — отсутствие JVM GC, предсказуемая задержка, низкий memory overhead
  • DataFusion оптимизатор — те же правила оптимизации, что и в однопроцессном DataFusion

Ключевые выводы

  • SessionContext::standalone() — scheduler + executor в одном процессе для разработки. SessionContext::remote() — подключение к кластеру
  • API запросов идентичен однопроцессному DataFusion: SQL, DataFrame, регистрация источников
  • Кастомные расширения (TableProvider, ExtensionPlanner, PhysicalExtensionCodec) работают в распределённом контексте
  • Docker-образы и Kubernetes с KEDA autoscaling для production-развёртывания
  • В 43.0.0 удалены UI, кеширование, plugin subsystem и KV stores — фокус на расширяемом ядре
  • BallistaContext устарел — используйте SessionContext для всех новых проектов

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Ballista предлагает два режима работы. Чем standalone отличается от remote?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5