Learning Platform
Глоссарий Troubleshooting
Урок 07.06 · 18 мин
Продвинутый
StatisticsColumnStatisticsTableProviderEXPLAIN ANALYZEProfilingPlanRegressionCostModeldynamic filtersTopK pushdownJoinFilter pushdownsideways information passingbloom filter

Статистика и стоимостная модель

Оптимизатор принимает решения: какой алгоритм join использовать, в каком порядке соединять таблицы, нужно ли перераспределение. Эти решения зависят от статистики — оценки размера данных. Без статистики оптимизатор работает вслепую. В этом уроке разберём, как предоставить статистику, как DataFusion её использует и как анализировать производительность через EXPLAIN ANALYZE.

Statistics struct

DataFusion представляет статистику через struct Statistics:

use datafusion::common::stats::Precision;
use datafusion::common::{ColumnStatistics, Statistics};

pub struct Statistics {
    /// Количество строк в источнике
    pub num_rows: Precision<usize>,

    /// Общий размер данных в байтах
    pub total_byte_size: Precision<usize>,

    /// Статистика по каждой колонке
    pub column_statistics: Vec<ColumnStatistics>,
}

Precision: точность оценки

Precision<T> указывает, насколько точна оценка:

pub enum Precision<T> {
    /// Точное значение (подсчитано полностью)
    Exact(T),

    /// Приблизительная оценка
    Inexact(T),

    /// Неизвестно
    Absent,
}
УровеньПримерКогда используется
ExactParquet footer содержит точное количество строкМетаданные формата
InexactОценка после фильтрации (50% selectivity)Heuristic-модели
AbsentCSV-файл без предварительного сканированияНет данных

ColumnStatistics

Статистика по отдельной колонке:

pub struct ColumnStatistics {
    /// Количество NULL-значений
    pub null_count: Precision<usize>,

    /// Минимальное значение
    pub min_value: Precision<ScalarValue>,

    /// Максимальное значение
    pub max_value: Precision<ScalarValue>,

    /// Количество уникальных значений (distinct count)
    pub distinct_count: Precision<usize>,
}
NOTE

Parquet автоматически предоставляет min_value/max_value через column chunk statistics. Это позволяет DataFusion пропускать целые row groups при predicate pushdown: если max_value(age) = 25 в row group, а фильтр age > 30, весь row group пропускается без чтения.

TableProvider::statistics()

Кастомный TableProvider предоставляет статистику через метод statistics():

use datafusion::catalog::Session;
use datafusion::datasource::TableProvider;
use datafusion::common::{Statistics, ColumnStatistics};
use datafusion::common::stats::Precision;
use datafusion::common::ScalarValue;

#[async_trait]
impl TableProvider for MyTableProvider {
    // ... schema(), table_type(), scan() ...

    fn statistics(&self) -> Option<Statistics> {
        Some(Statistics {
            num_rows: Precision::Exact(1_000_000),
            total_byte_size: Precision::Exact(256_000_000), // ~256MB

            column_statistics: vec![
                // Колонка 0: id (Int64, unique)
                ColumnStatistics {
                    null_count: Precision::Exact(0),
                    min_value: Precision::Exact(ScalarValue::Int64(Some(1))),
                    max_value: Precision::Exact(ScalarValue::Int64(Some(1_000_000))),
                    distinct_count: Precision::Exact(1_000_000),
                },
                // Колонка 1: region (Utf8, low cardinality)
                ColumnStatistics {
                    null_count: Precision::Exact(0),
                    min_value: Precision::Absent,
                    max_value: Precision::Absent,
                    distinct_count: Precision::Exact(12),
                },
                // Колонка 2: amount (Float64)
                ColumnStatistics {
                    null_count: Precision::Inexact(500),
                    min_value: Precision::Exact(ScalarValue::Float64(Some(0.01))),
                    max_value: Precision::Exact(ScalarValue::Float64(Some(99999.99))),
                    distinct_count: Precision::Absent,
                },
            ],
        })
    }
}

Как оптимизатор использует статистику

Выбор build side для HashJoin

orders: num_rows = 10_000_000
customers: num_rows = 100_000

→ customers (меньше) выбирается как build side
→ HashJoinExec: mode=CollectLeft (собрать customers в одну партицию)

Без статистики оптимизатор не знает, какая таблица меньше, и использует Partitioned mode (дороже из-за двойного перераспределения).

