Learning Platform
Troubleshooting
Глоссарий

Глоссарий — Apache DataFusion

Справочник ключевых терминов курса Apache DataFusion.

8 категорий · 50 терминов

Apache Arrow

RecordBatch

Record Batch
Термин

Основная единица данных в Apache Arrow — набор столбцов одинаковой длины, объединённых общей схемой. RecordBatch хранит данные в колоночном формате для эффективной SIMD-обработки. В DataFusion все промежуточные результаты передаются как поток RecordBatch-ей.

Пример:
use arrow::array::Int32Array;
use arrow::record_batch::RecordBatch;
use arrow::datatypes::{Schema, Field, DataType};
use std::sync::Arc;

let schema = Arc::new(Schema::new(vec![
    Field::new("id", DataType::Int32, false),
]));
let batch = RecordBatch::try_new(
    schema,
    vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)?;
Подробнее в уроках:

Array

Arrow Array
Термин

Иммутабельная колоночная коллекция значений одного типа в Apache Arrow. Каждый Array содержит буфер данных, буфер validity (для NULL-значений) и опциональные offset-буферы для строк и списков. DataFusion использует Arrow Arrays для всех вычислений на столбцах.

Пример:
use arrow::array::{StringArray, Int64Array, Float64Array};

let strings = StringArray::from(vec!["hello", "world"]);
let ints = Int64Array::from(vec![10, 20, 30]);
let floats = Float64Array::from(vec![Some(1.5), None, Some(3.0)]);
// None -> NULL в Arrow
Подробнее в уроках:

Schema

Arrow Schema
Термин

Описание структуры данных — упорядоченный набор полей (Field), каждое из которых содержит имя, тип данных (DataType) и флаг nullable. Schema определяет формат RecordBatch и используется для валидации данных при чтении, записи и трансформациях.

Пример:
use arrow::datatypes::{Schema, Field, DataType};

let schema = Schema::new(vec![
    Field::new("name", DataType::Utf8, false),
    Field::new("age", DataType::Int32, true),
    Field::new("score", DataType::Float64, true),
]);
println!("Полей: {}", schema.fields().len());
Подробнее в уроках:

DataType

Arrow Data Type
Термин

Перечисление (enum) всех поддерживаемых типов данных в Arrow — от примитивных (Int8–Int64, Float16–Float64, Boolean) до вложенных (List, Struct, Map, Union). DataType определяет формат хранения в памяти и набор доступных операций над столбцом.

Пример:
use arrow::datatypes::DataType;

// Примитивные типы
let int_type = DataType::Int64;
let str_type = DataType::Utf8;

// Вложенные типы
let list_type = DataType::List(
    Arc::new(Field::new("item", DataType::Int32, true))
);
let ts_type = DataType::Timestamp(
    TimeUnit::Microsecond, Some("UTC".into())
);
Подробнее в уроках:

IPC

Inter-Process Communication
Термин

Формат сериализации Arrow для обмена данными между процессами с нулевым копированием (zero-copy). Arrow IPC позволяет передавать RecordBatch между процессами и языками без десериализации — данные в памяти уже в нужном формате. Используется в Arrow Flight и при кешировании.

Пример:
use arrow::ipc::writer::FileWriter;
use arrow::ipc::reader::FileReader;
use std::fs::File;

// Запись в IPC файл
let file = File::create("data.arrow")?;
let mut writer = FileWriter::try_new(file, &schema)?;
writer.write(&batch)?;
writer.finish()?;

// Чтение из IPC файла
let file = File::open("data.arrow")?;
let reader = FileReader::try_new(file, None)?;
Подробнее в уроках:

Arrow Flight

Apache Arrow Flight
Термин

Высокопроизводительный RPC-протокол на основе gRPC для передачи данных в формате Arrow. Flight оптимизирован для потоковой передачи больших датасетов без сериализации/десериализации. В DataFusion используется в Ballista для распределённого выполнения запросов между узлами.

Пример:
// Ballista использует Flight для передачи данных
// между scheduler и executor узлами
use ballista::prelude::*;

let ctx = BallistaContext::remote(
    "localhost", 50050, &config
).await?;
// Данные передаются через Arrow Flight
let df = ctx.sql("SELECT * FROM t").await?;
Подробнее в уроках:

Архитектура

SessionContext

Session Context
Термин

Главная точка входа в DataFusion — объект, через который выполняются SQL-запросы, регистрируются таблицы и UDF, настраиваются параметры выполнения. SessionContext владеет каталогом, оптимизатором и runtime-конфигурацией. Каждый контекст изолирован — идеально для мультитенантных сценариев.

Пример:
use datafusion::prelude::*;

let ctx = SessionContext::new();

// Регистрация CSV как таблицы
ctx.register_csv("users", "data/users.csv",
    CsvReadOptions::default()
).await?;

// Выполнение SQL
let df = ctx.sql(
    "SELECT name, age FROM users WHERE age > 30"
).await?;
df.show().await?;
Подробнее в уроках:

LogicalPlan

Logical Plan
Термин

