Learning Platform
Глоссарий Troubleshooting
Урок 20.05 · 26 мин
Продвинутый
PrometheusGrafanaLokiOpenTelemetryMetricsLogsTracing

Observability stack

В предыдущих уроках мы говорили о профилировании (JFR, async-profiler), tuning state backend (RocksDB), и о detection skew. Это все active investigation tools. Этот урок про passive observability: continuous metrics, logs aggregation, distributed tracing. То, что позволяет понимать состояние production Flink-кластеров 24/7 без manual investigation.

Будем разбирать конкретный stack: Prometheus + Grafana для metrics, Loki для logs, OpenTelemetry для tracing. Это de-facto standard для cloud-native Flink deployments.

Мониторинг Flink: Prometheus + Grafana (практика) JMX-метрики Kafka: таксономия и мониторинг Система метрик Spark и Prometheus

Three pillars of observability

Production observability стоит на трёх pillars:

  1. Metrics — numerical time-series. Counter, gauge, histogram. Aggregate, alertable, low cardinality. Что было.
  2. Logs — structured event records. High cardinality, detailed context, full info. Что происходило.
  3. Traces — end-to-end request paths through distributed system. Per-request granularity, causality. Что взаимодействовало с чем.

Flink natively produces metrics, logs, and partial trace context. Задача observability stack — collect, store, query, visualize, alert.


Flink имеет built-in metrics system с pluggable reporters. Метрики делятся на три уровня:

System metrics — JVM, network, IO. Available out of the box:

flink_taskmanager_Status_JVM_Memory_Heap_Used
flink_taskmanager_Status_JVM_CPU_Load
flink_taskmanager_Status_JVM_GarbageCollector_*_Count
flink_taskmanager_Status_Network_AvailableMemorySegments

Job metrics — per-job aggregates:

flink_jobmanager_job_uptime
flink_jobmanager_job_numRunningJobs
flink_jobmanager_job_lastCheckpointDuration
flink_jobmanager_job_lastCheckpointSize
flink_jobmanager_job_numberOfCompletedCheckpoints

Task/Operator metrics — per-operator detail:

flink_taskmanager_job_task_operator_numRecordsIn
flink_taskmanager_job_task_operator_numRecordsOut
flink_taskmanager_job_task_operator_currentInputWatermark
flink_taskmanager_job_task_operator_backPressuredTimeMsPerSecond

Configuration metrics reporter:

# flink-conf.yaml
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249

# Multiple reporters одновременно
metrics.reporter.prom2.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
metrics.reporter.prom2.host: pushgateway.monitoring.svc
metrics.reporter.prom2.port: 9091

PrometheusReporter exposes scrape endpoint на каждом TaskManager. PushGateway alternative — для short-lived jobs или firewalled environments, push metrics в central gateway.


Custom application metrics

Beyond built-in, очень полезно exposing domain-specific metrics из вашего кода:

public class FraudDetector extends KeyedProcessFunction{'<'}...{'>'} {
    private transient Counter alertCount;
    private transient Counter eventCount;
    private transient Histogram processingTime;
    private transient Gauge{'<'}Long{'>'} activeSessionsGauge;

    @Override
    public void open(Configuration parameters) {
        MetricGroup group = getRuntimeContext().getMetricGroup();
        alertCount = group.counter("fraud_alerts_fired");
        eventCount = group.counter("fraud_events_processed");
        processingTime = group.histogram("fraud_processing_ms",
            new DropwizardHistogramWrapper(new Histogram(new SlidingTimeWindowReservoir(60, TimeUnit.SECONDS))));
        activeSessionsGauge = group.gauge("fraud_active_sessions",
            () -{'>'} (long) activeSessions.size());
    }

    @Override
    public void processElement(Event e, Context ctx, Collector{'<'}Alert{'>'} out) {
        long start = System.nanoTime();
        eventCount.inc();

        // ... business logic
        if (isFraud) {
            alertCount.inc();
            out.collect(alert);
        }

        long elapsedMs = (System.nanoTime() - start) / 1_000_000;
        processingTime.update(elapsedMs);
    }
}

Эти метрики появляются в Prometheus как:

