Learning Platform
Глоссарий Troubleshooting
Урок 13.03 · 14 мин
Продвинутый
StreamingTableContinuous QueriesIncremental ProcessingUnbounded SourcesWatermarkEvent ProcessingSendableRecordBatchStream

Streaming и инкрементальная обработка в DataFusion

DataFusion — это прежде всего batch query engine. Но реальные аналитические системы часто требуют обработки данных по мере их поступления: мониторинг метрик, обнаружение аномалий, real-time dashboards. DataFusion поддерживает streaming через StreamingTable — trait, который позволяет подключать unbounded источники данных к движку запросов.

NOTE

DataFusion — не полноценный stream processing engine (как Apache Flink или Kafka Streams). Он не поддерживает event time, exactly-once semantics и stateful windowing из коробки. Но StreamingTable позволяет строить continuous query паттерны поверх batch-движка — для многих use case этого достаточно.

Batch vs Streaming: модель выполнения

Чтобы понять возможности и ограничения, сравним модели:

Batch vs Streaming в DataFusion
Batch ModeДанные конечны — файлы на диске, таблицы с известным размером
Источник
Выполнение
Гарантии
Streaming ModeДанные бесконечны — новые записи поступают непрерывно
Источник
Выполнение
Гарантии

StreamingTable trait

StreamingTable — это TableProvider, который сообщает DataFusion, что источник бесконечен. Это влияет на планирование: оптимизатор не может использовать операции, требующие полного scan (сортировку всех данных, полный hash join).

Реализация StreamingTable

use datafusion::datasource::streaming::StreamingTable;
use datafusion::datasource::TableProvider;
use datafusion::physical_plan::streaming::PartitionStream;
use arrow::datatypes::SchemaRef;

/// Источник метрик — бесконечный поток RecordBatch
struct MetricsSource {
    schema: SchemaRef,
    receiver: tokio::sync::mpsc::Receiver<RecordBatch>,
}

impl PartitionStream for MetricsSource {
    fn schema(&self) -> &SchemaRef {
        &self.schema
    }

    fn execute(
        &self,
        _ctx: Arc<TaskContext>,
    ) -> SendableRecordBatchStream {
        // Конвертируем mpsc::Receiver в RecordBatchStream
        let schema = self.schema.clone();
        let stream = ReceiverStream::new(self.receiver)
            .map(Ok);
        Box::pin(RecordBatchStreamAdapter::new(schema, stream))
    }
}

// Регистрация unbounded источника
let streaming_table = StreamingTable::try_new(
    schema.clone(),
    vec![Arc::new(metrics_source) as Arc<dyn PartitionStream>],
)?;

ctx.register_table("live_metrics", Arc::new(streaming_table))?;
TIP

StreamingTable принимает вектор PartitionStream — каждый partition стримится независимо. Для Kafka-источника каждый partition может соответствовать одному Kafka partition, обеспечивая параллельное чтение.

Continuous query

После регистрации StreamingTable можно выполнять запросы, которые не завершаются — они продолжают обрабатывать данные по мере поступления:

// Continuous aggregation — не завершается, стримит результаты
let df = ctx.sql(
    "SELECT host, COUNT(*) as cnt, AVG(cpu_usage) as avg_cpu
     FROM live_metrics
     GROUP BY host"
).await?;

// execute_stream() возвращает SendableRecordBatchStream
let mut stream = df.execute_stream().await?;

// Каждый batch — промежуточный результат агрегации
while let Some(batch) = stream.next().await {
    let batch = batch?;
    // Отправляем в WebSocket, записываем в time-series DB,
    // проверяем алерты...
    process_incremental_result(&batch).await;
}
WARNING

Aggregation на unbounded источнике выдаёт промежуточные результаты — они пересчитываются с каждым новым batch. Это не windowed aggregation (как в Flink). Если вам нужны точные результаты за временное окно — реализуйте windowing логику в PartitionStream или используйте micro-batch подход.

Ограничения streaming-режима

Не все SQL-операторы работают с unbounded данными:

Совместимость операторов с unbounded источниками
РаботаютРаботают: streaming-операторы с O(1) или O(state) памятью
Операторы
Почему
ПроблемныПроблемны: buffering-операторы, которые ждут конца данных
Операторы
Почему

DataFusion обнаруживает конфликт на этапе планирования: если оптимизатор вставляет SortExec перед unbounded источником, физический план не будет сгенерирован.

Micro-batch паттерн

Когда нужна семантика batch-обработки (полные агрегации, сортировка), но данные поступают непрерывно — используйте micro-batch подход:

use tokio::time::{interval, Duration};

struct MicroBatchProcessor {
    ctx: SessionContext,
    batch_interval: Duration,
}

impl MicroBatchProcessor {
    async fn run(&self) {
        let mut tick = interval(self.batch_interval);

        loop {
            tick.tick().await;

            // Определяем временное окно
            let window_end = chrono::Utc::now();
            let window_start = window_end - self.batch_interval;

            // Читаем новые данные за окно (из Kafka, файлов, API)
            let new_data = self.fetch_window(window_start, window_end).await;

            if new_data.is_empty() {
                continue;
            }

            // Регистрируем как in-memory таблицу
            let mem_table = MemTable::try_new(
                new_data[0].schema(),
                vec![new_data],
            )?;
            self.ctx.register_table("window_data", Arc::new(mem_table))?;

            // Выполняем batch-запрос — все операторы работают
            let result = self.ctx.sql(
                "SELECT category, COUNT(*) as cnt,
                        AVG(value) as avg_val
                 FROM window_data
                 GROUP BY category
                 ORDER BY cnt DESC"
            ).await?.collect().await?;

            // Обрабатываем результат
            self.emit_results(&result).await;

            // Очищаем временную таблицу
            self.ctx.deregister_table("window_data")?;
        }
    }
}
TIP

