Production observability: мониторинг, метрики и диагностика
В уроке по бенчмаркингу мы изучили профилирование отдельных запросов. Но production-мониторинг — это другая задача: нужно непрерывно отслеживать здоровье всей системы, обнаруживать деградацию до того, как пользователи пожалуются, и быстро находить причину при инцидентах. Этот урок показывает, как выстроить наблюдаемость DataFusion-сервиса.
Три столпа observability
Structured logging с tracing
Crate tracing — стандарт для structured logging в Rust. Вместо строковых println! используются spans (контексты) и events (записи) со structured полями:
Настройка tracing-subscriber
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
fn init_logging() {
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info,datafusion=warn".into()))
.with(fmt::layer()
.json() // JSON формат для machine parsing
.with_target(true) // Включаем module path
.with_thread_ids(true)
.with_span_events(fmt::format::FmtSpan::CLOSE))
.init();
}
Фильтр datafusion=warn подавляет debug/info логи самого DataFusion (их много при выполнении запросов). Для отладки конкретного запроса временно включите datafusion=debug — увидите каждый шаг оптимизатора и executor-а.
Per-request spans
Каждый запрос оборачивается в span со structured полями:
use tracing::{instrument, info, warn, Span};
#[instrument(
skip(state, request),
fields(
tenant_id = %request.tenant_id,
query_hash = %hash_sql(&request.sql),
query_length = request.sql.len(),
)
)]
async fn query_handler(
State(state): State<Arc<AppState>>,
Json(request): Json<QueryRequest>,
) -> Result<Json<QueryResponse>, AppError> {
let start = std::time::Instant::now();
// Парсинг и валидация
validate_read_only(&request.sql)?;
let df = state.ctx.sql(&request.sql).await?;
// Логируем оптимизированный план
let plan = df.logical_plan();
info!(
plan = %plan.display_indent(),
"Query planned"
);
// Выполнение
let batches = df.collect().await?;
let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
let duration = start.elapsed();
// Structured event с метриками запроса
info!(
duration_ms = duration.as_millis() as u64,
row_count = row_count,
batch_count = batches.len(),
"Query completed"
);
// Алерт на медленные запросы
if duration > Duration::from_secs(30) {
warn!(
duration_ms = duration.as_millis() as u64,
sql = %request.sql,
"Slow query detected"
);
}
Ok(Json(QueryResponse { rows: to_json(&batches)?, row_count }))
}
Structured error logging
use tracing::error;
// При ошибке DataFusion — логируем контекст
match df.collect().await {
Ok(batches) => Ok(batches),
Err(e) => {
error!(
error = %e,
error_kind = ?e.find_root(),
sql = %request.sql,
"Query execution failed"
);
Err(AppError::QueryFailed(e.to_string()))
}
}
EXPLAIN ANALYZE в production
EXPLAIN ANALYZE выполняет запрос и собирает статистику каждого оператора. В production используйте его для диагностики медленных запросов.
Программный доступ к execution stats
use datafusion::physical_plan::{
accept, ExecutionPlan, ExecutionPlanVisitor,
displayable,
};
struct StatsCollector {
operators: Vec<OperatorStats>,
}
struct OperatorStats {
name: String,
output_rows: usize,
elapsed_compute_ns: u64,
spill_count: usize,
spilled_bytes: usize,
}
impl ExecutionPlanVisitor for StatsCollector {
type Error = DataFusionError;
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool> {
if let Some(metrics) = plan.metrics() {
let stats = OperatorStats {
name: plan.name().to_string(),
output_rows: metrics.output_rows().unwrap_or(0),
elapsed_compute_ns: metrics
.elapsed_compute()
.unwrap_or(0),
spill_count: metrics.spill_count().unwrap_or(0),
spilled_bytes: metrics.spilled_bytes().unwrap_or(0),
};
self.operators.push(stats);
}
Ok(true)
}
}
// Использование: собираем статистику после выполнения
async fn collect_with_stats(
df: DataFrame,
) -> Result<(Vec<RecordBatch>, Vec<OperatorStats>)> {
let (state, plan) = df.into_parts();
let physical = state.create_physical_plan(&plan).await?;
let task_ctx = state.task_ctx();
let batches = collect(physical.clone(), task_ctx).await?;
// После выполнения physical plan содержит метрики
let mut collector = StatsCollector { operators: vec![] };
accept(physical.as_ref(), &mut collector)?;
Ok((batches, collector.operators))
}
EXPLAIN ANALYZE удваивает нагрузку — запрос реально выполняется. Не вызывайте его для каждого запроса в production. Используйте для диагностики: включайте по флагу (?analyze=true) или автоматически для запросов, превышающих порог latency.
Метрики с Prometheus
Определение метрик
use prometheus::{
Histogram, HistogramOpts, HistogramVec,
IntCounter, IntCounterVec, IntGauge,
Opts, Registry,
};
struct QueryMetrics {
/// Histogram латентности запросов (секунды)
query_duration: HistogramVec,
/// Количество запросов (total)
queries_total: IntCounterVec,
/// Количество ошибок
query_errors: IntCounterVec,
/// Текущее потребление памяти MemoryPool (bytes)
memory_pool_used: IntGauge,
/// Количество spill операций
spill_count: IntCounter,
/// Количество активных запросов
active_queries: IntGauge,
}
impl QueryMetrics {
fn new(registry: &Registry) -> Self {
let query_duration = HistogramVec::new(
HistogramOpts::new(
"datafusion_query_duration_seconds",
"Query execution duration in seconds",
)
.buckets(vec![
0.01, 0.05, 0.1, 0.25, 0.5,
1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 300.0,
]),
&["tenant", "query_class"],
)
.unwrap();
registry.register(Box::new(query_duration.clone())).unwrap();
let queries_total = IntCounterVec::new(
Opts::new(
"datafusion_queries_total",
"Total number of queries executed",
),
&["tenant", "status"],
)
.unwrap();
registry.register(Box::new(queries_total.clone())).unwrap();
// ... аналогично для остальных метрик
Self {
query_duration,
queries_total,
query_errors: IntCounterVec::new(
Opts::new(
"datafusion_query_errors_total",
"Total query errors",
),
&["tenant", "error_type"],
).unwrap(),
memory_pool_used: IntGauge::new(
"datafusion_memory_pool_used_bytes",
"Current memory pool usage",
).unwrap(),
spill_count: IntCounter::new(
"datafusion_spill_operations_total",
"Total spill-to-disk operations",
).unwrap(),
active_queries: IntGauge::new(
"datafusion_active_queries",
"Currently executing queries",
).unwrap(),
}
}
}
Инструментирование запроса
impl QueryMetrics {
fn observe_query(
&self,
tenant: &str,
class: &str,
duration: Duration,
success: bool,
stats: &[OperatorStats],
) {
// Латентность
self.query_duration
.with_label_values(&[tenant, class])
.observe(duration.as_secs_f64());
// Счётчик запросов
let status = if success { "ok" } else { "error" };
self.queries_total
.with_label_values(&[tenant, status])
.inc();
// Spill-счётчик из execution stats
let total_spills: usize = stats.iter()
.map(|s| s.spill_count)
.sum();
if total_spills > 0 {
self.spill_count.inc_by(total_spills as u64);
}
}
}
Prometheus endpoint
use axum::{routing::get, response::IntoResponse};
use prometheus::{Encoder, TextEncoder};
async fn metrics_handler(
State(registry): State<Arc<Registry>>,
) -> impl IntoResponse {
let encoder = TextEncoder::new();
let mut buffer = Vec::new();
let metrics = registry.gather();
encoder.encode(&metrics, &mut buffer).unwrap();
(
[("content-type", "text/plain; charset=utf-8")],
buffer,
)
}
// В router:
// .route("/metrics", get(metrics_handler))
OpenTelemetry трейсинг
Для end-to-end трейсинга — от HTTP-запроса до отдельных операторов DataFusion — интегрируйте tracing с OpenTelemetry:
Настройка
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_otlp::WithExportConfig;
use tracing_opentelemetry::OpenTelemetryLayer;
use opentelemetry_sdk::trace::TracerProvider;
fn init_tracing_with_otel() -> TracerProvider {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint("http://localhost:4317")
.build()
.expect("OTLP exporter build failed");
let provider = TracerProvider::builder()
.with_batch_exporter(exporter)
.build();
let tracer = provider.tracer("datafusion-service");
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info".into()))
.with(fmt::layer().json())
.with(OpenTelemetryLayer::new(tracer))
.init();
provider
}
Span-иерархия запроса
use tracing::{instrument, info_span, Instrument};
#[instrument(skip_all, fields(tenant_id, sql_hash))]
async fn handle_query(/* ... */) {
// Span: parse
let plan = async {
ctx.sql(&sql).await
}
.instrument(info_span!("sql_parse_plan"))
.await?;
// Span: optimize
let physical = async {
ctx.state().create_physical_plan(plan.logical_plan()).await
}
.instrument(info_span!("query_optimize"))
.await?;
// Span: execute
let batches = async {
collect(physical, ctx.task_ctx()).await
}
.instrument(info_span!("query_execute"))
.await?;
// Span: serialize
let response = async {
to_json(&batches)
}
.instrument(info_span!("serialize_response"))
.await?;
}
В Jaeger/Tempo вы увидите: handle_query → sql_parse_plan → query_optimize → query_execute → serialize_response. Это сразу показывает, какая фаза занимает больше всего времени. Для большинства аналитических запросов bottleneck — query_execute.
Алертинг на медленные запросы
Prometheus alerting rules
# prometheus/alerts.yml
groups:
- name: datafusion
rules:
- alert: SlowQueryP95
expr: |
histogram_quantile(0.95,
rate(datafusion_query_duration_seconds_bucket[5m])
) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "P95 query latency exceeds 10s"
- alert: HighErrorRate
expr: |
rate(datafusion_query_errors_total[5m])
/ rate(datafusion_queries_total[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "Query error rate exceeds 5%"
- alert: MemoryPoolNearLimit
expr: |
datafusion_memory_pool_used_bytes
/ datafusion_memory_pool_limit_bytes > 0.9
for: 2m
labels:
severity: warning
annotations:
summary: "Memory pool usage exceeds 90%"
- alert: ExcessiveSpilling
expr: |
rate(datafusion_spill_operations_total[5m]) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "High spill rate - consider increasing memory"
Программные алерты в коде
/// Middleware: логирует медленные запросы с планом
async fn slow_query_logger(
sql: &str,
duration: Duration,
stats: &[OperatorStats],
) {
if duration > Duration::from_secs(30) {
// Находим самый тяжёлый оператор
let bottleneck = stats.iter()
.max_by_key(|s| s.elapsed_compute_ns)
.map(|s| format!("{}: {:.2}s", s.name,
s.elapsed_compute_ns as f64 / 1e9));
tracing::warn!(
duration_ms = duration.as_millis() as u64,
bottleneck = ?bottleneck,
sql = %sql,
"Slow query — consider adding partition pruning or reducing scan scope"
);
}
}
Dashboard: ключевые панели
Для Grafana-дашборда DataFusion-сервиса рекомендуются следующие панели:
| Панель | Метрика | Визуализация |
|---|---|---|
| QPS | rate(datafusion_queries_total[1m]) | Time series |
| Latency distribution | datafusion_query_duration_seconds | Heatmap |
| P50 / P95 / P99 | histogram_quantile(...) | Time series, 3 линии |
| Memory usage | datafusion_memory_pool_used_bytes | Gauge + time series |
| Spill rate | rate(datafusion_spill_operations_total[5m]) | Time series |
| Active queries | datafusion_active_queries | Gauge |
| Error rate | rate(errors) / rate(total) | Time series + threshold |
| Per-tenant breakdown | Все метрики с label tenant | Table |
Начните с 4 панелей: QPS, P95 latency, memory usage, error rate. Остальные добавляйте по мере роста — преждевременный monitoring добавляет overhead без пользы.
Резюме
- Логи:
tracingcrate с JSON-форматом и per-request spans — structured fields вместо строк - Метрики: Prometheus histograms для latency, counters для QPS и ошибок, gauges для memory pool
- Трейсы: OpenTelemetry через
tracing-opentelemetry— span-иерархия от HTTP до execute - EXPLAIN ANALYZE: программный доступ через
ExecutionPlanVisitor— для диагностики, не для каждого запроса - Алерты: P95 latency, error rate, memory usage, spill rate — 4 алерта достаточно для начала
- Начинайте с логов + метрик. Трейсинг добавляйте когда появятся запросы, проходящие через несколько сервисов