Порядок join

-- Три таблицы: orders (10M), customers (100K), regions (50)
SELECT *
FROM orders o
JOIN customers c ON o.customer_id = c.id
JOIN regions r ON c.region_id = r.id;

-- С статистикой: regions JOIN customers → JOIN orders
-- Без статистики: порядок как написано (хуже)

Selectivity estimation

-- Фильтр: age > 30
-- Статистика: min=18, max=70, distinct=53
-- Оценка selectivity: (70 - 30) / (70 - 18) ≈ 0.77 → 77% строк пройдут
-- Estimated rows после фильтра: 1_000_000 * 0.77 = 770_000

Оценка selectivity влияет на выбор алгоритма последующих операций (join, aggregation).

TIP

Если оптимизатор выбирает неоптимальный план (например, NestedLoop вместо HashJoin для большой таблицы), первое что стоит проверить — наличие статистики. Отсутствие num_rows лишает оптимизатор главного сигнала для принятия решений.

EXPLAIN ANALYZE: метрики выполнения

EXPLAIN ANALYZE выполняет запрос и показывает реальные метрики каждого оператора:

EXPLAIN ANALYZE
SELECT region, SUM(amount) AS total
FROM orders
WHERE status = 'completed'
GROUP BY region
ORDER BY total DESC
LIMIT 5;

Ключевые метрики операторов

GlobalLimitExec: skip=0, fetch=5
  metrics=[output_rows=5, elapsed_compute=1.2µs]

SortExec: expr=[total DESC], fetch=5
  metrics=[output_rows=5, elapsed_compute=45.3µs]

AggregateExec: mode=FinalPartitioned
  metrics=[output_rows=12, elapsed_compute=234.5µs]

AggregateExec: mode=Partial
  metrics=[output_rows=192, elapsed_compute=45.2ms]

FilterExec: status = 'completed'
  metrics=[output_rows=487329, elapsed_compute=12.3ms]

DataSourceExec: file_groups={16 groups}, format=parquet
  metrics=[
    output_rows=1000000,
    elapsed_compute=89.4ms,
    bytes_scanned=45200000,
    predicate_evaluation_errors=0,
    row_groups_pruned=3,
    row_groups_matched=13
  ]

Метрики по категориям

МетрикаОператорЗначение
output_rowsВсеСтрок на выходе — главный индикатор эффективности фильтрации
elapsed_computeВсеCPU-время оператора — находит bottleneck
bytes_scannedScanПрочитано байт с диска — эффективность projection/predicate pushdown
row_groups_prunedDataSourceExecПропущенные row groups — эффективность min/max pruning
spill_countSort, AggregateКоличество spill-to-disk — нехватка памяти
spill_bytesSort, AggregateБайт записано на диск при spill
repart_timeRepartitionExecВремя перераспределения — сетевой/memory bottleneck
send_timeRepartitionExecВремя отправки между партициями

Профилирование производительности

Workflow диагностики

-- Шаг 1: Структура плана (без выполнения)
EXPLAIN SELECT ...;
-- → Проверить: выбор алгоритма, pushdown, порядок join

-- Шаг 2: Реальные метрики
EXPLAIN ANALYZE SELECT ...;
-- → Найти самый медленный оператор (max elapsed_compute)
-- → Проверить selectivity (output_rows на каждом этапе)
-- → Проверить spill_count > 0

-- Шаг 3: Конфигурация
SELECT name, value FROM information_schema.df_settings
WHERE name LIKE '%partitions%'
   OR name LIKE '%batch_size%'
   OR name LIKE '%memory%';

Типичные проблемы и решения

Диагностика по метрикам EXPLAIN ANALYZE
row_groups_pruned = 0Нулевой pruning означает, что min/max статистика row groups не помогает — данные не отсортированы по фильтруемой колонке
spill_count > 0Spill на диск означает, что оператору не хватает памяти — увеличьте memory pool или уменьшите объём данных
bytes_scanned высокБольшой объём чтения указывает на отсутствие projection или predicate pushdown — читаются лишние колонки/строки
NestedLoopJoin на больших таблицахNestedLoopJoin O(N*M) на больших данных — признак отсутствия equi-предиката или неверного типа join

Обнаружение регрессий плана

При обновлении DataFusion или изменении данных план может измениться. Регрессия плана — когда новый план медленнее старого.

