Оптимизация файловой раскладки
DataFusion умеет читать данные быстро — но скорость напрямую зависит от того, как организованы файлы на диске. В этом уроке разберём, как layout Parquet-файлов влияет на predicate pushdown, I/O-паттерны и параллелизм DataFusion.
Анатомия Parquet-файла для DataFusion
DataFusion читает Parquet через DataSourceExec (с ParquetSource). Оптимизации зависят от структуры файла:
Row Group Sizing: баланс между параллелизмом и I/O
Размер row group — критический параметр:
Рекомендации по размеру:
| Среда | Размер Row Group | Почему |
|---|---|---|
| Локальный SSD | 64-128 MB | Баланс: хороший параллелизм, быстрый I/O |
| Облачное хранилище (S3/GCS) | 128-256 MB | Один HTTP GET на RG — уменьшаем количество запросов |
| Memory-constrained (< 2 GB) | 32-64 MB | Один RG должен помещаться в доступную память |
| Аналитический сервер | 128 MB | Стандартный Parquet default, хорошо работает |
Создание файлов с заданным размером RG
use datafusion::prelude::*;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::common::config::TableParquetOptions;
let ctx = SessionContext::new();
let df = ctx.read_parquet("raw_data/", Default::default()).await?;
// Записываем с контролем row group size
let mut parquet_options = TableParquetOptions::default();
parquet_options.global.max_row_group_size = 500_000; // 500K строк = ~128 MB
parquet_options.global.write_batch_size = 1024;
df.write_parquet(
"optimized_data/",
DataFrameWriteOptions::default(),
Some(parquet_options),
).await?;
Сортировка данных для Predicate Pushdown
Predicate pushdown в DataFusion работает через row group statistics: если WHERE date = '2024-03-15', а min(date)=‘2024-07’, max(date)=‘2024-09’ в row group — весь RG пропускается.
Проблема: Если данные не отсортированы, min/max каждого row group перекрываются, и pushdown не работает:
Создание отсортированных Parquet-файлов
let ctx = SessionContext::new();
let df = ctx.read_parquet("raw_data/", Default::default()).await?;
// Сортируем перед записью
let sorted = df.sort(vec![
col("date").sort(true, true), // primary sort: date ASC
col("region").sort(true, true), // secondary sort: region ASC
])?;
sorted.write_parquet("sorted_data/", Default::default(), None).await?;
-- Или через SQL
COPY (
SELECT * FROM raw_data ORDER BY date, region
) TO 'sorted_data/' STORED AS PARQUET;
Выбор колонки для сортировки: Сортируйте по колонке, которая чаще всего появляется в WHERE. Для time-series данных — по timestamp. Для мультитенантных данных — по tenant_id, затем по timestamp. Сортировка по нескольким колонкам помогает при composite-предикатах.
Bloom Filters: точные lookup-предикаты
Для equality-предикатов (WHERE user_id = 'abc123') min/max статистика бесполезна — user_id по всему диапазону. Bloom filters решают эту проблему:
use datafusion::common::config::TableParquetOptions;
let mut parquet_options = TableParquetOptions::default();
// Включаем bloom filter для колонки user_id
parquet_options.global.bloom_filter_on_write = true;
parquet_options.global.bloom_filter_fpp = 0.01; // false positive rate 1%
parquet_options.global.bloom_filter_ndv = Some(1_000_000); // ~1M уникальных значений
df.write_parquet(
"bloom_data/",
DataFrameWriteOptions::default(),
Some(parquet_options),
).await?;
Как DataFusion использует bloom filters:
- Читает footer файла → находит bloom filter для нужной колонки
- Проверяет
user_id = 'abc123'против bloom filter каждого row group - Если bloom filter говорит «нет» — row group пропускается (guaranteed correct)
- Если bloom filter говорит «возможно да» — row group читается и фильтруется
Bloom filters увеличивают размер файла (примерно 1-2% при fpp=0.01). Включайте их только для колонок с equality-предикатами на high-cardinality данных (user_id, session_id, order_id). Для колонок с range-предикатами (date, amount) используйте сортировку + min/max statistics.
Проверка качества статистик
DataFusion использует статистики row group для pushdown. Если статистики отсутствуют — pushdown не работает:
-- Проверяем, работает ли pushdown
EXPLAIN ANALYZE
SELECT * FROM events WHERE date = '2024-03-15';
Ищите в метриках DataSourceExec:
row_groups_matched_statistics— сколько RG прошли проверку статистикrow_groups_pruned_statistics— сколько RG пропущено через min/maxrow_groups_pruned_bloom_filter— сколько RG пропущено через bloom filterbytes_scanned— реальный объём прочитанных данных
DataSourceExec:
row_groups_matched_statistics=2
row_groups_pruned_statistics=18 ← 18 из 20 RG пропущено!
bytes_scanned=128_000_000 ← вместо 1.2 GB
Если row_groups_pruned_statistics = 0 при наличии WHERE — скорее всего данные не отсортированы по колонке из предиката. Пересоздайте файлы с сортировкой.
File Compaction: оптимизация мелких файлов
Мелкие файлы (< 10 MB) — бич аналитических систем. Каждый файл = overhead на открытие, чтение footer, создание RowGroup reader:
// Compaction: объединяем мелкие файлы в крупные
let ctx = SessionContext::new();
// Читаем директорию с мелкими файлами
let df = ctx.read_parquet(
"fragmented_data/",
Default::default()
).await?;
// Пересортировываем и записываем крупными файлами
let mut parquet_options = TableParquetOptions::default();
parquet_options.global.max_row_group_size = 500_000;
df.sort(vec![col("date").sort(true, true)])?
.write_parquet(
"compacted_data/",
DataFrameWriteOptions::default()
.with_single_file_output(false), // несколько файлов для параллелизма
Some(parquet_options),
).await?;
Оптимальное количество файлов
Правило: num_files ≈ target_partitions * 2-4
Пример: 8 ядер → target_partitions = 8 → 16-32 файлов
Каждый файл: 128-512 MB → суммарно 2-16 GB на каталог
Для streaming-инъекции (данные приходят непрерывно) используйте двухуровневую стратегию: мелкие файлы пишутся в «staging» директорию, а фоновый процесс периодически компактирует их в «production» директорию с оптимальным размером и сортировкой.
Projection Pushdown: читаем только нужные колонки
DataFusion автоматически применяет projection pushdown — читает только колонки, упомянутые в SELECT, WHERE, JOIN, GROUP BY:
-- DataFusion читает только 3 колонки из 50
EXPLAIN ANALYZE
SELECT region, SUM(amount)
FROM wide_table -- 50 колонок
WHERE status = 'paid'
GROUP BY region;
-- DataSourceExec: projection=[region, amount, status]
-- bytes_scanned гораздо меньше, чем при SELECT *
Антипаттерн: SELECT * в подзапросах. Если подзапрос использует SELECT *, DataFusion читает все колонки на этом уровне, даже если внешний запрос использует только 2. Всегда указывайте конкретные колонки:
-- Плохо: читаем все 50 колонок
SELECT region, total FROM (SELECT * FROM wide_table WHERE status = 'paid') sub;
-- Хорошо: читаем только 3 колонки
SELECT region, total FROM (SELECT region, amount AS total FROM wide_table WHERE status = 'paid') sub;Чек-лист оптимизации файлов
Итоги
- Row group size — баланс между параллелизмом и I/O overhead. Для облака: 128-256 MB
- Сортировка по колонке из WHERE — включает predicate pushdown через min/max статистики
- Bloom filters — для equality на high-cardinality колонках (user_id, session_id)
- File compaction — объединяйте мелкие файлы, цель:
target_partitions × 2-4 - Projection pushdown — автоматический, но
SELECT *в подзапросах отключает его - Проверяйте через EXPLAIN ANALYZE:
row_groups_pruned_statistics,bytes_scanned - Следующий урок: анализ планов запросов — EXPLAIN ANALYZE deep patterns и идентификация bottlenecks