Learning Platform
Глоссарий Troubleshooting
Урок 11.02 · 15 мин
Продвинутый
RepartitionExecRoundRobinBatchHashRepartitionSortPreservingRepartitionpartition pruningtarget_partitionsDataSourceExecObjectStore

Партиционирование и параллелизм

В предыдущем уроке мы настроили target_partitions и repartition-флаги. Теперь разберём, как DataFusion фактически распараллеливает выполнение: какие стратегии repartition существуют, когда они применяются, и как оптимизировать partition layout данных.

Модель параллелизма DataFusion

DataFusion использует partition-based параллелизм: каждая партиция обрабатывается независимо в своём потоке Tokio. Физический план — это дерево операторов, где каждый оператор может иметь N выходных партиций.

Partition-based параллелизм
DataSourceExec: 1 partitionОдин файл Parquet → одна партиция на входе
RepartitionExec: RoundRobinBatch(8)Физический планировщик добавляет RepartitionExec для расширения параллелизма
P0
P1
P2
...
P7
AggregateExec: mode=Partial (8 partitions)8 параллельных потоков обрабатывают partial aggregation

Стратегии Repartition

DataFusion использует три стратегии repartitioning, каждая с конкретным назначением:

RoundRobinBatch — расширение параллелизма

Когда: Входных партиций меньше, чем target_partitions. Типично: один Parquet-файл → 1 партиция.

-- Один файл → RoundRobin расширяет до target_partitions
EXPLAIN FORMAT INDENT 
SELECT region, SUM(amount) 
FROM 'orders.parquet' 
GROUP BY region;
DataSourceExec: partitions=1
  RepartitionExec: RoundRobinBatch(8)    ← расширение с 1 до 8
    AggregateExec: mode=Partial
      ...

Как работает: Batches распределяются по партициям по кругу: batch 0 → P0, batch 1 → P1, …, batch 8 → P0. Без перемешивания строк внутри batch — просто чередование целых RecordBatch.

Hash Repartition — перегруппировка по ключу

Когда: Операция требует, чтобы все строки с одним ключом были в одной партиции (JOIN, GROUP BY, WINDOW PARTITION BY).

-- Hash repartition перед FinalPartitioned aggregation
EXPLAIN FORMAT INDENT 
SELECT region, SUM(amount) 
FROM orders 
GROUP BY region;
AggregateExec: mode=Partial, gby=[region]
  CoalesceBatchesExec: target_batch_size=8192
    RepartitionExec: Hash([region@0], 8)    ← все "EU" → P3, все "US" → P1
      AggregateExec: mode=FinalPartitioned
Hash Repartition для GROUP BY
Partial P0
Partial P1
Hash([region], 8)hash(region) % 8 определяет целевую партицию
Final P1 (US)
Final P3 (EU)
NOTE

Hash repartition — самая дорогая стратегия: данные сериализуются, передаются через канал, десериализуются в целевой партиции. Если данные уже разложены по нужному ключу, отключите: SET datafusion.optimizer.repartition_aggregations = false.

SortPreserving Repartition — параллельная сортировка

Когда: Данные уже отсортированы (или будут сортироваться), и нужно разделить на партиции с сохранением порядка.

-- При repartition_sorts = true
EXPLAIN FORMAT INDENT
SELECT * FROM orders ORDER BY created_at;
SortPreservingMergeExec: [created_at@0 ASC]
  SortExec: [created_at@0 ASC]           ← параллельная сортировка
    RepartitionExec: RoundRobinBatch(8)   ← каждая партиция сортируется отдельно
      DataSourceExec: partitions=1

Как работает: SortPreservingMergeExec на финальном этапе выполняет K-way merge отсортированных потоков. Это O(N log K) вместо O(N log N) — значительное ускорение при большом N.

Partition Pruning: пропуск ненужных партиций

Если данные организованы по партициям (Hive-стиль: data/year=2024/month=01/), DataFusion может пропускать целые партиции на этапе планирования:

-- При Hive-partitioned данных DataFusion читает только нужные директории
SELECT * FROM events WHERE year = 2024 AND month = 3;
-- DataSourceExec: partitions=1 (только data/year=2024/month=03/)
-- вместо partitions=36 (все месяцы за 3 года)

Настройка Hive-partitioned чтения

use datafusion::prelude::*;
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use std::sync::Arc;

let format = Arc::new(ParquetFormat::default());

