Learning Platform
Глоссарий Troubleshooting
Урок 09.01 · 15 мин
Продвинутый
distributed executiontarget_partitionsRepartitionExecpipeline breakerspartitioningshuffle

Основы распределённого выполнения

DataFusion спроектирован как встраиваемый query engine для одного процесса. Но данные растут — один узел упирается в RAM, CPU, пропускную способность диска. В этом модуле мы разберём три подхода к распределению DataFusion-запросов: Ballista, DataFusion Ray и кастомные решения. Начнём с фундамента: почему нужна распределённость и как партиционная модель DataFusion создаёт для неё естественные точки.

Когда одного узла недостаточно

Однопроцессный DataFusion с target_partitions = 16 параллельно обрабатывает данные в 16 потоках. Это покрывает большинство аналитических сценариев — TPC-H SF100 (100 GB) выполняется за секунды. Но три класса задач требуют нескольких узлов:

Объём данных превышает память одного узла. DataFusion поддерживает spill-to-disk для сортировки и hash join, но при объёмах в терабайты один SSD становится узким местом. Параллельное чтение с нескольких узлов даёт линейный прирост пропускной способности I/O.

Вычислительная сложность не помещается в допустимое время. Сложные аналитические запросы (multi-way join, оконные функции по миллиардам строк) могут занимать минуты на одном узле. Распределение по N узлам теоретически сокращает время в N раз, на практике — с учётом overhead сети и координации.

Данные изначально распределены. Архитектура с шардированием (InfluxDB IOx, GreptimeDB) хранит данные на разных узлах. Перемещение всего к одному движку дороже, чем выполнение фрагментов плана локально и объединение результатов.

Партиционная модель DataFusion

DataFusion разбивает выполнение на партиции — независимые потоки данных, обрабатываемые параллельно внутри одного процесса. Эту модель управляют два механизма.

target_partitions

Параметр target_partitions определяет степень параллелизма:

use datafusion::prelude::SessionContext;

let ctx = SessionContext::new();

// По умолчанию — количество CPU-ядер
ctx.state().config().target_partitions(); // e.g. 16

// Можно задать вручную
let config = SessionConfig::new()
    .with_target_partitions(32);
let ctx = SessionContext::new_with_config(config);

Каждый ExecutionPlan сообщает, сколько партиций он производит, через метод output_partitioning(). Физический оптимизатор вставляет RepartitionExec, когда требуется изменить распределение данных.

RepartitionExec

RepartitionExec перераспределяет данные между партициями. Три стратегии:

Стратегии RepartitionExec
RoundRobinЦиклическое распределение строк по партициям — для равномерной нагрузки без привязки к ключу
HashHash-партиционирование по ключу — гарантирует co-location строк с одинаковым ключом для join/group by
UnknownPartitioningФормальная обёртка — количество партиций известно, но распределение данных по ним произвольное

В однопроцессном DataFusion RepartitionExec перемещает RecordBatch между потоками через crossbeam-каналы. Это быстро — данные остаются в памяти одного процесса.

Pipeline breakers как точки распределения

Физический оптимизатор DataFusion вставляет RepartitionExec через правило EnforceDistribution (мы рассматривали его в М06 урок 5). Эти точки — pipeline breakers — прерывают потоковое выполнение: данные из одной партиции должны быть перенаправлены в несколько целевых партиций по ключу.

-- До EnforceDistribution
HashJoinExec: join_expr=customer_id
  DataSourceExec: orders.parquet, format=parquet (8 партиций)
  DataSourceExec: customers.parquet, format=parquet (4 партиции)

-- После EnforceDistribution
HashJoinExec: join_expr=customer_id
  RepartitionExec: Hash([customer_id], 16)
    DataSourceExec: orders.parquet, format=parquet (8 партиций)
  RepartitionExec: Hash([id], 16)
    DataSourceExec: customers.parquet, format=parquet (4 партиции)

Каждый RepartitionExec в распределённой системе становится точкой сетевого shuffle — вместо перемещения между потоками данные отправляются по сети между узлами.

