Learning Platform
Глоссарий Troubleshooting
Урок 13.04 · 16 мин
Продвинутый
ObservabilityTracingEXPLAIN ANALYZEPrometheusOpenTelemetryMetricsAlertingStructured Logging

Production observability: мониторинг, метрики и диагностика

В уроке по бенчмаркингу мы изучили профилирование отдельных запросов. Но production-мониторинг — это другая задача: нужно непрерывно отслеживать здоровье всей системы, обнаруживать деградацию до того, как пользователи пожалуются, и быстро находить причину при инцидентах. Этот урок показывает, как выстроить наблюдаемость DataFusion-сервиса.

Три столпа observability

Три столпа observability DataFusion-сервиса
ЛогиStructured logging: tracing crate + tracing-subscriber, per-request spans
Инструменты
Что логируем
МетрикиМетрики: prometheus + DataFusion execution stats
Инструменты
Что считаем
ТрейсыТрейсы: span-иерархия запроса от HTTP до RecordBatch
Инструменты
Что трейсим

Structured logging с tracing

Crate tracing — стандарт для structured logging в Rust. Вместо строковых println! используются spans (контексты) и events (записи) со structured полями:

Настройка tracing-subscriber

use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

fn init_logging() {
    tracing_subscriber::registry()
        .with(EnvFilter::try_from_default_env()
            .unwrap_or_else(|_| "info,datafusion=warn".into()))
        .with(fmt::layer()
            .json()              // JSON формат для machine parsing
            .with_target(true)   // Включаем module path
            .with_thread_ids(true)
            .with_span_events(fmt::format::FmtSpan::CLOSE))
        .init();
}
TIP

Фильтр datafusion=warn подавляет debug/info логи самого DataFusion (их много при выполнении запросов). Для отладки конкретного запроса временно включите datafusion=debug — увидите каждый шаг оптимизатора и executor-а.

Per-request spans

Каждый запрос оборачивается в span со structured полями:

use tracing::{instrument, info, warn, Span};

#[instrument(
    skip(state, request),
    fields(
        tenant_id = %request.tenant_id,
        query_hash = %hash_sql(&request.sql),
        query_length = request.sql.len(),
    )
)]
async fn query_handler(
    State(state): State<Arc<AppState>>,
    Json(request): Json<QueryRequest>,
) -> Result<Json<QueryResponse>, AppError> {
    let start = std::time::Instant::now();

    // Парсинг и валидация
    validate_read_only(&request.sql)?;

    let df = state.ctx.sql(&request.sql).await?;

    // Логируем оптимизированный план
    let plan = df.logical_plan();
    info!(
        plan = %plan.display_indent(),
        "Query planned"
    );

    // Выполнение
    let batches = df.collect().await?;
    let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();

    let duration = start.elapsed();

    // Structured event с метриками запроса
    info!(
        duration_ms = duration.as_millis() as u64,
        row_count = row_count,
        batch_count = batches.len(),
        "Query completed"
    );

    // Алерт на медленные запросы
    if duration > Duration::from_secs(30) {
        warn!(
            duration_ms = duration.as_millis() as u64,
            sql = %request.sql,
            "Slow query detected"
        );
    }

    Ok(Json(QueryResponse { rows: to_json(&batches)?, row_count }))
}

Structured error logging

use tracing::error;

// При ошибке DataFusion — логируем контекст
match df.collect().await {
    Ok(batches) => Ok(batches),
    Err(e) => {
        error!(
            error = %e,
            error_kind = ?e.find_root(),
            sql = %request.sql,
            "Query execution failed"
        );
        Err(AppError::QueryFailed(e.to_string()))
    }
}

EXPLAIN ANALYZE в production

EXPLAIN ANALYZE выполняет запрос и собирает статистику каждого оператора. В production используйте его для диагностики медленных запросов.

Программный доступ к execution stats

use datafusion::physical_plan::{
    accept, ExecutionPlan, ExecutionPlanVisitor,
    displayable,
};

struct StatsCollector {
    operators: Vec<OperatorStats>,
}

struct OperatorStats {
    name: String,
    output_rows: usize,
    elapsed_compute_ns: u64,
    spill_count: usize,
    spilled_bytes: usize,
}

impl ExecutionPlanVisitor for StatsCollector {
    type Error = DataFusionError;

    fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool> {
        if let Some(metrics) = plan.metrics() {
            let stats = OperatorStats {
                name: plan.name().to_string(),
                output_rows: metrics.output_rows().unwrap_or(0),
                elapsed_compute_ns: metrics
                    .elapsed_compute()
                    .unwrap_or(0),
                spill_count: metrics.spill_count().unwrap_or(0),
                spilled_bytes: metrics.spilled_bytes().unwrap_or(0),
            };
            self.operators.push(stats);
        }
        Ok(true)
    }
}

