TableProvider: собственный источник данных
Мы познакомились с TableProvider в модуле 02, где рассмотрели трёхуровневую систему каталогов и встроенные реализации (ListingTable, MemTable). Теперь реализуем TableProvider с нуля — от trait-методов до полноценного ExecutionPlan с поддержкой pushdown-оптимизаций.
От trait к работающему источнику
TableProvider связывает DataFusion с произвольным хранилищем данных. Ключевой метод — scan(), который возвращает ExecutionPlan:
Минимальная реализация
Начнём с простейшего случая — in-memory источник без pushdown:
use datafusion::datasource::TableProvider;
use datafusion::logical_expr::{Expr, TableType};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::arrow::datatypes::{Schema, SchemaRef, Field, DataType};
use datafusion::arrow::array::{Int64Array, StringArray, RecordBatch};
use datafusion::execution::context::SessionState;
use datafusion::common::Result;
use async_trait::async_trait;
use std::sync::Arc;
use std::any::Any;
struct InMemorySource {
schema: SchemaRef,
data: Vec<RecordBatch>,
}
impl InMemorySource {
fn new() -> Self {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
Field::new("score", DataType::Int64, true),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol", "Dave", "Eve"])),
Arc::new(Int64Array::from(vec![95, 87, 73, 91, 68])),
],
).unwrap();
Self { schema, data: vec![batch] }
}
}
#[async_trait]
impl TableProvider for InMemorySource {
fn as_any(&self) -> &dyn Any { self }
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
_state: &dyn datafusion::catalog::Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let batches = vec![self.data.clone()]; // Vec<Vec<RecordBatch>> — по одному Vec на партицию
Ok(Arc::new(MemoryExec::try_new(
&batches,
self.schema.clone(),
projection.cloned(),
)?))
}
}
// Регистрация и использование
async fn demo() -> Result<()> {
let ctx = datafusion::prelude::SessionContext::new();
ctx.register_table("my_source", Arc::new(InMemorySource::new()))?;
let df = ctx.sql("SELECT name, score FROM my_source WHERE score > 80").await?;
df.show().await?;
Ok(())
}
MemoryExec — встроенный ExecutionPlan, который возвращает заранее вычисленные RecordBatch. Для простых случаев это всё, что нужно. Для настоящих источников (API, БД, файлы) потребуется собственный ExecutionPlan.
Собственный ExecutionPlan
Когда данные не помещаются в память или должны загружаться лениво, реализуйте свой ExecutionPlan:
use datafusion::physical_plan::{
ExecutionPlan, PlanProperties, Partitioning,
DisplayAs, DisplayFormatType, ExecutionMode,
};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::execution::TaskContext;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::common::Result;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use std::sync::Arc;
use std::any::Any;
use futures::stream;
#[derive(Debug)]
struct HttpSourceExec {
url: String,
schema: SchemaRef,
projection: Option<Vec<usize>>,
properties: PlanProperties,
}
impl HttpSourceExec {
fn new(url: String, schema: SchemaRef, projection: Option<Vec<usize>>) -> Self {
let projected_schema = match &projection {
Some(p) => Arc::new(schema.project(p).unwrap()),
None => schema.clone(),
};
let properties = PlanProperties::new(
EquivalenceProperties::new(projected_schema.clone()),
Partitioning::UnknownPartitioning(1), // 1 партиция
ExecutionMode::Bounded,
);
Self { url, schema, projection, properties }
}
}
impl DisplayAs for HttpSourceExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "HttpSourceExec: url={}", self.url)
}
}
impl ExecutionPlan for HttpSourceExec {
fn name(&self) -> &str { "HttpSourceExec" }
fn as_any(&self) -> &dyn Any { self }
fn schema(&self) -> SchemaRef {
match &self.projection {
Some(p) => Arc::new(self.schema.project(p).unwrap()),
None => self.schema.clone(),
}
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![] // Leaf node — нет дочерних планов
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self) // Leaf node — нечего заменять
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<datafusion::physical_plan::SendableRecordBatchStream> {
let schema = self.schema();
let url = self.url.clone();
// Создаём асинхронный поток RecordBatch
let stream = stream::once(async move {
// В реальной реализации: HTTP-запрос к url, парсинг ответа
let batch = fetch_data_from_url(&url).await?;
Ok(batch)
});
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
}
PlanProperties — обязательная часть ExecutionPlan в DataFusion 53.x (релиз 53.0.0 от 2026-04-02). Он описывает ordering (сортировку), partitioning (параллелизм) и execution mode (bounded/unbounded). Оптимизатор использует эти свойства для placement операторов Sort и Repartition.
Filter Pushdown
Filter pushdown позволяет источнику отфильтровать данные самостоятельно, до того как DataFusion применит оператор Filter. Это критично для удалённых источников — вместо загрузки всех данных и фильтрации в памяти, источник выполняет фильтрацию на стороне хранилища.
use datafusion::datasource::TableProviderFilterPushDown;
#[async_trait]
impl TableProvider for HttpSource {
// ... schema, table_type ...
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
filters.iter().map(|f| {
match f {
// Простые сравнения с колонками — можем pushdown
Expr::BinaryExpr(binary) if is_simple_comparison(binary) => {
Ok(TableProviderFilterPushDown::Exact)
}
// Сложные выражения — не можем
_ => Ok(TableProviderFilterPushDown::Unsupported),
}
}).collect()
}
async fn scan(
&self,
_state: &dyn datafusion::catalog::Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Передаём фильтры в ExecutionPlan для выполнения на стороне источника
Ok(Arc::new(HttpSourceExec::new_with_filters(
self.url.clone(),
self.schema(),
projection.cloned(),
filters.to_vec(),
limit,
)))
}
}
Уровни pushdown:
| Уровень | Значение |
|---|---|
Exact | Источник гарантирует корректную фильтрацию. DataFusion не добавит повторный Filter |
Inexact | Источник отфильтрует часть строк. DataFusion добавит Filter для проверки |
Unsupported | Источник не может обработать фильтр. DataFusion применит Filter полностью |
Если сомневаетесь в корректности pushdown — используйте Inexact. DataFusion добавит проверочный Filter, и корректность гарантирована. Exact используйте только когда вы уверены, что источник фильтрует без ошибок.
Projection Pushdown
Projection pushdown передаёт список нужных колонок в scan. Источник может читать только их:
async fn scan(
&self,
_state: &dyn datafusion::catalog::Session,
projection: Option<&Vec<usize>>, // Индексы нужных колонок
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let columns: Vec<String> = match projection {
Some(indices) => indices.iter()
.map(|i| self.schema.field(*i).name().clone())
.collect(),
None => self.schema.fields().iter()
.map(|f| f.name().clone())
.collect(),
};
// Передаём только нужные колонки в запрос к источнику
// SELECT col_a, col_b вместо SELECT *
Ok(Arc::new(HttpSourceExec::new_with_columns(
self.url.clone(),
self.schema(),
columns,
projection.cloned(),
)))
}
insert_into и DataSink
Для записи данных обратно в источник реализуйте insert_into:
use datafusion::datasource::TableProvider;
use datafusion::physical_plan::insert::DataSink;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::execution::TaskContext;
#[async_trait]
impl TableProvider for WritableSource {
// ... scan, schema ...
async fn insert_into(
&self,
_state: &dyn datafusion::catalog::Session,
input: Arc<dyn ExecutionPlan>,
_overwrite: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(
datafusion::physical_plan::insert::DataSinkExec::new(
input,
Arc::new(MyDataSink::new(self.connection.clone())),
self.schema(),
None,
)
))
}
}
#[derive(Debug)]
struct MyDataSink {
connection: String,
}
#[async_trait]
impl DataSink for MyDataSink {
fn as_any(&self) -> &dyn Any { self }
fn name(&self) -> &str { "MyDataSink" }
async fn write_all(
&self,
data: SendableRecordBatchStream,
_context: &Arc<TaskContext>,
) -> Result<u64> {
use futures::StreamExt;
let mut rows_written = 0u64;
let mut stream = data;
while let Some(batch) = stream.next().await {
let batch = batch?;
// Записываем batch в целевое хранилище
write_batch_to_storage(&self.connection, &batch).await?;
rows_written += batch.num_rows() as u64;
}
Ok(rows_written)
}
}
После реализации insert_into становится доступен SQL-синтаксис:
INSERT INTO my_writable_source SELECT * FROM other_table WHERE condition;
Полный пример: CSV API-источник
use datafusion::datasource::TableProvider;
use datafusion::logical_expr::{Expr, TableType};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::arrow::datatypes::{Schema, SchemaRef, Field, DataType};
use datafusion::arrow::array::{StringArray, Float64Array, RecordBatch};
use datafusion::common::Result;
use datafusion::prelude::SessionContext;
use async_trait::async_trait;
use std::sync::Arc;
use std::any::Any;
/// TableProvider, загружающий CSV-данные из URL
struct CsvApiSource {
schema: SchemaRef,
api_url: String,
}
impl CsvApiSource {
fn new(api_url: &str) -> Self {
let schema = Arc::new(Schema::new(vec![
Field::new("city", DataType::Utf8, false),
Field::new("temperature", DataType::Float64, true),
Field::new("humidity", DataType::Float64, true),
]));
Self { schema, api_url: api_url.to_string() }
}
}
#[async_trait]
impl TableProvider for CsvApiSource {
fn as_any(&self) -> &dyn Any { self }
fn schema(&self) -> SchemaRef { self.schema.clone() }
fn table_type(&self) -> TableType { TableType::Base }
async fn scan(
&self,
_state: &dyn datafusion::catalog::Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// В production: HTTP GET → parse CSV → RecordBatch
// Здесь: демонстрационные данные
let batch = RecordBatch::try_new(
self.schema.clone(),
vec![
Arc::new(StringArray::from(vec!["Moscow", "Berlin", "Tokyo"])),
Arc::new(Float64Array::from(vec![5.2, 12.8, 18.5])),
Arc::new(Float64Array::from(vec![78.0, 65.0, 72.0])),
],
)?;
Ok(Arc::new(MemoryExec::try_new(
&[vec![batch]],
self.schema.clone(),
projection.cloned(),
)?))
}
}
async fn demo_csv_api() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_table(
"weather",
Arc::new(CsvApiSource::new("https://api.example.com/weather")),
)?;
// Стандартный SQL — DataFusion не знает, что за источником стоит HTTP API
let df = ctx.sql("SELECT city, temperature FROM weather WHERE humidity > 70").await?;
df.show().await
}
datafusion-table-providers — готовый набор провайдеров
Полная реализация TableProvider для production-источника (Postgres / MySQL / ClickHouse) требует значительной работы: connection pooling, type mapping, SQL generation для filter pushdown, async I/O. Apache DataFusion community поддерживает contrib repository — datafusion-contrib/datafusion-table-providers — готовые TableProvider реализации для популярных систем.
Что в репозитории
| Провайдер | Тип источника | Crate |
|---|---|---|
| Postgres | PostgreSQL via tokio-postgres | datafusion-table-providers (feature postgres) |
| MySQL | MySQL/MariaDB via mysql_async | feature mysql |
| SQLite | SQLite via rusqlite | feature sqlite |
| DuckDB | DuckDB embedded | feature duckdb |
| ClickHouse | ClickHouse via HTTP/native | feature clickhouse |
| ODBC | Generic ODBC (SQL Server, Oracle, …) | feature odbc |
| FlightSQL | Apache Arrow FlightSQL endpoint | feature flight-sql |
Базовый use case — Postgres provider
use datafusion::prelude::SessionContext;
use datafusion_table_providers::postgres::PostgresTableFactory;
use datafusion_table_providers::common::DatabaseConnectionPool;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connection pool для Postgres
let pool = Arc::new(
PostgresConnectionPool::new("postgresql://user:pass@localhost/db").await?
);
// Factory создаёт TableProvider по имени таблицы
let factory = PostgresTableFactory::new(pool);
let table = factory.table_provider(TableReference::bare("orders")).await?;
// Регистрируем в DataFusion как обычную таблицу
let ctx = SessionContext::new();
ctx.register_table("orders", table)?;
// SQL-запросы прозрачно идут в Postgres с filter/projection pushdown
let df = ctx
.sql("SELECT customer_id, SUM(amount) FROM orders WHERE status = 'completed' GROUP BY 1")
.await?;
df.show().await?;
Ok(())
}
Под капотом factory:
- Получает schema из Postgres
information_schema.columnsдля запрошенной таблицы. - Создаёт
TableProvider, реализующийscan()через SQL-запрос к Postgres. - Filter pushdown — переводит DataFusion
Exprв SQLWHEREclause. - Projection pushdown — выбирает только нужные колонки в
SELECT.
Federated queries — несколько источников в одной сессии
let ctx = SessionContext::new();
// Postgres для orders
let pg_factory = PostgresTableFactory::new(pg_pool);
ctx.register_table("orders", pg_factory.table_provider(TableReference::bare("orders")).await?)?;
// ClickHouse для events (analytical large fact table)
let ch_factory = ClickHouseTableFactory::new(ch_client);
ctx.register_table("events", ch_factory.table_provider(TableReference::bare("events")).await?)?;
// SQLite для local lookup table
let sqlite_factory = SqliteTableFactory::new(sqlite_pool);
ctx.register_table("regions", sqlite_factory.table_provider(TableReference::bare("regions")).await?)?;
// JOIN across heterogeneous sources — DataFusion координирует
let df = ctx.sql("
SELECT e.event_type, r.region_name, COUNT(*)
FROM events e
JOIN orders o ON e.order_id = o.id
JOIN regions r ON o.region_id = r.id
GROUP BY 1, 2
").await?;
DataFusion выполняет federation: каждый TableProvider получает свою часть запроса (с pushdown), результаты объединяются в-памяти через Arrow. Это паттерн “DataFusion as federated query engine” — аналог Trino / Presto на Rust.
FlightSQL provider — Arrow-native protocol
use datafusion_table_providers::flight::sql::FlightSqlDriver;
// Подключение к любому FlightSQL-совместимому endpoint
// (DataFusion FlightSQL server, Dremio, InfluxDB IOx, Cloudera Impala 4.4+)
let driver = FlightSqlDriver::new("grpc://flight-server:50051").await?;
let table = driver.table_provider("my_table").await?;
ctx.register_table("my_table", table)?;
FlightSQL — Arrow-native wire protocol — zero-copy чтение RecordBatch без сериализации в JSON/CSV. Производительность критично важна для analytical workloads.
Перед написанием собственного TableProvider проверьте datafusion-table-providers — есть высокая вероятность, что готовая реализация существует. Это экономит сотни строк кода: connection pooling, error mapping, type coercion, async I/O. Для production используйте либо contrib provider напрямую, либо его как референс при создании специализированной версии.
Cite datafusion-contrib/datafusion-table-providers + crates.io/crates/datafusion-table-providers.
Итоги
-
TableProvider::scan()— ключевой метод: принимает projection, filters, limit и возвращаетExecutionPlan -
MemoryExec— встроенный план для данных в памяти; для ленивой загрузки реализуйте свойExecutionPlan -
PlanPropertiesописывает ordering, partitioning и execution mode для оптимизатора - Filter pushdown:
Exact(источник фильтрует),Inexact(с проверкой),Unsupported(DataFusion фильтрует) - Projection pushdown через параметр
projectionвscan()— читайте только нужные колонки -
insert_into+DataSinkпревращает источник в writable target для INSERT INTO