DataFusion Ray
Ballista — Rust-нативная система для распределения DataFusion. Альтернативный подход — DataFusion Ray (apache/datafusion-ray): Python-first интеграция, использующая Ray для оркестрации распределённого выполнения. Проект находится на ранней стадии (~1700 строк кода), но демонстрирует жизнеспособность подхода «DataFusion + существующая распределённая платформа».
Контекст: от ray-sql к Apache
DataFusion Ray эволюционировал из исследовательского проекта ray-sql (datafusion-contrib/ray-sql). В 2024 году проект был передан в Apache Software Foundation как apache/datafusion-ray. Мотивация: Ray — зрелая платформа для распределённых вычислений в Python-экосистеме (ML-пайплайны, inference, обработка данных). Интеграция DataFusion с Ray позволяет использовать SQL-аналитику внутри существующих Ray-кластеров без развёртывания отдельной инфраструктуры.
Архитектура
DataFusion Ray следует той же концепции, что и Ballista: физический план разбивается на query stages, каждый stage выполняется как распределённая задача. Разница — в механизме оркестрации.
Компоненты
- Python Driver — процесс, в котором создаётся DataFusion
SessionContextчерез Python-биндинги (datafusion-python). Driver планирует запрос, разбивает физический план на query stages и запускает их как Ray remote-функции - Ray Workers — процессы в Ray-кластере, выполняющие query stages. Каждый worker содержит DataFusion runtime (через Rust/Python биндинги) и выполняет
ExecutionPlanдля своих партиций - Shared Filesystem — shuffle-данные записываются как Arrow IPC файлы на общую файловую систему. Все worker-ы должны иметь доступ к одному пути
Два режима выполнения
DataFusion Ray поддерживает два режима:
Streaming (default)
Streaming-режим — текущий рабочий режим. Он воспроизводит пайплайнное выполнение DataFusion: каждый query stage обрабатывает данные потоково, без полной материализации промежуточных результатов в памяти.
-- Streaming execution
Stage 1 (partition 0): читает → фильтрует → записывает shuffle
Stage 1 (partition 1): читает → фильтрует → записывает shuffle
↓ (по готовности каждой партиции)
Stage 2: читает shuffle → join → aggregation → результат
Streaming-режим эффективен по памяти — каждый stage обрабатывает данные в виде потока RecordBatch, не накапливая весь dataset.
Batch (не реализован)
Batch-режим — запланированная альтернатива, аналогичная модели Spark: каждый stage полностью завершается перед запуском следующего. Все партиции stage записывают shuffle, и только после завершения всех — запускается следующий stage.
Batch-режим проще для реализации отказоустойчивости (stage можно перезапустить целиком), но менее эффективен по latency и памяти. На текущий момент batch-режим не реализован.
ShuffleWriterExec и ShuffleReaderExec
DataFusion Ray заменяет RepartitionExec на пару кастомных операторов:
ShuffleWriterExec
ShuffleWriterExec — замена выходной части RepartitionExec. Он получает поток RecordBatch, вычисляет хеш-партицию для каждой строки и записывает Arrow IPC файлы:
ShuffleWriterExec(partition_count=16)
└── FilterExec(amount > 100)
└── DataSourceExec(orders.parquet, format=parquet)
-- Результат: 16 файлов на shared filesystem
/shuffle/stage-1/part-0.arrow
/shuffle/stage-1/part-1.arrow
...
/shuffle/stage-1/part-15.arrow
ShuffleReaderExec
ShuffleReaderExec — замена входной части RepartitionExec. Он читает Arrow IPC файлы, записанные предыдущим stage:
HashJoinExec
├── ShuffleReaderExec(stage-1, partition=0)
│ -- читает /shuffle/stage-1/part-0.arrow от всех writer-ов
└── ShuffleReaderExec(stage-2, partition=0)
-- читает /shuffle/stage-2/part-0.arrow от всех writer-ов
Ключевое отличие от Ballista: shuffle-данные хранятся на общей файловой системе, а не на локальных дисках executor-ов. Это упрощает архитектуру (не нужен протокол для передачи данных между executor-ами), но добавляет зависимость от общего хранилища.
execute_query_stage: распределение через Ray
Распределение происходит через Ray remote-функцию execute_query_stage:
import ray
@ray.remote
def execute_query_stage(
stage_id: int,
plan: bytes, # сериализованный ExecutionPlan
partition_id: int,
shuffle_dir: str,
) -> str:
"""Выполняет один stage для одной партиции."""
# Десериализация плана (Rust через PyO3)
ctx = SessionContext()
execution_plan = ctx.deserialize_execution_plan(plan)
# Выполнение DataFusion ExecutionPlan
results = execution_plan.execute(partition_id)
# Запись shuffle-файлов
output_path = f"{shuffle_dir}/stage-{stage_id}/part-{partition_id}.arrow"
write_arrow_ipc(results, output_path)
return output_path
Driver строит DAG зависимостей между stage и запускает execute_query_stage.remote() для каждой (stage, partition) пары. Ray управляет планированием, retry и ресурсами:
# Рекурсивный DAG futures
async def execute_plan(plan, shuffle_dir):
stages = split_into_stages(plan)
# Stage без зависимостей запускаются параллельно
leaf_futures = {}
for stage in stages.leaves():
futures = [
execute_query_stage.remote(
stage.id, stage.plan_bytes,
partition_id, shuffle_dir
)
for partition_id in range(stage.partition_count)
]
leaf_futures[stage.id] = futures
# Зависимые stage ждут завершения входных
for stage in stages.topological_order():
if stage.id in leaf_futures:
continue
# Ждём завершения зависимостей
dep_results = await asyncio.gather(*[
ray.get(leaf_futures[dep_id])
for dep_id in stage.dependencies
])
futures = [
execute_query_stage.remote(
stage.id, stage.plan_bytes,
partition_id, shuffle_dir
)
for partition_id in range(stage.partition_count)
]
leaf_futures[stage.id] = futures
# Финальный stage — результат запроса
return await ray.get(leaf_futures[stages.root().id])
Пример использования
from datafusion_ray import DatafusionRayContext
# Создание контекста (подключается к запущенному Ray-кластеру)
ctx = DatafusionRayContext(num_workers=4)
# Регистрация данных
ctx.register_parquet("orders", "/shared/data/orders/")
ctx.register_parquet("customers", "/shared/data/customers/")
# Выполнение запроса — распределяется автоматически
df = ctx.sql("""
SELECT c.name, SUM(o.amount) as total
FROM orders o
JOIN customers c ON o.customer_id = c.id
GROUP BY c.name
ORDER BY total DESC
LIMIT 10
""")
result = df.collect()
print(result)
API максимально близок к DataFusion Python — разница только в создании контекста.
Ограничения
DataFusion Ray — проект на ранней стадии. Текущие ограничения:
Зачем DataFusion Ray, если есть Ballista?
Два проекта решают одну задачу разными средствами:
- Ballista — Rust-нативная система, пишет собственный scheduler, executor, протоколы. Полный контроль, но отдельная инфраструктура
- DataFusion Ray — использует Ray как готовую платформу для оркестрации. Не нужно писать scheduler и управление ресурсами — Ray это обеспечивает
DataFusion Ray имеет смысл, когда:
- В организации уже развёрнут Ray-кластер (для ML-пайплайнов, inference)
- Команда работает в Python-экосистеме
- Нужна SQL-аналитика рядом с ML-нагрузками на тех же ресурсах
Если Ray-кластера нет и команда работает в Rust — Ballista предпочтительнее.
Ключевые выводы
- DataFusion Ray (apache/datafusion-ray) — ранняя стадия (~1700 LOC), Python-first интеграция DataFusion с Ray
- Эволюционировал из ray-sql, передан Apache в 2024
- Streaming-режим работает (default, пайплайнное выполнение). Batch-режим не реализован
ShuffleWriterExec/ShuffleReaderExecзаменяютRepartitionExec, записывая Arrow IPC на общую файловую системуexecute_query_stage— Ray remote-функция, выполняющая один stage для одной партиции- Требует shared filesystem для shuffle (NFS, HDFS, S3-FUSE). Интеграция с Ray object store — в планах
- Имеет смысл при наличии Ray-кластера и Python-стека. Для Rust-проектов — Ballista предпочтительнее