// Использование: собираем статистику после выполнения
async fn collect_with_stats(
    df: DataFrame,
) -> Result<(Vec<RecordBatch>, Vec<OperatorStats>)> {
    let (state, plan) = df.into_parts();
    let physical = state.create_physical_plan(&plan).await?;

    let task_ctx = state.task_ctx();
    let batches = collect(physical.clone(), task_ctx).await?;

    // После выполнения physical plan содержит метрики
    let mut collector = StatsCollector { operators: vec![] };
    accept(physical.as_ref(), &mut collector)?;

    Ok((batches, collector.operators))
}
WARNING

EXPLAIN ANALYZE удваивает нагрузку — запрос реально выполняется. Не вызывайте его для каждого запроса в production. Используйте для диагностики: включайте по флагу (?analyze=true) или автоматически для запросов, превышающих порог latency.

Метрики с Prometheus

Определение метрик

use prometheus::{
    Histogram, HistogramOpts, HistogramVec,
    IntCounter, IntCounterVec, IntGauge,
    Opts, Registry,
};

struct QueryMetrics {
    /// Histogram латентности запросов (секунды)
    query_duration: HistogramVec,
    /// Количество запросов (total)
    queries_total: IntCounterVec,
    /// Количество ошибок
    query_errors: IntCounterVec,
    /// Текущее потребление памяти MemoryPool (bytes)
    memory_pool_used: IntGauge,
    /// Количество spill операций
    spill_count: IntCounter,
    /// Количество активных запросов
    active_queries: IntGauge,
}

impl QueryMetrics {
    fn new(registry: &Registry) -> Self {
        let query_duration = HistogramVec::new(
            HistogramOpts::new(
                "datafusion_query_duration_seconds",
                "Query execution duration in seconds",
            )
            .buckets(vec![
                0.01, 0.05, 0.1, 0.25, 0.5,
                1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 300.0,
            ]),
            &["tenant", "query_class"],
        )
        .unwrap();
        registry.register(Box::new(query_duration.clone())).unwrap();

        let queries_total = IntCounterVec::new(
            Opts::new(
                "datafusion_queries_total",
                "Total number of queries executed",
            ),
            &["tenant", "status"],
        )
        .unwrap();
        registry.register(Box::new(queries_total.clone())).unwrap();

        // ... аналогично для остальных метрик

        Self {
            query_duration,
            queries_total,
            query_errors: IntCounterVec::new(
                Opts::new(
                    "datafusion_query_errors_total",
                    "Total query errors",
                ),
                &["tenant", "error_type"],
            ).unwrap(),
            memory_pool_used: IntGauge::new(
                "datafusion_memory_pool_used_bytes",
                "Current memory pool usage",
            ).unwrap(),
            spill_count: IntCounter::new(
                "datafusion_spill_operations_total",
                "Total spill-to-disk operations",
            ).unwrap(),
            active_queries: IntGauge::new(
                "datafusion_active_queries",
                "Currently executing queries",
            ).unwrap(),
        }
    }
}

Инструментирование запроса

impl QueryMetrics {
    fn observe_query(
        &self,
        tenant: &str,
        class: &str,
        duration: Duration,
        success: bool,
        stats: &[OperatorStats],
    ) {
        // Латентность
        self.query_duration
            .with_label_values(&[tenant, class])
            .observe(duration.as_secs_f64());

        // Счётчик запросов
        let status = if success { "ok" } else { "error" };
        self.queries_total
            .with_label_values(&[tenant, status])
            .inc();

        // Spill-счётчик из execution stats
        let total_spills: usize = stats.iter()
            .map(|s| s.spill_count)
            .sum();
        if total_spills > 0 {
            self.spill_count.inc_by(total_spills as u64);
        }
    }
}

Prometheus endpoint

use axum::{routing::get, response::IntoResponse};
use prometheus::{Encoder, TextEncoder};

async fn metrics_handler(
    State(registry): State<Arc<Registry>>,
) -> impl IntoResponse {
    let encoder = TextEncoder::new();
    let mut buffer = Vec::new();
    let metrics = registry.gather();
    encoder.encode(&metrics, &mut buffer).unwrap();
    (
        [("content-type", "text/plain; charset=utf-8")],
        buffer,
    )
}

// В router:
// .route("/metrics", get(metrics_handler))

OpenTelemetry трейсинг

Для end-to-end трейсинга — от HTTP-запроса до отдельных операторов DataFusion — интегрируйте tracing с OpenTelemetry:

Настройка

use opentelemetry::trace::TracerProvider as _;
use opentelemetry_otlp::WithExportConfig;
use tracing_opentelemetry::OpenTelemetryLayer;
use opentelemetry_sdk::trace::TracerProvider;

