Learning Platform
Глоссарий Troubleshooting
Урок 11.03 · 14 мин
Продвинутый
Parquetrow groupspredicate pushdownbloom filtersstatisticsfile compactionDataSourceExecsorting

Оптимизация файловой раскладки

DataFusion умеет читать данные быстро — но скорость напрямую зависит от того, как организованы файлы на диске. В этом уроке разберём, как layout Parquet-файлов влияет на predicate pushdown, I/O-паттерны и параллелизм DataFusion.

Анатомия Parquet-файла для DataFusion

DataFusion читает Parquet через DataSourceExec (с ParquetSource). Оптимизации зависят от структуры файла:

Структура Parquet и точки оптимизации
Parquet FileФайл разбит на row groups — каждый row group может быть пропущен или прочитан независимо
Row Group 0
Статистики
Row Group 1
Bloom Filter
Footer (metadata)Footer содержит schema, статистики всех row groups — читается первым

Row Group Sizing: баланс между параллелизмом и I/O

Размер row group — критический параметр:

Влияние размера Row Group
Мелкие RG (1-10 MB)Мелкие row groups: больше параллелизм, больше overhead, хуже сжатие
Плюсы
Минусы
Крупные RG (128-256 MB)Крупные row groups: лучше сжатие, меньше overhead, но грубее pruning
Плюсы
Минусы

Рекомендации по размеру:

СредаРазмер Row GroupПочему
Локальный SSD64-128 MBБаланс: хороший параллелизм, быстрый I/O
Облачное хранилище (S3/GCS)128-256 MBОдин HTTP GET на RG — уменьшаем количество запросов
Memory-constrained (< 2 GB)32-64 MBОдин RG должен помещаться в доступную память
Аналитический сервер128 MBСтандартный Parquet default, хорошо работает

Создание файлов с заданным размером RG

use datafusion::prelude::*;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::common::config::TableParquetOptions;

let ctx = SessionContext::new();
let df = ctx.read_parquet("raw_data/", Default::default()).await?;

// Записываем с контролем row group size
let mut parquet_options = TableParquetOptions::default();
parquet_options.global.max_row_group_size = 500_000;  // 500K строк = ~128 MB
parquet_options.global.write_batch_size = 1024;

df.write_parquet(
    "optimized_data/",
    DataFrameWriteOptions::default(),
    Some(parquet_options),
).await?;

Сортировка данных для Predicate Pushdown

Predicate pushdown в DataFusion работает через row group statistics: если WHERE date = '2024-03-15', а min(date)=‘2024-07’, max(date)=‘2024-09’ в row group — весь RG пропускается.

Проблема: Если данные не отсортированы, min/max каждого row group перекрываются, и pushdown не работает:

Сортировка и эффективность Predicate Pushdown
Без сортировкиБез сортировки: row groups содержат смешанные данные, min/max перекрываются
RG 0
RG 1
Результат
Отсортировано по dateС сортировкой по date: каждый RG покрывает непересекающийся диапазон
RG 0
RG 1
Результат

Создание отсортированных Parquet-файлов

let ctx = SessionContext::new();
let df = ctx.read_parquet("raw_data/", Default::default()).await?;

// Сортируем перед записью
let sorted = df.sort(vec![
    col("date").sort(true, true),  // primary sort: date ASC
    col("region").sort(true, true), // secondary sort: region ASC
])?;

sorted.write_parquet("sorted_data/", Default::default(), None).await?;
-- Или через SQL
COPY (
    SELECT * FROM raw_data ORDER BY date, region
) TO 'sorted_data/' STORED AS PARQUET;
TIP

Выбор колонки для сортировки: Сортируйте по колонке, которая чаще всего появляется в WHERE. Для time-series данных — по timestamp. Для мультитенантных данных — по tenant_id, затем по timestamp. Сортировка по нескольким колонкам помогает при composite-предикатах.

Bloom Filters: точные lookup-предикаты

Для equality-предикатов (WHERE user_id = 'abc123') min/max статистика бесполезна — user_id по всему диапазону. Bloom filters решают эту проблему:

use datafusion::common::config::TableParquetOptions;

let mut parquet_options = TableParquetOptions::default();
// Включаем bloom filter для колонки user_id
parquet_options.global.bloom_filter_on_write = true;
parquet_options.global.bloom_filter_fpp = 0.01;     // false positive rate 1%
parquet_options.global.bloom_filter_ndv = Some(1_000_000); // ~1M уникальных значений

df.write_parquet(
    "bloom_data/",
    DataFrameWriteOptions::default(),
    Some(parquet_options),
).await?;

Как DataFusion использует bloom filters:

  1. Читает footer файла → находит bloom filter для нужной колонки
  2. Проверяет user_id = 'abc123' против bloom filter каждого row group
  3. Если bloom filter говорит «нет» — row group пропускается (guaranteed correct)
  4. Если bloom filter говорит «возможно да» — row group читается и фильтруется
WARNING

Bloom filters увеличивают размер файла (примерно 1-2% при fpp=0.01). Включайте их только для колонок с equality-предикатами на high-cardinality данных (user_id, session_id, order_id). Для колонок с range-предикатами (date, amount) используйте сортировку + min/max statistics.

