Learning Platform
Глоссарий Troubleshooting
Урок 06.03 · 22 мин
Продвинутый
TableProviderExecutionPlanMemoryStreamfilter pushdownprojection pushdownDataSinkSendableRecordBatchStreamdatafusion-table-providersPostgresMySQLSQLiteDuckDBClickHouseFlightSQLODBCfederated queries

TableProvider: собственный источник данных

Мы познакомились с TableProvider в модуле 02, где рассмотрели трёхуровневую систему каталогов и встроенные реализации (ListingTable, MemTable). Теперь реализуем TableProvider с нуля — от trait-методов до полноценного ExecutionPlan с поддержкой pushdown-оптимизаций.

От trait к работающему источнику

TableProvider связывает DataFusion с произвольным хранилищем данных. Ключевой метод — scan(), который возвращает ExecutionPlan:

TableProvider → ExecutionPlan Pipeline
SQL: SELECT a, b FROM my_source WHERE a > 10SQL-запрос с проекцией и фильтром запускает pipeline через TableProvider
Планировщик
TableProvider::scan(projection=[0,1], filters=[a > 10], limit=None)Планировщик вызывает scan() с проекцией колонок и фильтрами — TableProvider решает, какие применить
Физический план
ExecutionPlan::execute(partition) → RecordBatchStreamExecutionPlan исполняет физический оператор, возвращая асинхронный поток RecordBatch
Потоковое чтение
RecordBatch → RecordBatch → … → конец потокаРезультат — последовательность RecordBatch, потребляемых следующим оператором в конвейере

Минимальная реализация

Начнём с простейшего случая — in-memory источник без pushdown:

use datafusion::datasource::TableProvider;
use datafusion::logical_expr::{Expr, TableType};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::arrow::datatypes::{Schema, SchemaRef, Field, DataType};
use datafusion::arrow::array::{Int64Array, StringArray, RecordBatch};
use datafusion::execution::context::SessionState;
use datafusion::common::Result;
use async_trait::async_trait;
use std::sync::Arc;
use std::any::Any;

struct InMemorySource {
    schema: SchemaRef,
    data: Vec<RecordBatch>,
}

impl InMemorySource {
    fn new() -> Self {
        let schema = Arc::new(Schema::new(vec![
            Field::new("id", DataType::Int64, false),
            Field::new("name", DataType::Utf8, true),
            Field::new("score", DataType::Int64, true),
        ]));

        let batch = RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
                Arc::new(StringArray::from(vec!["Alice", "Bob", "Carol", "Dave", "Eve"])),
                Arc::new(Int64Array::from(vec![95, 87, 73, 91, 68])),
            ],
        ).unwrap();

        Self { schema, data: vec![batch] }
    }
}

#[async_trait]
impl TableProvider for InMemorySource {
    fn as_any(&self) -> &dyn Any { self }

    fn schema(&self) -> SchemaRef {
        self.schema.clone()
    }

    fn table_type(&self) -> TableType {
        TableType::Base
    }

    async fn scan(
        &self,
        _state: &dyn datafusion::catalog::Session,
        projection: Option<&Vec<usize>>,
        _filters: &[Expr],
        _limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        let batches = vec![self.data.clone()]; // Vec<Vec<RecordBatch>> — по одному Vec на партицию
        Ok(Arc::new(MemoryExec::try_new(
            &batches,
            self.schema.clone(),
            projection.cloned(),
        )?))
    }
}

// Регистрация и использование
async fn demo() -> Result<()> {
    let ctx = datafusion::prelude::SessionContext::new();
    ctx.register_table("my_source", Arc::new(InMemorySource::new()))?;

    let df = ctx.sql("SELECT name, score FROM my_source WHERE score > 80").await?;
    df.show().await?;
    Ok(())
}
NOTE

MemoryExec — встроенный ExecutionPlan, который возвращает заранее вычисленные RecordBatch. Для простых случаев это всё, что нужно. Для настоящих источников (API, БД, файлы) потребуется собственный ExecutionPlan.

Собственный ExecutionPlan

Когда данные не помещаются в память или должны загружаться лениво, реализуйте свой ExecutionPlan:

use datafusion::physical_plan::{
    ExecutionPlan, PlanProperties, Partitioning,
    DisplayAs, DisplayFormatType, ExecutionMode,
};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::execution::TaskContext;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::common::Result;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use std::sync::Arc;
use std::any::Any;
use futures::stream;

#[derive(Debug)]
struct HttpSourceExec {
    url: String,
    schema: SchemaRef,
    projection: Option<Vec<usize>>,
    properties: PlanProperties,
}