flink_taskmanager_job_task_operator_fraud_alerts_fired_total
flink_taskmanager_job_task_operator_fraud_events_processed_total
flink_taskmanager_job_task_operator_fraud_processing_ms{quantile="0.99"}
flink_taskmanager_job_task_operator_fraud_active_sessions

Critical metrics для каждого Flink job:

  • Business event counters (alerts, transactions, errors).
  • Processing time histograms (per-stage latency).
  • Domain state size gauges (active sessions, pending tasks).
  • External call latency (DB, API calls).

Prometheus setup

Prometheus scrapes metrics с TaskManager endpoints на configured interval (typically 15-60 seconds).

# prometheus.yml
scrape_configs:
  - job_name: 'flink'
    scrape_interval: 15s
    kubernetes_sd_configs:
      - role: pod
        namespaces:
          names: ['flink']
    relabel_configs:
      - source_labels: [__meta_kubernetes_pod_label_app]
        regex: 'flink-(jobmanager|taskmanager)'
        action: keep
      - source_labels: [__meta_kubernetes_pod_container_port_number]
        regex: '9249'
        action: keep
      - source_labels: [__meta_kubernetes_pod_name]
        target_label: instance

Kubernetes service discovery automatically finds Flink pods. PromQL queries для healthchecks:

# Job uptime
flink_jobmanager_job_uptime{job_name="my-job"}

# Throughput per operator
sum(rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])) by (operator_name)

# Backpressure ratio (0-1)
flink_taskmanager_job_task_operator_backPressuredTimeMsPerSecond / 1000

# Checkpoint duration
flink_jobmanager_job_lastCheckpointDuration{quantile="0.99"}

# State size growth
deriv(flink_jobmanager_job_lastCheckpointSize[1h])

Retention в Prometheus typically 15-30 days. Для long-term retention — Cortex, Thanos, или Mimir layers поверх Prometheus.


Grafana dashboards

Grafana visualizes Prometheus metrics. Production Flink dashboard typically включает:

1. Overview панель — job status, uptime, parallelism, version.

2. Throughput — events/sec per operator, source vs sink.

3. Latency — end-to-end latency (через custom metrics или Watermark lag).

4. Backpressure — per-operator backpressure ratio, color-coded.

5. Checkpoints — duration, size, success rate, last failure time.

6. State — total state size, per-operator size, growth rate.

7. Resources — CPU per TaskManager, memory (heap + RocksDB), network throughput.

8. RocksDB — block cache hit rate, compaction queue, write stalls.

9. Errors — restart count, job failures, exception count.

10. Custom business metrics — domain-specific.

Pre-built Flink dashboard available на Grafana.com:

# Import dashboard 13456 (Apache Flink Job Monitoring)
# Через Grafana UI: Dashboards -> Import -> 13456

Customize для your workload по необходимости.


Observability data flow: from Flink to Grafana
TaskManagerFlink TaskManager: emits metrics through MetricGroup API. PrometheusReporter exposes scrape endpoint на каждом TM. JFR rolling recording захватывает JVM events.
metrics
PrometheusPrometheus: scrapes /metrics endpoint каждые 15s. Stores в local TSDB. Retention 15-30 days. Queries via PromQL.
PromQL
GrafanaGrafana: dashboards с panels using PromQL queries. Real-time visualization, alerting via Alertmanager integration.
Flink logsFlink logs: log4j2 output в stdout/stderr контейнера. Включает JobManager и TaskManager logs, user code logs (logging from operators).
ship
Log shipperPromtail/Vector: log shipper, читает container stdout, парсит, добавляет labels (pod, namespace, app, job_id). Forwards в Loki.
LokiLoki: log aggregation, similar to Prometheus но для logs. Indexes labels, not full text. LogQL queries для search/filter. Storage на S3 typically.
OTel SDKOpenTelemetry instrumentation: код в Flink user functions emits spans с trace context. Trace propagation через Kafka headers, event metadata.
OTLP
OTel CollectorOTel Collector: receives spans, processes, exports в backend. Может delete, sample, enrich spans. Supports multiple destinations одновременно.
TempoTempo/Jaeger/Zipkin: trace storage backend. Indexed by trace ID. Grafana может query Tempo и render trace timeline directly.