Micro-batch — самый прагматичный подход для DataFusion. Вы получаете полную мощь SQL (JOIN, ORDER BY, оконные функции) за счёт задержки в один batch_interval. Для большинства аналитических задач задержка 5-30 секунд допустима.

Интеграция с Kafka

Пример PartitionStream, который читает из Kafka:

use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::Message;

struct KafkaPartitionStream {
    schema: SchemaRef,
    consumer: Arc<StreamConsumer>,
    topic: String,
    partition: i32,
}

impl PartitionStream for KafkaPartitionStream {
    fn schema(&self) -> &SchemaRef {
        &self.schema
    }

    fn execute(
        &self,
        _ctx: Arc<TaskContext>,
    ) -> SendableRecordBatchStream {
        let consumer = self.consumer.clone();
        let schema = self.schema.clone();

        let stream = async_stream::stream! {
            let mut buffer = Vec::new();
            let batch_size = 1024;

            loop {
                match consumer.recv().await {
                    Ok(msg) => {
                        if let Some(payload) = msg.payload() {
                            let row = parse_message(payload, &schema);
                            buffer.push(row);
                        }

                        if buffer.len() >= batch_size {
                            let batch = rows_to_record_batch(
                                &buffer, &schema
                            );
                            buffer.clear();
                            yield Ok(batch);
                        }
                    }
                    Err(e) => {
                        tracing::error!("Kafka recv error: {}", e);
                        // Продолжаем — transient ошибки нормальны
                    }
                }
            }
        };

        Box::pin(RecordBatchStreamAdapter::new(schema, stream))
    }
}

Watermark и event time

DataFusion не имеет встроенной поддержки watermark (в отличие от Flink). Но вы можете реализовать watermark-логику в PartitionStream:

struct WatermarkedStream {
    inner: SendableRecordBatchStream,
    watermark: Arc<AtomicI64>,
    event_time_column: String,
}

impl Stream for WatermarkedStream {
    type Item = Result<RecordBatch>;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        match self.inner.poll_next_unpin(cx) {
            Poll::Ready(Some(Ok(batch))) => {
                // Обновляем watermark по max event_time в batch
                let event_col = batch
                    .column_by_name(&self.event_time_column)
                    .expect("event_time column");
                let max_time = compute::max(
                    event_col.as_any()
                        .downcast_ref::<TimestampMillisecondArray>()
                        .unwrap()
                ).unwrap();
                self.watermark.store(max_time, Ordering::Release);
                Poll::Ready(Some(Ok(batch)))
            }
            other => other,
        }
    }
}
NOTE

Watermark в DataFusion — ответственность вашего кода, не движка. Это означает полный контроль, но и полную ответственность за корректность. Для сложных event-time semantics рассмотрите Flink или Arroyo, которые интегрируются с DataFusion внутри.

Инкрементальная обработка файлов

Помимо streaming sources, распространённый паттерн — инкрементальная обработка новых файлов:

use notify::{Watcher, RecursiveMode, Event, EventKind};

struct IncrementalFileProcessor {
    ctx: SessionContext,
    watch_dir: PathBuf,
    processed: HashSet<PathBuf>,
}

impl IncrementalFileProcessor {
    async fn run(&mut self) -> Result<()> {
        let (tx, mut rx) = tokio::sync::mpsc::channel(100);

        let mut watcher = notify::recommended_watcher(
            move |event: Result<Event, _>| {
                if let Ok(event) = event {
                    if matches!(event.kind, EventKind::Create(_)) {
                        for path in event.paths {
                            let _ = tx.blocking_send(path);
                        }
                    }
                }
            },
        )?;

        watcher.watch(&self.watch_dir, RecursiveMode::NonRecursive)?;

        while let Some(path) = rx.recv().await {
            if self.processed.contains(&path) {
                continue;
            }

            if path.extension().map_or(false, |e| e == "parquet") {
                tracing::info!(file = %path.display(), "Processing new file");

                self.ctx.register_parquet(
                    "new_data",
                    path.to_str().unwrap(),
                    ParquetReadOptions::default(),
                ).await?;

                let result = self.ctx.sql(
                    "INSERT INTO processed_data SELECT * FROM new_data"
                ).await?.collect().await?;

                self.ctx.deregister_table("new_data")?;
                self.processed.insert(path);
            }
        }

        Ok(())
    }
}

Выбор подхода

ПодходЗадержкаСложностьSQL-возможностиКогда использовать
StreamingTableМинимальная (ms)ВысокаяОграничены (нет sort, full join)Real-time мониторинг, алерты
Micro-batchСредняя (секунды)СредняяПолныеАналитика с допустимой задержкой
Incremental filesВысокая (минуты)НизкаяПолныеETL, data pipeline
Polling (periodic query)НастраиваемаяНизкаяПолныеDashboard refresh

Резюме

  • DataFusion — batch engine, но поддерживает streaming через StreamingTable trait
  • StreamingTable регистрирует unbounded источники — запрос не завершается, стримит результаты
  • Не все операторы работают с unbounded данными — Sort, full Hash JOIN, DISTINCT требуют конечного входа
  • Micro-batch — прагматичный компромисс: полный SQL с задержкой в один batch_interval
  • Watermark и event time — ответственность вашего кода, не движка
  • Для сложного stream processing (exactly-once, event time, stateful windows) — рассмотрите Flink или Arroyo

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. DataFusion — это прежде всего batch engine. Через какой trait он поддерживает streaming-источники?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4