Learning Platform
Глоссарий Troubleshooting
Урок 09.04 · 15 мин
Продвинутый
DataFusion RayRayShuffleWriterExecShuffleReaderExecexecute_query_stagestreaming modePython

DataFusion Ray

Ballista — Rust-нативная система для распределения DataFusion. Альтернативный подход — DataFusion Ray (apache/datafusion-ray): Python-first интеграция, использующая Ray для оркестрации распределённого выполнения. Проект находится на ранней стадии (~1700 строк кода), но демонстрирует жизнеспособность подхода «DataFusion + существующая распределённая платформа».

Контекст: от ray-sql к Apache

DataFusion Ray эволюционировал из исследовательского проекта ray-sql (datafusion-contrib/ray-sql). В 2024 году проект был передан в Apache Software Foundation как apache/datafusion-ray. Мотивация: Ray — зрелая платформа для распределённых вычислений в Python-экосистеме (ML-пайплайны, inference, обработка данных). Интеграция DataFusion с Ray позволяет использовать SQL-аналитику внутри существующих Ray-кластеров без развёртывания отдельной инфраструктуры.

Архитектура

DataFusion Ray следует той же концепции, что и Ballista: физический план разбивается на query stages, каждый stage выполняется как распределённая задача. Разница — в механизме оркестрации.

DataFusion Ray: архитектура
Python DriverПроцесс-драйвер: создаёт DataFusion контекст, строит план, разбивает на stage и запускает Ray remote-функции
execute_query_stage.remote()
Ray Worker 1Ray-процесс с DataFusion runtime — выполняет stage для назначенных партиций
Ray Worker 2Каждый worker десериализует ExecutionPlan, выполняет и записывает Arrow IPC файлы
Ray Worker NRay управляет планированием, retry и ресурсами — DataFusion Ray не пишет собственный scheduler
Arrow IPC на общей файловой системе
Shared filesystem (NFS / объектное хранилище)Общая файловая система (NFS, HDFS, S3-FUSE) — все worker-ы читают и пишут shuffle-данные сюда

Компоненты

  • Python Driver — процесс, в котором создаётся DataFusion SessionContext через Python-биндинги (datafusion-python). Driver планирует запрос, разбивает физический план на query stages и запускает их как Ray remote-функции
  • Ray Workers — процессы в Ray-кластере, выполняющие query stages. Каждый worker содержит DataFusion runtime (через Rust/Python биндинги) и выполняет ExecutionPlan для своих партиций
  • Shared Filesystem — shuffle-данные записываются как Arrow IPC файлы на общую файловую систему. Все worker-ы должны иметь доступ к одному пути

Два режима выполнения

DataFusion Ray поддерживает два режима:

Streaming (default)

Streaming-режим — текущий рабочий режим. Он воспроизводит пайплайнное выполнение DataFusion: каждый query stage обрабатывает данные потоково, без полной материализации промежуточных результатов в памяти.

-- Streaming execution
Stage 1 (partition 0): читает → фильтрует → записывает shuffle
Stage 1 (partition 1): читает → фильтрует → записывает shuffle
  ↓ (по готовности каждой партиции)
Stage 2: читает shuffle → join → aggregation → результат

Streaming-режим эффективен по памяти — каждый stage обрабатывает данные в виде потока RecordBatch, не накапливая весь dataset.

Batch (не реализован)

Batch-режим — запланированная альтернатива, аналогичная модели Spark: каждый stage полностью завершается перед запуском следующего. Все партиции stage записывают shuffle, и только после завершения всех — запускается следующий stage.

Batch-режим проще для реализации отказоустойчивости (stage можно перезапустить целиком), но менее эффективен по latency и памяти. На текущий момент batch-режим не реализован.

ShuffleWriterExec и ShuffleReaderExec

DataFusion Ray заменяет RepartitionExec на пару кастомных операторов:

ShuffleWriterExec

ShuffleWriterExec — замена выходной части RepartitionExec. Он получает поток RecordBatch, вычисляет хеш-партицию для каждой строки и записывает Arrow IPC файлы:

ShuffleWriterExec(partition_count=16)
└── FilterExec(amount > 100)
    └── DataSourceExec(orders.parquet, format=parquet)

-- Результат: 16 файлов на shared filesystem
/shuffle/stage-1/part-0.arrow
/shuffle/stage-1/part-1.arrow
...
/shuffle/stage-1/part-15.arrow

ShuffleReaderExec

ShuffleReaderExec — замена входной части RepartitionExec. Он читает Arrow IPC файлы, записанные предыдущим stage:

HashJoinExec
├── ShuffleReaderExec(stage-1, partition=0)
│   -- читает /shuffle/stage-1/part-0.arrow от всех writer-ов
└── ShuffleReaderExec(stage-2, partition=0)
    -- читает /shuffle/stage-2/part-0.arrow от всех writer-ов

Ключевое отличие от Ballista: shuffle-данные хранятся на общей файловой системе, а не на локальных дисках executor-ов. Это упрощает архитектуру (не нужен протокол для передачи данных между executor-ами), но добавляет зависимость от общего хранилища.