Logs aggregation с Loki

Loki — это log aggregation system от Grafana Labs. Похож на Prometheus, но для logs. Storage оптимизирован для labels, не full-text indexing — это делает Loki cheap (S3 storage) и fast для filtered queries.

Архитектура:

  • Log shipper (Promtail, Vector, Fluent Bit) collects container logs, ships в Loki.
  • Loki stores logs partitioned by labels (pod, namespace, app, job_id, level).
  • Grafana queries Loki через LogQL.

Setup для Flink в Kubernetes:

# Promtail config
clients:
  - url: http://loki:3100/loki/api/v1/push

scrape_configs:
  - job_name: kubernetes-pods
    kubernetes_sd_configs:
      - role: pod
    relabel_configs:
      - source_labels: [__meta_kubernetes_pod_label_app]
        regex: 'flink-(jobmanager|taskmanager)'
        action: keep
      - source_labels: [__meta_kubernetes_pod_label_flink_job_id]
        target_label: job_id
      - source_labels: [__meta_kubernetes_pod_name]
        target_label: pod
    pipeline_stages:
      - cri: {}
      - regex:
          expression: '(?P{'<'}level{'>'}INFO|WARN|ERROR|DEBUG) (?P{'<'}message{'>'}.*)'
      - labels:
          level:

После setup, LogQL queries в Grafana:

# All ERROR logs from Flink jobs за последний час
{app="flink-taskmanager", level="ERROR"} |= "exception"

# Errors per job
sum(count_over_time({app="flink-taskmanager", level="ERROR"}[5m])) by (job_id)

# Specific exception type
{app="flink-taskmanager"} |= "OutOfMemoryError"

Structured logging recommended для better querying:

import org.slf4j.MDC;

public class MyFunction extends RichMapFunction{'<'}...{'>'} {
    private static final Logger LOG = LoggerFactory.getLogger(MyFunction.class);

    public Object map(Event e) {
        MDC.put("user_id", e.userId);
        MDC.put("event_type", e.type);
        try {
            // ... process
            LOG.info("Processed event: amount={}", e.amount);
        } finally {
            MDC.clear();
        }
        return result;
    }
}

MDC adds labels к каждой log line, что enables better query filtering в Loki.


OpenTelemetry tracing

Distributed tracing критично для understanding end-to-end latency в multi-service pipeline. OpenTelemetry — open standard для tracing с support в всех major languages и vendors.

Setup для Flink:

{'<'}!-- pom.xml --{'>'}
{'<'}dependency{'>'}
    {'<'}groupId{'>'}io.opentelemetry{'<'}/groupId{'>'}
    {'<'}artifactId{'>'}opentelemetry-api{'<'}/artifactId{'>'}
{'<'}/dependency{'>'}
{'<'}dependency{'>'}
    {'<'}groupId{'>'}io.opentelemetry{'<'}/groupId{'>'}
    {'<'}artifactId{'>'}opentelemetry-sdk{'<'}/artifactId{'>'}
{'<'}/dependency{'>'}
{'<'}dependency{'>'}
    {'<'}groupId{'>'}io.opentelemetry{'<'}/groupId{'>'}
    {'<'}artifactId{'>'}opentelemetry-exporter-otlp{'<'}/artifactId{'>'}
{'<'}/dependency{'>'}

Initialize в job:

public class TraceInit {
    public static OpenTelemetry initialize() {
        OtlpGrpcSpanExporter exporter = OtlpGrpcSpanExporter.builder()
            .setEndpoint("http://otel-collector:4317")
            .build();

        BatchSpanProcessor processor = BatchSpanProcessor.builder(exporter).build();

        SdkTracerProvider tracerProvider = SdkTracerProvider.builder()
            .addSpanProcessor(processor)
            .setResource(Resource.builder()
                .put("service.name", "flink-fraud-detector")
                .put("flink.job", System.getProperty("job.name", "unknown"))
                .build())
            .build();

        OpenTelemetry otel = OpenTelemetrySdk.builder()
            .setTracerProvider(tracerProvider)
            .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
            .buildAndRegisterGlobal();

        return otel;
    }
}