impl HttpSourceExec {
    fn new(url: String, schema: SchemaRef, projection: Option<Vec<usize>>) -> Self {
        let projected_schema = match &projection {
            Some(p) => Arc::new(schema.project(p).unwrap()),
            None => schema.clone(),
        };

        let properties = PlanProperties::new(
            EquivalenceProperties::new(projected_schema.clone()),
            Partitioning::UnknownPartitioning(1),  // 1 партиция
            ExecutionMode::Bounded,
        );

        Self { url, schema, projection, properties }
    }
}

impl DisplayAs for HttpSourceExec {
    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(f, "HttpSourceExec: url={}", self.url)
    }
}

impl ExecutionPlan for HttpSourceExec {
    fn name(&self) -> &str { "HttpSourceExec" }
    fn as_any(&self) -> &dyn Any { self }

    fn schema(&self) -> SchemaRef {
        match &self.projection {
            Some(p) => Arc::new(self.schema.project(p).unwrap()),
            None => self.schema.clone(),
        }
    }

    fn properties(&self) -> &PlanProperties {
        &self.properties
    }

    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
        vec![]  // Leaf node — нет дочерних планов
    }

    fn with_new_children(
        self: Arc<Self>,
        _children: Vec<Arc<dyn ExecutionPlan>>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        Ok(self)  // Leaf node — нечего заменять
    }

    fn execute(
        &self,
        _partition: usize,
        _context: Arc<TaskContext>,
    ) -> Result<datafusion::physical_plan::SendableRecordBatchStream> {
        let schema = self.schema();
        let url = self.url.clone();

        // Создаём асинхронный поток RecordBatch
        let stream = stream::once(async move {
            // В реальной реализации: HTTP-запрос к url, парсинг ответа
            let batch = fetch_data_from_url(&url).await?;
            Ok(batch)
        });

        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
    }
}
WARNING

PlanProperties — обязательная часть ExecutionPlan в DataFusion 53.x (релиз 53.0.0 от 2026-04-02). Он описывает ordering (сортировку), partitioning (параллелизм) и execution mode (bounded/unbounded). Оптимизатор использует эти свойства для placement операторов Sort и Repartition.

Filter Pushdown

Filter pushdown позволяет источнику отфильтровать данные самостоятельно, до того как DataFusion применит оператор Filter. Это критично для удалённых источников — вместо загрузки всех данных и фильтрации в памяти, источник выполняет фильтрацию на стороне хранилища.

use datafusion::datasource::TableProviderFilterPushDown;

#[async_trait]
impl TableProvider for HttpSource {
    // ... schema, table_type ...

    fn supports_filters_pushdown(
        &self,
        filters: &[&Expr],
    ) -> Result<Vec<TableProviderFilterPushDown>> {
        filters.iter().map(|f| {
            match f {
                // Простые сравнения с колонками — можем pushdown
                Expr::BinaryExpr(binary) if is_simple_comparison(binary) => {
                    Ok(TableProviderFilterPushDown::Exact)
                }
                // Сложные выражения — не можем
                _ => Ok(TableProviderFilterPushDown::Unsupported),
            }
        }).collect()
    }

    async fn scan(
        &self,
        _state: &dyn datafusion::catalog::Session,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // Передаём фильтры в ExecutionPlan для выполнения на стороне источника
        Ok(Arc::new(HttpSourceExec::new_with_filters(
            self.url.clone(),
            self.schema(),
            projection.cloned(),
            filters.to_vec(),
            limit,
        )))
    }
}

Уровни pushdown:

УровеньЗначение
ExactИсточник гарантирует корректную фильтрацию. DataFusion не добавит повторный Filter
InexactИсточник отфильтрует часть строк. DataFusion добавит Filter для проверки
UnsupportedИсточник не может обработать фильтр. DataFusion применит Filter полностью
TIP

Если сомневаетесь в корректности pushdown — используйте Inexact. DataFusion добавит проверочный Filter, и корректность гарантирована. Exact используйте только когда вы уверены, что источник фильтрует без ошибок.

Projection Pushdown

Projection pushdown передаёт список нужных колонок в scan. Источник может читать только их:

async fn scan(
    &self,
    _state: &dyn datafusion::catalog::Session,
    projection: Option<&Vec<usize>>,  // Индексы нужных колонок
    _filters: &[Expr],
    _limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
    let columns: Vec<String> = match projection {
        Some(indices) => indices.iter()
            .map(|i| self.schema.field(*i).name().clone())
            .collect(),
        None => self.schema.fields().iter()
            .map(|f| f.name().clone())
            .collect(),
    };

    // Передаём только нужные колонки в запрос к источнику
    // SELECT col_a, col_b вместо SELECT *
    Ok(Arc::new(HttpSourceExec::new_with_columns(
        self.url.clone(),
        self.schema(),
        columns,
        projection.cloned(),
    )))
}

