Справочник ключевых терминов курса Apache DataFusion.
Основная единица данных в Apache Arrow — набор столбцов одинаковой длины, объединённых общей схемой. RecordBatch хранит данные в колоночном формате для эффективной SIMD-обработки. В DataFusion все промежуточные результаты передаются как поток RecordBatch-ей.
use arrow::array::Int32Array;
use arrow::record_batch::RecordBatch;
use arrow::datatypes::{Schema, Field, DataType};
use std::sync::Arc;
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
]));
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)?;Иммутабельная колоночная коллекция значений одного типа в Apache Arrow. Каждый Array содержит буфер данных, буфер validity (для NULL-значений) и опциональные offset-буферы для строк и списков. DataFusion использует Arrow Arrays для всех вычислений на столбцах.
use arrow::array::{StringArray, Int64Array, Float64Array};
let strings = StringArray::from(vec!["hello", "world"]);
let ints = Int64Array::from(vec![10, 20, 30]);
let floats = Float64Array::from(vec![Some(1.5), None, Some(3.0)]);
// None -> NULL в ArrowОписание структуры данных — упорядоченный набор полей (Field), каждое из которых содержит имя, тип данных (DataType) и флаг nullable. Schema определяет формат RecordBatch и используется для валидации данных при чтении, записи и трансформациях.
use arrow::datatypes::{Schema, Field, DataType};
let schema = Schema::new(vec![
Field::new("name", DataType::Utf8, false),
Field::new("age", DataType::Int32, true),
Field::new("score", DataType::Float64, true),
]);
println!("Полей: {}", schema.fields().len());Перечисление (enum) всех поддерживаемых типов данных в Arrow — от примитивных (Int8–Int64, Float16–Float64, Boolean) до вложенных (List, Struct, Map, Union). DataType определяет формат хранения в памяти и набор доступных операций над столбцом.
use arrow::datatypes::DataType;
// Примитивные типы
let int_type = DataType::Int64;
let str_type = DataType::Utf8;
// Вложенные типы
let list_type = DataType::List(
Arc::new(Field::new("item", DataType::Int32, true))
);
let ts_type = DataType::Timestamp(
TimeUnit::Microsecond, Some("UTC".into())
);Формат сериализации Arrow для обмена данными между процессами с нулевым копированием (zero-copy). Arrow IPC позволяет передавать RecordBatch между процессами и языками без десериализации — данные в памяти уже в нужном формате. Используется в Arrow Flight и при кешировании.
use arrow::ipc::writer::FileWriter;
use arrow::ipc::reader::FileReader;
use std::fs::File;
// Запись в IPC файл
let file = File::create("data.arrow")?;
let mut writer = FileWriter::try_new(file, &schema)?;
writer.write(&batch)?;
writer.finish()?;
// Чтение из IPC файла
let file = File::open("data.arrow")?;
let reader = FileReader::try_new(file, None)?;Высокопроизводительный RPC-протокол на основе gRPC для передачи данных в формате Arrow. Flight оптимизирован для потоковой передачи больших датасетов без сериализации/десериализации. В DataFusion используется в Ballista для распределённого выполнения запросов между узлами.
// Ballista использует Flight для передачи данных
// между scheduler и executor узлами
use ballista::prelude::*;
let ctx = BallistaContext::remote(
"localhost", 50050, &config
).await?;
// Данные передаются через Arrow Flight
let df = ctx.sql("SELECT * FROM t").await?;Главная точка входа в DataFusion — объект, через который выполняются SQL-запросы, регистрируются таблицы и UDF, настраиваются параметры выполнения. SessionContext владеет каталогом, оптимизатором и runtime-конфигурацией. Каждый контекст изолирован — идеально для мультитенантных сценариев.
use datafusion::prelude::*;
let ctx = SessionContext::new();
// Регистрация CSV как таблицы
ctx.register_csv("users", "data/users.csv",
CsvReadOptions::default()
).await?;
// Выполнение SQL
let df = ctx.sql(
"SELECT name, age FROM users WHERE age > 30"
).await?;
df.show().await?;Декларативное дерево операций, описывающее «что» нужно сделать — без указания «как». LogicalPlan содержит узлы: Scan, Filter, Projection, Aggregate, Join, Sort, Limit. Оптимизатор трансформирует LogicalPlan перед преобразованием в PhysicalPlan.
-- Просмотр логического плана
EXPLAIN
SELECT name, COUNT(*)
FROM users
WHERE active = true
GROUP BY name
ORDER BY COUNT(*) DESC;
-- Результат:
-- Sort: COUNT(*) DESC
-- Aggregate: groupBy=[name], aggr=[COUNT(*)]
-- Filter: active = true
-- TableScan: usersИсполняемое дерево операторов, определяющее конкретный способ выполнения запроса — стратегии join (HashJoin, SortMergeJoin), режимы сканирования (параллельный, с pruning), способы агрегации. PhysicalPlan создаётся из LogicalPlan с учётом статистик и настроек runtime.
-- Просмотр физического плана с метриками
EXPLAIN ANALYZE
SELECT u.name, COUNT(o.id)
FROM users u JOIN orders o ON u.id = o.user_id
GROUP BY u.name;
-- Результат покажет:
-- HashJoinExec: mode=Partitioned
-- AggregateExec: groupBy=[name]
-- CoalesceBatchesExec
-- RepartitionExec: Hash([user_id])Trait в DataFusion, который реализует каждый физический оператор. Метод execute() возвращает SendableRecordBatchStream — асинхронный поток RecordBatch-ей. Все встроенные операторы (FilterExec, HashJoinExec, SortExec) и пользовательские расширения реализуют ExecutionPlan.
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::SendableRecordBatchStream;
#[async_trait]
impl ExecutionPlan for MyCustomScan {
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
// возврат потока данных
}
fn schema(&self) -> SchemaRef { ... }
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { ... }
}Трёхуровневая система метаданных в DataFusion: Catalog → Schema → Table. По умолчанию используется MemoryCatalogProvider, но можно реализовать CatalogProvider для подключения внешних хранилищ метаданных (Hive Metastore, Glue, Unity Catalog). Каталог отвечает за разрешение имён таблиц в SQL-запросах.
use datafusion::catalog::CatalogProvider;
// Регистрация нового каталога
ctx.register_catalog(
"production",
Arc::new(MyHiveCatalog::new(metastore_url)),
);
// Использование в SQL
let df = ctx.sql(
"SELECT * FROM production.analytics.events"
).await?;Trait, определяющий источник данных для DataFusion. TableProvider описывает схему таблицы и создаёт ExecutionPlan для сканирования. Реализации включают: MemTable (в памяти), ListingTable (файлы), DeltaTable (Delta Lake), и пользовательские провайдеры для любых внешних источников.
use datafusion::datasource::TableProvider;
#[async_trait]
impl TableProvider for MyApiTable {
fn schema(&self) -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("data", DataType::Utf8, true),
]))
}
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Создание ExecutionPlan для чтения из API
}
}DataFusion состоит из нескольких крейтов с чёткими зависимостями: datafusion-common (типы и ошибки), datafusion-expr (выражения и логический план), datafusion-sql (SQL-парсер), datafusion-optimizer (правила оптимизации), datafusion-physical-plan (физическое выполнение), datafusion (верхнеуровневый фасад). Модульность позволяет использовать только нужные части.
# Cargo.toml — можно зависеть от конкретных крейтов
[dependencies]
datafusion = "44"
# Или отдельные компоненты:
datafusion-common = "44"
datafusion-expr = "44"
datafusion-sql = "44"
datafusion-optimizer = "44"
datafusion-physical-plan = "44"Система отслеживания и ограничения потребления оперативной памяти при выполнении запросов. DataFusion использует MemoryPool (GreedyMemoryPool, FairSpillPool, TrackConsumersPool) для контроля каждого оператора. При превышении лимита — автоматический spill на диск вместо OOM-краша.
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
// Установка лимита 4 GB
let pool = FairSpillPool::new(4 * 1024 * 1024 * 1024);
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(pool))
.build()?;
let ctx = SessionContext::new_with_config_rt(
SessionConfig::new(), runtime
);Fluent API для построения запросов без SQL-синтаксиса. DataFrame в DataFusion — это обёртка над LogicalPlan, предоставляющая методы filter(), select(), aggregate(), join(). Вызовы ленивые — выполнение происходит только при collect() или show(). Эквивалентен SQL по возможностям.
use datafusion::prelude::*;
let df = ctx.read_csv("data.csv", CsvReadOptions::default()).await?;
let result = df
.filter(col("age").gt(lit(18)))?
.select(vec![col("name"), col("age")])?
.sort(vec![col("age").sort(true, true)])?
.limit(0, Some(10))?;
result.show().await?;Enum, описывающий выражения в логическом плане: Column, Literal, BinaryExpr (сравнения, арифметика), AggregateFunction, ScalarFunction, WindowFunction, и другие. Expr — строительный блок для условий фильтрации, проекций и агрегаций как в SQL, так и в DataFrame API.
use datafusion::prelude::*;
use datafusion::logical_expr::Expr;
// Построение выражений
let filter = col("age").gt(lit(18))
.and(col("status").eq(lit("active")));
// Агрегация
let avg_age = avg(col("age")).alias("avg_age");
// Window-выражение
let rank = Expr::WindowFunction(WindowFunction::new(
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::RowNumber
),
vec![],
));SQL-команды для анализа планов выполнения. EXPLAIN показывает логический и физический план без выполнения запроса. EXPLAIN ANALYZE выполняет запрос и добавляет реальные метрики: время выполнения каждого оператора, количество обработанных строк, размер данных. Незаменимый инструмент для оптимизации.
-- Логический + физический план
EXPLAIN SELECT * FROM users WHERE age > 30;
-- План с реальными метриками
EXPLAIN ANALYZE
SELECT department, AVG(salary)
FROM employees
GROUP BY department;
-- Результат: elapsed_compute=12ms, output_rows=5
-- HashAggregateExec: output_rows=5
-- CoalesceBatchesExec: output_rows=1000
-- FilterExec: output_rows=1000Функции, которые вычисляют значение для каждой строки на основе «окна» — набора связанных строк. DataFusion поддерживает ROW_NUMBER, RANK, DENSE_RANK, LAG, LEAD, FIRST_VALUE, NTH_VALUE и пользовательские WindowUDF. Окно задаётся через PARTITION BY и ORDER BY.
-- Ранжирование по зарплате в каждом отделе
SELECT
name,
department,
salary,
RANK() OVER (
PARTITION BY department
ORDER BY salary DESC
) AS salary_rank,
LAG(salary) OVER (
PARTITION BY department
ORDER BY salary
) AS prev_salary
FROM employees;Именованные подзапросы (WITH ... AS), улучшающие читаемость сложных SQL. DataFusion поддерживает нерекурсивные CTE, которые разворачиваются в логический план как inline-подзапросы. CTE позволяют разбить сложный запрос на логические шаги.
-- CTE для пошагового анализа
WITH active_users AS (
SELECT user_id, name
FROM users
WHERE last_login > '2024-01-01'
),
user_orders AS (
SELECT u.name, COUNT(*) as order_count
FROM active_users u
JOIN orders o ON u.user_id = o.user_id
GROUP BY u.name
)
SELECT name, order_count
FROM user_orders
WHERE order_count > 5
ORDER BY order_count DESC;Компонент DataFusion, отвечающий за парсинг SQL-текста в AST (на основе sqlparser-rs), трансформацию AST в LogicalPlan, валидацию типов и разрешение имён через каталог. Поддерживает диалект, совместимый с PostgreSQL, включая CREATE TABLE, INSERT, COPY, и множество функций.
use datafusion::prelude::*;
let ctx = SessionContext::new();
// CREATE TABLE
ctx.sql("
CREATE EXTERNAL TABLE logs (
timestamp TIMESTAMP,
level VARCHAR,
message VARCHAR
) STORED AS PARQUET
LOCATION 'data/logs/'
").await?;
// Запрос с агрегацией
let df = ctx.sql("
SELECT level, COUNT(*) as cnt
FROM logs
WHERE timestamp > '2024-01-01'
GROUP BY level
").await?;Пользовательская скалярная функция, которая принимает один или несколько столбцов и возвращает столбец с результатом (по одному значению на строку). ScalarUDF регистрируется в SessionContext и вызывается в SQL как обычная функция. DataFusion автоматически применяет её к каждому RecordBatch.
use datafusion::logical_expr::{create_udf, Volatility};
use datafusion::arrow::datatypes::DataType;
use datafusion::physical_plan::functions::make_scalar_function;
let udf = create_udf(
"my_upper",
vec![DataType::Utf8], // входные типы
DataType::Utf8, // тип результата
Volatility::Immutable,
make_scalar_function(|args| {
let input = as_string_array(&args[0])?;
let result: StringArray = input.iter()
.map(|s| s.map(|v| v.to_uppercase()))
.collect();
Ok(Arc::new(result) as ArrayRef)
}),
);
ctx.register_udf(udf);Пользовательская агрегатная функция, которая принимает набор строк и возвращает одно значение (или одно значение на группу). AggregateUDF реализует trait Accumulator с методами update_batch(), merge(), evaluate(). Используется в GROUP BY и оконных выражениях.
use datafusion::logical_expr::AggregateUDF;
// Пример: геометрическое среднее
struct GeoMeanAccumulator {
product: f64,
count: u64,
}
impl Accumulator for GeoMeanAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let arr = as_float64_array(&values[0])?;
for val in arr.iter().flatten() {
self.product *= val;
self.count += 1;
}
Ok(())
}
fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::Float64(Some(
self.product.powf(1.0 / self.count as f64)
)))
}
}Пользовательская оконная функция, которая вычисляет значение для каждой строки на основе окна (OVER). В отличие от Aggregатных UDF, WindowUDF получает доступ ко всем строкам окна и может обращаться к соседним строкам (как LAG/LEAD). Реализует trait WindowUDFImpl.
use datafusion::logical_expr::WindowUDF;
// Регистрация оконной UDF
ctx.register_udwf(my_window_udf);
// Использование в SQL
let df = ctx.sql("
SELECT
ts,
value,
MY_MOVING_AVG(value) OVER (
ORDER BY ts
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) as moving_avg
FROM metrics
").await?;Правило трансформации логического плана. OptimizerRule принимает LogicalPlan и возвращает эквивалентный, но более эффективный план. Встроенные правила: push-down предикатов/проекций, устранение подвыражений, свёртка констант. Можно добавить кастомные правила через SessionState.
use datafusion::optimizer::OptimizerRule;
pub struct MyPruningRule;
impl OptimizerRule for MyPruningRule {
fn name(&self) -> &str { "my_pruning_rule" }
fn try_optimize(
&self,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
// Анализ и трансформация плана
match plan {
LogicalPlan::Filter(filter) => { ... }
_ => Ok(None), // без изменений
}
}
}Правило анализа, выполняемое до оптимизации. AnalyzerRule валидирует и обогащает логический план: разрешает типы, проверяет корректность имён столбцов, встраивает значения по умолчанию. В отличие от OptimizerRule, AnalyzerRule может менять семантику (добавлять cast-ы, разрешать wildcard *).
use datafusion::optimizer::AnalyzerRule;
pub struct TypeCoercionRule;
impl AnalyzerRule for TypeCoercionRule {
fn name(&self) -> &str { "type_coercion" }
fn analyze(
&self,
plan: LogicalPlan,
config: &ConfigOptions,
) -> Result<LogicalPlan> {
// Добавление CAST там, где типы не совпадают
plan.transform_up(|node| {
// ... вставка приведений типов
})
}
}Единый тип ошибки DataFusion, покрывающий все уровни: парсинг SQL, планирование, оптимизация, выполнение. Варианты: Plan, SQL, ArrowError, Execution, Internal, NotImplemented. Для UDF и расширений — создавайте ошибки через datafusion_common::DataFusionError для консистентной обработки.
use datafusion_common::DataFusionError;
// Ошибка планирования
return Err(DataFusionError::Plan(
format!("Column '{}' not found in schema", name)
));
// Ошибка выполнения
return Err(DataFusionError::Execution(
"Division by zero in UDF".to_string()
));
// Конвертация из Arrow ошибки
let arrow_result = batch.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.ok_or(DataFusionError::Internal(
"Expected Int64Array".to_string()
))?;Фреймворк для тестирования SQL-движков, используемый в DataFusion. Тесты записываются как .slt файлы: SQL-запрос и ожидаемый результат в текстовом формате. Поддерживает statement ok, query, halt, include. DataFusion использует sqllogictest для регрессионного тестирования сотен SQL-сценариев.
# Файл: my_tests.slt
# Создание таблицы
statement ok
CREATE TABLE t (a INT, b VARCHAR);
# Вставка данных
statement ok
INSERT INTO t VALUES (1, 'hello'), (2, 'world');
# Проверка результата (I = Integer, T = Text)
query IT
SELECT a, b FROM t ORDER BY a;
----
1 hello
2 worldКомпонент, преобразующий LogicalPlan в ExecutionPlan (физический план). PhysicalPlanner выбирает конкретные реализации операторов: HashJoin vs SortMergeJoin, одно- или двухфазная агрегация. Можно подменить DefaultPhysicalPlanner через SessionState для кастомного маппинга логических узлов в физические.
use datafusion::physical_planner::PhysicalPlanner;
// Кастомный physical planner для специализированных операторов
pub struct MyPlanner {
default: DefaultPhysicalPlanner,
}
#[async_trait]
impl PhysicalPlanner for MyPlanner {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
match logical_plan {
// Подмена scan-оператора для GPU
LogicalPlan::TableScan(scan)
if is_gpu_table(scan) => {
Ok(Arc::new(GpuScanExec::new(scan)))
}
_ => self.default.create_physical_plan(
logical_plan, session_state
).await
}
}
}Правило оптимизации, перемещающее фильтры (WHERE) как можно ближе к источнику данных. Если фильтр может применяться на уровне сканирования файлов, Parquet-ридер пропускает целые row group-ы, а CSV/JSON-ридеры отбрасывают строки раньше. Одна из самых эффективных оптимизаций.
-- До оптимизации:
-- Projection: [name]
-- Filter: age > 30
-- TableScan: users
-- После Predicate Pushdown:
-- Projection: [name]
-- TableScan: users, filter=[age > 30]
-- (фильтр применён на уровне scan —
-- row groups с max(age) <= 30 пропущены)Правило оптимизации, ограничивающее набор читаемых столбцов до реально используемых в запросе. Если из 100 столбцов Parquet-файла используются только 3, остальные 97 не будут десериализованы. Критично для колоночных форматов, где каждый столбец хранится отдельно.
-- Запрос использует только name и age
SELECT name FROM users WHERE age > 30;
-- Без Projection Pushdown: читаются все столбцы
-- С Projection Pushdown:
-- TableScan: users, projection=[name, age]
-- Parquet читает только 2 column chunk вместо всехОптимизация, изменяющая порядок соединений таблиц для минимизации промежуточных результатов. DataFusion использует статистики таблиц (row count, distinct values) для выбора оптимального порядка. Ставит таблицу с меньшей кардинальностью как build-сторону HashJoin.
-- Исходный запрос: 3 JOIN-а
SELECT *
FROM large_table l -- 10M rows
JOIN medium_table m -- 100K rows
ON l.id = m.large_id
JOIN small_table s -- 1K rows
ON m.id = s.medium_id;
-- После Join Reorder:
-- small × medium × large
-- (минимизация промежуточных данных)Метаданные о распределении данных: количество строк, min/max и distinct count для каждого столбца, размер в байтах. Parquet хранит статистики в footer-е каждого row group. DataFusion использует статистики для оптимизаций: pruning файлов, выбор порядка join, оценка стоимости планов.
-- Просмотр статистик через EXPLAIN
EXPLAIN SELECT * FROM parquet_table;
-- TableScan: parquet_table
-- statistics: rows=1000000
-- column stats:
-- id: min=1, max=1000000, distinct=1000000
-- created: min=2020-01-01, max=2024-12-31
-- Настройка сбора статистик
SET datafusion.execution.collect_statistics = true;Подход к оптимизации запросов, при котором оптимизатор оценивает «стоимость» (CPU, I/O, память) нескольких альтернативных планов и выбирает самый дешёвый. В DataFusion CBO используется для выбора порядка join-ов и стратегий агрегации на основе статистик таблиц.
-- CBO влияет на выбор стратегии join:
-- Если build-сторона помещается в память:
-- → HashJoinExec (быстрый)
-- Если данные уже отсортированы:
-- → SortMergeJoinExec (без доп. памяти)
-- Если одна сторона очень маленькая:
-- → NestedLoopJoinExec с broadcast
-- Включение CBO-оптимизаций
SET datafusion.optimizer.prefer_hash_join = true;
SET datafusion.optimizer.top_down_join_key_reordering = true;Runtime-оптимизация, при которой результат одной стороны join используется для фильтрации другой стороны до передачи данных. Вместо чтения всех строк probe-стороны, DataFusion строит Bloom-фильтр или диапазон из build-стороны и применяет его к scan. Особенно эффективно для star-schema запросов.
-- Пример: join большой fact-таблицы с малой dimension
SELECT f.amount, d.category
FROM fact_sales f
JOIN dim_products d ON f.product_id = d.id
WHERE d.category = 'electronics';
-- Без dynamic filtering: scan всех 100M строк fact
-- С dynamic filtering:
-- 1. Scan dim_products → product_ids для 'electronics'
-- 2. Bloom filter из этих id-шников
-- 3. Scan fact_sales с фильтром → пропуск row groupsРаспределённый движок на базе DataFusion для выполнения запросов на кластере. Архитектура: Scheduler распределяет stage-и по Executor-ам, данные передаются через Arrow Flight. Ballista масштабирует DataFusion горизонтально, сохраняя совместимость с SQL и DataFrame API.
use ballista::prelude::*;
// Подключение к кластеру Ballista
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "16")
.build()?;
let ctx = BallistaContext::remote(
"scheduler-host", 50050, &config
).await?;
// SQL выполняется распределённо
let df = ctx.sql("
SELECT region, SUM(amount)
FROM sales
GROUP BY region
").await?;Интеграция DataFusion с Ray — распределённым фреймворком из Python-экосистемы. DataFusion-Ray использует Ray для распределения физических планов по узлам кластера. Преимущество перед Ballista: зрелая экосистема Ray (auto-scaling, GPU, ML), привычная Python-среда.
import datafusion_ray
import ray
ray.init()
# Создание контекста DataFusion на Ray
ctx = datafusion_ray.DatafusionRayContext(
num_workers=4
)
# SQL выполняется на Ray-кластере
df = ctx.sql("
SELECT date, SUM(revenue)
FROM sales_data
GROUP BY date
")
result = df.collect()Stage — этап распределённого плана выполнения, ограниченный shuffle-границей (repartition). Каждый stage выполняется параллельно на нескольких partition-ах. Partition — единица параллелизма: один RecordBatch-поток на одном ядре. Количество партиций задаётся target_partitions.
-- Физический план с 2 стадиями:
-- Stage 1: Параллельный scan + фильтрация
-- Partition 0: файлы 1-4
-- Partition 1: файлы 5-8
-- [Shuffle по hash(user_id)]
-- Stage 2: Агрегация по user_id
-- Partition 0: users A-M
-- Partition 1: users N-Z
SET datafusion.execution.target_partitions = 8;Ускоритель Apache Spark, заменяющий Spark JVM-операторы на нативные DataFusion-операторы. Comet перехватывает физический план Spark и выполняет поддерживаемые операции (scan, filter, aggregate, join) через DataFusion, получая 2-5x ускорение без изменения Spark-кода.
# Spark + Comet: нулевые изменения в коде
# spark-defaults.conf:
spark.plugins org.apache.spark.CometPlugin
spark.comet.enabled true
spark.comet.exec.enabled true
spark.comet.exec.all.enabled true
# Обычный PySpark-код ускоряется автоматически
df = spark.read.parquet("data/")
result = df.groupBy("key").agg(sum("value"))
result.show()Стек технологий для построения аналитических систем: Parquet (хранение), Arrow (память), DataFusion (выполнение), Flight (транспорт). Все компоненты нативно совместимы — данные передаются между уровнями без сериализации. FDAP используется в InfluxDB, GlareDB, SeaFowl и десятках других проектов.
// Типичный FDAP-стек:
// 1. Parquet — данные на диске (S3, HDFS)
// 2. Arrow — колоночный формат в памяти
// 3. DataFusion — SQL/DataFrame выполнение
// 4. Flight — передача клиенту
let ctx = SessionContext::new();
ctx.register_parquet("data", "s3://bucket/data/",
ParquetReadOptions::default()
).await?;
// DataFusion читает Parquet → Arrow → результат
let df = ctx.sql("SELECT * FROM data").await?;
// Flight передаёт Arrow RecordBatch клиенту
df.collect().await?;Интерфейс управления памятью в DataFusion. MemoryPool отслеживает потребление памяти каждым оператором и принимает решение: продолжить выполнение или сбросить данные на диск (spill). Реализации: GreedyMemoryPool (первый забирает всё), FairSpillPool (справедливое распределение).
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
// Ограничение памяти до 4 GB
let pool = FairSpillPool::new(4 * 1024 * 1024 * 1024);
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(pool))
.build()?;
let config = SessionConfig::new()
.with_target_partitions(8);
let ctx = SessionContext::new_with_config_rt(config, runtime);Реализация MemoryPool, которая справедливо распределяет доступную память между всеми активными операторами. Когда общее потребление достигает лимита, FairSpillPool уведомляет оператора-потребителя о необходимости spill — сброса промежуточных данных на диск для освобождения RAM.
use datafusion::execution::memory_pool::FairSpillPool;
// 2 GB лимит, справедливое распределение
let pool = FairSpillPool::new(2 * 1024 * 1024 * 1024);
// Если 4 оператора активны, каждый получит ~512 MB
// При превышении — spill на диск
// FairSpillPool выбирает оператора с наибольшим
// потреблением для spillОбёртка над другими MemoryPool, добавляющая отслеживание потребления памяти по каждому оператору. Когда лимит памяти превышен, TrackConsumersPool включает в сообщение об ошибке детальную информацию — какие операторы потребляют сколько. Критически полезно для отладки OOM-ситуаций.
use datafusion::execution::memory_pool::{
TrackConsumersPool, FairSpillPool
};
// Обернуть FairSpillPool для отладки
let inner = FairSpillPool::new(2_000_000_000);
let pool = TrackConsumersPool::new(
inner,
NonZeroUsize::new(10).unwrap(), // top 10 consumers
);
// При OOM: "Resources exhausted:
// HashJoinExec[0]: 1.2 GB
// SortExec[1]: 600 MB
// AggregateExec[2]: 200 MB"Настройка DataFusion, определяющая степень параллелизма при выполнении запросов. По умолчанию равна количеству ядер CPU. Каждый partition обрабатывается отдельной Tokio-задачей. Увеличение target_partitions улучшает параллелизм, но увеличивает потребление памяти и overhead на координацию.
-- SQL: установка партиций
SET datafusion.execution.target_partitions = 16;
-- Rust: через SessionConfig
let config = SessionConfig::new()
.with_target_partitions(16);
let ctx = SessionContext::new_with_config(config);
-- Проверка: EXPLAIN покажет RepartitionExec
EXPLAIN SELECT region, SUM(sales)
FROM data GROUP BY region;
-- RepartitionExec: partitioning=Hash([region], 16)Механизм работы с данными, не помещающимися в оперативную память. Когда MemoryPool сигнализирует о превышении лимита, операторы (Sort, HashAggregate, HashJoin) сбрасывают промежуточные данные на диск и продолжают выполнение. Без spill — OOM и аварийное завершение запроса.
-- Настройка директории для spill
SET datafusion.execution.sort_spill_reservation_bytes = 10485760;
-- В EXPLAIN ANALYZE видно spill-метрики:
EXPLAIN ANALYZE
SELECT * FROM large_table ORDER BY value;
-- SortExec:
-- elapsed_compute: 4.2s
-- spill_count: 3
-- spilled_bytes: 2147483648
-- output_rows: 50000000Расширенный формат вывода плана выполнения, показывающий дерево операторов с индентацией, метриками каждого узла и потоком данных. В отличие от обычного EXPLAIN, FORMAT TREE визуально структурирует сложные планы с множеством join-ов и подзапросов.
-- Древовидный формат плана
EXPLAIN FORMAT TREE
SELECT u.name, COUNT(o.id)
FROM users u
JOIN orders o ON u.id = o.user_id
GROUP BY u.name;
-- Результат:
-- AggregateExec [name, COUNT(o.id)]
-- └─ HashJoinExec [u.id = o.user_id]
-- ├─ DataSourceExec [users]
-- └─ DataSourceExec [orders]Открытый формат таблиц с ACID-транзакциями поверх объектного хранилища. DataFusion интегрируется с Delta Lake через крейт delta-rs, что даёт time travel, schema evolution, Z-ordering и MERGE. Delta Lake — один из ключевых Lakehouse-форматов, поддерживаемых DataFusion.
use deltalake::DeltaTableBuilder;
use datafusion::prelude::*;
let ctx = SessionContext::new();
// Регистрация Delta-таблицы
let table = DeltaTableBuilder::from_uri("s3://bucket/delta-table")
.with_storage_options(storage_opts)
.load().await?;
ctx.register_table("events", Arc::new(table))?;
// Time travel: чтение версии 5
let df = ctx.sql("
SELECT * FROM events VERSION AS OF 5
").await?;Формат таблиц для больших аналитических датасетов с поддержкой schema evolution, partition evolution и hidden partitioning. DataFusion интегрируется с Iceberg через iceberg-rust. Iceberg использует manifest-файлы для эффективного pruning — DataFusion пропускает ненужные data-файлы на основе статистик.
use iceberg_datafusion::IcebergTableProvider;
use datafusion::prelude::*;
let ctx = SessionContext::new();
// Подключение к Iceberg-каталогу
let provider = IcebergTableProvider::try_new(
catalog,
"db.events"
).await?;
ctx.register_table("events", Arc::new(provider))?;
// Iceberg partition pruning работает автоматически
let df = ctx.sql("
SELECT event_type, COUNT(*)
FROM events
WHERE date = '2024-03-15'
GROUP BY event_type
").await?;Абстракция в экосистеме Arrow для доступа к объектным хранилищам: S3, GCS, Azure Blob, HDFS и локальная файловая система. DataFusion использует object_store для чтения и записи данных. Поддерживает multipart upload, conditional put, range reads для эффективного доступа к Parquet column chunks.
use object_store::aws::AmazonS3Builder;
use datafusion::prelude::*;
use url::Url;
let s3 = AmazonS3Builder::new()
.with_bucket_name("my-bucket")
.with_region("us-east-1")
.build()?;
let ctx = SessionContext::new();
ctx.register_object_store(
&Url::parse("s3://my-bucket")?,
Arc::new(s3),
);
// Чтение напрямую из S3
let df = ctx.read_parquet(
"s3://my-bucket/data/*.parquet",
ParquetReadOptions::default(),
).await?;Колоночный формат хранения данных, оптимизированный для аналитических запросов. Parquet хранит данные по столбцам с компрессией (Snappy, Zstd, LZ4), содержит встроенные статистики (min/max, null count) для каждого row group. DataFusion нативно поддерживает Parquet с predicate pushdown и projection pushdown.
use datafusion::prelude::*;
let ctx = SessionContext::new();
// Чтение Parquet с автоматическим pruning
ctx.register_parquet(
"logs",
"data/logs/*.parquet",
ParquetReadOptions::default()
.parquet_pruning(true) // row group pruning
.skip_metadata(false), // читать footer
).await?;
// Запись результата в Parquet
let df = ctx.sql("SELECT * FROM logs WHERE level = 'ERROR'").await?;
df.write_parquet(
"output/errors.parquet",
WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.build(),
None,
).await?;Паттерн изоляции нескольких клиентов (tenant-ов) в одном экземпляре DataFusion. Каждый tenant получает собственный SessionContext с отдельным каталогом, MemoryPool-лимитами и правами доступа. Изоляция предотвращает утечки данных между tenant-ами и гарантирует справедливое распределение ресурсов.
// Создание изолированного контекста для каждого tenant
fn create_tenant_context(
tenant_id: &str,
memory_limit: usize,
) -> SessionContext {
let pool = FairSpillPool::new(memory_limit);
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(pool))
.build().unwrap();
let config = SessionConfig::new()
.with_default_catalog_and_schema(
tenant_id, "public"
);
SessionContext::new_with_config_rt(config, runtime)
}
// Каждый tenant видит только свои таблицы
let ctx_a = create_tenant_context("tenant_a", GB_2);
let ctx_b = create_tenant_context("tenant_b", GB_4);