Статистика и стоимостная модель
Оптимизатор принимает решения: какой алгоритм join использовать, в каком порядке соединять таблицы, нужно ли перераспределение. Эти решения зависят от статистики — оценки размера данных. Без статистики оптимизатор работает вслепую. В этом уроке разберём, как предоставить статистику, как DataFusion её использует и как анализировать производительность через EXPLAIN ANALYZE.
Statistics struct
DataFusion представляет статистику через struct Statistics:
use datafusion::common::stats::Precision;
use datafusion::common::{ColumnStatistics, Statistics};
pub struct Statistics {
/// Количество строк в источнике
pub num_rows: Precision<usize>,
/// Общий размер данных в байтах
pub total_byte_size: Precision<usize>,
/// Статистика по каждой колонке
pub column_statistics: Vec<ColumnStatistics>,
}
Precision: точность оценки
Precision<T> указывает, насколько точна оценка:
pub enum Precision<T> {
/// Точное значение (подсчитано полностью)
Exact(T),
/// Приблизительная оценка
Inexact(T),
/// Неизвестно
Absent,
}
| Уровень | Пример | Когда используется |
|---|---|---|
Exact | Parquet footer содержит точное количество строк | Метаданные формата |
Inexact | Оценка после фильтрации (50% selectivity) | Heuristic-модели |
Absent | CSV-файл без предварительного сканирования | Нет данных |
ColumnStatistics
Статистика по отдельной колонке:
pub struct ColumnStatistics {
/// Количество NULL-значений
pub null_count: Precision<usize>,
/// Минимальное значение
pub min_value: Precision<ScalarValue>,
/// Максимальное значение
pub max_value: Precision<ScalarValue>,
/// Количество уникальных значений (distinct count)
pub distinct_count: Precision<usize>,
}
Parquet автоматически предоставляет min_value/max_value через column chunk statistics. Это позволяет DataFusion пропускать целые row groups при predicate pushdown: если max_value(age) = 25 в row group, а фильтр age > 30, весь row group пропускается без чтения.
TableProvider::statistics()
Кастомный TableProvider предоставляет статистику через метод statistics():
use datafusion::catalog::Session;
use datafusion::datasource::TableProvider;
use datafusion::common::{Statistics, ColumnStatistics};
use datafusion::common::stats::Precision;
use datafusion::common::ScalarValue;
#[async_trait]
impl TableProvider for MyTableProvider {
// ... schema(), table_type(), scan() ...
fn statistics(&self) -> Option<Statistics> {
Some(Statistics {
num_rows: Precision::Exact(1_000_000),
total_byte_size: Precision::Exact(256_000_000), // ~256MB
column_statistics: vec![
// Колонка 0: id (Int64, unique)
ColumnStatistics {
null_count: Precision::Exact(0),
min_value: Precision::Exact(ScalarValue::Int64(Some(1))),
max_value: Precision::Exact(ScalarValue::Int64(Some(1_000_000))),
distinct_count: Precision::Exact(1_000_000),
},
// Колонка 1: region (Utf8, low cardinality)
ColumnStatistics {
null_count: Precision::Exact(0),
min_value: Precision::Absent,
max_value: Precision::Absent,
distinct_count: Precision::Exact(12),
},
// Колонка 2: amount (Float64)
ColumnStatistics {
null_count: Precision::Inexact(500),
min_value: Precision::Exact(ScalarValue::Float64(Some(0.01))),
max_value: Precision::Exact(ScalarValue::Float64(Some(99999.99))),
distinct_count: Precision::Absent,
},
],
})
}
}
Как оптимизатор использует статистику
Выбор build side для HashJoin
orders: num_rows = 10_000_000
customers: num_rows = 100_000
→ customers (меньше) выбирается как build side
→ HashJoinExec: mode=CollectLeft (собрать customers в одну партицию)
Без статистики оптимизатор не знает, какая таблица меньше, и использует Partitioned mode (дороже из-за двойного перераспределения).
Порядок join
-- Три таблицы: orders (10M), customers (100K), regions (50)
SELECT *
FROM orders o
JOIN customers c ON o.customer_id = c.id
JOIN regions r ON c.region_id = r.id;
-- С статистикой: regions JOIN customers → JOIN orders
-- Без статистики: порядок как написано (хуже)
Selectivity estimation
-- Фильтр: age > 30
-- Статистика: min=18, max=70, distinct=53
-- Оценка selectivity: (70 - 30) / (70 - 18) ≈ 0.77 → 77% строк пройдут
-- Estimated rows после фильтра: 1_000_000 * 0.77 = 770_000
Оценка selectivity влияет на выбор алгоритма последующих операций (join, aggregation).
Если оптимизатор выбирает неоптимальный план (например, NestedLoop вместо HashJoin для большой таблицы), первое что стоит проверить — наличие статистики. Отсутствие num_rows лишает оптимизатор главного сигнала для принятия решений.
EXPLAIN ANALYZE: метрики выполнения
EXPLAIN ANALYZE выполняет запрос и показывает реальные метрики каждого оператора:
EXPLAIN ANALYZE
SELECT region, SUM(amount) AS total
FROM orders
WHERE status = 'completed'
GROUP BY region
ORDER BY total DESC
LIMIT 5;
Ключевые метрики операторов
GlobalLimitExec: skip=0, fetch=5
metrics=[output_rows=5, elapsed_compute=1.2µs]
SortExec: expr=[total DESC], fetch=5
metrics=[output_rows=5, elapsed_compute=45.3µs]
AggregateExec: mode=FinalPartitioned
metrics=[output_rows=12, elapsed_compute=234.5µs]
AggregateExec: mode=Partial
metrics=[output_rows=192, elapsed_compute=45.2ms]
FilterExec: status = 'completed'
metrics=[output_rows=487329, elapsed_compute=12.3ms]
DataSourceExec: file_groups={16 groups}, format=parquet
metrics=[
output_rows=1000000,
elapsed_compute=89.4ms,
bytes_scanned=45200000,
predicate_evaluation_errors=0,
row_groups_pruned=3,
row_groups_matched=13
]
Метрики по категориям
| Метрика | Оператор | Значение |
|---|---|---|
output_rows | Все | Строк на выходе — главный индикатор эффективности фильтрации |
elapsed_compute | Все | CPU-время оператора — находит bottleneck |
bytes_scanned | Scan | Прочитано байт с диска — эффективность projection/predicate pushdown |
row_groups_pruned | DataSourceExec | Пропущенные row groups — эффективность min/max pruning |
spill_count | Sort, Aggregate | Количество spill-to-disk — нехватка памяти |
spill_bytes | Sort, Aggregate | Байт записано на диск при spill |
repart_time | RepartitionExec | Время перераспределения — сетевой/memory bottleneck |
send_time | RepartitionExec | Время отправки между партициями |
Профилирование производительности
Workflow диагностики
-- Шаг 1: Структура плана (без выполнения)
EXPLAIN SELECT ...;
-- → Проверить: выбор алгоритма, pushdown, порядок join
-- Шаг 2: Реальные метрики
EXPLAIN ANALYZE SELECT ...;
-- → Найти самый медленный оператор (max elapsed_compute)
-- → Проверить selectivity (output_rows на каждом этапе)
-- → Проверить spill_count > 0
-- Шаг 3: Конфигурация
SELECT name, value FROM information_schema.df_settings
WHERE name LIKE '%partitions%'
OR name LIKE '%batch_size%'
OR name LIKE '%memory%';
Типичные проблемы и решения
Обнаружение регрессий плана
При обновлении DataFusion или изменении данных план может измениться. Регрессия плана — когда новый план медленнее старого.
Стратегия мониторинга
use datafusion::prelude::SessionContext;
async fn capture_plan_fingerprint(
ctx: &SessionContext,
sql: &str,
) -> Result<String> {
let df = ctx.sql(&format!("EXPLAIN {}", sql)).await?;
let batches = df.collect().await?;
// Извлечь текстовое представление плана
let plan_text = batches
.iter()
.map(|b| {
let col = b.column(1); // plan column
let arr = col.as_any()
.downcast_ref::<StringArray>()
.unwrap();
(0..arr.len())
.map(|i| arr.value(i).to_string())
.collect::<Vec<_>>()
.join("\n")
})
.collect::<Vec<_>>()
.join("\n");
Ok(plan_text)
}
Что сравнивать между версиями
- Алгоритм join — HashJoin → NestedLoop = регрессия
- Наличие pushdown —
projection=[*]вместо выборочных колонок = регрессия - Количество RepartitionExec — лишние перераспределения = overhead
- Наличие SortExec — появление сортировки где раньше не было = регрессия
- output_rows на каждом этапе — резкий рост промежуточных строк = потеря оптимизации
Автоматическая проверка
#[tokio::test]
async fn test_plan_not_regressed() {
let ctx = setup_test_context().await;
let plan = ctx
.sql("SELECT * FROM orders WHERE status = 'active' ORDER BY id LIMIT 10")
.await
.unwrap()
.into_optimized_plan()
.unwrap();
let plan_str = format!("{}", plan.display_indent());
// Проверить ключевые свойства плана
assert!(plan_str.contains("TopK"), "Should use TopK optimization");
assert!(!plan_str.contains("NestedLoopJoin"), "Should not use NestedLoop");
assert!(plan_str.contains("filter=[status"), "Filter should be pushed down");
}
Тесты на точное представление плана хрупкие — они ломаются при каждом обновлении DataFusion. Тестируйте свойства плана (наличие pushdown, тип join, TopK) а не точный текст.
Предоставление статистики для ListingTable
Для файловых источников DataFusion может собирать статистику автоматически:
use datafusion::datasource::listing::ListingOptions;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use std::sync::Arc;
let listing_options = ListingOptions::new(
Arc::new(ParquetFormat::default())
)
.with_collect_stat(true); // Включить сбор статистики
Или через SQL:
SET datafusion.execution.collect_statistics = true;
CREATE EXTERNAL TABLE orders
STORED AS PARQUET
LOCATION '/data/orders/';
-- Статистика будет собрана из Parquet footer
Parquet footer содержит точные num_rows и min/max по колонкам для каждого row group — это самый дешёвый и точный источник статистики.
CSV-файлы не содержат встроенной статистики. Для CSV collect_statistics = true означает полный предварительный скан файла — дорого для больших файлов. Для CSV-источников лучше предоставить статистику программно через кастомный TableProvider.
Dynamic filters — runtime-pushdown в DataFusion 49+
В предыдущих разделах мы видели статический filter pushdown — фильтры из WHERE спускаются к TableScan на стадии оптимизации. С DataFusion 49+ доступен dynamic filter pushdown — фильтры формируются во время выполнения и пробрасываются обратно в Scan, что позволяет читать существенно меньше данных.
TopK → Scan dynamic pushdown
Классический случай — ORDER BY col LIMIT N:
SELECT id, score FROM events ORDER BY score DESC LIMIT 10;
До dynamic filters:
TopK: fetch=10
DataSourceExec: parquet
metrics=[output_rows=100_000_000, bytes_scanned=4 GB]
Все 100M строк читаются с диска, потом TopK выбирает top-10. Wasteful.
С dynamic filters (DataFusion 49+):
TopK: fetch=10, dynamic_filter=score > 12345 (threshold обновляется)
DataSourceExec: parquet, dynamic_filter_pushdown=enabled
metrics=[output_rows=2_500_000, bytes_scanned=80 MB, row_groups_pruned=478]
Как работает:
- TopK читает первый batch — определяет минимальный
scoreсреди top-10 (threshold). - TopK пробрасывает фильтр
score > thresholdобратно в Scan через dynamic filter API. - Scan использует Parquet column statistics: row groups, где
max(score) ≤ threshold, пропускаются полностью. - По мере того как TopK видит более высокие значения, threshold обновляется — Scan читает ещё меньше.
Результат — для типичных TopK запросов на отсортированных данных bytes_scanned уменьшается в 10–100×.
JoinFilter → Scan (Sideways Information Passing)
Hash Join с маленькой customers (build side) и большой orders (probe side):
SELECT o.id, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE c.country = 'JP';
С dynamic filters:
- Build phase: HashJoin собирает hashtable из
customers WHERE country = 'JP'(e.g., 100 customer_ids). - HashJoin генерирует dynamic filter —
customer_id IN (1, 7, 42, ..., 999)— bloom filter или explicit IN-list. - Probe phase: фильтр пробрасывается в Scan для
orders— большая таблица читает только row groups, гдеcustomer_idпересекается с filter.
HashJoinExec
├── BuildSide (small): customers WHERE country = 'JP' → 100 rows
│
└── ProbeSide (large): orders, dynamic_filter=customer_id IN bloom_filter
DataSourceExec: parquet, row_groups_pruned=85% → bytes_scanned reduction
Это аналог Bloom Filter Join в Spark / Sideways Information Passing в Trino.
EXPLAIN visualisation
EXPLAIN ANALYZE SELECT id, score FROM events ORDER BY score DESC LIMIT 10;
TopK: fetch=10
metrics=[output_rows=10, dynamic_filter_updates=12]
dynamic_filter=score > 9847 (final threshold)
DataSourceExec: file_groups={16 groups}, format=parquet
metrics=[
output_rows=2_500_000,
bytes_scanned=80_000_000,
row_groups_pruned_dynamic=478, -- за счёт dynamic filter
row_groups_pruned_statistics=3, -- за счёт static min/max
row_groups_matched=19
]
Метрика row_groups_pruned_dynamic показывает количество row groups, отсечённых dynamic filter — критический индикатор эффективности.
Performance impact
| Запрос | До 49 | DataFusion 49+ | Speedup |
|---|---|---|---|
ORDER BY score DESC LIMIT 10 (100M rows) | 4 GB scan, 12 sec | 80 MB scan, 0.4 sec | 30× |
| Hash Join, selective build (1% rows) | Full scan probe | 5–10% scan probe | 10–20× |
WHERE id IN (subquery) | Full scan | Bloom-filtered scan | 5–50× |
Конфигурация
Dynamic filter pushdown включён по умолчанию в DataFusion 49+ для standard operators (TopK, HashJoin):
SHOW datafusion.optimizer.enable_dynamic_filter_pushdown;
-- true (default since 49.0)
-- Отключить (для диагностики regression):
SET datafusion.optimizer.enable_dynamic_filter_pushdown = false;
Если dynamic filter pushdown работает, в EXPLAIN ANALYZE появляются метрики row_groups_pruned_dynamic или dynamic_filter_updates. Их отсутствие при enable_dynamic_filter_pushdown = true означает, что либо TableProvider не поддерживает dynamic filter API, либо план не позволяет применить (нет TopK/Join, фильтр не выводится). Для кастомных TableProvider нужно реализовать supports_dynamic_filters_pushdown() метод.
Dynamic filters имеют overhead на построение и проброс — для маленьких таблиц или непредсказуемых данных могут не давать выигрыша. DataFusion применяет cost-based heuristic — учитывает статистику build side для оценки целесообразности. На неподходящих планах оптимизатор пропускает dynamic filter generation автоматически.
Cite DataFusion 49 release notes — dynamic filters + Dynamic Filter Pushdown design doc.
Итоги
-
Statisticsсодержитnum_rows,total_byte_sizeиcolumn_statisticsс тремя уровнями точности (Exact,Inexact,Absent) -
TableProvider::statistics()— точка подключения статистики для кастомных источников - Статистика определяет выбор join algorithm, build side, порядок join и selectivity estimation
-
EXPLAIN ANALYZEпоказывает реальные метрики:output_rows,elapsed_compute,bytes_scanned,spill_count - Регрессии плана обнаруживаются через тесты свойств плана (тип join, наличие pushdown, TopK)
- Parquet предоставляет точную статистику через footer бесплатно; CSV требует полного скана