Проверка качества статистик

DataFusion использует статистики row group для pushdown. Если статистики отсутствуют — pushdown не работает:

-- Проверяем, работает ли pushdown
EXPLAIN ANALYZE
SELECT * FROM events WHERE date = '2024-03-15';

Ищите в метриках DataSourceExec:

  • row_groups_matched_statistics — сколько RG прошли проверку статистик
  • row_groups_pruned_statistics — сколько RG пропущено через min/max
  • row_groups_pruned_bloom_filter — сколько RG пропущено через bloom filter
  • bytes_scanned — реальный объём прочитанных данных
DataSourceExec: 
  row_groups_matched_statistics=2
  row_groups_pruned_statistics=18    ← 18 из 20 RG пропущено!
  bytes_scanned=128_000_000          ← вместо 1.2 GB
NOTE

Если row_groups_pruned_statistics = 0 при наличии WHERE — скорее всего данные не отсортированы по колонке из предиката. Пересоздайте файлы с сортировкой.

File Compaction: оптимизация мелких файлов

Мелкие файлы (< 10 MB) — бич аналитических систем. Каждый файл = overhead на открытие, чтение footer, создание RowGroup reader:

// Compaction: объединяем мелкие файлы в крупные
let ctx = SessionContext::new();

// Читаем директорию с мелкими файлами
let df = ctx.read_parquet(
    "fragmented_data/", 
    Default::default()
).await?;

// Пересортировываем и записываем крупными файлами
let mut parquet_options = TableParquetOptions::default();
parquet_options.global.max_row_group_size = 500_000;

df.sort(vec![col("date").sort(true, true)])?
  .write_parquet(
      "compacted_data/",
      DataFrameWriteOptions::default()
          .with_single_file_output(false),  // несколько файлов для параллелизма
      Some(parquet_options),
  ).await?;

Оптимальное количество файлов

Правило: num_files ≈ target_partitions * 2-4

Пример: 8 ядер → target_partitions = 8 → 16-32 файлов
Каждый файл: 128-512 MB → суммарно 2-16 GB на каталог
TIP

Для streaming-инъекции (данные приходят непрерывно) используйте двухуровневую стратегию: мелкие файлы пишутся в «staging» директорию, а фоновый процесс периодически компактирует их в «production» директорию с оптимальным размером и сортировкой.

Projection Pushdown: читаем только нужные колонки

DataFusion автоматически применяет projection pushdown — читает только колонки, упомянутые в SELECT, WHERE, JOIN, GROUP BY:

-- DataFusion читает только 3 колонки из 50
EXPLAIN ANALYZE
SELECT region, SUM(amount) 
FROM wide_table  -- 50 колонок
WHERE status = 'paid'
GROUP BY region;

-- DataSourceExec: projection=[region, amount, status]
-- bytes_scanned гораздо меньше, чем при SELECT *
WARNING

Антипаттерн: SELECT * в подзапросах. Если подзапрос использует SELECT *, DataFusion читает все колонки на этом уровне, даже если внешний запрос использует только 2. Всегда указывайте конкретные колонки:

-- Плохо: читаем все 50 колонок
SELECT region, total FROM (SELECT * FROM wide_table WHERE status = 'paid') sub;

-- Хорошо: читаем только 3 колонки
SELECT region, total FROM (SELECT region, amount AS total FROM wide_table WHERE status = 'paid') sub;

Чек-лист оптимизации файлов

Чек-лист: оптимальная файловая раскладка
1. Отсортируйте по основной колонке фильтрацииОтсортируйте данные по колонке из самого частого WHERE-предиката
2. Задайте размер row group: 64-256 MB64-256 MB для локального SSD, 128-256 MB для облака
3. Включите bloom filters для lookup-колонокДля high-cardinality equality-предикатов (user_id, session_id)
4. Компактируйте мелкие файлыСкомпактируйте файлы < 10 MB, цель: target_partitions × 2-4 файлов
5. Проверьте эффективность через EXPLAIN ANALYZEEXPLAIN ANALYZE → row_groups_pruned_statistics, bytes_scanned

Итоги

  • Row group size — баланс между параллелизмом и I/O overhead. Для облака: 128-256 MB
  • Сортировка по колонке из WHERE — включает predicate pushdown через min/max статистики
  • Bloom filters — для equality на high-cardinality колонках (user_id, session_id)
  • File compaction — объединяйте мелкие файлы, цель: target_partitions × 2-4
  • Projection pushdown — автоматический, но SELECT * в подзапросах отключает его
  • Проверяйте через EXPLAIN ANALYZE: row_groups_pruned_statistics, bytes_scanned
  • Следующий урок: анализ планов запросов — EXPLAIN ANALYZE deep patterns и идентификация bottlenecks

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. Parquet-файл содержит 20 row groups, данные не отсортированы. Запрос: WHERE date = '2024-03-15'. EXPLAIN ANALYZE показывает row_groups_pruned_statistics = 0. Почему pushdown не работает?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4