insert_into и DataSink

Для записи данных обратно в источник реализуйте insert_into:

use datafusion::datasource::TableProvider;
use datafusion::physical_plan::insert::DataSink;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::execution::TaskContext;

#[async_trait]
impl TableProvider for WritableSource {
    // ... scan, schema ...

    async fn insert_into(
        &self,
        _state: &dyn datafusion::catalog::Session,
        input: Arc<dyn ExecutionPlan>,
        _overwrite: bool,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        Ok(Arc::new(
            datafusion::physical_plan::insert::DataSinkExec::new(
                input,
                Arc::new(MyDataSink::new(self.connection.clone())),
                self.schema(),
                None,
            )
        ))
    }
}

#[derive(Debug)]
struct MyDataSink {
    connection: String,
}

#[async_trait]
impl DataSink for MyDataSink {
    fn as_any(&self) -> &dyn Any { self }
    fn name(&self) -> &str { "MyDataSink" }

    async fn write_all(
        &self,
        data: SendableRecordBatchStream,
        _context: &Arc<TaskContext>,
    ) -> Result<u64> {
        use futures::StreamExt;

        let mut rows_written = 0u64;
        let mut stream = data;

        while let Some(batch) = stream.next().await {
            let batch = batch?;
            // Записываем batch в целевое хранилище
            write_batch_to_storage(&self.connection, &batch).await?;
            rows_written += batch.num_rows() as u64;
        }

        Ok(rows_written)
    }
}

После реализации insert_into становится доступен SQL-синтаксис:

INSERT INTO my_writable_source SELECT * FROM other_table WHERE condition;

Полный пример: CSV API-источник

use datafusion::datasource::TableProvider;
use datafusion::logical_expr::{Expr, TableType};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::arrow::datatypes::{Schema, SchemaRef, Field, DataType};
use datafusion::arrow::array::{StringArray, Float64Array, RecordBatch};
use datafusion::common::Result;
use datafusion::prelude::SessionContext;
use async_trait::async_trait;
use std::sync::Arc;
use std::any::Any;

/// TableProvider, загружающий CSV-данные из URL
struct CsvApiSource {
    schema: SchemaRef,
    api_url: String,
}

impl CsvApiSource {
    fn new(api_url: &str) -> Self {
        let schema = Arc::new(Schema::new(vec![
            Field::new("city", DataType::Utf8, false),
            Field::new("temperature", DataType::Float64, true),
            Field::new("humidity", DataType::Float64, true),
        ]));
        Self { schema, api_url: api_url.to_string() }
    }
}

#[async_trait]
impl TableProvider for CsvApiSource {
    fn as_any(&self) -> &dyn Any { self }
    fn schema(&self) -> SchemaRef { self.schema.clone() }
    fn table_type(&self) -> TableType { TableType::Base }

    async fn scan(
        &self,
        _state: &dyn datafusion::catalog::Session,
        projection: Option<&Vec<usize>>,
        _filters: &[Expr],
        _limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // В production: HTTP GET → parse CSV → RecordBatch
        // Здесь: демонстрационные данные
        let batch = RecordBatch::try_new(
            self.schema.clone(),
            vec![
                Arc::new(StringArray::from(vec!["Moscow", "Berlin", "Tokyo"])),
                Arc::new(Float64Array::from(vec![5.2, 12.8, 18.5])),
                Arc::new(Float64Array::from(vec![78.0, 65.0, 72.0])),
            ],
        )?;

        Ok(Arc::new(MemoryExec::try_new(
            &[vec![batch]],
            self.schema.clone(),
            projection.cloned(),
        )?))
    }
}

async fn demo_csv_api() -> Result<()> {
    let ctx = SessionContext::new();
    ctx.register_table(
        "weather",
        Arc::new(CsvApiSource::new("https://api.example.com/weather")),
    )?;

    // Стандартный SQL — DataFusion не знает, что за источником стоит HTTP API
    let df = ctx.sql("SELECT city, temperature FROM weather WHERE humidity > 70").await?;
    df.show().await
}

datafusion-table-providers — готовый набор провайдеров

Полная реализация TableProvider для production-источника (Postgres / MySQL / ClickHouse) требует значительной работы: connection pooling, type mapping, SQL generation для filter pushdown, async I/O. Apache DataFusion community поддерживает contrib repositorydatafusion-contrib/datafusion-table-providers — готовые TableProvider реализации для популярных систем.

Что в репозитории