let options = ListingOptions::new(format)
    .with_table_partition_cols(vec![
        ("year".to_string(), datafusion::arrow::datatypes::DataType::Int32),
        ("month".to_string(), datafusion::arrow::datatypes::DataType::Int32),
    ]);
TIP

Partition pruning — одна из самых эффективных оптимизаций. Для таблиц с временными данными разбивайте по year/month или date. Это сокращает объём I/O на порядки — DataFusion даже не перечисляет файлы в ненужных директориях.

DataSourceExec и concurrent I/O

DataSourceExec — оператор чтения данных. Его поведение зависит от количества и размера файлов:

DataSourceExec: стратегии чтения
Много файлов (> target_partitions)Много мелких файлов — каждый файл = партиция, параллельное чтение
Поведение
Один крупный файлОдин крупный файл — DataFusion разбивает на range-запросы, если repartition_file_scans = true
Поведение

ObjectStore и параллельный I/O

DataFusion читает файлы через ObjectStore trait, который поддерживает range-based чтение:

use datafusion::datasource::object_store::ObjectStoreUrl;
use object_store::aws::AmazonS3Builder;
use std::sync::Arc;

// Регистрация S3 object store
let s3 = AmazonS3Builder::from_env()
    .with_bucket_name("my-data-lake")
    .with_region("eu-west-1")
    .build()?;

ctx.register_object_store(
    &ObjectStoreUrl::parse("s3://my-data-lake")?,
    Arc::new(s3),
);

// DataFusion параллельно читает row groups из S3
let df = ctx.read_parquet("s3://my-data-lake/orders/", Default::default()).await?;
WARNING

При чтении из S3/GCS каждый row group — отдельный HTTP GET request. Если файлы содержат 100+ мелких row groups (по 1-5 MB), overhead от HTTP-запросов может превысить время чтения данных. Оптимальный размер row group для облачного хранилища: 64-256 MB.

Оптимизация target_partitions

Диагностика: определяем текущий параллелизм

EXPLAIN ANALYZE
SELECT region, SUM(amount) FROM orders GROUP BY region;

Ищите в выводе:

  • partitions=N в DataSourceExec — сколько файлов/групп на входе
  • input_partitions=N в RepartitionExec — откуда расширяем
  • output_rows на каждом уровне — равномерно ли распределены данные

Когда уменьшать target_partitions

-- Если видите:
-- AggregateExec: mode=Partial, output_rows=10 across 16 partitions
-- → каждая партиция обрабатывает < 1 строку — overhead > полезная работа

SET datafusion.execution.target_partitions = 4;
-- Теперь: каждая партиция обрабатывает 2-3 строки — лучше амортизация

Правило: Если output_rows / target_partitions < 1000, партиций слишком много.

Когда увеличивать target_partitions

-- Если видите:
-- DataSourceExec: partitions=1, bytes_scanned=50_000_000_000
-- AggregateExec: elapsed_compute=120s
-- → один поток обрабатывает 50 GB, другие ядра простаивают

SET datafusion.execution.target_partitions = 16;
-- DataFusion добавит RepartitionExec для параллелизации

Data skew и борьба с ним

Неравномерное распределение данных по ключу (data skew) — частая проблема при Hash repartitioning:

Data skew при Hash Repartition
Skewed90% данных попадают в одну партицию
P0 (US)
P1 (EU)
P2 (Asia)
Решение: SaltingДобавляем salt-column для равномерного распределения
Подход
-- Workaround для data skew:
-- Разбиваем "горячий" регион на подгруппы
SELECT region, SUM(sub_total) as total
FROM (
    SELECT region, SUM(amount) as sub_total
    FROM orders
    GROUP BY region, (id % 16)  -- salt: делим каждый region на 16 групп
) sub
GROUP BY region;

Итоги

  • DataFusion параллелизирует работу через партиции — каждая партиция выполняется в отдельном потоке
  • RoundRobinBatch — расширение параллелизма для одиночных источников
  • Hash Repartition — перегруппировка по ключу для JOIN/GROUP BY/WINDOW
  • SortPreserving Repartition — параллельная сортировка с K-way merge
  • Partition Pruning — пропуск ненужных Hive-партиций (одна из самых эффективных оптимизаций)
  • target_partitions оптимален при ≈ num_cores — больше не значит быстрее
  • Data skew решается через salting или предварительное разбиение данных
  • Следующий урок: оптимизация файловой раскладки — как layout Parquet-файлов влияет на производительность запросов

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. DataFusion выполняет GROUP BY region на данных из одного Parquet-файла при target_partitions = 8. Какие RepartitionExec появятся в физическом плане?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4