Декларативное дерево операций, описывающее «что» нужно сделать — без указания «как». LogicalPlan содержит узлы: Scan, Filter, Projection, Aggregate, Join, Sort, Limit. Оптимизатор трансформирует LogicalPlan перед преобразованием в PhysicalPlan.

Пример:
-- Просмотр логического плана
EXPLAIN
SELECT name, COUNT(*)
FROM users
WHERE active = true
GROUP BY name
ORDER BY COUNT(*) DESC;

-- Результат:
-- Sort: COUNT(*) DESC
--   Aggregate: groupBy=[name], aggr=[COUNT(*)]
--     Filter: active = true
--       TableScan: users
Подробнее в уроках:

PhysicalPlan

Physical Plan
Термин

Исполняемое дерево операторов, определяющее конкретный способ выполнения запроса — стратегии join (HashJoin, SortMergeJoin), режимы сканирования (параллельный, с pruning), способы агрегации. PhysicalPlan создаётся из LogicalPlan с учётом статистик и настроек runtime.

Пример:
-- Просмотр физического плана с метриками
EXPLAIN ANALYZE
SELECT u.name, COUNT(o.id)
FROM users u JOIN orders o ON u.id = o.user_id
GROUP BY u.name;

-- Результат покажет:
-- HashJoinExec: mode=Partitioned
--   AggregateExec: groupBy=[name]
--   CoalesceBatchesExec
--     RepartitionExec: Hash([user_id])
Подробнее в уроках:

ExecutionPlan

Execution Plan (trait)
Термин

Trait в DataFusion, который реализует каждый физический оператор. Метод execute() возвращает SendableRecordBatchStream — асинхронный поток RecordBatch-ей. Все встроенные операторы (FilterExec, HashJoinExec, SortExec) и пользовательские расширения реализуют ExecutionPlan.

Пример:
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::SendableRecordBatchStream;

#[async_trait]
impl ExecutionPlan for MyCustomScan {
    fn execute(
        &self,
        partition: usize,
        context: Arc<TaskContext>,
    ) -> Result<SendableRecordBatchStream> {
        // возврат потока данных
    }
    fn schema(&self) -> SchemaRef { ... }
    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { ... }
}
Подробнее в уроках:

Catalog

Catalog System
Термин

Трёхуровневая система метаданных в DataFusion: Catalog → Schema → Table. По умолчанию используется MemoryCatalogProvider, но можно реализовать CatalogProvider для подключения внешних хранилищ метаданных (Hive Metastore, Glue, Unity Catalog). Каталог отвечает за разрешение имён таблиц в SQL-запросах.

Пример:
use datafusion::catalog::CatalogProvider;

// Регистрация нового каталога
ctx.register_catalog(
    "production",
    Arc::new(MyHiveCatalog::new(metastore_url)),
);

// Использование в SQL
let df = ctx.sql(
    "SELECT * FROM production.analytics.events"
).await?;
Подробнее в уроках:

TableProvider

Table Provider (trait)
Термин

Trait, определяющий источник данных для DataFusion. TableProvider описывает схему таблицы и создаёт ExecutionPlan для сканирования. Реализации включают: MemTable (в памяти), ListingTable (файлы), DeltaTable (Delta Lake), и пользовательские провайдеры для любых внешних источников.

Пример:
use datafusion::datasource::TableProvider;

#[async_trait]
impl TableProvider for MyApiTable {
    fn schema(&self) -> SchemaRef {
        Arc::new(Schema::new(vec![
            Field::new("id", DataType::Int64, false),
            Field::new("data", DataType::Utf8, true),
        ]))
    }
    async fn scan(
        &self,
        state: &dyn Session,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // Создание ExecutionPlan для чтения из API
    }
}
Подробнее в уроках:

Crate Architecture

Архитектура крейтов DataFusion
Термин

DataFusion состоит из нескольких крейтов с чёткими зависимостями: datafusion-common (типы и ошибки), datafusion-expr (выражения и логический план), datafusion-sql (SQL-парсер), datafusion-optimizer (правила оптимизации), datafusion-physical-plan (физическое выполнение), datafusion (верхнеуровневый фасад). Модульность позволяет использовать только нужные части.

Пример:
# Cargo.toml — можно зависеть от конкретных крейтов
[dependencies]
datafusion = "44"
# Или отдельные компоненты:
datafusion-common = "44"
datafusion-expr = "44"
datafusion-sql = "44"
datafusion-optimizer = "44"
datafusion-physical-plan = "44"
Подробнее в уроках:

Memory Management

Управление памятью
Термин

Система отслеживания и ограничения потребления оперативной памяти при выполнении запросов. DataFusion использует MemoryPool (GreedyMemoryPool, FairSpillPool, TrackConsumersPool) для контроля каждого оператора. При превышении лимита — автоматический spill на диск вместо OOM-краша.

Пример:
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;

// Установка лимита 4 GB
let pool = FairSpillPool::new(4 * 1024 * 1024 * 1024);
let runtime = RuntimeEnvBuilder::new()
    .with_memory_pool(Arc::new(pool))
    .build()?;

