Бенчмаркинг и профилирование DataFusion
В уроке 05 мы разобрали EXPLAIN и EXPLAIN ANALYZE для чтения планов и метрик. Теперь идём глубже: систематическое профилирование, микробенчмарки, поиск горячих путей на CPU-flamegraph-ах, мониторинг памяти и стандартные бенчмарки для объективного сравнения.
EXPLAIN ANALYZE: глубокий разбор метрик
EXPLAIN ANALYZE выполняет запрос и возвращает метрики каждого оператора. Разберём, что именно показывают эти метрики и как их интерпретировать.
EXPLAIN ANALYZE
SELECT region, COUNT(*) AS cnt, SUM(amount) AS total
FROM orders
WHERE status = 'completed'
GROUP BY region
ORDER BY total DESC;
Типичный вывод (сокращённо):
+---+----------------------------------------------------------+
| plan_type | plan |
+---+----------------------------------------------------------+
| Plan with Metrics | |
| | SortExec: expr=[total@2 DESC] |
| | metrics: [ |
| | output_rows=5, |
| | elapsed_compute=120.5µs, |
| | spill_count=0, |
| | spilled_bytes=0 |
| | ] |
| | AggregateExec: groupBy=[region], |
| | aggr=[count(*), sum(amount)] |
| | metrics: [ |
| | output_rows=5, |
| | elapsed_compute=3.2ms, |
| | spill_count=0, |
| | spilled_bytes=0 |
| | ] |
| | CoalesceBatchesExec |
| | FilterExec: status = completed |
| | metrics: [ |
| | output_rows=48000, |
| | elapsed_compute=1.8ms |
| | ] |
| | DataSourceExec: orders.parquet |
| | metrics: [ |
| | output_rows=100000, |
| | elapsed_compute=12.4ms, |
| | bytes_scanned=4200000, |
| | row_groups_pruned=2 |
| | ] |
+---+----------------------------------------------------------+
Ключевые метрики по операторам
Интерпретация метрик
Что искать в выводе:
- Высокий
elapsed_computeотносительно соседей — оператор-бутылочное горлышко. Если SortExec занимает 80% времени, оптимизируйте сортировку (добавьте LIMIT, измените порядок колонок). output_rowsсильно больше, чем у следующего оператора — фильтрация неэффективна. Предикат pushdown не сработал или фильтр стоит слишком поздно в плане.spill_count > 0— оператор не поместился в память и сбросил данные на диск. Увеличьтеmemory_limitили оптимизируйте запрос, чтобы уменьшить объём промежуточных данных.row_groups_pruned > 0— Parquet predicate pushdown работает. Чем больше row groups пропущено, тем эффективнее фильтрация.bytes_scannedнепропорционально велик — читается больше данных, чем нужно. Проверьте, применяется ли projection pushdown (чтение только нужных колонок).
Для сложных планов используйте EXPLAIN ANALYZE FORMAT TREE — древовидный вывод с метриками. Подробнее о формате FORMAT TREE — в уроке 05 этого модуля.
Программный доступ к метрикам
Метрики доступны не только через SQL — DataFrame API позволяет получить ExecutionPlan с метриками:
use datafusion::prelude::*;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
let ctx = SessionContext::new();
// ... регистрация таблиц ...
let df = ctx.sql("SELECT region, SUM(amount) FROM orders GROUP BY region").await?;
// Получаем физический план
let (state, plan) = df.into_parts();
let plan = state.create_physical_plan(&plan).await?;
// Выполняем и собираем метрики
let task_ctx = state.task_ctx();
let stream = plan.execute(0, task_ctx)?;
let _batches: Vec<_> = stream.try_collect().await?;
// Выводим план с метриками
let displayable = DisplayableExecutionPlan::with_metrics(plan.as_ref());
println!("{}", displayable.indent(true));
Criterion.rs: микробенчмарки для DataFusion
Criterion.rs — стандарт для Rust-бенчмарков: статистический анализ, защита от случайных выбросов и отчёты с историей изменений.
Настройка
Добавьте зависимости в Cargo.toml:
[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
datafusion = "53"
[[bench]]
name = "my_benchmarks"
harness = false
Бенчмарк запроса
// benches/my_benchmarks.rs
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::prelude::*;
use tokio::runtime::Runtime;
fn bench_aggregation(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
// Подготовка: создаём контекст и загружаем данные один раз
let ctx = rt.block_on(async {
let ctx = SessionContext::new();
ctx.register_parquet("orders", "data/orders.parquet",
ParquetReadOptions::default()).await.unwrap();
ctx
});
c.bench_function("group_by_region_sum", |b| {
b.to_async(&rt).iter(|| async {
ctx.sql("SELECT region, SUM(amount) FROM orders GROUP BY region")
.await.unwrap()
.collect().await.unwrap()
});
});
}
criterion_group!(benches, bench_aggregation);
criterion_main!(benches);
Бенчмарк кастомного UDF
fn bench_custom_udf(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let ctx = rt.block_on(async {
let ctx = SessionContext::new();
// Регистрируем UDF
ctx.register_udf(my_string_udf());
// Загружаем данные
ctx.register_parquet("logs", "data/logs.parquet",
ParquetReadOptions::default()).await.unwrap();
ctx
});
let mut group = c.benchmark_group("udf_performance");
group.sample_size(50); // Для тяжёлых запросов уменьшаем sample_size
group.bench_function("my_udf_1m_rows", |b| {
b.to_async(&rt).iter(|| async {
ctx.sql("SELECT my_udf(message) FROM logs")
.await.unwrap()
.collect().await.unwrap()
});
});
group.finish();
}
Запуск и анализ
# Запуск всех бенчмарков
cargo bench
# Запуск конкретного бенчмарка
cargo bench -- group_by_region_sum
# HTML-отчёт появится в target/criterion/
open target/criterion/report/index.html
Criterion.rs автоматически сравнивает с предыдущим запуском. Если производительность деградировала — покажет процент регрессии. Используйте это в CI для обнаружения performance-регрессий.
Лучшие практики для бенчмарков DataFusion
- Изолируйте подготовку данных — загрузка Parquet-файлов не должна входить в замер. Вынесите
register_parquetв setup. - Фиксируйте seed — если данные генерируются, используйте фиксированный seed для воспроизводимости.
- Прогрев файловой системы — первый запрос на Parquet-файл включает IO. Запустите один “холостой” запрос перед бенчмарком или используйте
--warm-up-time. - Отключите turbo boost — для стабильных результатов на Linux:
echo 1 > /sys/devices/system/cpu/intel_pstate/no_turbo.
CPU-профилирование: flamegraph и samply
Criterion показывает сколько времени занимает запрос. Flamegraph показывает где именно тратится время.
cargo-flamegraph
# Установка
cargo install flamegraph
# macOS: требуется DTrace (встроен в Xcode)
# Linux: требуется perf (sudo apt install linux-tools-common)
# Профилирование бенчмарка
cargo flamegraph --bench my_benchmarks -- --bench group_by_region_sum
# Профилирование произвольной программы
cargo flamegraph -- --query "SELECT region, SUM(amount) FROM orders GROUP BY region"
Результат — SVG-файл flamegraph.svg, интерактивный в браузере.
Чтение flamegraph
Что искать на flamegraph DataFusion-приложений
Типичные горячие пути и что с ними делать:
| Горячий путь | Причина | Решение |
|---|---|---|
arrow::compute::sort | Сортировка большого объёма данных | Добавить LIMIT, изменить порядок операций в плане |
hash_utils::create_hashes | Построение хэш-таблицы для JOIN или GROUP BY | Уменьшить кардинальность группировки, предфильтровать данные |
parquet::arrow::reader | Чтение Parquet-файлов | Включить predicate pushdown, уменьшить число колонок |
arrow::cast::cast_with_options | Приведение типов | Привести типы на этапе записи данных, не при запросе |
| Ваш UDF | Кастомная функция — бутылочное горлышко | Оптимизировать реализацию UDF, использовать Arrow-kernels вместо поэлементной обработки |
samply: кросс-платформенный профайлер
Samply — удобная альтернатива flamegraph с интерактивным web-интерфейсом:
# Установка
cargo install --locked samply
# Профилирование
samply record cargo bench -- --bench group_by_region_sum
# Открывается Firefox Profiler с интерактивным стек-трейсом
Samply предоставляет:
- Таймлайн выполнения — видно, когда какая функция работала
- Call tree — иерархия вызовов с процентами
- Flame chart — аналог flamegraph, но с осью времени
- Markers — события из приложения (если инструментировано)
Samply работает одинаково на macOS (DTrace) и Linux (perf) — не нужно менять команды при смене платформы. Для DataFusion-разработки это удобнее, чем cargo-flamegraph, если нужна не просто картинка, а интерактивный анализ.
Профилирование памяти: TrackConsumersPool
CPU-профилирование не покажет, кто потребляет память. Для этого в DataFusion есть TrackConsumersPool — обёртка над любым MemoryPool, которая записывает, какие операторы сколько памяти занимают.
Подробно TrackConsumersPool разобран в уроке 07 модуля «Архитектура DataFusion» (Memory Management). Здесь — быстрый рецепт для профилирования:
use datafusion::execution::memory_pool::{FairSpillPool, TrackConsumersPool};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::prelude::*;
use std::sync::Arc;
// Оборачиваем основной пул в TrackConsumersPool
let inner_pool = FairSpillPool::new(512 * 1024 * 1024); // 512 MB
let tracking_pool = Arc::new(TrackConsumersPool::new(inner_pool, 10)); // top-10
let runtime = RuntimeEnvBuilder::default()
.with_memory_pool(tracking_pool.clone())
.build_arc()?;
let config = SessionConfig::new()
.with_target_partitions(4);
let ctx = SessionContext::new_with_config_rt(config, runtime);
// Выполняем запрос
ctx.sql("SELECT region, SUM(amount) FROM large_table GROUP BY region")
.await?.collect().await?;
// Смотрим, кто потребляет память
println!("{}", tracking_pool.report());
Вывод report():
Top memory consumers:
AggregateExec[0]: 128.5 MB (25.1%)
SortExec[0]: 96.3 MB (18.8%)
HashJoinExec[0]: 64.1 MB (12.5%)
...
Total tracked: 412.8 MB / 512.0 MB limit
TrackConsumersPool добавляет overhead для каждой операции grow()/shrink() — сохраняет имя consumer-а и размер. Не используйте в production при высокой нагрузке. Включайте для диагностики конкретных запросов.
Стандартные бенчмарки: ClickBench, TPC-H, TPC-DS
Для объективного сравнения движков и отслеживания регрессий DataFusion используют стандартные бенчмарки.
ClickBench
ClickBench — бенчмарк аналитических запросов от команды ClickHouse:
- Данные: один Parquet-файл (~15 GB) с событиями web-аналитики (hits)
- Запросы: 43 SQL-запроса — фильтрация, агрегация, GROUP BY с высокой кардинальностью, COUNT(DISTINCT), LIKE
- Фокус: производительность на одной широкой таблице без JOIN
- Результат: время выполнения каждого запроса (горячий / холодный кеш)
DataFusion стабильно входит в top-5 среди single-node движков на ClickBench.
TPC-H
TPC-H — стандарт для оценки OLAP-производительности:
- Данные: 8 таблиц, настраиваемый масштаб (SF1 = ~1 GB, SF10 = ~10 GB, SF100 = ~100 GB)
- Запросы: 22 аналитических запроса — JOIN нескольких таблиц, агрегация, подзапросы, оконные функции
- Фокус: оптимизатор запросов, эффективность JOIN, predicate pushdown
- Когда использовать: тестирование оптимизатора на реалистичных бизнес-запросах
# Запуск TPC-H бенчмарков DataFusion (встроены в репозиторий)
git clone https://github.com/apache/datafusion.git
cd datafusion/benchmarks
# Генерация данных (SF1)
./bench.sh data tpch
# Запуск
./bench.sh run tpch
TPC-DS
TPC-DS — более сложный бенчмарк с 99 запросами:
- Данные: 24 таблицы, модель розничных продаж
- Запросы: 99 SQL-запросов — сложные JOIN, вложенные подзапросы, оконные функции
- Фокус: сложность оптимизатора, обработка NULL, полиморфные запросы
- Когда использовать: стресс-тест оптимизатора на пределе SQL-возможностей
DataFusion Built-in Benchmark Suite
В репозитории DataFusion — набор микробенчмарков для конкретных подсистем:
cd datafusion/datafusion
# Бенчмарки физических выражений
cargo bench --bench physical_expr
# Бенчмарки SQL-парсера
cargo bench --bench sql_planner
# Бенчмарки сортировки
cargo bench --bench sort
# Бенчмарки Parquet-чтения
cargo bench --bench parquet_query
Эти бенчмарки запускаются в DataFusion CI для отслеживания performance-регрессий при каждом PR.
Цикл оптимизации
Инструменты выше складываются в итеративный процесс:
Пошаговый пример
Проблема: запрос SELECT category, AVG(price) FROM products GROUP BY category медленно работает на 10M строк.
Шаг 1 — Benchmark: измеряем baseline через Criterion.
c.bench_function("group_by_category", |b| {
b.to_async(&rt).iter(|| async {
ctx.sql("SELECT category, AVG(price) FROM products GROUP BY category")
.await.unwrap().collect().await.unwrap()
});
});
// Результат: 245ms ± 12ms
Шаг 2 — Profile: EXPLAIN ANALYZE показывает, что DataSourceExec читает все колонки.
EXPLAIN ANALYZE
SELECT category, AVG(price) FROM products GROUP BY category;
-- DataSourceExec: bytes_scanned=800MB (все 20 колонок)
Шаг 3 — Fix: Проблема в projection pushdown — запрос читает только 2 колонки, но сканирует все 20. Проверяем конфигурацию:
SET datafusion.execution.parquet.pushdown_filters = true;
SET datafusion.execution.parquet.reorder_filters = true;
Шаг 4 — Verify: перезапускаем бенчмарк.
group_by_category time: [82ms ± 5ms]
change: [-66.531% -65.714% -64.897%] (p < 0.05)
Performance has improved.
Criterion.rs автоматически сравнивает с предыдущим baseline и показывает процент изменения с p-value. Если регрессия значима статистически — пометит красным.
Рекомендации по инструментам
| Задача | Инструмент | Когда использовать |
|---|---|---|
| Быстрый анализ плана | EXPLAIN ANALYZE | Первый шаг для любого медленного запроса |
| Метрики без SQL | DisplayableExecutionPlan::with_metrics | Когда нужен программный доступ к метрикам из Rust-кода |
| Регрессионное тестирование | Criterion.rs | CI, отслеживание производительности между релизами |
| CPU hotspot | cargo-flamegraph | Локальная отладка, поиск горячих путей |
| Интерактивный CPU-анализ | samply | Глубокий анализ с таймлайном и call tree |
| Утечки памяти / пики | TrackConsumersPool | Диагностика конкретных запросов на предмет потребления памяти |
| Сравнение с другими движками | ClickBench / TPC-H | Объективная оценка для выбора движка или публикации |
| Стресс-тест оптимизатора | TPC-DS | Проверка обработки сложных SQL-конструкций |
Итоги
-
EXPLAIN ANALYZE— первый инструмент: per-operator метрики (output_rows, elapsed_compute, spill_count, bytes_scanned) - Criterion.rs — статистически корректные микробенчмарки с автоматическим сравнением baseline
-
cargo-flamegraphиsamply— CPU-профилирование для поиска горячих путей в коде -
TrackConsumersPool— профилирование памяти: какие операторы потребляют больше всего - ClickBench — бенчмарк сканирования без JOIN; TPC-H — реалистичные аналитические запросы с JOIN
- Цикл оптимизации: benchmark → profile → fix → verify — без замера нет улучшения