Streaming и инкрементальная обработка в DataFusion
DataFusion — это прежде всего batch query engine. Но реальные аналитические системы часто требуют обработки данных по мере их поступления: мониторинг метрик, обнаружение аномалий, real-time dashboards. DataFusion поддерживает streaming через StreamingTable — trait, который позволяет подключать unbounded источники данных к движку запросов.
DataFusion — не полноценный stream processing engine (как Apache Flink или Kafka Streams). Он не поддерживает event time, exactly-once semantics и stateful windowing из коробки. Но StreamingTable позволяет строить continuous query паттерны поверх batch-движка — для многих use case этого достаточно.
Batch vs Streaming: модель выполнения
Чтобы понять возможности и ограничения, сравним модели:
StreamingTable trait
StreamingTable — это TableProvider, который сообщает DataFusion, что источник бесконечен. Это влияет на планирование: оптимизатор не может использовать операции, требующие полного scan (сортировку всех данных, полный hash join).
Реализация StreamingTable
use datafusion::datasource::streaming::StreamingTable;
use datafusion::datasource::TableProvider;
use datafusion::physical_plan::streaming::PartitionStream;
use arrow::datatypes::SchemaRef;
/// Источник метрик — бесконечный поток RecordBatch
struct MetricsSource {
schema: SchemaRef,
receiver: tokio::sync::mpsc::Receiver<RecordBatch>,
}
impl PartitionStream for MetricsSource {
fn schema(&self) -> &SchemaRef {
&self.schema
}
fn execute(
&self,
_ctx: Arc<TaskContext>,
) -> SendableRecordBatchStream {
// Конвертируем mpsc::Receiver в RecordBatchStream
let schema = self.schema.clone();
let stream = ReceiverStream::new(self.receiver)
.map(Ok);
Box::pin(RecordBatchStreamAdapter::new(schema, stream))
}
}
// Регистрация unbounded источника
let streaming_table = StreamingTable::try_new(
schema.clone(),
vec![Arc::new(metrics_source) as Arc<dyn PartitionStream>],
)?;
ctx.register_table("live_metrics", Arc::new(streaming_table))?;
StreamingTable принимает вектор PartitionStream — каждый partition стримится независимо. Для Kafka-источника каждый partition может соответствовать одному Kafka partition, обеспечивая параллельное чтение.
Continuous query
После регистрации StreamingTable можно выполнять запросы, которые не завершаются — они продолжают обрабатывать данные по мере поступления:
// Continuous aggregation — не завершается, стримит результаты
let df = ctx.sql(
"SELECT host, COUNT(*) as cnt, AVG(cpu_usage) as avg_cpu
FROM live_metrics
GROUP BY host"
).await?;
// execute_stream() возвращает SendableRecordBatchStream
let mut stream = df.execute_stream().await?;
// Каждый batch — промежуточный результат агрегации
while let Some(batch) = stream.next().await {
let batch = batch?;
// Отправляем в WebSocket, записываем в time-series DB,
// проверяем алерты...
process_incremental_result(&batch).await;
}
Aggregation на unbounded источнике выдаёт промежуточные результаты — они пересчитываются с каждым новым batch. Это не windowed aggregation (как в Flink). Если вам нужны точные результаты за временное окно — реализуйте windowing логику в PartitionStream или используйте micro-batch подход.
Ограничения streaming-режима
Не все SQL-операторы работают с unbounded данными:
DataFusion обнаруживает конфликт на этапе планирования: если оптимизатор вставляет SortExec перед unbounded источником, физический план не будет сгенерирован.
Micro-batch паттерн
Когда нужна семантика batch-обработки (полные агрегации, сортировка), но данные поступают непрерывно — используйте micro-batch подход:
use tokio::time::{interval, Duration};
struct MicroBatchProcessor {
ctx: SessionContext,
batch_interval: Duration,
}
impl MicroBatchProcessor {
async fn run(&self) {
let mut tick = interval(self.batch_interval);
loop {
tick.tick().await;
// Определяем временное окно
let window_end = chrono::Utc::now();
let window_start = window_end - self.batch_interval;
// Читаем новые данные за окно (из Kafka, файлов, API)
let new_data = self.fetch_window(window_start, window_end).await;
if new_data.is_empty() {
continue;
}
// Регистрируем как in-memory таблицу
let mem_table = MemTable::try_new(
new_data[0].schema(),
vec![new_data],
)?;
self.ctx.register_table("window_data", Arc::new(mem_table))?;
// Выполняем batch-запрос — все операторы работают
let result = self.ctx.sql(
"SELECT category, COUNT(*) as cnt,
AVG(value) as avg_val
FROM window_data
GROUP BY category
ORDER BY cnt DESC"
).await?.collect().await?;
// Обрабатываем результат
self.emit_results(&result).await;
// Очищаем временную таблицу
self.ctx.deregister_table("window_data")?;
}
}
}
Micro-batch — самый прагматичный подход для DataFusion. Вы получаете полную мощь SQL (JOIN, ORDER BY, оконные функции) за счёт задержки в один batch_interval. Для большинства аналитических задач задержка 5-30 секунд допустима.
Интеграция с Kafka
Пример PartitionStream, который читает из Kafka:
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::Message;
struct KafkaPartitionStream {
schema: SchemaRef,
consumer: Arc<StreamConsumer>,
topic: String,
partition: i32,
}
impl PartitionStream for KafkaPartitionStream {
fn schema(&self) -> &SchemaRef {
&self.schema
}
fn execute(
&self,
_ctx: Arc<TaskContext>,
) -> SendableRecordBatchStream {
let consumer = self.consumer.clone();
let schema = self.schema.clone();
let stream = async_stream::stream! {
let mut buffer = Vec::new();
let batch_size = 1024;
loop {
match consumer.recv().await {
Ok(msg) => {
if let Some(payload) = msg.payload() {
let row = parse_message(payload, &schema);
buffer.push(row);
}
if buffer.len() >= batch_size {
let batch = rows_to_record_batch(
&buffer, &schema
);
buffer.clear();
yield Ok(batch);
}
}
Err(e) => {
tracing::error!("Kafka recv error: {}", e);
// Продолжаем — transient ошибки нормальны
}
}
}
};
Box::pin(RecordBatchStreamAdapter::new(schema, stream))
}
}
Watermark и event time
DataFusion не имеет встроенной поддержки watermark (в отличие от Flink). Но вы можете реализовать watermark-логику в PartitionStream:
struct WatermarkedStream {
inner: SendableRecordBatchStream,
watermark: Arc<AtomicI64>,
event_time_column: String,
}
impl Stream for WatermarkedStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match self.inner.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(batch))) => {
// Обновляем watermark по max event_time в batch
let event_col = batch
.column_by_name(&self.event_time_column)
.expect("event_time column");
let max_time = compute::max(
event_col.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
).unwrap();
self.watermark.store(max_time, Ordering::Release);
Poll::Ready(Some(Ok(batch)))
}
other => other,
}
}
}
Watermark в DataFusion — ответственность вашего кода, не движка. Это означает полный контроль, но и полную ответственность за корректность. Для сложных event-time semantics рассмотрите Flink или Arroyo, которые интегрируются с DataFusion внутри.
Инкрементальная обработка файлов
Помимо streaming sources, распространённый паттерн — инкрементальная обработка новых файлов:
use notify::{Watcher, RecursiveMode, Event, EventKind};
struct IncrementalFileProcessor {
ctx: SessionContext,
watch_dir: PathBuf,
processed: HashSet<PathBuf>,
}
impl IncrementalFileProcessor {
async fn run(&mut self) -> Result<()> {
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
let mut watcher = notify::recommended_watcher(
move |event: Result<Event, _>| {
if let Ok(event) = event {
if matches!(event.kind, EventKind::Create(_)) {
for path in event.paths {
let _ = tx.blocking_send(path);
}
}
}
},
)?;
watcher.watch(&self.watch_dir, RecursiveMode::NonRecursive)?;
while let Some(path) = rx.recv().await {
if self.processed.contains(&path) {
continue;
}
if path.extension().map_or(false, |e| e == "parquet") {
tracing::info!(file = %path.display(), "Processing new file");
self.ctx.register_parquet(
"new_data",
path.to_str().unwrap(),
ParquetReadOptions::default(),
).await?;
let result = self.ctx.sql(
"INSERT INTO processed_data SELECT * FROM new_data"
).await?.collect().await?;
self.ctx.deregister_table("new_data")?;
self.processed.insert(path);
}
}
Ok(())
}
}
Выбор подхода
| Подход | Задержка | Сложность | SQL-возможности | Когда использовать |
|---|---|---|---|---|
| StreamingTable | Минимальная (ms) | Высокая | Ограничены (нет sort, full join) | Real-time мониторинг, алерты |
| Micro-batch | Средняя (секунды) | Средняя | Полные | Аналитика с допустимой задержкой |
| Incremental files | Высокая (минуты) | Низкая | Полные | ETL, data pipeline |
| Polling (periodic query) | Настраиваемая | Низкая | Полные | Dashboard refresh |
Резюме
- DataFusion — batch engine, но поддерживает streaming через
StreamingTabletrait StreamingTableрегистрирует unbounded источники — запрос не завершается, стримит результаты- Не все операторы работают с unbounded данными — Sort, full Hash JOIN, DISTINCT требуют конечного входа
- Micro-batch — прагматичный компромисс: полный SQL с задержкой в один batch_interval
- Watermark и event time — ответственность вашего кода, не движка
- Для сложного stream processing (exactly-once, event time, stateful windows) — рассмотрите Flink или Arroyo