Основы распределённого выполнения
DataFusion спроектирован как встраиваемый query engine для одного процесса. Но данные растут — один узел упирается в RAM, CPU, пропускную способность диска. В этом модуле мы разберём три подхода к распределению DataFusion-запросов: Ballista, DataFusion Ray и кастомные решения. Начнём с фундамента: почему нужна распределённость и как партиционная модель DataFusion создаёт для неё естественные точки.
Когда одного узла недостаточно
Однопроцессный DataFusion с target_partitions = 16 параллельно обрабатывает данные в 16 потоках. Это покрывает большинство аналитических сценариев — TPC-H SF100 (100 GB) выполняется за секунды. Но три класса задач требуют нескольких узлов:
Объём данных превышает память одного узла. DataFusion поддерживает spill-to-disk для сортировки и hash join, но при объёмах в терабайты один SSD становится узким местом. Параллельное чтение с нескольких узлов даёт линейный прирост пропускной способности I/O.
Вычислительная сложность не помещается в допустимое время. Сложные аналитические запросы (multi-way join, оконные функции по миллиардам строк) могут занимать минуты на одном узле. Распределение по N узлам теоретически сокращает время в N раз, на практике — с учётом overhead сети и координации.
Данные изначально распределены. Архитектура с шардированием (InfluxDB IOx, GreptimeDB) хранит данные на разных узлах. Перемещение всего к одному движку дороже, чем выполнение фрагментов плана локально и объединение результатов.
Партиционная модель DataFusion
DataFusion разбивает выполнение на партиции — независимые потоки данных, обрабатываемые параллельно внутри одного процесса. Эту модель управляют два механизма.
target_partitions
Параметр target_partitions определяет степень параллелизма:
use datafusion::prelude::SessionContext;
let ctx = SessionContext::new();
// По умолчанию — количество CPU-ядер
ctx.state().config().target_partitions(); // e.g. 16
// Можно задать вручную
let config = SessionConfig::new()
.with_target_partitions(32);
let ctx = SessionContext::new_with_config(config);
Каждый ExecutionPlan сообщает, сколько партиций он производит, через метод output_partitioning(). Физический оптимизатор вставляет RepartitionExec, когда требуется изменить распределение данных.
RepartitionExec
RepartitionExec перераспределяет данные между партициями. Три стратегии:
В однопроцессном DataFusion RepartitionExec перемещает RecordBatch между потоками через crossbeam-каналы. Это быстро — данные остаются в памяти одного процесса.
Pipeline breakers как точки распределения
Физический оптимизатор DataFusion вставляет RepartitionExec через правило EnforceDistribution (мы рассматривали его в М06 урок 5). Эти точки — pipeline breakers — прерывают потоковое выполнение: данные из одной партиции должны быть перенаправлены в несколько целевых партиций по ключу.
-- До EnforceDistribution
HashJoinExec: join_expr=customer_id
DataSourceExec: orders.parquet, format=parquet (8 партиций)
DataSourceExec: customers.parquet, format=parquet (4 партиции)
-- После EnforceDistribution
HashJoinExec: join_expr=customer_id
RepartitionExec: Hash([customer_id], 16)
DataSourceExec: orders.parquet, format=parquet (8 партиций)
RepartitionExec: Hash([id], 16)
DataSourceExec: customers.parquet, format=parquet (4 партиции)
Каждый RepartitionExec в распределённой системе становится точкой сетевого shuffle — вместо перемещения между потоками данные отправляются по сети между узлами.
От потоков к сети: что меняется
Переход от однопроцессного к распределённому выполнению затрагивает три аспекта:
1. Координация
В однопроцессном DataFusion координация тривиальна — один SessionContext управляет всеми потоками. В распределённой системе нужен планировщик (scheduler): он разбивает план на этапы (query stages), распределяет задачи по узлам и отслеживает прогресс.
2. Shuffle
RepartitionExec в DataFusion перемещает RecordBatch между потоками через каналы в памяти. В распределённой системе данные сериализуются (обычно в Arrow IPC), передаются по сети и десериализуются. Это на порядки медленнее — сеть становится главным bottleneck.
3. Отказоустойчивость
Поток внутри процесса не «теряется» — если произошла ошибка, весь процесс падает. В распределённой системе отдельный executor может упасть, и планировщик должен решить: перезапустить задачу на другом узле или вернуть ошибку.
Query stages: единица распределения
Распределённые системы на основе DataFusion (Ballista, DataFusion Ray) разбивают физический план на query stages — фрагменты плана между pipeline breakers. Каждый query stage может выполняться независимо, как только доступны его входные данные.
-- Физический план (HashJoin с двумя источниками)
HashJoinExec: customer_id = id
├── RepartitionExec: Hash([customer_id], N) ← граница stage
│ └── DataSourceExec: orders.parquet, format=parquet
└── RepartitionExec: Hash([id], N) ← граница stage
└── DataSourceExec: customers.parquet, format=parquet
-- Разбивка на query stages
Stage 1: DataSourceExec(orders) → ShuffleWriter(customer_id)
Stage 2: DataSourceExec(customers) → ShuffleWriter(id)
Stage 3: ShuffleReader(orders) + ShuffleReader(customers) → HashJoinExec
Stage 1 и Stage 2 выполняются параллельно на разных executor-ах. Stage 3 начинается только после завершения обоих shuffle. Эта модель — DAG зависимостей между stage — лежит в основе и Ballista, и DataFusion Ray.
Что дальше
В следующих уроках мы разберём три подхода к распределению DataFusion:
- Ballista (уроки 2–3) — Rust-нативная система с архитектурой scheduler/executor, gRPC API и Arrow IPC shuffle
- DataFusion Ray (урок 4) — Python-first подход, использующий Ray для распределения query stages
- Кастомные решения (урок 5) — как InfluxDB IOx, GreptimeDB и другие строят собственное распределение поверх DataFusion как библиотеки
Все три подхода используют одну и ту же концепцию: pipeline breakers определяют границы stage, а DataFusion ExecutionPlan — единицу выполнения внутри каждого stage.
Ключевые выводы
- DataFusion — однопроцессный query engine. Распределение требует внешней координации, сетевого shuffle и отказоустойчивости
- Партиционная модель DataFusion (
target_partitions,RepartitionExec,output_partitioning) создаёт естественные точки для распределения EnforceDistributionвставляетRepartitionExec— в распределённых системах эти точки становятся сетевым shuffle между executor-ами- Физический план разбивается на query stages между pipeline breakers. Каждый stage — независимая единица выполнения
- Три подхода к распределению: Ballista (Rust-нативный, scheduler/executor), DataFusion Ray (Python + Ray), кастомные решения (DataFusion как библиотека)