Мониторинг Flink: Prometheus + Grafana
В production Flink-джоб без мониторинга — это бомба замедленного действия.
Мониторинг ресурсов в Kubernetes: kubectl top и метрики У вас может расти state до десятков гигабайт, накапливаться backpressure, отваливаться checkpoint-ы — и узнаете вы об этом, только когда джоб упадёт в три ночи. Все эти симптомы видны в метриках. Этот урок — про то, как включить Prometheus в Flink, какие метрики правда важны (а не “все 200 штук”), готовый Grafana dashboard и alert rules для production.
Включение Prometheus reporter в Flink
Flink имеет встроенную систему метрик с pluggable reporter-ами. Один из них — PrometheusReporter, который поднимает HTTP endpoint на каждом JM и TM, отдающий метрики в Prometheus формате.
В Flink 2.x reporter включается через flinkConfiguration в FlinkDeployment:
spec:
flinkConfiguration:
metrics.reporters: prom
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: "9249"
# Опционально: фильтр для уменьшения количества метрик
metrics.reporter.prom.scope.variables.excludes: "tm_id;task_attempt_id"
После apply каждый JM и TM поды откроют порт 9249. Дамп метрик доступен через curl http://pod:9249.
ServiceMonitor для Prometheus Operator (если используете kube-prometheus-stack):
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: flink-orders-pipeline
namespace: flink-jobs
labels:
release: prometheus # должно совпадать с serviceMonitorSelector в Prometheus CR
spec:
selector:
matchLabels:
app: orders-pipeline
endpoints:
- port: metrics
interval: 30s
path: /metrics
namespaceSelector:
matchNames:
- flink-jobs
Оператор Flink создаёт Service с лейблом app: <deployment-name> и портом metrics: 9249. ServiceMonitor по нему подцепится.
Если у вас не Prometheus Operator, а просто Prometheus с classic scrape config — используйте Kubernetes SD. Annotation на подах prometheus.io/scrape: "true", prometheus.io/port: "9249" обычно достаточно (зависит от конфигурации scrape job).
Какие метрики правда важны
Flink экспортирует 200+ метрик из коробки. Большинство из них — для глубокого дебага. Для production-мониторинга реально нужны 5-7 ключевых групп. Разберём каждую.
Это первоочередной набор. Если у вас всего один dashboard — пусть он покажет эти 6 групп.
Checkpoint duration и fails: самый важный сигнал
Чекпойнт-ы — это пульс Flink-джоба. Здоровый джоб делает их каждые 30-60 секунд за приемлемое время (обычно меньше 30 секунд). Если они начинают долго идти или фейлиться — это первый признак проблемы.
Метрики:
flink_jobmanager_job_lastCheckpointDuration— миллисекунды на последний checkpoint.flink_jobmanager_job_lastCheckpointSize— байты последнего checkpoint.flink_jobmanager_job_numFailedCheckpoints— total counter упавших chekpoint-ов.flink_jobmanager_job_lastCheckpointDuration_seconds— same в seconds.
Production alert (Prometheus rule):
groups:
- name: flink-checkpoint
rules:
- alert: FlinkCheckpointDurationHigh
expr: flink_jobmanager_job_lastCheckpointDuration > 60000
for: 5m
labels:
severity: warning
annotations:
summary: "Flink checkpoint duration > 60s for {{ $labels.job_name }}"
description: "Checkpoints are taking longer than 60s - check state size and S3 throughput"
- alert: FlinkCheckpointsFailing
expr: rate(flink_jobmanager_job_numFailedCheckpoints[5m]) > 0
for: 2m
labels:
severity: critical
annotations:
summary: "Flink checkpoints failing for {{ $labels.job_name }}"
Высокий duration — обычно симптом одной из трёх причин:
- Растущий стейт. RocksDB всё больше — snapshot всё дольше.
- Медленный S3. Сетевые проблемы или throttling.
- Backpressure. При aligned checkpoint барьеры тормозят — джоб не успевает все барьеры пройти.
Failed checkpoint-ы (rate > 0) — это всегда проблема. Без работающих checkpoint-ов вы теряете гарантии recovery и в случае краха откатитесь на час назад.
Backpressure: где затык?
Backpressure — это состояние, когда оператор Flink не может писать в downstream (downstream медленнее, не успевает обрабатывать). Метрика показывает, какую долю времени оператор провёл в состоянии “ждёт, пока downstream освободится”.
flink_taskmanager_job_task_backPressuredTimeMsPerSecond— мс в секунду в состоянии backpressure (0-1000).flink_taskmanager_job_task_busyTimeMsPerSecond— мс активной работы.flink_taskmanager_job_task_idleTimeMsPerSecond— мс ожидания input-а.
Интерпретация:
- backpressure > 500 ms/s (50% времени) на каком-то операторе — у этого оператора downstream-узкое горло. Смотрите следующий по графу.
- busy > 900 ms/s на конечном sink — sink не успевает (например, медленный JDBC или Iceberg commits).
- idle > 900 ms/s на source — нет данных (нормально для bursty workload, аномалия для постоянной нагрузки).
Production alert:
- alert: FlinkOperatorBackpressureHigh
expr: |
flink_taskmanager_job_task_backPressuredTimeMsPerSecond > 500
for: 10m
labels:
severity: warning
annotations:
summary: "Operator {{ $labels.task_name }} backpressured > 50% for 10 min"
description: "Downstream is slower than upstream. Check next operator in graph."
В Grafana по этой метрике делайте heatmap по task_name — сразу видно, какой оператор задыхается.
Source lag: насколько отстаём от realtime
Для Kafka source-ов Flink экспортирует event-time lag. Но более прямой метрик — лаг consumer group в Kafka (Kafka exporter kafka_consumergroup_lag). Они дополняют друг друга:
- Flink-метрик:
currentEmitEventTimeLag— разница между current processing time и event time только что эмитированной записи. Если 30 секунд — данные обрабатываются с лагом 30 сек. - Kafka-метрик:
kafka_consumergroup_lag{group="flink-orders-pipeline"}— сколько unread offset-ов в Kafka.
Production alert:
- alert: KafkaConsumerGroupLagHigh
expr: |
sum by (group) (kafka_consumergroup_lag{group=~"flink-.*"}) > 1000000
for: 15m
labels:
severity: warning
annotations:
summary: "Kafka lag for {{ $labels.group }} > 1M records"
Порог 1M записей условный — настройте под свой throughput. Если у вас 10k events/sec, то 1M = 100 секунд лага, это терпимо. Если 100 events/sec, то 1M = 10000 сек, это уже катастрофа.
State size: тревожный сигнал unbounded state
Постоянно растущий размер state — это симптом unbounded state: какие-то ключи в keyed state никогда не удаляются. В перспективе закончится disk на TM или RocksDB не сможет открыться при рестарте.
Метрика: flink_jobmanager_job_lastCheckpointSize (bytes).
Production alert на рост:
- alert: FlinkStateSizeGrowing
expr: |
deriv(flink_jobmanager_job_lastCheckpointSize[1h]) > 1000000
for: 24h
labels:
severity: warning
annotations:
summary: "State size growing for {{ $labels.job_name }}"
description: "State has grown > 1 MB/hour for 24 hours - check for unbounded state"
deriv(...[1h]) — производная за час, в bytes/sec. Если > 1MB/час = очевидный рост, что-то накапливается без TTL.
Лечение:
- Используйте
StateTtlConfigдля всех value/list/map states. - Window state автоматически очищается через
allowedLateness. - Для ProcessFunction явно вызывайте
state.clear()когда логика говорит, что ключ больше не нужен.
Throughput regression: автотест продакшна
Throughput (records/sec) обычно стабилен в normal load. Если в одной задеплоенной версии резко упал — есть regression в коде (новая лога медленнее, кто-то добавил неоптимальный join).
Метрика:
flink_taskmanager_job_task_numRecordsOutPerSecond— записей в секунду из оператора.flink_taskmanager_job_task_numRecordsInPerSecond— записей на вход.
Alert на drop:
- alert: FlinkThroughputDrop
expr: |
flink_taskmanager_job_task_numRecordsOutPerSecond
< (flink_taskmanager_job_task_numRecordsOutPerSecond offset 1h) * 0.7
for: 15m
labels:
severity: warning
annotations:
summary: "Throughput dropped > 30% in last hour"
Сравниваем текущее значение с тем, что было час назад. Если упало на 30%+ за час и держится 15 минут — что-то изменилось (deploy, schema change, downstream issue).
Готовый Grafana dashboard
Сообщество поддерживает популярный dashboard для Flink: ID 14911 на grafana.com. Полное название — “Apache Flink Dashboard” by Flink community.
Импорт:
# В Grafana UI
# Dashboards -> Import -> Enter 14911 -> Load
# Выбрать datasource Prometheus -> Import
Что показывает dashboard 14911:
- Overview: количество джобов, состояния (running / failed / restarting).
- Per-job panels:
- JM heap usage.
- TM heap usage per slot.
- Checkpoint duration / size / fails timeline.
- Records in/out per second.
- Watermark progression.
- Backpressure per operator.
- Сетевые I/O, RocksDB block cache hit rate.
Для production первого запуска — этого dashboard достаточно. Для специфических джобов (например, CDC с MySQL) добавляйте кастомные panels с метриками connector-а.
Dashboard 14911 требует labels в правильном виде в метриках. Если ваш Prometheus scraping добавляет лишние labels (например, pod, instance), может потребоваться скорректировать переменные dashboard-а. Альтернатива — fork dashboard и адаптировать под вашу метку.
Alert rules: production-ready набор
Минимальный набор alert rules для production Flink. Все они в формате PrometheusRule CRD (если используете Prometheus Operator):
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: flink-production
namespace: flink-jobs
spec:
groups:
- name: flink-job-health
rules:
- alert: FlinkJobNotRunning
expr: flink_jobmanager_job_uptime == 0
for: 1m
labels:
severity: critical
- alert: FlinkJobRestarting
expr: rate(flink_jobmanager_job_numRestarts[10m]) > 0
for: 1m
labels:
severity: warning
- alert: FlinkCheckpointDurationHigh
expr: flink_jobmanager_job_lastCheckpointDuration > 60000
for: 5m
labels:
severity: warning
- alert: FlinkCheckpointsFailing
expr: rate(flink_jobmanager_job_numFailedCheckpoints[5m]) > 0
for: 2m
labels:
severity: critical
- alert: FlinkOperatorBackpressureHigh
expr: flink_taskmanager_job_task_backPressuredTimeMsPerSecond > 500
for: 10m
labels:
severity: warning
- alert: FlinkStateSizeGrowing
expr: deriv(flink_jobmanager_job_lastCheckpointSize[1h]) > 1000000
for: 24h
labels:
severity: warning
- alert: KafkaConsumerGroupLagHigh
expr: sum by (group) (kafka_consumergroup_lag{group=~"flink-.*"}) > 1000000
for: 15m
labels:
severity: warning
- alert: FlinkTaskManagerOOM
expr: |
kube_pod_container_status_terminated_reason{reason="OOMKilled",pod=~".*taskmanager.*"} == 1
for: 0m
labels:
severity: critical
Severity-deltas: critical = ping pagerduty, warning = create ticket. Threshold-ы — подгоняйте под свой workload.
Что НЕ мониторить (или мониторить редко)
Несколько метрик кажутся важными, но в реальности шумные или бесполезные:
- JVM GC time — Flink использует G1GC, обычно ок. Смотреть только если есть симптомы (frequent GC pauses).
- Network bytes — interesting for capacity planning, не для алертинга.
- RocksDB metrics (compaction, cache) — глубокий дебаг, не daily monitoring.
- Watermark idle time — только для джобов с watermark-stuck issue (есть отдельные сигналы).
Принцип: мониторьте симптомы, которые напрямую означают проблему для бизнеса (отставание, потеря данных, downtime). Внутренности — на отдельные dashboard-ы для дебага.
Ключевые выводы
-
Включение Prometheus: один блок
flinkConfigurationсmetrics.reporters: promи port 9249. Service от оператора + ServiceMonitor. -
6 ключевых групп метрик: checkpoint duration / fails, backpressure, source lag, throughput, state size, restart count.
-
Checkpoint duration > 60s — первый сигнал проблемы. Failed checkpoints — критичный.
-
Backpressure показывает, какой оператор downstream-затыкается. Heatmap по task_name в Grafana.
-
State size монотонный рост — unbounded state. Используйте StateTtlConfig.
-
Готовый dashboard ID 14911 от Flink community — стартует за минуты.
-
PrometheusRule с 8 alerts покрывает 90% production-нужд: job uptime, restarts, checkpoint, backpressure, state, Kafka lag, OOM.
-
Не мониторьте всё: glubokie JVM/RocksDB метрики — только для дебага.