Learning Platform
Глоссарий Troubleshooting
Урок 09.02 · 16 мин
Продвинутый
Ballistaschedulerexecutorquery stagesArrow IPCFlight SQLgRPCexecution graph

Архитектура Ballista

Ballista — распределённый query engine, построенный поверх DataFusion. Его архитектура следует модели scheduler/executor: централизованный планировщик разбивает запросы на задачи и распределяет их по пулу исполнителей. Версия 43.0.0 (февраль 2025) выровняла нумерацию с DataFusion, хотя между внутренними зависимостями сохраняется разрыв — Ballista 43.0.0 базируется на DataFusion ~43.x, а не на текущей ветке 53.x (релиз 53.0.0 от 2026-04-02).

Обзор архитектуры

Ballista состоит из трёх компонентов: клиенты, scheduler и executor-ы.

Архитектура Ballista
Rust SessionContextНативный Rust-клиент — стандартный DataFusion SessionContext с remote-подключением
PyBallistaPython-клиент для Ballista — DataFrame и SQL API через BallistaBuilder
Flight SQL / JDBCСтандартный Flight SQL протокол — подключение BI-инструментов (Tableau, Superset) без спецдрайверов
gRPC / Flight SQL
SchedulerЦентрализованный планировщик: принимает запросы, строит Execution Graph, распределяет задачи по executor-ам
gRPC: задачи (protobuf физ. планы)
Executor 1Рабочий процесс с DataFusion runtime — выполняет задачи параллельно в N потоков
Executor 2Каждый executor — отдельный процесс с полноценным DataFusion SessionContext
Executor NКластер масштабируется добавлением executor-ов — scheduler автоматически распределяет задачи
Arrow IPC
Shuffle-файлы (Arrow IPC на локальном диске executor-а)Промежуточные данные между stage хранятся как Arrow IPC файлы на локальном диске каждого executor-а

Scheduler

Scheduler — единственная точка координации. Он принимает запросы, строит план выполнения, разбивает его на задачи и распределяет по executor-ам.

Job lifecycle

  1. Приём запроса. Клиент отправляет SQL или LogicalPlan через gRPC. Scheduler создаёт Job с уникальным job_id
  2. Создание физического плана. Scheduler использует DataFusion SessionContext для парсинга, оптимизации и создания физического плана — те же фазы, что и в однопроцессном DataFusion
  3. Построение Execution Graph. Физический план разбивается на query stages (подробнее ниже). Результат — DAG зависимостей между stages
  4. Планирование задач. Scheduler назначает задачи (partitions внутри stage) свободным executor-ам, учитывая locality и доступные ресурсы
  5. Мониторинг. Scheduler отслеживает статус каждой задачи. При сбое executor-а задачи перепланируются

Execution Graph

Execution Graph — это DAG, в котором узлы — query stages, а рёбра — зависимости по данным (shuffle). Scheduler строит его из физического плана, разрезая по RepartitionExec (pipeline breakers).

-- Исходный запрос
SELECT c.name, SUM(o.amount)
FROM orders o
JOIN customers c ON o.customer_id = c.id
GROUP BY c.name

-- Execution Graph (3 stage)
Stage 1: DataSourceExec(orders) → ShuffleWriterExec(Hash(customer_id))
Stage 2: DataSourceExec(customers) → ShuffleWriterExec(Hash(id))
Stage 3: ShuffleReaderExec + ShuffleReaderExec
         → HashJoinExec(customer_id = id)
         → AggregateExec(GROUP BY name, SUM(amount))
         → ShuffleWriterExec(результат)

Stage 1 и Stage 2 не зависят друг от друга — scheduler запускает их параллельно. Stage 3 ждёт завершения обоих.

Протокол планирования задач

Каждый query stage содержит N задач (по числу партиций). Scheduler сериализует физический план stage в protobuf и отправляет executor-у:

-- Scheduler → Executor (gRPC)
TaskDefinition {
    task_id: 1,
    job_id: "job-1234",
    stage_id: 1,
    partition_id: 0,
    plan: <protobuf PhysicalPlanNode>,
    session_config: <SessionConfig>,
}

Executor десериализует protobuf в DataFusion ExecutionPlan, выполняет его и записывает результат как Arrow IPC файл.

Executor

Executor — рабочий процесс, выполняющий задачи. Каждый executor — отдельный процесс с полноценным DataFusion runtime.

Цикл работы

  1. Регистрация. Executor подключается к scheduler и сообщает свои ресурсы (CPU, память)
  2. Получение задач. Executor запрашивает (pull) или получает (push) задачи от scheduler
  3. Выполнение. Задача — это DataFusion ExecutionPlan для одной партиции одного stage. Executor выполняет его параллельно для нескольких партиций
  4. Запись shuffle. Результат записывается как Arrow IPC файл на локальный диск
  5. Отчёт. Executor сообщает scheduler о завершении задачи и расположении shuffle-файлов

Arrow IPC shuffle

Данные между stage передаются через Arrow IPC — бинарный формат Arrow, сохраняющий columnar layout без сериализации/десериализации:

Arrow IPC shuffle между stage
Stage 1, partition 0
→ Arrow IPC файлExecutor записывает Arrow IPC файл для партиции 0 целевого stage
Stage 1, partition 1
→ Arrow IPC файлExecutor записывает Arrow IPC файл для партиции 1 целевого stage
Stage 1, partition 2
→ Arrow IPC файлExecutor записывает Arrow IPC файл для партиции 2 целевого stage
сеть (executor → executor)
Stage 2, partition 0
ShuffleReaderExecShuffleReaderExec собирает Arrow IPC файлы от всех executor-ов stage 1 для партиции 0
Stage 2, partition 1
ShuffleReaderExecShuffleReaderExec собирает файлы для партиции 1 — zero-copy маппинг буферов без построчного декодирования

Каждый executor stage 1 записывает N Arrow IPC файлов (по одному на каждую целевую партицию). Executor stage 2 читает файлы со всех executor-ов stage 1, собирая данные для своей партиции. Поскольку Arrow IPC сохраняет columnar формат — десериализация сводится к маппингу буферов, без построчного декодирования.

Query stages подробно

Разбивка на query stages — ключевое отличие распределённого от однопроцессного выполнения. В DataFusion EnforceDistribution вставляет RepartitionExec для co-location данных. Ballista заменяет каждый RepartitionExec парой ShuffleWriterExec / ShuffleReaderExec:

-- DataFusion (однопроцессный план)
HashJoinExec
├── RepartitionExec: Hash([customer_id], 16)
│   └── DataSourceExec: orders, format=parquet
└── RepartitionExec: Hash([id], 16)
    └── DataSourceExec: customers, format=parquet

-- Ballista (распределённый план)
HashJoinExec
├── ShuffleReaderExec: [stage 1, partitions 0..16]
└── ShuffleReaderExec: [stage 2, partitions 0..16]

Stage 1: DataSourceExec(orders) → ShuffleWriterExec(Hash(customer_id), 16)
Stage 2: DataSourceExec(customers) → ShuffleWriterExec(Hash(id), 16)

Каждый ShuffleWriterExec вычисляет хеш-партицию для каждой строки и записывает Arrow IPC файл для каждой целевой партиции. ShuffleReaderExec собирает файлы от всех writer-ов для своей партиции.

Клиентские интерфейсы

Ballista предоставляет несколько способов отправки запросов:

Rust SessionContext

Основной интерфейс — стандартный DataFusion SessionContext. С Ballista 43.0.0 устаревший BallistaContext заменён на расширения SessionContext:

use datafusion::prelude::SessionContext;

// Подключение к кластеру
let ctx = SessionContext::remote("df://scheduler-host:50050").await?;

// API идентичен однопроцессному DataFusion
ctx.register_parquet("orders", "s3://bucket/orders/").await?;
let df = ctx.sql("SELECT count(*) FROM orders").await?;
df.show().await?;

Flight SQL и JDBC

Ballista scheduler реализует Flight SQL протокол. Любой Flight SQL-совместимый клиент (включая JDBC-драйвер Arrow Flight SQL) может подключиться:

-- JDBC connection string
jdbc:arrow-flight-sql://scheduler-host:50050

-- После подключения — стандартный SQL
SELECT c.name, SUM(o.amount)
FROM orders o
JOIN customers c ON o.customer_id = c.id
GROUP BY c.name;

Это позволяет подключить BI-инструменты (Tableau, Superset) к кластеру Ballista без специализированных драйверов.

PyBallista

Python-клиент для Ballista:

from pyballista import BallistaBuilder

# Подключение к кластеру
ctx = BallistaBuilder() \
    .remote("df://scheduler-host:50050") \
    .build()

# DataFrame API
df = ctx.sql("SELECT * FROM orders WHERE amount > 100")
result = df.collect()

Версионирование и совместимость

Ballista 43.0.0 синхронизировала нумерацию версий с DataFusion. Однако это не означает полную совместимость:

  • Ballista 43.0.0 базируется на DataFusion ~43.x
  • Текущая DataFusion — версия 53.0.0 (релиз 2026-04-02)
  • Сообщество работает над сокращением разрыва, но не все API DataFusion 53.x доступны в Ballista

Для production-использования проверяйте совместимость конкретной версии Ballista с нужными вам возможностями DataFusion.

Ключевые выводы

  • Ballista следует архитектуре scheduler/executor: централизованный scheduler координирует распределённое выполнение, executor-ы выполняют задачи
  • Scheduler строит Execution Graph — DAG query stages, разрезая план по pipeline breakers (RepartitionExec)
  • Каждый query stage — независимая единица: executor получает сериализованный (protobuf) физический план и выполняет его как обычный DataFusion ExecutionPlan
  • Shuffle между stages использует Arrow IPC — zero-copy columnar формат, минимизирующий overhead сериализации
  • Клиенты: Rust SessionContext, PyBallista, Flight SQL/JDBC — единый API для подключения к кластеру
  • Ballista 43.0.0 выровняла нумерацию с DataFusion, но сохраняется разрыв совместимости (~43.x vs 53.x)

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Ballista строит Execution Graph из физического плана. Как определяются границы query stages?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5