Observability stack
В предыдущих уроках мы говорили о профилировании (JFR, async-profiler), tuning state backend (RocksDB), и о detection skew. Это все active investigation tools. Этот урок про passive observability: continuous metrics, logs aggregation, distributed tracing. То, что позволяет понимать состояние production Flink-кластеров 24/7 без manual investigation.
Будем разбирать конкретный stack: Prometheus + Grafana для metrics, Loki для logs, OpenTelemetry для tracing. Это de-facto standard для cloud-native Flink deployments.
Мониторинг Flink: Prometheus + Grafana (практика) JMX-метрики Kafka: таксономия и мониторинг Система метрик Spark и PrometheusThree pillars of observability
Production observability стоит на трёх pillars:
- Metrics — numerical time-series. Counter, gauge, histogram. Aggregate, alertable, low cardinality. Что было.
- Logs — structured event records. High cardinality, detailed context, full info. Что происходило.
- Traces — end-to-end request paths through distributed system. Per-request granularity, causality. Что взаимодействовало с чем.
Flink natively produces metrics, logs, and partial trace context. Задача observability stack — collect, store, query, visualize, alert.
Flink metrics system
Flink имеет built-in metrics system с pluggable reporters. Метрики делятся на три уровня:
System metrics — JVM, network, IO. Available out of the box:
flink_taskmanager_Status_JVM_Memory_Heap_Used
flink_taskmanager_Status_JVM_CPU_Load
flink_taskmanager_Status_JVM_GarbageCollector_*_Count
flink_taskmanager_Status_Network_AvailableMemorySegments
Job metrics — per-job aggregates:
flink_jobmanager_job_uptime
flink_jobmanager_job_numRunningJobs
flink_jobmanager_job_lastCheckpointDuration
flink_jobmanager_job_lastCheckpointSize
flink_jobmanager_job_numberOfCompletedCheckpoints
Task/Operator metrics — per-operator detail:
flink_taskmanager_job_task_operator_numRecordsIn
flink_taskmanager_job_task_operator_numRecordsOut
flink_taskmanager_job_task_operator_currentInputWatermark
flink_taskmanager_job_task_operator_backPressuredTimeMsPerSecond
Configuration metrics reporter:
# flink-conf.yaml
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249
# Multiple reporters одновременно
metrics.reporter.prom2.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
metrics.reporter.prom2.host: pushgateway.monitoring.svc
metrics.reporter.prom2.port: 9091
PrometheusReporter exposes scrape endpoint на каждом TaskManager. PushGateway alternative — для short-lived jobs или firewalled environments, push metrics в central gateway.
Custom application metrics
Beyond built-in, очень полезно exposing domain-specific metrics из вашего кода:
public class FraudDetector extends KeyedProcessFunction{'<'}...{'>'} {
private transient Counter alertCount;
private transient Counter eventCount;
private transient Histogram processingTime;
private transient Gauge{'<'}Long{'>'} activeSessionsGauge;
@Override
public void open(Configuration parameters) {
MetricGroup group = getRuntimeContext().getMetricGroup();
alertCount = group.counter("fraud_alerts_fired");
eventCount = group.counter("fraud_events_processed");
processingTime = group.histogram("fraud_processing_ms",
new DropwizardHistogramWrapper(new Histogram(new SlidingTimeWindowReservoir(60, TimeUnit.SECONDS))));
activeSessionsGauge = group.gauge("fraud_active_sessions",
() -{'>'} (long) activeSessions.size());
}
@Override
public void processElement(Event e, Context ctx, Collector{'<'}Alert{'>'} out) {
long start = System.nanoTime();
eventCount.inc();
// ... business logic
if (isFraud) {
alertCount.inc();
out.collect(alert);
}
long elapsedMs = (System.nanoTime() - start) / 1_000_000;
processingTime.update(elapsedMs);
}
}
Эти метрики появляются в Prometheus как:
flink_taskmanager_job_task_operator_fraud_alerts_fired_total
flink_taskmanager_job_task_operator_fraud_events_processed_total
flink_taskmanager_job_task_operator_fraud_processing_ms{quantile="0.99"}
flink_taskmanager_job_task_operator_fraud_active_sessions
Critical metrics для каждого Flink job:
- Business event counters (alerts, transactions, errors).
- Processing time histograms (per-stage latency).
- Domain state size gauges (active sessions, pending tasks).
- External call latency (DB, API calls).
Prometheus setup
Prometheus scrapes metrics с TaskManager endpoints на configured interval (typically 15-60 seconds).
# prometheus.yml
scrape_configs:
- job_name: 'flink'
scrape_interval: 15s
kubernetes_sd_configs:
- role: pod
namespaces:
names: ['flink']
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
regex: 'flink-(jobmanager|taskmanager)'
action: keep
- source_labels: [__meta_kubernetes_pod_container_port_number]
regex: '9249'
action: keep
- source_labels: [__meta_kubernetes_pod_name]
target_label: instance
Kubernetes service discovery automatically finds Flink pods. PromQL queries для healthchecks:
# Job uptime
flink_jobmanager_job_uptime{job_name="my-job"}
# Throughput per operator
sum(rate(flink_taskmanager_job_task_operator_numRecordsIn[5m])) by (operator_name)
# Backpressure ratio (0-1)
flink_taskmanager_job_task_operator_backPressuredTimeMsPerSecond / 1000
# Checkpoint duration
flink_jobmanager_job_lastCheckpointDuration{quantile="0.99"}
# State size growth
deriv(flink_jobmanager_job_lastCheckpointSize[1h])
Retention в Prometheus typically 15-30 days. Для long-term retention — Cortex, Thanos, или Mimir layers поверх Prometheus.
Grafana dashboards
Grafana visualizes Prometheus metrics. Production Flink dashboard typically включает:
1. Overview панель — job status, uptime, parallelism, version.
2. Throughput — events/sec per operator, source vs sink.
3. Latency — end-to-end latency (через custom metrics или Watermark lag).
4. Backpressure — per-operator backpressure ratio, color-coded.
5. Checkpoints — duration, size, success rate, last failure time.
6. State — total state size, per-operator size, growth rate.
7. Resources — CPU per TaskManager, memory (heap + RocksDB), network throughput.
8. RocksDB — block cache hit rate, compaction queue, write stalls.
9. Errors — restart count, job failures, exception count.
10. Custom business metrics — domain-specific.
Pre-built Flink dashboard available на Grafana.com:
# Import dashboard 13456 (Apache Flink Job Monitoring)
# Через Grafana UI: Dashboards -> Import -> 13456
Customize для your workload по необходимости.
Logs aggregation с Loki
Loki — это log aggregation system от Grafana Labs. Похож на Prometheus, но для logs. Storage оптимизирован для labels, не full-text indexing — это делает Loki cheap (S3 storage) и fast для filtered queries.
Архитектура:
- Log shipper (Promtail, Vector, Fluent Bit) collects container logs, ships в Loki.
- Loki stores logs partitioned by labels (pod, namespace, app, job_id, level).
- Grafana queries Loki через LogQL.
Setup для Flink в Kubernetes:
# Promtail config
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: kubernetes-pods
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
regex: 'flink-(jobmanager|taskmanager)'
action: keep
- source_labels: [__meta_kubernetes_pod_label_flink_job_id]
target_label: job_id
- source_labels: [__meta_kubernetes_pod_name]
target_label: pod
pipeline_stages:
- cri: {}
- regex:
expression: '(?P{'<'}level{'>'}INFO|WARN|ERROR|DEBUG) (?P{'<'}message{'>'}.*)'
- labels:
level:
После setup, LogQL queries в Grafana:
# All ERROR logs from Flink jobs за последний час
{app="flink-taskmanager", level="ERROR"} |= "exception"
# Errors per job
sum(count_over_time({app="flink-taskmanager", level="ERROR"}[5m])) by (job_id)
# Specific exception type
{app="flink-taskmanager"} |= "OutOfMemoryError"
Structured logging recommended для better querying:
import org.slf4j.MDC;
public class MyFunction extends RichMapFunction{'<'}...{'>'} {
private static final Logger LOG = LoggerFactory.getLogger(MyFunction.class);
public Object map(Event e) {
MDC.put("user_id", e.userId);
MDC.put("event_type", e.type);
try {
// ... process
LOG.info("Processed event: amount={}", e.amount);
} finally {
MDC.clear();
}
return result;
}
}
MDC adds labels к каждой log line, что enables better query filtering в Loki.
OpenTelemetry tracing
Distributed tracing критично для understanding end-to-end latency в multi-service pipeline. OpenTelemetry — open standard для tracing с support в всех major languages и vendors.
Setup для Flink:
{'<'}!-- pom.xml --{'>'}
{'<'}dependency{'>'}
{'<'}groupId{'>'}io.opentelemetry{'<'}/groupId{'>'}
{'<'}artifactId{'>'}opentelemetry-api{'<'}/artifactId{'>'}
{'<'}/dependency{'>'}
{'<'}dependency{'>'}
{'<'}groupId{'>'}io.opentelemetry{'<'}/groupId{'>'}
{'<'}artifactId{'>'}opentelemetry-sdk{'<'}/artifactId{'>'}
{'<'}/dependency{'>'}
{'<'}dependency{'>'}
{'<'}groupId{'>'}io.opentelemetry{'<'}/groupId{'>'}
{'<'}artifactId{'>'}opentelemetry-exporter-otlp{'<'}/artifactId{'>'}
{'<'}/dependency{'>'}
Initialize в job:
public class TraceInit {
public static OpenTelemetry initialize() {
OtlpGrpcSpanExporter exporter = OtlpGrpcSpanExporter.builder()
.setEndpoint("http://otel-collector:4317")
.build();
BatchSpanProcessor processor = BatchSpanProcessor.builder(exporter).build();
SdkTracerProvider tracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(processor)
.setResource(Resource.builder()
.put("service.name", "flink-fraud-detector")
.put("flink.job", System.getProperty("job.name", "unknown"))
.build())
.build();
OpenTelemetry otel = OpenTelemetrySdk.builder()
.setTracerProvider(tracerProvider)
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
.buildAndRegisterGlobal();
return otel;
}
}
В operator code:
public class FraudDetector extends KeyedProcessFunction{'<'}...{'>'} {
private transient Tracer tracer;
@Override
public void open(Configuration parameters) {
tracer = GlobalOpenTelemetry.getTracer("fraud-detector");
}
@Override
public void processElement(Event e, Context ctx, Collector{'<'}Alert{'>'} out) {
Span span = tracer.spanBuilder("process_event")
.setAttribute("user.id", e.userId)
.setAttribute("event.amount", e.amount)
.startSpan();
try (Scope scope = span.makeCurrent()) {
// ... business logic с child spans для slow operations
Span dbSpan = tracer.spanBuilder("lookup_user_profile").startSpan();
try {
UserProfile profile = lookupUser(e.userId);
dbSpan.setAttribute("user.country", profile.country);
} finally {
dbSpan.end();
}
// ... more processing
} catch (Exception ex) {
span.recordException(ex);
span.setStatus(StatusCode.ERROR);
throw ex;
} finally {
span.end();
}
}
}
Spans экспортируются в OpenTelemetry Collector -> Tempo/Jaeger.
Trace context propagation
Critical для distributed tracing — propagation trace context через async boundaries. В Flink — это Kafka topics, async I/O, external APIs.
Kafka context propagation:
// Producer: inject trace context в Kafka headers
public class TraceInjectingSerializer implements SerializationSchema{'<'}Event{'>'} {
private static final TextMapSetter{'<'}Map{'<'}String, byte[]{'>'}{'>'} SETTER =
(carrier, key, value) -{'>'} carrier.put(key, value.getBytes(StandardCharsets.UTF_8));
@Override
public byte[] serialize(Event e) {
Map{'<'}String, byte[]{'>'} headers = new HashMap{'<'}{'>'}();
W3CTraceContextPropagator.getInstance().inject(
Context.current(),
headers,
SETTER
);
// ... serialize event + attach headers
return bytes;
}
}
// Consumer: extract trace context from headers
public class TraceExtractingDeserializer implements KafkaRecordDeserializationSchema{'<'}Event{'>'} {
private static final TextMapGetter{'<'}Headers{'>'} GETTER = new TextMapGetter{'<'}{'>'}() {
@Override
public String get(Headers carrier, String key) {
Header h = carrier.lastHeader(key);
return h == null ? null : new String(h.value(), StandardCharsets.UTF_8);
}
@Override
public Iterable{'<'}String{'>'} keys(Headers carrier) {
return StreamSupport.stream(carrier.spliterator(), false)
.map(Header::key).collect(Collectors.toList());
}
};
@Override
public void deserialize(ConsumerRecord{'<'}byte[], byte[]{'>'} record, Collector{'<'}Event{'>'} out) {
Context extracted = W3CTraceContextPropagator.getInstance().extract(
Context.current(),
record.headers(),
GETTER
);
try (Scope scope = extracted.makeCurrent()) {
// ... deserialize, downstream operators будут видеть trace context
}
}
}
Это enables end-to-end tracing through Kafka -> Flink -> Kafka chains. В Jaeger UI вы видите full trace from upstream producer through Flink operators to downstream consumer.
Alerting strategy
Critical alerts для Flink production:
1. Job down.
up{job="flink"} == 0
2. Job restart count high.
increase(flink_jobmanager_job_numRestarts[1h]) > 3
3. Checkpoint failures.
flink_jobmanager_job_numberOfFailedCheckpoints / flink_jobmanager_job_totalNumberOfCheckpoints > 0.05
4. Backpressure persistent.
avg_over_time(flink_taskmanager_job_task_operator_backPressuredTimeMsPerSecond[15m]) > 500
5. Checkpoint duration excessive.
flink_jobmanager_job_lastCheckpointDuration > 300000 # 5 minutes
6. State size growth abnormal.
deriv(flink_jobmanager_job_lastCheckpointSize[1h]) > 1e9 # 1 GB per hour
7. Memory pressure.
flink_taskmanager_Status_JVM_Memory_Heap_Used / flink_taskmanager_Status_JVM_Memory_Heap_Max > 0.9
Каждый alert routed в PagerDuty/Slack/Opsgenie через Alertmanager. Severity levels: critical (page on-call), warning (Slack notification), info (daily digest).
Production observability playbook
Stack components:
- Prometheus — metrics, retention 15 days, federated в Mimir для long-term.
- Loki — logs, S3-backed, retention 30 days, indexed by labels.
- Tempo — traces, S3-backed, retention 7 days.
- Grafana — unified UI для metrics + logs + traces, alerting via Alertmanager.
- PrometheusOperator — manages CRD-based Prometheus rules.
Operational practices:
- Dashboards per job — каждый production job имеет dedicated dashboard.
- Alert runbooks — каждый alert linked к Confluence/Notion runbook с recovery steps.
- Weekly review — team review top jobs metrics, identify regressions.
- SLO definition — formal SLO для critical jobs (latency p99, throughput, error rate).
- Error budget tracking — alert when error budget consumption accelerates.
Observability — это not infrastructure project. Это ongoing operational discipline. Setup правильного stack — это 20% работы; remaining 80% — это ongoing tuning dashboards, refining alerts, training team. Plan for continuous investment.
Попробуй сам
-
Local stack with docker-compose. Поднимите Prometheus + Grafana + Loki в Docker. Setup Flink session cluster, configure Prometheus reporter. Verify metrics flowing.
-
Custom metric in user code. Add counter и histogram в простой Flink job. Verify видимость в Prometheus.
-
End-to-end trace через Kafka. Setup Tempo, instrument Flink job с OpenTelemetry, send events через Kafka. Verify в Grafana trace view full path.