execute_query_stage: распределение через Ray

Распределение происходит через Ray remote-функцию execute_query_stage:

import ray

@ray.remote
def execute_query_stage(
    stage_id: int,
    plan: bytes,       # сериализованный ExecutionPlan
    partition_id: int,
    shuffle_dir: str,
) -> str:
    """Выполняет один stage для одной партиции."""
    # Десериализация плана (Rust через PyO3)
    ctx = SessionContext()
    execution_plan = ctx.deserialize_execution_plan(plan)

    # Выполнение DataFusion ExecutionPlan
    results = execution_plan.execute(partition_id)

    # Запись shuffle-файлов
    output_path = f"{shuffle_dir}/stage-{stage_id}/part-{partition_id}.arrow"
    write_arrow_ipc(results, output_path)
    return output_path

Driver строит DAG зависимостей между stage и запускает execute_query_stage.remote() для каждой (stage, partition) пары. Ray управляет планированием, retry и ресурсами:

# Рекурсивный DAG futures
async def execute_plan(plan, shuffle_dir):
    stages = split_into_stages(plan)

    # Stage без зависимостей запускаются параллельно
    leaf_futures = {}
    for stage in stages.leaves():
        futures = [
            execute_query_stage.remote(
                stage.id, stage.plan_bytes,
                partition_id, shuffle_dir
            )
            for partition_id in range(stage.partition_count)
        ]
        leaf_futures[stage.id] = futures

    # Зависимые stage ждут завершения входных
    for stage in stages.topological_order():
        if stage.id in leaf_futures:
            continue
        # Ждём завершения зависимостей
        dep_results = await asyncio.gather(*[
            ray.get(leaf_futures[dep_id])
            for dep_id in stage.dependencies
        ])
        futures = [
            execute_query_stage.remote(
                stage.id, stage.plan_bytes,
                partition_id, shuffle_dir
            )
            for partition_id in range(stage.partition_count)
        ]
        leaf_futures[stage.id] = futures

    # Финальный stage — результат запроса
    return await ray.get(leaf_futures[stages.root().id])

Пример использования

from datafusion_ray import DatafusionRayContext

# Создание контекста (подключается к запущенному Ray-кластеру)
ctx = DatafusionRayContext(num_workers=4)

# Регистрация данных
ctx.register_parquet("orders", "/shared/data/orders/")
ctx.register_parquet("customers", "/shared/data/customers/")

# Выполнение запроса — распределяется автоматически
df = ctx.sql("""
    SELECT c.name, SUM(o.amount) as total
    FROM orders o
    JOIN customers c ON o.customer_id = c.id
    GROUP BY c.name
    ORDER BY total DESC
    LIMIT 10
""")

result = df.collect()
print(result)

API максимально близок к DataFusion Python — разница только в создании контекста.

Ограничения

DataFusion Ray — проект на ранней стадии. Текущие ограничения:

Ограничения DataFusion Ray (март 2026)
Shared filesystemShuffle требует общую ФС — это зависимость от инфраструктуры, которой может не быть
Batch modeBatch-режим не реализован — ограничивает стратегии отказоустойчивости (нельзя перезапустить stage целиком)
~1700 LOCМинимальная кодовая база — не все DataFusion операторы покрыты, сложные запросы могут падать
Нет production-историйНет публичных production-deployment — используйте для экспериментов и прототипов, не для production

Зачем DataFusion Ray, если есть Ballista?

Два проекта решают одну задачу разными средствами:

  • Ballista — Rust-нативная система, пишет собственный scheduler, executor, протоколы. Полный контроль, но отдельная инфраструктура
  • DataFusion Ray — использует Ray как готовую платформу для оркестрации. Не нужно писать scheduler и управление ресурсами — Ray это обеспечивает

DataFusion Ray имеет смысл, когда:

  • В организации уже развёрнут Ray-кластер (для ML-пайплайнов, inference)
  • Команда работает в Python-экосистеме
  • Нужна SQL-аналитика рядом с ML-нагрузками на тех же ресурсах

Если Ray-кластера нет и команда работает в Rust — Ballista предпочтительнее.

Ключевые выводы

  • DataFusion Ray (apache/datafusion-ray) — ранняя стадия (~1700 LOC), Python-first интеграция DataFusion с Ray
  • Эволюционировал из ray-sql, передан Apache в 2024
  • Streaming-режим работает (default, пайплайнное выполнение). Batch-режим не реализован
  • ShuffleWriterExec / ShuffleReaderExec заменяют RepartitionExec, записывая Arrow IPC на общую файловую систему
  • execute_query_stage — Ray remote-функция, выполняющая один stage для одной партиции
  • Требует shared filesystem для shuffle (NFS, HDFS, S3-FUSE). Интеграция с Ray object store — в планах
  • Имеет смысл при наличии Ray-кластера и Python-стека. Для Rust-проектов — Ballista предпочтительнее

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. DataFusion Ray использует ShuffleWriterExec и ShuffleReaderExec для shuffle. Чем их механизм отличается от Ballista?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5