Партиционирование и параллелизм
В предыдущем уроке мы настроили target_partitions и repartition-флаги. Теперь разберём, как DataFusion фактически распараллеливает выполнение: какие стратегии repartition существуют, когда они применяются, и как оптимизировать partition layout данных.
Модель параллелизма DataFusion
DataFusion использует partition-based параллелизм: каждая партиция обрабатывается независимо в своём потоке Tokio. Физический план — это дерево операторов, где каждый оператор может иметь N выходных партиций.
Стратегии Repartition
DataFusion использует три стратегии repartitioning, каждая с конкретным назначением:
RoundRobinBatch — расширение параллелизма
Когда: Входных партиций меньше, чем target_partitions. Типично: один Parquet-файл → 1 партиция.
-- Один файл → RoundRobin расширяет до target_partitions
EXPLAIN FORMAT INDENT
SELECT region, SUM(amount)
FROM 'orders.parquet'
GROUP BY region;
DataSourceExec: partitions=1
RepartitionExec: RoundRobinBatch(8) ← расширение с 1 до 8
AggregateExec: mode=Partial
...
Как работает: Batches распределяются по партициям по кругу: batch 0 → P0, batch 1 → P1, …, batch 8 → P0. Без перемешивания строк внутри batch — просто чередование целых RecordBatch.
Hash Repartition — перегруппировка по ключу
Когда: Операция требует, чтобы все строки с одним ключом были в одной партиции (JOIN, GROUP BY, WINDOW PARTITION BY).
-- Hash repartition перед FinalPartitioned aggregation
EXPLAIN FORMAT INDENT
SELECT region, SUM(amount)
FROM orders
GROUP BY region;
AggregateExec: mode=Partial, gby=[region]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: Hash([region@0], 8) ← все "EU" → P3, все "US" → P1
AggregateExec: mode=FinalPartitioned
Hash repartition — самая дорогая стратегия: данные сериализуются, передаются через канал, десериализуются в целевой партиции. Если данные уже разложены по нужному ключу, отключите: SET datafusion.optimizer.repartition_aggregations = false.
SortPreserving Repartition — параллельная сортировка
Когда: Данные уже отсортированы (или будут сортироваться), и нужно разделить на партиции с сохранением порядка.
-- При repartition_sorts = true
EXPLAIN FORMAT INDENT
SELECT * FROM orders ORDER BY created_at;
SortPreservingMergeExec: [created_at@0 ASC]
SortExec: [created_at@0 ASC] ← параллельная сортировка
RepartitionExec: RoundRobinBatch(8) ← каждая партиция сортируется отдельно
DataSourceExec: partitions=1
Как работает: SortPreservingMergeExec на финальном этапе выполняет K-way merge отсортированных потоков. Это O(N log K) вместо O(N log N) — значительное ускорение при большом N.
Partition Pruning: пропуск ненужных партиций
Если данные организованы по партициям (Hive-стиль: data/year=2024/month=01/), DataFusion может пропускать целые партиции на этапе планирования:
-- При Hive-partitioned данных DataFusion читает только нужные директории
SELECT * FROM events WHERE year = 2024 AND month = 3;
-- DataSourceExec: partitions=1 (только data/year=2024/month=03/)
-- вместо partitions=36 (все месяцы за 3 года)
Настройка Hive-partitioned чтения
use datafusion::prelude::*;
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use std::sync::Arc;
let format = Arc::new(ParquetFormat::default());
let options = ListingOptions::new(format)
.with_table_partition_cols(vec![
("year".to_string(), datafusion::arrow::datatypes::DataType::Int32),
("month".to_string(), datafusion::arrow::datatypes::DataType::Int32),
]);
Partition pruning — одна из самых эффективных оптимизаций. Для таблиц с временными данными разбивайте по year/month или date. Это сокращает объём I/O на порядки — DataFusion даже не перечисляет файлы в ненужных директориях.
DataSourceExec и concurrent I/O
DataSourceExec — оператор чтения данных. Его поведение зависит от количества и размера файлов:
ObjectStore и параллельный I/O
DataFusion читает файлы через ObjectStore trait, который поддерживает range-based чтение:
use datafusion::datasource::object_store::ObjectStoreUrl;
use object_store::aws::AmazonS3Builder;
use std::sync::Arc;
// Регистрация S3 object store
let s3 = AmazonS3Builder::from_env()
.with_bucket_name("my-data-lake")
.with_region("eu-west-1")
.build()?;
ctx.register_object_store(
&ObjectStoreUrl::parse("s3://my-data-lake")?,
Arc::new(s3),
);
// DataFusion параллельно читает row groups из S3
let df = ctx.read_parquet("s3://my-data-lake/orders/", Default::default()).await?;
При чтении из S3/GCS каждый row group — отдельный HTTP GET request. Если файлы содержат 100+ мелких row groups (по 1-5 MB), overhead от HTTP-запросов может превысить время чтения данных. Оптимальный размер row group для облачного хранилища: 64-256 MB.
Оптимизация target_partitions
Диагностика: определяем текущий параллелизм
EXPLAIN ANALYZE
SELECT region, SUM(amount) FROM orders GROUP BY region;
Ищите в выводе:
partitions=Nв DataSourceExec — сколько файлов/групп на входеinput_partitions=Nв RepartitionExec — откуда расширяемoutput_rowsна каждом уровне — равномерно ли распределены данные
Когда уменьшать target_partitions
-- Если видите:
-- AggregateExec: mode=Partial, output_rows=10 across 16 partitions
-- → каждая партиция обрабатывает < 1 строку — overhead > полезная работа
SET datafusion.execution.target_partitions = 4;
-- Теперь: каждая партиция обрабатывает 2-3 строки — лучше амортизация
Правило: Если output_rows / target_partitions < 1000, партиций слишком много.
Когда увеличивать target_partitions
-- Если видите:
-- DataSourceExec: partitions=1, bytes_scanned=50_000_000_000
-- AggregateExec: elapsed_compute=120s
-- → один поток обрабатывает 50 GB, другие ядра простаивают
SET datafusion.execution.target_partitions = 16;
-- DataFusion добавит RepartitionExec для параллелизации
Data skew и борьба с ним
Неравномерное распределение данных по ключу (data skew) — частая проблема при Hash repartitioning:
-- Workaround для data skew:
-- Разбиваем "горячий" регион на подгруппы
SELECT region, SUM(sub_total) as total
FROM (
SELECT region, SUM(amount) as sub_total
FROM orders
GROUP BY region, (id % 16) -- salt: делим каждый region на 16 групп
) sub
GROUP BY region;
Итоги
- DataFusion параллелизирует работу через партиции — каждая партиция выполняется в отдельном потоке
- RoundRobinBatch — расширение параллелизма для одиночных источников
- Hash Repartition — перегруппировка по ключу для JOIN/GROUP BY/WINDOW
- SortPreserving Repartition — параллельная сортировка с K-way merge
- Partition Pruning — пропуск ненужных Hive-партиций (одна из самых эффективных оптимизаций)
- target_partitions оптимален при
≈ num_cores— больше не значит быстрее - Data skew решается через salting или предварительное разбиение данных
- Следующий урок: оптимизация файловой раскладки — как layout Parquet-файлов влияет на производительность запросов