Архитектура Ballista
Ballista — распределённый query engine, построенный поверх DataFusion. Его архитектура следует модели scheduler/executor: централизованный планировщик разбивает запросы на задачи и распределяет их по пулу исполнителей. Версия 43.0.0 (февраль 2025) выровняла нумерацию с DataFusion, хотя между внутренними зависимостями сохраняется разрыв — Ballista 43.0.0 базируется на DataFusion ~43.x, а не на текущей ветке 53.x (релиз 53.0.0 от 2026-04-02).
Обзор архитектуры
Ballista состоит из трёх компонентов: клиенты, scheduler и executor-ы.
Scheduler
Scheduler — единственная точка координации. Он принимает запросы, строит план выполнения, разбивает его на задачи и распределяет по executor-ам.
Job lifecycle
- Приём запроса. Клиент отправляет SQL или LogicalPlan через gRPC. Scheduler создаёт
Jobс уникальнымjob_id - Создание физического плана. Scheduler использует DataFusion
SessionContextдля парсинга, оптимизации и создания физического плана — те же фазы, что и в однопроцессном DataFusion - Построение Execution Graph. Физический план разбивается на query stages (подробнее ниже). Результат — DAG зависимостей между stages
- Планирование задач. Scheduler назначает задачи (partitions внутри stage) свободным executor-ам, учитывая locality и доступные ресурсы
- Мониторинг. Scheduler отслеживает статус каждой задачи. При сбое executor-а задачи перепланируются
Execution Graph
Execution Graph — это DAG, в котором узлы — query stages, а рёбра — зависимости по данным (shuffle). Scheduler строит его из физического плана, разрезая по RepartitionExec (pipeline breakers).
-- Исходный запрос
SELECT c.name, SUM(o.amount)
FROM orders o
JOIN customers c ON o.customer_id = c.id
GROUP BY c.name
-- Execution Graph (3 stage)
Stage 1: DataSourceExec(orders) → ShuffleWriterExec(Hash(customer_id))
Stage 2: DataSourceExec(customers) → ShuffleWriterExec(Hash(id))
Stage 3: ShuffleReaderExec + ShuffleReaderExec
→ HashJoinExec(customer_id = id)
→ AggregateExec(GROUP BY name, SUM(amount))
→ ShuffleWriterExec(результат)
Stage 1 и Stage 2 не зависят друг от друга — scheduler запускает их параллельно. Stage 3 ждёт завершения обоих.
Протокол планирования задач
Каждый query stage содержит N задач (по числу партиций). Scheduler сериализует физический план stage в protobuf и отправляет executor-у:
-- Scheduler → Executor (gRPC)
TaskDefinition {
task_id: 1,
job_id: "job-1234",
stage_id: 1,
partition_id: 0,
plan: <protobuf PhysicalPlanNode>,
session_config: <SessionConfig>,
}
Executor десериализует protobuf в DataFusion ExecutionPlan, выполняет его и записывает результат как Arrow IPC файл.
Executor
Executor — рабочий процесс, выполняющий задачи. Каждый executor — отдельный процесс с полноценным DataFusion runtime.
Цикл работы
- Регистрация. Executor подключается к scheduler и сообщает свои ресурсы (CPU, память)
- Получение задач. Executor запрашивает (pull) или получает (push) задачи от scheduler
- Выполнение. Задача — это DataFusion
ExecutionPlanдля одной партиции одного stage. Executor выполняет его параллельно для нескольких партиций - Запись shuffle. Результат записывается как Arrow IPC файл на локальный диск
- Отчёт. Executor сообщает scheduler о завершении задачи и расположении shuffle-файлов
Arrow IPC shuffle
Данные между stage передаются через Arrow IPC — бинарный формат Arrow, сохраняющий columnar layout без сериализации/десериализации:
→ Arrow IPC файлExecutor записывает Arrow IPC файл для партиции 0 целевого stage
→ Arrow IPC файлExecutor записывает Arrow IPC файл для партиции 1 целевого stage
→ Arrow IPC файлExecutor записывает Arrow IPC файл для партиции 2 целевого stage
ShuffleReaderExecShuffleReaderExec собирает Arrow IPC файлы от всех executor-ов stage 1 для партиции 0
ShuffleReaderExecShuffleReaderExec собирает файлы для партиции 1 — zero-copy маппинг буферов без построчного декодирования
Каждый executor stage 1 записывает N Arrow IPC файлов (по одному на каждую целевую партицию). Executor stage 2 читает файлы со всех executor-ов stage 1, собирая данные для своей партиции. Поскольку Arrow IPC сохраняет columnar формат — десериализация сводится к маппингу буферов, без построчного декодирования.
Query stages подробно
Разбивка на query stages — ключевое отличие распределённого от однопроцессного выполнения. В DataFusion EnforceDistribution вставляет RepartitionExec для co-location данных. Ballista заменяет каждый RepartitionExec парой ShuffleWriterExec / ShuffleReaderExec:
-- DataFusion (однопроцессный план)
HashJoinExec
├── RepartitionExec: Hash([customer_id], 16)
│ └── DataSourceExec: orders, format=parquet
└── RepartitionExec: Hash([id], 16)
└── DataSourceExec: customers, format=parquet
-- Ballista (распределённый план)
HashJoinExec
├── ShuffleReaderExec: [stage 1, partitions 0..16]
└── ShuffleReaderExec: [stage 2, partitions 0..16]
Stage 1: DataSourceExec(orders) → ShuffleWriterExec(Hash(customer_id), 16)
Stage 2: DataSourceExec(customers) → ShuffleWriterExec(Hash(id), 16)
Каждый ShuffleWriterExec вычисляет хеш-партицию для каждой строки и записывает Arrow IPC файл для каждой целевой партиции. ShuffleReaderExec собирает файлы от всех writer-ов для своей партиции.
Клиентские интерфейсы
Ballista предоставляет несколько способов отправки запросов:
Rust SessionContext
Основной интерфейс — стандартный DataFusion SessionContext. С Ballista 43.0.0 устаревший BallistaContext заменён на расширения SessionContext:
use datafusion::prelude::SessionContext;
// Подключение к кластеру
let ctx = SessionContext::remote("df://scheduler-host:50050").await?;
// API идентичен однопроцессному DataFusion
ctx.register_parquet("orders", "s3://bucket/orders/").await?;
let df = ctx.sql("SELECT count(*) FROM orders").await?;
df.show().await?;
Flight SQL и JDBC
Ballista scheduler реализует Flight SQL протокол. Любой Flight SQL-совместимый клиент (включая JDBC-драйвер Arrow Flight SQL) может подключиться:
-- JDBC connection string
jdbc:arrow-flight-sql://scheduler-host:50050
-- После подключения — стандартный SQL
SELECT c.name, SUM(o.amount)
FROM orders o
JOIN customers c ON o.customer_id = c.id
GROUP BY c.name;
Это позволяет подключить BI-инструменты (Tableau, Superset) к кластеру Ballista без специализированных драйверов.
PyBallista
Python-клиент для Ballista:
from pyballista import BallistaBuilder
# Подключение к кластеру
ctx = BallistaBuilder() \
.remote("df://scheduler-host:50050") \
.build()
# DataFrame API
df = ctx.sql("SELECT * FROM orders WHERE amount > 100")
result = df.collect()
Версионирование и совместимость
Ballista 43.0.0 синхронизировала нумерацию версий с DataFusion. Однако это не означает полную совместимость:
- Ballista 43.0.0 базируется на DataFusion ~43.x
- Текущая DataFusion — версия 53.0.0 (релиз 2026-04-02)
- Сообщество работает над сокращением разрыва, но не все API DataFusion 53.x доступны в Ballista
Для production-использования проверяйте совместимость конкретной версии Ballista с нужными вам возможностями DataFusion.
Ключевые выводы
- Ballista следует архитектуре scheduler/executor: централизованный scheduler координирует распределённое выполнение, executor-ы выполняют задачи
- Scheduler строит Execution Graph — DAG query stages, разрезая план по pipeline breakers (
RepartitionExec) - Каждый query stage — независимая единица: executor получает сериализованный (protobuf) физический план и выполняет его как обычный DataFusion
ExecutionPlan - Shuffle между stages использует Arrow IPC — zero-copy columnar формат, минимизирующий overhead сериализации
- Клиенты: Rust
SessionContext, PyBallista, Flight SQL/JDBC — единый API для подключения к кластеру - Ballista 43.0.0 выровняла нумерацию с DataFusion, но сохраняется разрыв совместимости (~43.x vs 53.x)