ПровайдерТип источникаCrate
PostgresPostgreSQL via tokio-postgresdatafusion-table-providers (feature postgres)
MySQLMySQL/MariaDB via mysql_asyncfeature mysql
SQLiteSQLite via rusqlitefeature sqlite
DuckDBDuckDB embeddedfeature duckdb
ClickHouseClickHouse via HTTP/nativefeature clickhouse
ODBCGeneric ODBC (SQL Server, Oracle, …)feature odbc
FlightSQLApache Arrow FlightSQL endpointfeature flight-sql

Базовый use case — Postgres provider

use datafusion::prelude::SessionContext;
use datafusion_table_providers::postgres::PostgresTableFactory;
use datafusion_table_providers::common::DatabaseConnectionPool;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connection pool для Postgres
    let pool = Arc::new(
        PostgresConnectionPool::new("postgresql://user:pass@localhost/db").await?
    );

    // Factory создаёт TableProvider по имени таблицы
    let factory = PostgresTableFactory::new(pool);
    let table = factory.table_provider(TableReference::bare("orders")).await?;

    // Регистрируем в DataFusion как обычную таблицу
    let ctx = SessionContext::new();
    ctx.register_table("orders", table)?;

    // SQL-запросы прозрачно идут в Postgres с filter/projection pushdown
    let df = ctx
        .sql("SELECT customer_id, SUM(amount) FROM orders WHERE status = 'completed' GROUP BY 1")
        .await?;
    df.show().await?;
    Ok(())
}

Под капотом factory:

  1. Получает schema из Postgres information_schema.columns для запрошенной таблицы.
  2. Создаёт TableProvider, реализующий scan() через SQL-запрос к Postgres.
  3. Filter pushdown — переводит DataFusion Expr в SQL WHERE clause.
  4. Projection pushdown — выбирает только нужные колонки в SELECT.

Federated queries — несколько источников в одной сессии

let ctx = SessionContext::new();

// Postgres для orders
let pg_factory = PostgresTableFactory::new(pg_pool);
ctx.register_table("orders", pg_factory.table_provider(TableReference::bare("orders")).await?)?;

// ClickHouse для events (analytical large fact table)
let ch_factory = ClickHouseTableFactory::new(ch_client);
ctx.register_table("events", ch_factory.table_provider(TableReference::bare("events")).await?)?;

// SQLite для local lookup table
let sqlite_factory = SqliteTableFactory::new(sqlite_pool);
ctx.register_table("regions", sqlite_factory.table_provider(TableReference::bare("regions")).await?)?;

// JOIN across heterogeneous sources — DataFusion координирует
let df = ctx.sql("
    SELECT e.event_type, r.region_name, COUNT(*)
    FROM events e
    JOIN orders o ON e.order_id = o.id
    JOIN regions r ON o.region_id = r.id
    GROUP BY 1, 2
").await?;

DataFusion выполняет federation: каждый TableProvider получает свою часть запроса (с pushdown), результаты объединяются в-памяти через Arrow. Это паттерн “DataFusion as federated query engine” — аналог Trino / Presto на Rust.

FlightSQL provider — Arrow-native protocol

use datafusion_table_providers::flight::sql::FlightSqlDriver;

// Подключение к любому FlightSQL-совместимому endpoint
// (DataFusion FlightSQL server, Dremio, InfluxDB IOx, Cloudera Impala 4.4+)
let driver = FlightSqlDriver::new("grpc://flight-server:50051").await?;
let table = driver.table_provider("my_table").await?;
ctx.register_table("my_table", table)?;

FlightSQL — Arrow-native wire protocol — zero-copy чтение RecordBatch без сериализации в JSON/CSV. Производительность критично важна для analytical workloads.

TIP

Перед написанием собственного TableProvider проверьте datafusion-table-providers — есть высокая вероятность, что готовая реализация существует. Это экономит сотни строк кода: connection pooling, error mapping, type coercion, async I/O. Для production используйте либо contrib provider напрямую, либо его как референс при создании специализированной версии.

Cite datafusion-contrib/datafusion-table-providers + crates.io/crates/datafusion-table-providers.


Итоги

  • TableProvider::scan() — ключевой метод: принимает projection, filters, limit и возвращает ExecutionPlan
  • MemoryExec — встроенный план для данных в памяти; для ленивой загрузки реализуйте свой ExecutionPlan
  • PlanProperties описывает ordering, partitioning и execution mode для оптимизатора
  • Filter pushdown: Exact (источник фильтрует), Inexact (с проверкой), Unsupported (DataFusion фильтрует)
  • Projection pushdown через параметр projection в scan() — читайте только нужные колонки
  • insert_into + DataSink превращает источник в writable target для INSERT INTO

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какой метод TableProvider является ключевым для выполнения запросов и что он возвращает?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8