В operator code:

public class FraudDetector extends KeyedProcessFunction{'<'}...{'>'} {
    private transient Tracer tracer;

    @Override
    public void open(Configuration parameters) {
        tracer = GlobalOpenTelemetry.getTracer("fraud-detector");
    }

    @Override
    public void processElement(Event e, Context ctx, Collector{'<'}Alert{'>'} out) {
        Span span = tracer.spanBuilder("process_event")
            .setAttribute("user.id", e.userId)
            .setAttribute("event.amount", e.amount)
            .startSpan();

        try (Scope scope = span.makeCurrent()) {
            // ... business logic с child spans для slow operations
            Span dbSpan = tracer.spanBuilder("lookup_user_profile").startSpan();
            try {
                UserProfile profile = lookupUser(e.userId);
                dbSpan.setAttribute("user.country", profile.country);
            } finally {
                dbSpan.end();
            }

            // ... more processing
        } catch (Exception ex) {
            span.recordException(ex);
            span.setStatus(StatusCode.ERROR);
            throw ex;
        } finally {
            span.end();
        }
    }
}

Spans экспортируются в OpenTelemetry Collector -> Tempo/Jaeger.


Trace context propagation

Critical для distributed tracing — propagation trace context через async boundaries. В Flink — это Kafka topics, async I/O, external APIs.

Kafka context propagation:

// Producer: inject trace context в Kafka headers
public class TraceInjectingSerializer implements SerializationSchema{'<'}Event{'>'} {
    private static final TextMapSetter{'<'}Map{'<'}String, byte[]{'>'}{'>'} SETTER =
        (carrier, key, value) -{'>'} carrier.put(key, value.getBytes(StandardCharsets.UTF_8));

    @Override
    public byte[] serialize(Event e) {
        Map{'<'}String, byte[]{'>'} headers = new HashMap{'<'}{'>'}();
        W3CTraceContextPropagator.getInstance().inject(
            Context.current(),
            headers,
            SETTER
        );
        // ... serialize event + attach headers
        return bytes;
    }
}

// Consumer: extract trace context from headers
public class TraceExtractingDeserializer implements KafkaRecordDeserializationSchema{'<'}Event{'>'} {
    private static final TextMapGetter{'<'}Headers{'>'} GETTER = new TextMapGetter{'<'}{'>'}() {
        @Override
        public String get(Headers carrier, String key) {
            Header h = carrier.lastHeader(key);
            return h == null ? null : new String(h.value(), StandardCharsets.UTF_8);
        }

        @Override
        public Iterable{'<'}String{'>'} keys(Headers carrier) {
            return StreamSupport.stream(carrier.spliterator(), false)
                .map(Header::key).collect(Collectors.toList());
        }
    };

    @Override
    public void deserialize(ConsumerRecord{'<'}byte[], byte[]{'>'} record, Collector{'<'}Event{'>'} out) {
        Context extracted = W3CTraceContextPropagator.getInstance().extract(
            Context.current(),
            record.headers(),
            GETTER
        );

        try (Scope scope = extracted.makeCurrent()) {
            // ... deserialize, downstream operators будут видеть trace context
        }
    }
}

Это enables end-to-end tracing through Kafka -> Flink -> Kafka chains. В Jaeger UI вы видите full trace from upstream producer through Flink operators to downstream consumer.


Alerting strategy

Critical alerts для Flink production:

1. Job down.

up{job="flink"} == 0

2. Job restart count high.

increase(flink_jobmanager_job_numRestarts[1h]) > 3

3. Checkpoint failures.

flink_jobmanager_job_numberOfFailedCheckpoints / flink_jobmanager_job_totalNumberOfCheckpoints > 0.05

4. Backpressure persistent.

avg_over_time(flink_taskmanager_job_task_operator_backPressuredTimeMsPerSecond[15m]) > 500

5. Checkpoint duration excessive.

flink_jobmanager_job_lastCheckpointDuration > 300000  # 5 minutes

6. State size growth abnormal.

deriv(flink_jobmanager_job_lastCheckpointSize[1h]) > 1e9  # 1 GB per hour

7. Memory pressure.