Стратегия мониторинга

use datafusion::prelude::SessionContext;

async fn capture_plan_fingerprint(
    ctx: &SessionContext,
    sql: &str,
) -> Result<String> {
    let df = ctx.sql(&format!("EXPLAIN {}", sql)).await?;
    let batches = df.collect().await?;

    // Извлечь текстовое представление плана
    let plan_text = batches
        .iter()
        .map(|b| {
            let col = b.column(1); // plan column
            let arr = col.as_any()
                .downcast_ref::<StringArray>()
                .unwrap();
            (0..arr.len())
                .map(|i| arr.value(i).to_string())
                .collect::<Vec<_>>()
                .join("\n")
        })
        .collect::<Vec<_>>()
        .join("\n");

    Ok(plan_text)
}

Что сравнивать между версиями

  1. Алгоритм join — HashJoin → NestedLoop = регрессия
  2. Наличие pushdownprojection=[*] вместо выборочных колонок = регрессия
  3. Количество RepartitionExec — лишние перераспределения = overhead
  4. Наличие SortExec — появление сортировки где раньше не было = регрессия
  5. output_rows на каждом этапе — резкий рост промежуточных строк = потеря оптимизации

Автоматическая проверка

#[tokio::test]
async fn test_plan_not_regressed() {
    let ctx = setup_test_context().await;

    let plan = ctx
        .sql("SELECT * FROM orders WHERE status = 'active' ORDER BY id LIMIT 10")
        .await
        .unwrap()
        .into_optimized_plan()
        .unwrap();

    let plan_str = format!("{}", plan.display_indent());

    // Проверить ключевые свойства плана
    assert!(plan_str.contains("TopK"), "Should use TopK optimization");
    assert!(!plan_str.contains("NestedLoopJoin"), "Should not use NestedLoop");
    assert!(plan_str.contains("filter=[status"), "Filter should be pushed down");
}
WARNING

Тесты на точное представление плана хрупкие — они ломаются при каждом обновлении DataFusion. Тестируйте свойства плана (наличие pushdown, тип join, TopK) а не точный текст.

Предоставление статистики для ListingTable

Для файловых источников DataFusion может собирать статистику автоматически:

use datafusion::datasource::listing::ListingOptions;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use std::sync::Arc;

let listing_options = ListingOptions::new(
    Arc::new(ParquetFormat::default())
)
.with_collect_stat(true);  // Включить сбор статистики

Или через SQL:

SET datafusion.execution.collect_statistics = true;

CREATE EXTERNAL TABLE orders
STORED AS PARQUET
LOCATION '/data/orders/';
-- Статистика будет собрана из Parquet footer

Parquet footer содержит точные num_rows и min/max по колонкам для каждого row group — это самый дешёвый и точный источник статистики.

NOTE

CSV-файлы не содержат встроенной статистики. Для CSV collect_statistics = true означает полный предварительный скан файла — дорого для больших файлов. Для CSV-источников лучше предоставить статистику программно через кастомный TableProvider.

Dynamic filters — runtime-pushdown в DataFusion 49+

В предыдущих разделах мы видели статический filter pushdown — фильтры из WHERE спускаются к TableScan на стадии оптимизации. С DataFusion 49+ доступен dynamic filter pushdown — фильтры формируются во время выполнения и пробрасываются обратно в Scan, что позволяет читать существенно меньше данных.

TopK → Scan dynamic pushdown

Классический случай — ORDER BY col LIMIT N:

SELECT id, score FROM events ORDER BY score DESC LIMIT 10;

До dynamic filters:

TopK: fetch=10
  DataSourceExec: parquet
    metrics=[output_rows=100_000_000, bytes_scanned=4 GB]

Все 100M строк читаются с диска, потом TopK выбирает top-10. Wasteful.

С dynamic filters (DataFusion 49+):

TopK: fetch=10, dynamic_filter=score > 12345 (threshold обновляется)
  DataSourceExec: parquet, dynamic_filter_pushdown=enabled
    metrics=[output_rows=2_500_000, bytes_scanned=80 MB, row_groups_pruned=478]

Как работает:

  1. TopK читает первый batch — определяет минимальный score среди top-10 (threshold).
  2. TopK пробрасывает фильтр score > threshold обратно в Scan через dynamic filter API.
  3. Scan использует Parquet column statistics: row groups, где max(score) ≤ threshold, пропускаются полностью.
  4. По мере того как TopK видит более высокие значения, threshold обновляется — Scan читает ещё меньше.

