Web UI: task graph, backpressure, metrics, checkpoints
Flink Web UI — это не игрушка, а инструмент диагностики. Каждый production-инженер открывает его несколько раз в день: проверить job graph, посмотреть, не накопился ли backpressure, разобрать failed checkpoints. Этот урок — практический tour по разделам, которые вы будете использовать чаще всего.
Скриншотов в этом уроке нет (UI меняется между версиями) — есть описание словами того, что вы видите. Откройте Web UI вашего локального кластера и идите параллельно с уроком.
Главная навигация
После открытия http://localhost:8081 слева — главное меню:
- Overview — состояние кластера (TaskManager’ы, slots, job slots).
- Jobs
- Running Jobs — то, что сейчас работает.
- Completed Jobs — то, что завершилось (FINISHED, FAILED, CANCELED).
- Task Managers — список TM’ов в кластере.
- Job Manager — состояние JM, конфигурация, логи.
- Submit New Job — UI для submit JAR (Standalone/Session mode).
В правом верхнем углу — поиск, settings. В нижнем — Flink version.
Overview: пульс кластера
Overview — первый экран. Здесь вы видите:
- Available Task Slots — сколько свободных slots. Если 0 — кластер на пределе, новый job не пометится.
- Running Jobs / Finished Jobs / Canceled Jobs / Failed Jobs — счётчики.
- Total Task Slots — общее количество (например, 4 для нашего docker compose).
- Task Managers — количество подключённых TM’ов.
Если Available Task Slots = 0 и Running Jobs = 1 — ваш job занял всё. Это нормально для production single-app кластеров. Если несколько jobs в одном Session mode кластере конкурируют — оптимизируйте parallelism или добавьте TM’ов.
Observability в Kubernetes: метрики, логи, трейсыJob Graph: топология вашего pipeline
Кликаете на running job -> попадаете на job page. Главный экран — Plan / Job Graph.
Это визуализация DAG операторов. Вершины — операторы (или operator chains), стрелки между ними — потоки данных.
Что вы видите на каждой вершине:
- Имя оператора — то, что вы дали через
.name("...")в коде. Если не дали — Flink сгенерирует (“Source: KafkaSource”, “Map”, “Filter”). - Parallelism — сколько subtasks (например, “(4/4)” — 4 запущенных из 4 ожидаемых).
- Цвет — состояние:
- Зелёный — RUNNING.
- Жёлтый — RUNNING с backpressure / частично deployed.
- Серый — CREATED / SCHEDULED.
- Красный — FAILED.
Что вы видите на стрелках:
- Partitioning type: FORWARD, HASH, REBALANCE, BROADCAST, RESCALE, SHUFFLE, CUSTOM.
- Сколько records передано (если activated metrics).
- Backpressure indicator (если включён детальный мониторинг).
ВСЕГДА давайте операторам осмысленные имена через .name("Filter Suspicious Transactions"). В Web UI это сильно облегчает чтение графа. Без явного имени Flink покажет “Filter” — и непонятно, что именно фильтруется.
Что искать в Job Graph
- Backpressure: если оператор жёлтый, или вы видите HIGH backpressure indicator — это узкое место. Кликаете на оператор, переходите в BackPressure tab — там показано, какой % времени subtask проводит в “backpressured” состоянии.
- Failed subtasks: красный оператор — кликаете, переходите в SubTasks tab — видите, какие subtask упали, и есть exception в logs.
- Skew: один subtask обработал 80% records, остальные по 5% — это hot key. Решение: лучшая стратегия keyBy.
SubTasks tab: per-subtask метрики
Кликаете на оператор в графе. Открываются sub-tabs: Overview, SubTasks, TaskManagers, Watermarks, Accumulators, BackPressure, Metrics, FlameGraph, Configuration.
SubTasks tab — таблица со строкой на каждый subtask:
| Столбец | Что значит |
|---|---|
| ID | Индекс subtask (0, 1, 2, …) |
| Bytes Received | Сколько байт получил от upstream |
| Records Received | Сколько records получил |
| Bytes Sent | Сколько отправил downstream |
| Records Sent | Сколько отправил |
| Status | RUNNING / FAILED / FINISHED |
| TaskManager | На каком TM запущен |
| Start Time / Duration | Когда стартовал и сколько работает |
Что искать:
- Skew между subtasks: если subtask 0 — 1 M records, subtask 1 — 1.1 M, subtask 2 — 10 M — это hot key. Subtask 2 — bottleneck.
- FAILED subtasks: индикатор проблемы. Click на TaskManager в той же строке -> переходите к logs того TM.
BackPressure tab: главный диагностический инструмент
Backpressure — главное явление, которое отличает streaming от batch. Когда down-stream не успевает обрабатывать, up-stream замедляется, чтобы не переполнять buffers.
В Web UI на BackPressure tab вы видите для каждого subtask % времени в backpressured state:
- OK (зелёный, 0-10%): subtask работает свободно, не ждёт на отправке.
- LOW (жёлтый, 10-50%): иногда ждёт, но не критично.
- HIGH (красный, 50-100%): большую часть времени subtask ждёт, когда down-stream освободит buffer.
Source: HIGH
Source: HIGH backpressure. Означает, что Source производит данные, но Map не успевает их принять. Buffer between Source and Map переполнен. Source замедляется.Map: HIGH
Map: HIGH backpressure. Означает, что Map не может протолкнуть данные в Aggregate. Bullet'ы buffer между ними переполнены.Aggregate: HIGH
Aggregate: HIGH backpressure. Означает, что Aggregate не успевает отправить в Sink. ПЛЮС — Aggregate сам медленный (поэтому держит входной buffer полным).Sink: OK
Sink: OK. Означает, что Sink работает свободно. Но! Если Sink OK, а up-stream HIGH — bottleneck НЕ В SINK. Bottleneck в Aggregate (он не успевает обработать).Правило диагностики: backpressure распространяется вверх от bottleneck. Bottleneck — это первый оператор сверху-вниз, у которого backpressure НИЖЕ, чем у предыдущего. В нашей картинке: Source/Map/Aggregate все HIGH, Sink — OK. Значит, bottleneck — Aggregate (он не успевает обработать, поэтому держит свой input buffer полным, что вызывает backpressure у upstream).
Решения для backpressure на Aggregate:
- Увеличить parallelism Aggregate:
.setParallelism(8)вместо 4. Распределяет нагрузку. - Профайлить с FlameGraph tab: где Aggregate тратит время? GC? Сериализация? Внешний вызов?
- Уменьшить state operations: возможно, Aggregate делает много state.get/put — оптимизировать структуру.
- Заменить sync на async если есть внешние вызовы (модуль 09).
Backpressure — нормальная защитная мера, не bug сам по себе. Проблема, когда HIGH backpressure постоянный (не временный). Если только при пиках — это OK, кластер защищает себя от перегрузки.
Checkpoints tab: критично для production
Кликаете на job -> Checkpoints. Здесь история всех checkpoint’ов.
Overview:
- Latest Acknowledged: ID и время последнего успешного checkpoint. Должен быть recent (за последние minute-two).
- Failed Checkpoints: счётчик. Должен быть низкий или 0.
- In Progress: текущий checkpoint, если в процессе.
History таблица — строка на каждый checkpoint:
| Столбец | Что значит |
|---|---|
| ID | Номер checkpoint (1, 2, 3, …) |
| Status | COMPLETED / FAILED / IN_PROGRESS / DISCARDED |
| Trigger Time | Когда начался |
| End-to-End Duration | Сколько длился (от trigger до acknowledge) |
| Checkpointed Data Size | Размер snapshot (МБ / ГБ) |
| Processed Data | Сколько данных обработано во время checkpoint |
Что искать:
- Failed Checkpoints растёт — критическая проблема. Без успешных checkpoint’ов восстановление невозможно.
- Длительность растёт — state растёт, IO медленнее, нужно investigate.
- Размер state очень большой (десятки ГБ) — нужно подумать о ForSt (disaggregated state) или incremental checkpoints в RocksDB.
Подробно про чекпоинты — модуль 10. Но повседневная проверка простая: открыл Checkpoints tab, увидел зелёные галочки на последних 10 chechpoint’ах, размер state стабильный — всё в порядке.
Metrics tab: real-time показатели
Кликаете на оператор -> Metrics tab. Здесь можно добавить графики для отдельных метрик:
- numRecordsIn / numRecordsOut: throughput оператора.
- numBytesIn / numBytesOut: байтовый throughput.
- currentInputWatermark: текущий watermark на входе (для event time jobs).
- numLateRecordsDropped: сколько событий отброшено как too-late.
- Кастомные: если вы добавили
metrics.counter("my_counter")в коде — здесь будет.
Для production вы экспортируете метрики в Prometheus / StatsD / Datadog (модуль 16), но во время разработки и troubleshooting Web UI достаточен.
FlameGraph tab: профайлинг на лету
Если в конфиге включено rest.flamegraph.enabled: true, можно получить flame graph для каждого subtask на лету:
- Открыть оператор -> FlameGraph tab.
- Кликнуть “Start Sampling” — Flink будет samplеть стеки треды subtask.
- Через 1-2 минуты получите flame graph: где субтаск проводит время.
Это бесценно для performance troubleshooting. Видите, что 60% времени в Kryo serialization — переходите на POJO или Avro. Видите много времени в RocksDB get — рассмотрите кеширование. Видите GC pauses — настройте heap.
TaskManagers: per-TM перспектива
Левое меню -> Task Managers. Видите всех TM’ов:
- ID, host, slots (used / total), heap usage, last heartbeat.
Кликаете на TM -> подробности:
- Metrics: heap usage, GC time, network IO, garbage collection.
- Logs: реальные логи (stdout/stderr/.log) этого TM. Здесь stack traces failed subtasks.
- Thread Dump: stack снимок всех потоков.
Когда заходить сюда: когда нужны логи. Web UI хорошо для observability, но конкретные ошибки — в логах TM.
Job Manager: логи и конфигурация
Левое меню -> Job Manager:
- Configuration: текущая конфигурация Flink (все параметры из
flink-conf.yaml+ override). - Logs: JM логи. Здесь видны события scheduling, checkpoint coordinator, leader election (в HA mode).
- Stdout: тоже полезно.
Когда заходить: для системных проблем (HA failover, slot allocation, ResourceManager).
Submit New Job: UI для submit (Session mode)
В Session mode (как наш docker compose) есть UI для submit:
- Левое меню -> Submit New Job.
- Click “Add New” -> выбираете JAR файл.
- Указываете arguments, parallelism, savepoint path (если restart с savepoint).
- Click “Submit”.
Это работает только в Session mode. В Application mode нет “submit” в UI — job уже определён при создании кластера.
Sticky: что проверять каждый день
Если вы оператор Flink-приложений, ежедневная рутина:
- Все running jobs в RUNNING: зелёные, не RESTARTING.
- Available slots больше 0: кластер не на пределе.
- Failed Jobs cumulative не растёт: если растёт — investigate.
- Checkpoints зелёные для всех jobs: open Checkpoints tab каждого, проверить последние 10.
- Backpressure OK в критичных операторах: open BackPressure tab.
Это 2-3 минуты на job. После этого можно идти заниматься делом.
Попробуй сам
С running WordCount job из урока 00.3:
- Откройте Job Graph. Нарисуйте на бумаге, что видите: операторы, parallelism каждого, partitioning стрелок.
- Откройте Checkpoints tab. Подождите 2-3 минуты — должны появиться новые checkpoints. Какой размер state? Растёт ли он?
- Откройте BackPressure tab Source: загрузите много данных через kafka-console-producer (вставьте 1000 строк сразу). Появится ли backpressure?
- Откройте Task Manager logs для одного из TM’ов. Что видите? Найдите сообщения о deploy subtask и checkpoint.
- Сделайте кастомный test: пусть в код WordCount добавляется намеренное замедление (
Thread.sleep(100)в Map). Запустите снова. Какие операторы покажут backpressure? Это упражнение делает диагностику осязаемой.