let ctx = SessionContext::new_with_config_rt(
    SessionConfig::new(), runtime
);
Подробнее в уроках:

SQL и DataFrame

DataFrame

DataFrame API
Термин

Fluent API для построения запросов без SQL-синтаксиса. DataFrame в DataFusion — это обёртка над LogicalPlan, предоставляющая методы filter(), select(), aggregate(), join(). Вызовы ленивые — выполнение происходит только при collect() или show(). Эквивалентен SQL по возможностям.

Пример:
use datafusion::prelude::*;

let df = ctx.read_csv("data.csv", CsvReadOptions::default()).await?;

let result = df
    .filter(col("age").gt(lit(18)))?
    .select(vec![col("name"), col("age")])?
    .sort(vec![col("age").sort(true, true)])?
    .limit(0, Some(10))?;

result.show().await?;
Подробнее в уроках:

Expr

Expression
Термин

Enum, описывающий выражения в логическом плане: Column, Literal, BinaryExpr (сравнения, арифметика), AggregateFunction, ScalarFunction, WindowFunction, и другие. Expr — строительный блок для условий фильтрации, проекций и агрегаций как в SQL, так и в DataFrame API.

Пример:
use datafusion::prelude::*;
use datafusion::logical_expr::Expr;

// Построение выражений
let filter = col("age").gt(lit(18))
    .and(col("status").eq(lit("active")));

// Агрегация
let avg_age = avg(col("age")).alias("avg_age");

// Window-выражение
let rank = Expr::WindowFunction(WindowFunction::new(
    WindowFunctionDefinition::BuiltInWindowFunction(
        BuiltInWindowFunction::RowNumber
    ),
    vec![],
));
Подробнее в уроках:

EXPLAIN / EXPLAIN ANALYZE

Query Explanation
Термин

SQL-команды для анализа планов выполнения. EXPLAIN показывает логический и физический план без выполнения запроса. EXPLAIN ANALYZE выполняет запрос и добавляет реальные метрики: время выполнения каждого оператора, количество обработанных строк, размер данных. Незаменимый инструмент для оптимизации.

Пример:
-- Логический + физический план
EXPLAIN SELECT * FROM users WHERE age > 30;

-- План с реальными метриками
EXPLAIN ANALYZE
SELECT department, AVG(salary)
FROM employees
GROUP BY department;

-- Результат: elapsed_compute=12ms, output_rows=5
--   HashAggregateExec: output_rows=5
--     CoalesceBatchesExec: output_rows=1000
--       FilterExec: output_rows=1000
Подробнее в уроках:

Window Function

Оконная функция
Термин

Функции, которые вычисляют значение для каждой строки на основе «окна» — набора связанных строк. DataFusion поддерживает ROW_NUMBER, RANK, DENSE_RANK, LAG, LEAD, FIRST_VALUE, NTH_VALUE и пользовательские WindowUDF. Окно задаётся через PARTITION BY и ORDER BY.

Пример:
-- Ранжирование по зарплате в каждом отделе
SELECT
    name,
    department,
    salary,
    RANK() OVER (
        PARTITION BY department
        ORDER BY salary DESC
    ) AS salary_rank,
    LAG(salary) OVER (
        PARTITION BY department
        ORDER BY salary
    ) AS prev_salary
FROM employees;
Подробнее в уроках:

CTE

Common Table Expression
Термин

Именованные подзапросы (WITH ... AS), улучшающие читаемость сложных SQL. DataFusion поддерживает нерекурсивные CTE, которые разворачиваются в логический план как inline-подзапросы. CTE позволяют разбить сложный запрос на логические шаги.

Пример:
-- CTE для пошагового анализа
WITH active_users AS (
    SELECT user_id, name
    FROM users
    WHERE last_login > '2024-01-01'
),
user_orders AS (
    SELECT u.name, COUNT(*) as order_count
    FROM active_users u
    JOIN orders o ON u.user_id = o.user_id
    GROUP BY u.name
)
SELECT name, order_count
FROM user_orders
WHERE order_count > 5
ORDER BY order_count DESC;
Подробнее в уроках:

SQL Engine

SQL-движок DataFusion
Термин

Компонент DataFusion, отвечающий за парсинг SQL-текста в AST (на основе sqlparser-rs), трансформацию AST в LogicalPlan, валидацию типов и разрешение имён через каталог. Поддерживает диалект, совместимый с PostgreSQL, включая CREATE TABLE, INSERT, COPY, и множество функций.

Пример:
use datafusion::prelude::*;

let ctx = SessionContext::new();