fn init_tracing_with_otel() -> TracerProvider {
    let exporter = opentelemetry_otlp::SpanExporter::builder()
        .with_tonic()
        .with_endpoint("http://localhost:4317")
        .build()
        .expect("OTLP exporter build failed");

    let provider = TracerProvider::builder()
        .with_batch_exporter(exporter)
        .build();

    let tracer = provider.tracer("datafusion-service");

    tracing_subscriber::registry()
        .with(EnvFilter::try_from_default_env()
            .unwrap_or_else(|_| "info".into()))
        .with(fmt::layer().json())
        .with(OpenTelemetryLayer::new(tracer))
        .init();

    provider
}

Span-иерархия запроса

use tracing::{instrument, info_span, Instrument};

#[instrument(skip_all, fields(tenant_id, sql_hash))]
async fn handle_query(/* ... */) {
    // Span: parse
    let plan = async {
        ctx.sql(&sql).await
    }
    .instrument(info_span!("sql_parse_plan"))
    .await?;

    // Span: optimize
    let physical = async {
        ctx.state().create_physical_plan(plan.logical_plan()).await
    }
    .instrument(info_span!("query_optimize"))
    .await?;

    // Span: execute
    let batches = async {
        collect(physical, ctx.task_ctx()).await
    }
    .instrument(info_span!("query_execute"))
    .await?;

    // Span: serialize
    let response = async {
        to_json(&batches)
    }
    .instrument(info_span!("serialize_response"))
    .await?;
}
TIP

В Jaeger/Tempo вы увидите: handle_query → sql_parse_plan → query_optimize → query_execute → serialize_response. Это сразу показывает, какая фаза занимает больше всего времени. Для большинства аналитических запросов bottleneck — query_execute.

Алертинг на медленные запросы

Prometheus alerting rules

# prometheus/alerts.yml
groups:
  - name: datafusion
    rules:
      - alert: SlowQueryP95
        expr: |
          histogram_quantile(0.95,
            rate(datafusion_query_duration_seconds_bucket[5m])
          ) > 10
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "P95 query latency exceeds 10s"

      - alert: HighErrorRate
        expr: |
          rate(datafusion_query_errors_total[5m])
          / rate(datafusion_queries_total[5m]) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Query error rate exceeds 5%"

      - alert: MemoryPoolNearLimit
        expr: |
          datafusion_memory_pool_used_bytes
          / datafusion_memory_pool_limit_bytes > 0.9
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "Memory pool usage exceeds 90%"

      - alert: ExcessiveSpilling
        expr: |
          rate(datafusion_spill_operations_total[5m]) > 10
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High spill rate - consider increasing memory"

Программные алерты в коде

/// Middleware: логирует медленные запросы с планом
async fn slow_query_logger(
    sql: &str,
    duration: Duration,
    stats: &[OperatorStats],
) {
    if duration > Duration::from_secs(30) {
        // Находим самый тяжёлый оператор
        let bottleneck = stats.iter()
            .max_by_key(|s| s.elapsed_compute_ns)
            .map(|s| format!("{}: {:.2}s", s.name,
                s.elapsed_compute_ns as f64 / 1e9));

        tracing::warn!(
            duration_ms = duration.as_millis() as u64,
            bottleneck = ?bottleneck,
            sql = %sql,
            "Slow query — consider adding partition pruning or reducing scan scope"
        );
    }
}

Dashboard: ключевые панели

Для Grafana-дашборда DataFusion-сервиса рекомендуются следующие панели:

ПанельМетрикаВизуализация
QPSrate(datafusion_queries_total[1m])Time series
Latency distributiondatafusion_query_duration_secondsHeatmap
P50 / P95 / P99histogram_quantile(...)Time series, 3 линии
Memory usagedatafusion_memory_pool_used_bytesGauge + time series
Spill raterate(datafusion_spill_operations_total[5m])Time series
Active queriesdatafusion_active_queriesGauge
Error raterate(errors) / rate(total)Time series + threshold
Per-tenant breakdownВсе метрики с label tenantTable
NOTE

Начните с 4 панелей: QPS, P95 latency, memory usage, error rate. Остальные добавляйте по мере роста — преждевременный monitoring добавляет overhead без пользы.

Резюме

  • Логи: tracing crate с JSON-форматом и per-request spans — structured fields вместо строк
  • Метрики: Prometheus histograms для latency, counters для QPS и ошибок, gauges для memory pool
  • Трейсы: OpenTelemetry через tracing-opentelemetry — span-иерархия от HTTP до execute
  • EXPLAIN ANALYZE: программный доступ через ExecutionPlanVisitor — для диагностики, не для каждого запроса
  • Алерты: P95 latency, error rate, memory usage, spill rate — 4 алерта достаточно для начала
  • Начинайте с логов + метрик. Трейсинг добавляйте когда появятся запросы, проходящие через несколько сервисов

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какие три столпа observability используются для мониторинга DataFusion-сервиса в production?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4