flink_taskmanager_Status_JVM_Memory_Heap_Used / flink_taskmanager_Status_JVM_Memory_Heap_Max > 0.9

Каждый alert routed в PagerDuty/Slack/Opsgenie через Alertmanager. Severity levels: critical (page on-call), warning (Slack notification), info (daily digest).


Production observability playbook

Stack components:

  1. Prometheus — metrics, retention 15 days, federated в Mimir для long-term.
  2. Loki — logs, S3-backed, retention 30 days, indexed by labels.
  3. Tempo — traces, S3-backed, retention 7 days.
  4. Grafana — unified UI для metrics + logs + traces, alerting via Alertmanager.
  5. PrometheusOperator — manages CRD-based Prometheus rules.

Operational practices:

  • Dashboards per job — каждый production job имеет dedicated dashboard.
  • Alert runbooks — каждый alert linked к Confluence/Notion runbook с recovery steps.
  • Weekly review — team review top jobs metrics, identify regressions.
  • SLO definition — formal SLO для critical jobs (latency p99, throughput, error rate).
  • Error budget tracking — alert when error budget consumption accelerates.
TIP

Observability — это not infrastructure project. Это ongoing operational discipline. Setup правильного stack — это 20% работы; remaining 80% — это ongoing tuning dashboards, refining alerts, training team. Plan for continuous investment.


Попробуй сам

  1. Local stack with docker-compose. Поднимите Prometheus + Grafana + Loki в Docker. Setup Flink session cluster, configure Prometheus reporter. Verify metrics flowing.

  2. Custom metric in user code. Add counter и histogram в простой Flink job. Verify видимость в Prometheus.

  3. End-to-end trace через Kafka. Setup Tempo, instrument Flink job с OpenTelemetry, send events через Kafka. Verify в Grafana trace view full path.

Проверка знанийKnowledge check
Production Flink kafka-to-iceberg pipeline показал inconsistent behavior: 95% requests завершаются за 50ms, но 5% занимают 5+ seconds. Метрики показывают normal averages (нет obvious spike), backpressure intermittent. Как использовать observability stack (metrics + logs + traces) для root cause analysis? Конкретный план.
ОтветAnswer
Хвостовая latency 5% при normal averages — classic case where averages lie, нужны percentiles и per-request detail. План: (1) METRICS: посмотреть histogram percentiles вместо averages. PromQL: histogram_quantile(0.95, flink_taskmanager_job_task_operator_processing_time_bucket). Если p95 = 5 seconds — confirmed tail latency. Дальше per-operator breakdown — какой operator показывает spread. (2) METRICS: backpressure intermittent — посмотреть когда именно. Если correlates с checkpoint times — checkpoint barrier blocking. Если correlates с GC pauses — JVM issue. Если random — нужны traces. (3) TRACES: для slow requests get trace ID. В Tempo посмотреть full trace timeline — где конкретно spent 5+ seconds. Возможные patterns: (a) External API call slow (DB/HTTP) — span показывает long external call. (b) State backend stall — long span на 'rocksdb_get' или similar. (c) Network retransmit — span gap без visible work. (d) JVM GC pause — happens within operator, shows как long gap в spans, ничто не explicit. (4) LOGS: для same time range as slow trace — search Loki: {app="flink-taskmanager"} |= "WARN|ERROR" | json. Часто slow requests correlate с warnings — slow downstream, retry, exception. (5) Cross-reference: combine trace ID + timestamp с logs. Например trace shows 5s spent в processElement, logs show 'WARN: Kafka producer retrying' в same window — root cause network glitch к downstream Kafka. (6) Если patterns unclear — collect представительную sample: trigger async-profiler в wall mode на 60 seconds, ищите 'park' времена для blocking. JFR rolling snapshot для GC analysis в same window. (7) Verification: после fix monitor p99 latency в Grafana по последним 24h — should drop. Setup alert: histogram_quantile(0.99, ...) > 1000ms для 5 minutes — page on-call. Без observability such tail issues могут существовать undetected месяцами.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Production Flink kafka-to-iceberg pipeline: 95% requests завершаются за 50ms, но 5% занимают 5+ seconds. Метрики показывают normal averages. Как использовать observability stack для root cause analysis?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5