// CREATE TABLE
ctx.sql("
    CREATE EXTERNAL TABLE logs (
        timestamp TIMESTAMP,
        level VARCHAR,
        message VARCHAR
    ) STORED AS PARQUET
    LOCATION 'data/logs/'
").await?;

// Запрос с агрегацией
let df = ctx.sql("
    SELECT level, COUNT(*) as cnt
    FROM logs
    WHERE timestamp > '2024-01-01'
    GROUP BY level
").await?;
Подробнее в уроках:

Расширяемость

ScalarUDF

Scalar User-Defined Function
Термин

Пользовательская скалярная функция, которая принимает один или несколько столбцов и возвращает столбец с результатом (по одному значению на строку). ScalarUDF регистрируется в SessionContext и вызывается в SQL как обычная функция. DataFusion автоматически применяет её к каждому RecordBatch.

Пример:
use datafusion::logical_expr::{create_udf, Volatility};
use datafusion::arrow::datatypes::DataType;
use datafusion::physical_plan::functions::make_scalar_function;

let udf = create_udf(
    "my_upper",
    vec![DataType::Utf8],      // входные типы
    DataType::Utf8,             // тип результата
    Volatility::Immutable,
    make_scalar_function(|args| {
        let input = as_string_array(&args[0])?;
        let result: StringArray = input.iter()
            .map(|s| s.map(|v| v.to_uppercase()))
            .collect();
        Ok(Arc::new(result) as ArrayRef)
    }),
);
ctx.register_udf(udf);
Подробнее в уроках:

AggregateUDF

Aggregate User-Defined Function
Термин

Пользовательская агрегатная функция, которая принимает набор строк и возвращает одно значение (или одно значение на группу). AggregateUDF реализует trait Accumulator с методами update_batch(), merge(), evaluate(). Используется в GROUP BY и оконных выражениях.

Пример:
use datafusion::logical_expr::AggregateUDF;

// Пример: геометрическое среднее
struct GeoMeanAccumulator {
    product: f64,
    count: u64,
}
impl Accumulator for GeoMeanAccumulator {
    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
        let arr = as_float64_array(&values[0])?;
        for val in arr.iter().flatten() {
            self.product *= val;
            self.count += 1;
        }
        Ok(())
    }
    fn evaluate(&mut self) -> Result<ScalarValue> {
        Ok(ScalarValue::Float64(Some(
            self.product.powf(1.0 / self.count as f64)
        )))
    }
}
Подробнее в уроках:

WindowUDF

Window User-Defined Function
Термин

Пользовательская оконная функция, которая вычисляет значение для каждой строки на основе окна (OVER). В отличие от Aggregатных UDF, WindowUDF получает доступ ко всем строкам окна и может обращаться к соседним строкам (как LAG/LEAD). Реализует trait WindowUDFImpl.

Пример:
use datafusion::logical_expr::WindowUDF;

// Регистрация оконной UDF
ctx.register_udwf(my_window_udf);

// Использование в SQL
let df = ctx.sql("
    SELECT
        ts,
        value,
        MY_MOVING_AVG(value) OVER (
            ORDER BY ts
            ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
        ) as moving_avg
    FROM metrics
").await?;
Подробнее в уроках:

OptimizerRule

Optimizer Rule
Термин

Правило трансформации логического плана. OptimizerRule принимает LogicalPlan и возвращает эквивалентный, но более эффективный план. Встроенные правила: push-down предикатов/проекций, устранение подвыражений, свёртка констант. Можно добавить кастомные правила через SessionState.

Пример:
use datafusion::optimizer::OptimizerRule;

pub struct MyPruningRule;

impl OptimizerRule for MyPruningRule {
    fn name(&self) -> &str { "my_pruning_rule" }

    fn try_optimize(
        &self,
        plan: &LogicalPlan,
        config: &dyn OptimizerConfig,
    ) -> Result<Option<LogicalPlan>> {
        // Анализ и трансформация плана
        match plan {
            LogicalPlan::Filter(filter) => { ... }
            _ => Ok(None), // без изменений
        }
    }
}
Подробнее в уроках:

AnalyzerRule

Analyzer Rule
Термин

Правило анализа, выполняемое до оптимизации. AnalyzerRule валидирует и обогащает логический план: разрешает типы, проверяет корректность имён столбцов, встраивает значения по умолчанию. В отличие от OptimizerRule, AnalyzerRule может менять семантику (добавлять cast-ы, разрешать wildcard *).

Пример:
use datafusion::optimizer::AnalyzerRule;

pub struct TypeCoercionRule;

impl AnalyzerRule for TypeCoercionRule {
    fn name(&self) -> &str { "type_coercion" }

    fn analyze(
        &self,
        plan: LogicalPlan,
        config: &ConfigOptions,
    ) -> Result<LogicalPlan> {
        // Добавление CAST там, где типы не совпадают
        plan.transform_up(|node| {
            // ... вставка приведений типов
        })
    }
}
Подробнее в уроках:

DataFusionError

DataFusion Error
Термин

Единый тип ошибки DataFusion, покрывающий все уровни: парсинг SQL, планирование, оптимизация, выполнение. Варианты: Plan, SQL, ArrowError, Execution, Internal, NotImplemented. Для UDF и расширений — создавайте ошибки через datafusion_common::DataFusionError для консистентной обработки.

Пример:
use datafusion_common::DataFusionError;

// Ошибка планирования
return Err(DataFusionError::Plan(
    format!("Column '{}' not found in schema", name)
));

// Ошибка выполнения
return Err(DataFusionError::Execution(
    "Division by zero in UDF".to_string()
));

// Конвертация из Arrow ошибки
let arrow_result = batch.column(0)
    .as_any()
    .downcast_ref::<Int64Array>()
    .ok_or(DataFusionError::Internal(
        "Expected Int64Array".to_string()
    ))?;
Подробнее в уроках:

sqllogictest

SQL Logic Test
Термин

Фреймворк для тестирования SQL-движков, используемый в DataFusion. Тесты записываются как .slt файлы: SQL-запрос и ожидаемый результат в текстовом формате. Поддерживает statement ok, query, halt, include. DataFusion использует sqllogictest для регрессионного тестирования сотен SQL-сценариев.

Пример:
# Файл: my_tests.slt

# Создание таблицы
statement ok
CREATE TABLE t (a INT, b VARCHAR);

# Вставка данных
statement ok
INSERT INTO t VALUES (1, 'hello'), (2, 'world');

# Проверка результата (I = Integer, T = Text)
query IT
SELECT a, b FROM t ORDER BY a;
----
1 hello
2 world
Подробнее в уроках:

PhysicalPlanner

Physical Planner
Термин

Компонент, преобразующий LogicalPlan в ExecutionPlan (физический план). PhysicalPlanner выбирает конкретные реализации операторов: HashJoin vs SortMergeJoin, одно- или двухфазная агрегация. Можно подменить DefaultPhysicalPlanner через SessionState для кастомного маппинга логических узлов в физические.

Пример:
use datafusion::physical_planner::PhysicalPlanner;

// Кастомный physical planner для специализированных операторов
pub struct MyPlanner {
    default: DefaultPhysicalPlanner,
}

#[async_trait]
impl PhysicalPlanner for MyPlanner {
    async fn create_physical_plan(
        &self,
        logical_plan: &LogicalPlan,
        session_state: &SessionState,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        match logical_plan {
            // Подмена scan-оператора для GPU
            LogicalPlan::TableScan(scan)
                if is_gpu_table(scan) => {
                Ok(Arc::new(GpuScanExec::new(scan)))
            }
            _ => self.default.create_physical_plan(
                logical_plan, session_state
            ).await
        }
    }
}
Подробнее в уроках:

Оптимизация запросов

Predicate Pushdown

Проталкивание предикатов
Термин

Правило оптимизации, перемещающее фильтры (WHERE) как можно ближе к источнику данных. Если фильтр может применяться на уровне сканирования файлов, Parquet-ридер пропускает целые row group-ы, а CSV/JSON-ридеры отбрасывают строки раньше. Одна из самых эффективных оптимизаций.

Пример:
-- До оптимизации:
-- Projection: [name]
--   Filter: age > 30
--     TableScan: users

-- После Predicate Pushdown:
-- Projection: [name]
--   TableScan: users, filter=[age > 30]
--   (фильтр применён на уровне scan —
--    row groups с max(age) <= 30 пропущены)
Подробнее в уроках:

Projection Pushdown

Проталкивание проекций
Термин

Правило оптимизации, ограничивающее набор читаемых столбцов до реально используемых в запросе. Если из 100 столбцов Parquet-файла используются только 3, остальные 97 не будут десериализованы. Критично для колоночных форматов, где каждый столбец хранится отдельно.

Пример:
-- Запрос использует только name и age
SELECT name FROM users WHERE age > 30;

-- Без Projection Pushdown: читаются все столбцы
-- С Projection Pushdown:
-- TableScan: users, projection=[name, age]
-- Parquet читает только 2 column chunk вместо всех
Подробнее в уроках:

Join Reorder

Переупорядочивание JOIN-ов
Термин

Оптимизация, изменяющая порядок соединений таблиц для минимизации промежуточных результатов. DataFusion использует статистики таблиц (row count, distinct values) для выбора оптимального порядка. Ставит таблицу с меньшей кардинальностью как build-сторону HashJoin.

Пример:
-- Исходный запрос: 3 JOIN-а
SELECT *
FROM large_table l       -- 10M rows
JOIN medium_table m      -- 100K rows
  ON l.id = m.large_id
JOIN small_table s       -- 1K rows
  ON m.id = s.medium_id;

-- После Join Reorder:
-- small × medium × large
-- (минимизация промежуточных данных)
Подробнее в уроках:

Statistics

Статистики таблиц
Термин

Метаданные о распределении данных: количество строк, min/max и distinct count для каждого столбца, размер в байтах. Parquet хранит статистики в footer-е каждого row group. DataFusion использует статистики для оптимизаций: pruning файлов, выбор порядка join, оценка стоимости планов.

Пример:
-- Просмотр статистик через EXPLAIN
EXPLAIN SELECT * FROM parquet_table;
-- TableScan: parquet_table
--   statistics: rows=1000000
--   column stats:
--     id: min=1, max=1000000, distinct=1000000
--     created: min=2020-01-01, max=2024-12-31

-- Настройка сбора статистик
SET datafusion.execution.collect_statistics = true;
Подробнее в уроках:

Cost-Based Optimization

CBO — оптимизация на основе стоимости
Термин

Подход к оптимизации запросов, при котором оптимизатор оценивает «стоимость» (CPU, I/O, память) нескольких альтернативных планов и выбирает самый дешёвый. В DataFusion CBO используется для выбора порядка join-ов и стратегий агрегации на основе статистик таблиц.

Пример:
-- CBO влияет на выбор стратегии join:
-- Если build-сторона помещается в память:
--   → HashJoinExec (быстрый)
-- Если данные уже отсортированы:
--   → SortMergeJoinExec (без доп. памяти)
-- Если одна сторона очень маленькая:
--   → NestedLoopJoinExec с broadcast

-- Включение CBO-оптимизаций
SET datafusion.optimizer.prefer_hash_join = true;
SET datafusion.optimizer.top_down_join_key_reordering = true;
Подробнее в уроках:

Dynamic Filtering

Динамическая фильтрация
Термин

Runtime-оптимизация, при которой результат одной стороны join используется для фильтрации другой стороны до передачи данных. Вместо чтения всех строк probe-стороны, DataFusion строит Bloom-фильтр или диапазон из build-стороны и применяет его к scan. Особенно эффективно для star-schema запросов.

Пример:
-- Пример: join большой fact-таблицы с малой dimension
SELECT f.amount, d.category
FROM fact_sales f
JOIN dim_products d ON f.product_id = d.id
WHERE d.category = 'electronics';

-- Без dynamic filtering: scan всех 100M строк fact
-- С dynamic filtering:
--   1. Scan dim_products → product_ids для 'electronics'
--   2. Bloom filter из этих id-шников
--   3. Scan fact_sales с фильтром → пропуск row groups
Подробнее в уроках:

Распределённое выполнение

Ballista

Apache Ballista
Термин

Распределённый движок на базе DataFusion для выполнения запросов на кластере. Архитектура: Scheduler распределяет stage-и по Executor-ам, данные передаются через Arrow Flight. Ballista масштабирует DataFusion горизонтально, сохраняя совместимость с SQL и DataFrame API.

Пример:
use ballista::prelude::*;

// Подключение к кластеру Ballista
let config = BallistaConfig::builder()
    .set("ballista.shuffle.partitions", "16")
    .build()?;

let ctx = BallistaContext::remote(
    "scheduler-host", 50050, &config
).await?;

// SQL выполняется распределённо
let df = ctx.sql("
    SELECT region, SUM(amount)
    FROM sales
    GROUP BY region
").await?;
Подробнее в уроках:

DataFusion-Ray

DataFusion on Ray
Термин

Интеграция DataFusion с Ray — распределённым фреймворком из Python-экосистемы. DataFusion-Ray использует Ray для распределения физических планов по узлам кластера. Преимущество перед Ballista: зрелая экосистема Ray (auto-scaling, GPU, ML), привычная Python-среда.

Пример:
import datafusion_ray
import ray

ray.init()

# Создание контекста DataFusion на Ray
ctx = datafusion_ray.DatafusionRayContext(
    num_workers=4
)

# SQL выполняется на Ray-кластере
df = ctx.sql("
    SELECT date, SUM(revenue)
    FROM sales_data
    GROUP BY date
")
result = df.collect()
Подробнее в уроках:

Stage / Partition

Стадия / Партиция
Термин

Stage — этап распределённого плана выполнения, ограниченный shuffle-границей (repartition). Каждый stage выполняется параллельно на нескольких partition-ах. Partition — единица параллелизма: один RecordBatch-поток на одном ядре. Количество партиций задаётся target_partitions.

Пример:
-- Физический план с 2 стадиями:
-- Stage 1: Параллельный scan + фильтрация
--   Partition 0: файлы 1-4
--   Partition 1: файлы 5-8
-- [Shuffle по hash(user_id)]
-- Stage 2: Агрегация по user_id
--   Partition 0: users A-M
--   Partition 1: users N-Z

SET datafusion.execution.target_partitions = 8;
Подробнее в уроках:

Comet

Apache DataFusion Comet
Термин

Ускоритель Apache Spark, заменяющий Spark JVM-операторы на нативные DataFusion-операторы. Comet перехватывает физический план Spark и выполняет поддерживаемые операции (scan, filter, aggregate, join) через DataFusion, получая 2-5x ускорение без изменения Spark-кода.

Пример:
# Spark + Comet: нулевые изменения в коде
# spark-defaults.conf:
spark.plugins org.apache.spark.CometPlugin
spark.comet.enabled true
spark.comet.exec.enabled true
spark.comet.exec.all.enabled true

# Обычный PySpark-код ускоряется автоматически
df = spark.read.parquet("data/")
result = df.groupBy("key").agg(sum("value"))
result.show()
Подробнее в уроках:

FDAP Stack

Flight, DataFusion, Arrow, Parquet
Термин

Стек технологий для построения аналитических систем: Parquet (хранение), Arrow (память), DataFusion (выполнение), Flight (транспорт). Все компоненты нативно совместимы — данные передаются между уровнями без сериализации. FDAP используется в InfluxDB, GlareDB, SeaFowl и десятках других проектов.

Пример:
// Типичный FDAP-стек:
// 1. Parquet — данные на диске (S3, HDFS)
// 2. Arrow — колоночный формат в памяти
// 3. DataFusion — SQL/DataFrame выполнение
// 4. Flight — передача клиенту

let ctx = SessionContext::new();
ctx.register_parquet("data", "s3://bucket/data/",
    ParquetReadOptions::default()
).await?;

// DataFusion читает Parquet → Arrow → результат
let df = ctx.sql("SELECT * FROM data").await?;
// Flight передаёт Arrow RecordBatch клиенту
df.collect().await?;
Подробнее в уроках:

Performance и Memory

MemoryPool

Memory Pool
Термин

Интерфейс управления памятью в DataFusion. MemoryPool отслеживает потребление памяти каждым оператором и принимает решение: продолжить выполнение или сбросить данные на диск (spill). Реализации: GreedyMemoryPool (первый забирает всё), FairSpillPool (справедливое распределение).

Пример:
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;

// Ограничение памяти до 4 GB
let pool = FairSpillPool::new(4 * 1024 * 1024 * 1024);

let runtime = RuntimeEnvBuilder::new()
    .with_memory_pool(Arc::new(pool))
    .build()?;

let config = SessionConfig::new()
    .with_target_partitions(8);

let ctx = SessionContext::new_with_config_rt(config, runtime);
Подробнее в уроках:

FairSpillPool

Fair Spill Pool
Термин

Реализация MemoryPool, которая справедливо распределяет доступную память между всеми активными операторами. Когда общее потребление достигает лимита, FairSpillPool уведомляет оператора-потребителя о необходимости spill — сброса промежуточных данных на диск для освобождения RAM.

Пример:
use datafusion::execution::memory_pool::FairSpillPool;

// 2 GB лимит, справедливое распределение
let pool = FairSpillPool::new(2 * 1024 * 1024 * 1024);

// Если 4 оператора активны, каждый получит ~512 MB
// При превышении — spill на диск
// FairSpillPool выбирает оператора с наибольшим
// потреблением для spill
Подробнее в уроках:

TrackConsumersPool

Track Consumers Pool
Термин

Обёртка над другими MemoryPool, добавляющая отслеживание потребления памяти по каждому оператору. Когда лимит памяти превышен, TrackConsumersPool включает в сообщение об ошибке детальную информацию — какие операторы потребляют сколько. Критически полезно для отладки OOM-ситуаций.

Пример:
use datafusion::execution::memory_pool::{
    TrackConsumersPool, FairSpillPool
};

// Обернуть FairSpillPool для отладки
let inner = FairSpillPool::new(2_000_000_000);
let pool = TrackConsumersPool::new(
    inner,
    NonZeroUsize::new(10).unwrap(), // top 10 consumers
);

// При OOM: "Resources exhausted:
//   HashJoinExec[0]: 1.2 GB
//   SortExec[1]: 600 MB
//   AggregateExec[2]: 200 MB"
Подробнее в уроках:

target_partitions

Target Partitions
Термин

Настройка DataFusion, определяющая степень параллелизма при выполнении запросов. По умолчанию равна количеству ядер CPU. Каждый partition обрабатывается отдельной Tokio-задачей. Увеличение target_partitions улучшает параллелизм, но увеличивает потребление памяти и overhead на координацию.

Пример:
-- SQL: установка партиций
SET datafusion.execution.target_partitions = 16;

-- Rust: через SessionConfig
let config = SessionConfig::new()
    .with_target_partitions(16);
let ctx = SessionContext::new_with_config(config);

-- Проверка: EXPLAIN покажет RepartitionExec
EXPLAIN SELECT region, SUM(sales)
FROM data GROUP BY region;
-- RepartitionExec: partitioning=Hash([region], 16)
Подробнее в уроках:

Spill-to-Disk

Сброс на диск
Термин

Механизм работы с данными, не помещающимися в оперативную память. Когда MemoryPool сигнализирует о превышении лимита, операторы (Sort, HashAggregate, HashJoin) сбрасывают промежуточные данные на диск и продолжают выполнение. Без spill — OOM и аварийное завершение запроса.

Пример:
-- Настройка директории для spill
SET datafusion.execution.sort_spill_reservation_bytes = 10485760;

-- В EXPLAIN ANALYZE видно spill-метрики:
EXPLAIN ANALYZE
SELECT * FROM large_table ORDER BY value;
-- SortExec:
--   elapsed_compute: 4.2s
--   spill_count: 3
--   spilled_bytes: 2147483648
--   output_rows: 50000000
Подробнее в уроках:

EXPLAIN FORMAT TREE

Древовидный формат EXPLAIN
Термин

Расширенный формат вывода плана выполнения, показывающий дерево операторов с индентацией, метриками каждого узла и потоком данных. В отличие от обычного EXPLAIN, FORMAT TREE визуально структурирует сложные планы с множеством join-ов и подзапросов.

Пример:
-- Древовидный формат плана
EXPLAIN FORMAT TREE
SELECT u.name, COUNT(o.id)
FROM users u
JOIN orders o ON u.id = o.user_id
GROUP BY u.name;

-- Результат:
-- AggregateExec [name, COUNT(o.id)]
-- └─ HashJoinExec [u.id = o.user_id]
--    ├─ DataSourceExec [users]
--    └─ DataSourceExec [orders]
Подробнее в уроках:

Lakehouse и интеграции

Delta Lake

Delta Lake
Термин

Открытый формат таблиц с ACID-транзакциями поверх объектного хранилища. DataFusion интегрируется с Delta Lake через крейт delta-rs, что даёт time travel, schema evolution, Z-ordering и MERGE. Delta Lake — один из ключевых Lakehouse-форматов, поддерживаемых DataFusion.

Пример:
use deltalake::DeltaTableBuilder;
use datafusion::prelude::*;

let ctx = SessionContext::new();

// Регистрация Delta-таблицы
let table = DeltaTableBuilder::from_uri("s3://bucket/delta-table")
    .with_storage_options(storage_opts)
    .load().await?;

ctx.register_table("events", Arc::new(table))?;

// Time travel: чтение версии 5
let df = ctx.sql("
    SELECT * FROM events VERSION AS OF 5
").await?;
Подробнее в уроках:

Apache Iceberg

Apache Iceberg
Термин

Формат таблиц для больших аналитических датасетов с поддержкой schema evolution, partition evolution и hidden partitioning. DataFusion интегрируется с Iceberg через iceberg-rust. Iceberg использует manifest-файлы для эффективного pruning — DataFusion пропускает ненужные data-файлы на основе статистик.

Пример:
use iceberg_datafusion::IcebergTableProvider;
use datafusion::prelude::*;

let ctx = SessionContext::new();

// Подключение к Iceberg-каталогу
let provider = IcebergTableProvider::try_new(
    catalog,
    "db.events"
).await?;

ctx.register_table("events", Arc::new(provider))?;

// Iceberg partition pruning работает автоматически
let df = ctx.sql("
    SELECT event_type, COUNT(*)
    FROM events
    WHERE date = '2024-03-15'
    GROUP BY event_type
").await?;
Подробнее в уроках:

ObjectStore

Object Store
Термин

Абстракция в экосистеме Arrow для доступа к объектным хранилищам: S3, GCS, Azure Blob, HDFS и локальная файловая система. DataFusion использует object_store для чтения и записи данных. Поддерживает multipart upload, conditional put, range reads для эффективного доступа к Parquet column chunks.

Пример:
use object_store::aws::AmazonS3Builder;
use datafusion::prelude::*;
use url::Url;

let s3 = AmazonS3Builder::new()
    .with_bucket_name("my-bucket")
    .with_region("us-east-1")
    .build()?;

let ctx = SessionContext::new();
ctx.register_object_store(
    &Url::parse("s3://my-bucket")?,
    Arc::new(s3),
);

// Чтение напрямую из S3
let df = ctx.read_parquet(
    "s3://my-bucket/data/*.parquet",
    ParquetReadOptions::default(),
).await?;
Подробнее в уроках:

Parquet

Apache Parquet
Термин

Колоночный формат хранения данных, оптимизированный для аналитических запросов. Parquet хранит данные по столбцам с компрессией (Snappy, Zstd, LZ4), содержит встроенные статистики (min/max, null count) для каждого row group. DataFusion нативно поддерживает Parquet с predicate pushdown и projection pushdown.

Пример:
use datafusion::prelude::*;

let ctx = SessionContext::new();

// Чтение Parquet с автоматическим pruning
ctx.register_parquet(
    "logs",
    "data/logs/*.parquet",
    ParquetReadOptions::default()
        .parquet_pruning(true)     // row group pruning
        .skip_metadata(false),     // читать footer
).await?;

// Запись результата в Parquet
let df = ctx.sql("SELECT * FROM logs WHERE level = 'ERROR'").await?;
df.write_parquet(
    "output/errors.parquet",
    WriterProperties::builder()
        .set_compression(Compression::ZSTD(ZstdLevel::default()))
        .build(),
    None,
).await?;
Подробнее в уроках:

Multi-Tenant Isolation

Мультитенантная изоляция
Термин

Паттерн изоляции нескольких клиентов (tenant-ов) в одном экземпляре DataFusion. Каждый tenant получает собственный SessionContext с отдельным каталогом, MemoryPool-лимитами и правами доступа. Изоляция предотвращает утечки данных между tenant-ами и гарантирует справедливое распределение ресурсов.

Пример:
// Создание изолированного контекста для каждого tenant
fn create_tenant_context(
    tenant_id: &str,
    memory_limit: usize,
) -> SessionContext {
    let pool = FairSpillPool::new(memory_limit);
    let runtime = RuntimeEnvBuilder::new()
        .with_memory_pool(Arc::new(pool))
        .build().unwrap();

    let config = SessionConfig::new()
        .with_default_catalog_and_schema(
            tenant_id, "public"
        );

    SessionContext::new_with_config_rt(config, runtime)
}

// Каждый tenant видит только свои таблицы
let ctx_a = create_tenant_context("tenant_a", GB_2);
let ctx_b = create_tenant_context("tenant_b", GB_4);
Подробнее в уроках: