Learning Platform
Глоссарий Troubleshooting
Урок 16.05 · 25 мин
Средний
MonitoringPrometheusGrafanaMetricsAlertsCheckpoint

Мониторинг Flink: Prometheus + Grafana

В production Flink-джоб без мониторинга — это бомба замедленного действия.

Мониторинг ресурсов в Kubernetes: kubectl top и метрики У вас может расти state до десятков гигабайт, накапливаться backpressure, отваливаться checkpoint-ы — и узнаете вы об этом, только когда джоб упадёт в три ночи. Все эти симптомы видны в метриках. Этот урок — про то, как включить Prometheus в Flink, какие метрики правда важны (а не “все 200 штук”), готовый Grafana dashboard и alert rules для production.


Flink имеет встроенную систему метрик с pluggable reporter-ами. Один из них — PrometheusReporter, который поднимает HTTP endpoint на каждом JM и TM, отдающий метрики в Prometheus формате.

В Flink 2.x reporter включается через flinkConfiguration в FlinkDeployment:

spec:
  flinkConfiguration:
    metrics.reporters: prom
    metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    metrics.reporter.prom.port: "9249"
    # Опционально: фильтр для уменьшения количества метрик
    metrics.reporter.prom.scope.variables.excludes: "tm_id;task_attempt_id"

После apply каждый JM и TM поды откроют порт 9249. Дамп метрик доступен через curl http://pod:9249.

ServiceMonitor для Prometheus Operator (если используете kube-prometheus-stack):

apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: flink-orders-pipeline
  namespace: flink-jobs
  labels:
    release: prometheus  # должно совпадать с serviceMonitorSelector в Prometheus CR
spec:
  selector:
    matchLabels:
      app: orders-pipeline
  endpoints:
    - port: metrics
      interval: 30s
      path: /metrics
  namespaceSelector:
    matchNames:
      - flink-jobs

Оператор Flink создаёт Service с лейблом app: <deployment-name> и портом metrics: 9249. ServiceMonitor по нему подцепится.

TIP

Если у вас не Prometheus Operator, а просто Prometheus с classic scrape config — используйте Kubernetes SD. Annotation на подах prometheus.io/scrape: "true", prometheus.io/port: "9249" обычно достаточно (зависит от конфигурации scrape job).


Какие метрики правда важны

Flink экспортирует 200+ метрик из коробки. Большинство из них — для глубокого дебага. Для production-мониторинга реально нужны 5-7 ключевых групп. Разберём каждую.

Критичные метрики Flink
Checkpoint healthDuration, size, fails. Главный индикатор здоровья джоба. Если checkpoint-ы долгие или падают - стейт растёт неконтролируемо.
BackpressureОтношение busy / idle для операторов. Backpressure означает, что upstream быстрее downstream - лаг растёт.
Source lagKafka consumer lag. Прямой business-метрик: насколько мы отстаём от realtime.
ThroughputRecords per second через каждый operator. Если упал - где-то задеплоилась регрессия.
State sizeРазмер чекпойнт-а в байтах. Растущий монотонно - утечка стейта (unbounded state). Останавливать раньше OOM.
Restart countЧисло рестартов джоба. > 0 за день = есть нестабильность, надо разбираться.

Это первоочередной набор. Если у вас всего один dashboard — пусть он покажет эти 6 групп.


Checkpoint duration и fails: самый важный сигнал

Чекпойнт-ы — это пульс Flink-джоба. Здоровый джоб делает их каждые 30-60 секунд за приемлемое время (обычно меньше 30 секунд). Если они начинают долго идти или фейлиться — это первый признак проблемы.

Метрики:

  • flink_jobmanager_job_lastCheckpointDuration — миллисекунды на последний checkpoint.
  • flink_jobmanager_job_lastCheckpointSize — байты последнего checkpoint.
  • flink_jobmanager_job_numFailedCheckpoints — total counter упавших chekpoint-ов.
  • flink_jobmanager_job_lastCheckpointDuration_seconds — same в seconds.

Production alert (Prometheus rule):

groups:
  - name: flink-checkpoint
    rules:
      - alert: FlinkCheckpointDurationHigh
        expr: flink_jobmanager_job_lastCheckpointDuration > 60000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Flink checkpoint duration > 60s for {{ $labels.job_name }}"
          description: "Checkpoints are taking longer than 60s - check state size and S3 throughput"

      - alert: FlinkCheckpointsFailing
        expr: rate(flink_jobmanager_job_numFailedCheckpoints[5m]) > 0
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Flink checkpoints failing for {{ $labels.job_name }}"

Высокий duration — обычно симптом одной из трёх причин:

  1. Растущий стейт. RocksDB всё больше — snapshot всё дольше.
  2. Медленный S3. Сетевые проблемы или throttling.
  3. Backpressure. При aligned checkpoint барьеры тормозят — джоб не успевает все барьеры пройти.

Failed checkpoint-ы (rate > 0) — это всегда проблема. Без работающих checkpoint-ов вы теряете гарантии recovery и в случае краха откатитесь на час назад.


Backpressure: где затык?

Backpressure — это состояние, когда оператор Flink не может писать в downstream (downstream медленнее, не успевает обрабатывать). Метрика показывает, какую долю времени оператор провёл в состоянии “ждёт, пока downstream освободится”.

  • flink_taskmanager_job_task_backPressuredTimeMsPerSecond — мс в секунду в состоянии backpressure (0-1000).
  • flink_taskmanager_job_task_busyTimeMsPerSecond — мс активной работы.
  • flink_taskmanager_job_task_idleTimeMsPerSecond — мс ожидания input-а.

Интерпретация:

  • backpressure > 500 ms/s (50% времени) на каком-то операторе — у этого оператора downstream-узкое горло. Смотрите следующий по графу.
  • busy > 900 ms/s на конечном sink — sink не успевает (например, медленный JDBC или Iceberg commits).
  • idle > 900 ms/s на source — нет данных (нормально для bursty workload, аномалия для постоянной нагрузки).

Production alert:

- alert: FlinkOperatorBackpressureHigh
  expr: |
    flink_taskmanager_job_task_backPressuredTimeMsPerSecond > 500
  for: 10m
  labels:
    severity: warning
  annotations:
    summary: "Operator {{ $labels.task_name }} backpressured > 50% for 10 min"
    description: "Downstream is slower than upstream. Check next operator in graph."

В Grafana по этой метрике делайте heatmap по task_name — сразу видно, какой оператор задыхается.


Source lag: насколько отстаём от realtime

Для Kafka source-ов Flink экспортирует event-time lag. Но более прямой метрик — лаг consumer group в Kafka (Kafka exporter kafka_consumergroup_lag). Они дополняют друг друга:

  • Flink-метрик: currentEmitEventTimeLag — разница между current processing time и event time только что эмитированной записи. Если 30 секунд — данные обрабатываются с лагом 30 сек.
  • Kafka-метрик: kafka_consumergroup_lag{group="flink-orders-pipeline"} — сколько unread offset-ов в Kafka.

Production alert:

- alert: KafkaConsumerGroupLagHigh
  expr: |
    sum by (group) (kafka_consumergroup_lag{group=~"flink-.*"}) > 1000000
  for: 15m
  labels:
    severity: warning
  annotations:
    summary: "Kafka lag for {{ $labels.group }} > 1M records"

Порог 1M записей условный — настройте под свой throughput. Если у вас 10k events/sec, то 1M = 100 секунд лага, это терпимо. Если 100 events/sec, то 1M = 10000 сек, это уже катастрофа.


State size: тревожный сигнал unbounded state

Постоянно растущий размер state — это симптом unbounded state: какие-то ключи в keyed state никогда не удаляются. В перспективе закончится disk на TM или RocksDB не сможет открыться при рестарте.

Метрика: flink_jobmanager_job_lastCheckpointSize (bytes).

Production alert на рост:

- alert: FlinkStateSizeGrowing
  expr: |
    deriv(flink_jobmanager_job_lastCheckpointSize[1h]) > 1000000
  for: 24h
  labels:
    severity: warning
  annotations:
    summary: "State size growing for {{ $labels.job_name }}"
    description: "State has grown > 1 MB/hour for 24 hours - check for unbounded state"

deriv(...[1h]) — производная за час, в bytes/sec. Если > 1MB/час = очевидный рост, что-то накапливается без TTL.

Лечение:

  • Используйте StateTtlConfig для всех value/list/map states.
  • Window state автоматически очищается через allowedLateness.
  • Для ProcessFunction явно вызывайте state.clear() когда логика говорит, что ключ больше не нужен.

Throughput regression: автотест продакшна

Throughput (records/sec) обычно стабилен в normal load. Если в одной задеплоенной версии резко упал — есть regression в коде (новая лога медленнее, кто-то добавил неоптимальный join).

Метрика:

  • flink_taskmanager_job_task_numRecordsOutPerSecond — записей в секунду из оператора.
  • flink_taskmanager_job_task_numRecordsInPerSecond — записей на вход.

Alert на drop:

- alert: FlinkThroughputDrop
  expr: |
    flink_taskmanager_job_task_numRecordsOutPerSecond
    < (flink_taskmanager_job_task_numRecordsOutPerSecond offset 1h) * 0.7
  for: 15m
  labels:
    severity: warning
  annotations:
    summary: "Throughput dropped > 30% in last hour"

Сравниваем текущее значение с тем, что было час назад. Если упало на 30%+ за час и держится 15 минут — что-то изменилось (deploy, schema change, downstream issue).


Готовый Grafana dashboard

Сообщество поддерживает популярный dashboard для Flink: ID 14911 на grafana.com. Полное название — “Apache Flink Dashboard” by Flink community.

Импорт:

# В Grafana UI
# Dashboards -> Import -> Enter 14911 -> Load
# Выбрать datasource Prometheus -> Import

Что показывает dashboard 14911:

  • Overview: количество джобов, состояния (running / failed / restarting).
  • Per-job panels:
    • JM heap usage.
    • TM heap usage per slot.
    • Checkpoint duration / size / fails timeline.
    • Records in/out per second.
    • Watermark progression.
    • Backpressure per operator.
  • Сетевые I/O, RocksDB block cache hit rate.

Для production первого запуска — этого dashboard достаточно. Для специфических джобов (например, CDC с MySQL) добавляйте кастомные panels с метриками connector-а.

NOTE

Dashboard 14911 требует labels в правильном виде в метриках. Если ваш Prometheus scraping добавляет лишние labels (например, pod, instance), может потребоваться скорректировать переменные dashboard-а. Альтернатива — fork dashboard и адаптировать под вашу метку.


Alert rules: production-ready набор

Минимальный набор alert rules для production Flink. Все они в формате PrometheusRule CRD (если используете Prometheus Operator):

apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
  name: flink-production
  namespace: flink-jobs
spec:
  groups:
    - name: flink-job-health
      rules:
        - alert: FlinkJobNotRunning
          expr: flink_jobmanager_job_uptime == 0
          for: 1m
          labels:
            severity: critical

        - alert: FlinkJobRestarting
          expr: rate(flink_jobmanager_job_numRestarts[10m]) > 0
          for: 1m
          labels:
            severity: warning

        - alert: FlinkCheckpointDurationHigh
          expr: flink_jobmanager_job_lastCheckpointDuration > 60000
          for: 5m
          labels:
            severity: warning

        - alert: FlinkCheckpointsFailing
          expr: rate(flink_jobmanager_job_numFailedCheckpoints[5m]) > 0
          for: 2m
          labels:
            severity: critical

        - alert: FlinkOperatorBackpressureHigh
          expr: flink_taskmanager_job_task_backPressuredTimeMsPerSecond > 500
          for: 10m
          labels:
            severity: warning

        - alert: FlinkStateSizeGrowing
          expr: deriv(flink_jobmanager_job_lastCheckpointSize[1h]) > 1000000
          for: 24h
          labels:
            severity: warning

        - alert: KafkaConsumerGroupLagHigh
          expr: sum by (group) (kafka_consumergroup_lag{group=~"flink-.*"}) > 1000000
          for: 15m
          labels:
            severity: warning

        - alert: FlinkTaskManagerOOM
          expr: |
            kube_pod_container_status_terminated_reason{reason="OOMKilled",pod=~".*taskmanager.*"} == 1
          for: 0m
          labels:
            severity: critical

Severity-deltas: critical = ping pagerduty, warning = create ticket. Threshold-ы — подгоняйте под свой workload.


Что НЕ мониторить (или мониторить редко)

Несколько метрик кажутся важными, но в реальности шумные или бесполезные:

  • JVM GC time — Flink использует G1GC, обычно ок. Смотреть только если есть симптомы (frequent GC pauses).
  • Network bytes — interesting for capacity planning, не для алертинга.
  • RocksDB metrics (compaction, cache) — глубокий дебаг, не daily monitoring.
  • Watermark idle time — только для джобов с watermark-stuck issue (есть отдельные сигналы).

Принцип: мониторьте симптомы, которые напрямую означают проблему для бизнеса (отставание, потеря данных, downtime). Внутренности — на отдельные dashboard-ы для дебага.


Ключевые выводы

  1. Включение Prometheus: один блок flinkConfiguration с metrics.reporters: prom и port 9249. Service от оператора + ServiceMonitor.

  2. 6 ключевых групп метрик: checkpoint duration / fails, backpressure, source lag, throughput, state size, restart count.

  3. Checkpoint duration > 60s — первый сигнал проблемы. Failed checkpoints — критичный.

  4. Backpressure показывает, какой оператор downstream-затыкается. Heatmap по task_name в Grafana.

  5. State size монотонный рост — unbounded state. Используйте StateTtlConfig.

  6. Готовый dashboard ID 14911 от Flink community — стартует за минуты.

  7. PrometheusRule с 8 alerts покрывает 90% production-нужд: job uptime, restarts, checkpoint, backpressure, state, Kafka lag, OOM.

  8. Не мониторьте всё: glubokie JVM/RocksDB метрики — только для дебага.

Проверка знанийKnowledge check
У вас Flink-джоб обрабатывает 10k events/sec. Резко за час: throughput упал до 6k events/sec, checkpoint duration вырос с 5s до 90s, Kafka lag начал расти, backpressure на operator 'enrichment' стал 800 ms/s. Какая наиболее вероятная причина и план диагностики?
ОтветAnswer
Наиболее вероятная причина: enrichment operator стал медленным - бутылочное горло переместилось туда. Симптомы складываются в единую картину: 1. Throughput упал на 40% (10k -> 6k) - downstream меньше принимает. 2. Backpressure 800 ms/s на enrichment - этот оператор не успевает обработать input. 3. Checkpoint duration вырос с 5s до 90s - aligned checkpoint барьеры стопорятся в backpressured пайплайне (барьер ждёт, пока буферы flush). Это классический симптом backpressure + aligned checkpoints. 4. Kafka lag растёт - source не успевает потреблять (потому что backpressure упирается до источника). Что делать пошагово: 1. Срочно: проверить, не было ли deploy в последний час. Если был - подозревать regression в enrichment коде (например, более медленный lookup, синхронная блокировка). git log --since='1 hour ago' для repo джоба. 2. Параллельно: переключиться на unaligned checkpoints (execution.checkpointing.unaligned: true), чтобы checkpoint duration не зависел от backpressure. Это быстрый workaround на проблему чекпойнтов. 3. Дебаг enrichment: - Если enrichment делает sync I/O (например, lookup в REST API) - переписать на async I/O (Flink AsyncFunction). - Если уже async - проверить latency external API (может, downstream service деградировал). - kubectl exec в TM-под, посмотреть thread dump - jstack PID. Что блокирует worker threads. 4. Долго: масштабировать параллелизм enrichment оператора. Если сейчас parallelism 4, поднять до 8 - больше parallelism на этом этапе. 5. Если проблема не в коде - проверить, не происходит ли scale-down ноды Kubernetes (TM-под потерял CPU из-за noisy neighbor). Что НЕ делать сразу: рестартовать джоб - симптомы могут пропасть на минуту и вернуться. Сначала диагностика.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Метрика flink_jobmanager_job_lastCheckpointDuration выросла с 5s до 120s за последний час. Метрика flink_jobmanager_job_lastCheckpointSize не изменилась (~500 MB). Backpressure на одном operator > 700 ms/s. Что наиболее вероятно?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5