Однопроцессное vs распределённое перераспределение
Один процесс (DataFusion)Однопроцессное перераспределение — потоки обмениваются RecordBatch через каналы в памяти
Распределённая системаРаспределённый shuffle — данные сериализуются в Arrow IPC и передаются по сети между executor-ами

От потоков к сети: что меняется

Переход от однопроцессного к распределённому выполнению затрагивает три аспекта:

1. Координация

В однопроцессном DataFusion координация тривиальна — один SessionContext управляет всеми потоками. В распределённой системе нужен планировщик (scheduler): он разбивает план на этапы (query stages), распределяет задачи по узлам и отслеживает прогресс.

2. Shuffle

RepartitionExec в DataFusion перемещает RecordBatch между потоками через каналы в памяти. В распределённой системе данные сериализуются (обычно в Arrow IPC), передаются по сети и десериализуются. Это на порядки медленнее — сеть становится главным bottleneck.

3. Отказоустойчивость

Поток внутри процесса не «теряется» — если произошла ошибка, весь процесс падает. В распределённой системе отдельный executor может упасть, и планировщик должен решить: перезапустить задачу на другом узле или вернуть ошибку.

Компоненты распределённого выполнения
КоординацияПланировщик разбивает план на query stages, назначает задачи executor-ам и отслеживает прогресс
ShuffleПерераспределение данных между узлами — главный bottleneck распределённых систем
ОтказоустойчивостьОбработка падения executor-ов: retry задач, перепланирование, checkpoint промежуточных результатов

Query stages: единица распределения

Распределённые системы на основе DataFusion (Ballista, DataFusion Ray) разбивают физический план на query stages — фрагменты плана между pipeline breakers. Каждый query stage может выполняться независимо, как только доступны его входные данные.

-- Физический план (HashJoin с двумя источниками)
HashJoinExec: customer_id = id
├── RepartitionExec: Hash([customer_id], N)  ← граница stage
│   └── DataSourceExec: orders.parquet, format=parquet
└── RepartitionExec: Hash([id], N)           ← граница stage
    └── DataSourceExec: customers.parquet, format=parquet

-- Разбивка на query stages
Stage 1: DataSourceExec(orders) → ShuffleWriter(customer_id)
Stage 2: DataSourceExec(customers) → ShuffleWriter(id)
Stage 3: ShuffleReader(orders) + ShuffleReader(customers) → HashJoinExec

Stage 1 и Stage 2 выполняются параллельно на разных executor-ах. Stage 3 начинается только после завершения обоих shuffle. Эта модель — DAG зависимостей между stage — лежит в основе и Ballista, и DataFusion Ray.

Что дальше

В следующих уроках мы разберём три подхода к распределению DataFusion:

  • Ballista (уроки 2–3) — Rust-нативная система с архитектурой scheduler/executor, gRPC API и Arrow IPC shuffle
  • DataFusion Ray (урок 4) — Python-first подход, использующий Ray для распределения query stages
  • Кастомные решения (урок 5) — как InfluxDB IOx, GreptimeDB и другие строят собственное распределение поверх DataFusion как библиотеки

Все три подхода используют одну и ту же концепцию: pipeline breakers определяют границы stage, а DataFusion ExecutionPlan — единицу выполнения внутри каждого stage.

Ключевые выводы

  • DataFusion — однопроцессный query engine. Распределение требует внешней координации, сетевого shuffle и отказоустойчивости
  • Партиционная модель DataFusion (target_partitions, RepartitionExec, output_partitioning) создаёт естественные точки для распределения
  • EnforceDistribution вставляет RepartitionExec — в распределённых системах эти точки становятся сетевым shuffle между executor-ами
  • Физический план разбивается на query stages между pipeline breakers. Каждый stage — независимая единица выполнения
  • Три подхода к распределению: Ballista (Rust-нативный, scheduler/executor), DataFusion Ray (Python + Ray), кастомные решения (DataFusion как библиотека)

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какие три класса задач требуют распределённого выполнения вместо однопроцессного DataFusion?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5