Результат — для типичных TopK запросов на отсортированных данных bytes_scanned уменьшается в 10–100×.

JoinFilter → Scan (Sideways Information Passing)

Hash Join с маленькой customers (build side) и большой orders (probe side):

SELECT o.id, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE c.country = 'JP';

С dynamic filters:

  1. Build phase: HashJoin собирает hashtable из customers WHERE country = 'JP' (e.g., 100 customer_ids).
  2. HashJoin генерирует dynamic filtercustomer_id IN (1, 7, 42, ..., 999) — bloom filter или explicit IN-list.
  3. Probe phase: фильтр пробрасывается в Scan для orders — большая таблица читает только row groups, где customer_id пересекается с filter.
HashJoinExec
  ├── BuildSide (small): customers WHERE country = 'JP' → 100 rows

  └── ProbeSide (large): orders, dynamic_filter=customer_id IN bloom_filter
        DataSourceExec: parquet, row_groups_pruned=85% → bytes_scanned reduction

Это аналог Bloom Filter Join в Spark / Sideways Information Passing в Trino.

EXPLAIN visualisation

EXPLAIN ANALYZE SELECT id, score FROM events ORDER BY score DESC LIMIT 10;
TopK: fetch=10
  metrics=[output_rows=10, dynamic_filter_updates=12]
  dynamic_filter=score > 9847 (final threshold)

DataSourceExec: file_groups={16 groups}, format=parquet
  metrics=[
    output_rows=2_500_000,
    bytes_scanned=80_000_000,
    row_groups_pruned_dynamic=478,    -- за счёт dynamic filter
    row_groups_pruned_statistics=3,    -- за счёт static min/max
    row_groups_matched=19
  ]

Метрика row_groups_pruned_dynamic показывает количество row groups, отсечённых dynamic filter — критический индикатор эффективности.

Performance impact

ЗапросДо 49DataFusion 49+Speedup
ORDER BY score DESC LIMIT 10 (100M rows)4 GB scan, 12 sec80 MB scan, 0.4 sec30×
Hash Join, selective build (1% rows)Full scan probe5–10% scan probe10–20×
WHERE id IN (subquery)Full scanBloom-filtered scan5–50×

Конфигурация

Dynamic filter pushdown включён по умолчанию в DataFusion 49+ для standard operators (TopK, HashJoin):

SHOW datafusion.optimizer.enable_dynamic_filter_pushdown;
-- true (default since 49.0)

-- Отключить (для диагностики regression):
SET datafusion.optimizer.enable_dynamic_filter_pushdown = false;
TIP

Если dynamic filter pushdown работает, в EXPLAIN ANALYZE появляются метрики row_groups_pruned_dynamic или dynamic_filter_updates. Их отсутствие при enable_dynamic_filter_pushdown = true означает, что либо TableProvider не поддерживает dynamic filter API, либо план не позволяет применить (нет TopK/Join, фильтр не выводится). Для кастомных TableProvider нужно реализовать supports_dynamic_filters_pushdown() метод.

WARNING

Dynamic filters имеют overhead на построение и проброс — для маленьких таблиц или непредсказуемых данных могут не давать выигрыша. DataFusion применяет cost-based heuristic — учитывает статистику build side для оценки целесообразности. На неподходящих планах оптимизатор пропускает dynamic filter generation автоматически.

Cite DataFusion 49 release notes — dynamic filters + Dynamic Filter Pushdown design doc.


Итоги

  • Statistics содержит num_rows, total_byte_size и column_statistics с тремя уровнями точности (Exact, Inexact, Absent)
  • TableProvider::statistics() — точка подключения статистики для кастомных источников
  • Статистика определяет выбор join algorithm, build side, порядок join и selectivity estimation
  • EXPLAIN ANALYZE показывает реальные метрики: output_rows, elapsed_compute, bytes_scanned, spill_count
  • Регрессии плана обнаруживаются через тесты свойств плана (тип join, наличие pushdown, TopK)
  • Parquet предоставляет точную статистику через footer бесплатно; CSV требует полного скана

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Precision<T> в Statistics имеет три уровня: Exact, Inexact, Absent. Parquet footer содержит точное количество строк. Какой уровень использует DataFusion для num_rows из Parquet?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6