Финальный урок про network stack — про две практические вещи: как детектить backpressure и как его уменьшать. Backpressure в современном Flink — это не red light в UI, а конкретные числовые метрики, которые ты должен уметь читать. А buffer debloating — это первая защита для систем, где главное ограничение не throughput, а длительность checkpoint’ов.
Web UI Flink: backpressure, task graph, метрики Kafka Performance Tuning: throughput и latencyКак UI узнаёт о backpressure: history
Изначально (до 1.13) backpressure в Web UI определялся sampling thread stack traces: каждые ~10 ms JM брал у TM stack trace всех task threads. Если в стеке встречался метод LocalBufferPool.requestBuffer() (то есть задача ждёт свободный буфер для отправки), считалось, что задача в backpressure. Раз в N секунд UI агрегировал результаты и красил оператор в OK, LOW, HIGH.
Подход работал, но имел недостатки:
- Sampling-noise: на коротких эпизодах backpressure могло не попасть в выборку.
- Overhead на JM при большом числе TM и parallelism.
- Не давал численного значения — только три категории.
В 1.13 (FLIP-179) подход переписали на continuous task metrics: значения busy/idle/backpressured считаются прямо в RecordWriter и InputProcessor и экспортируются как обычные счётчики.
Busy/idle/backpressured: треугольник состояний
В каждый момент времени любая задача в одном из трёх состояний:
- busy — оператор обрабатывает запись (выполняется userCode внутри
processElement()). - idle — оператор ничего не делает, ждёт входящих данных (InputGate.pollNext() блокируется).
- backpressured — оператор хочет отправить запись, но не может: нет credits на исходящем канале или нет свободного буфера в пуле.
Сумма всегда busy + idle + backpressured = 1 (нормировано на единицу времени).
В метриках это:
flink_taskmanager_job_task_busyTimeMsPerSecond
flink_taskmanager_job_task_idleTimeMsPerSecond
flink_taskmanager_job_task_backPressuredTimeMsPerSecond
И булева метрика flink_taskmanager_job_task_isBackPressured — текущее состояние.
busy + idle + backpressured = 1. Здоровая задача: ~80% busy, ~20% idle, ~0% backpressured. Перегруженная: ~30% busy, ~70% backpressured.
Алгоритм диагностики backpressure: пройди операторы снизу вверх (от sink к source), найди первый, у которого busy ≈ 1.0 и backpressured ≈ 0. Это твой bottleneck. Всё, что выше него по графу — будет показывать backpressure (они не могут отдать ему данные так быстро, как хотят).
Где это считается в коде
busyTimeMsPerSecond инкрементируется в StreamTask.processInput() каждый раз, когда задача активно вычисляет. idleTimeMsPerSecond — когда мейл-бокс пуст и задача спит в MailboxProcessor.runMailboxLoop(). backPressuredTimeMsPerSecond — когда RecordWriter.emit() не может писать из-за отсутствия credits и крутится в ожидании.
Точные классы:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java— общий lifecycle.flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java— главный цикл, метрикиbusy/idle.flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java— metrikbackPressured.
Это continuous метрики (нет sampling-окна), их можно агрегировать в Prometheus с любой гранулярностью. В Grafana дашборде стандартный паттерн:
rate(flink_taskmanager_job_task_busyTimeMsPerSecond[1m])
/ 1000
даёт долю времени в busy от 0 до 1.
Как ослабить backpressure: общие подходы
Если ты увидел, что оператор X — bottleneck, есть пять основных рычагов:
- Увеличить parallelism X (если можно). Чем больше параллельных subtasks, тем меньше нагрузка на каждый.
- Оптимизировать userCode. Часто backpressure — это просто медленный сериализатор (Kryo вместо POJO) или synchronous внешний вызов (HTTP в
processElement— анти-паттерн). - Увеличить slot ресурсы (CPU/memory). Не всегда помогает, но иногда узкое место — это GC.
- Переразместить операторы, чтобы тяжёлый shuffle стал co-located (LocalInputChannel вместо Remote). Через
slotSharingGroupилиdisableChaining. - Включить buffer debloating — это и про latency, и про checkpoint alignment.
Пункт 5 — самый интересный, о нём весь оставшийся урок.
Buffer debloating: проблема и решение
Сценарий проблемы. У тебя нагруженный пайплайн, downstream временно не справляется (например, sink в S3 timeout). Между оператором A и B — RemoteInputChannel с 2 exclusive + 8 floating буферов на 32 KiB. Накопление: 10 × 32 KiB = 320 KiB. Если B обрабатывает 1 MB/s — это 320 мс «висящих» данных в очереди.
Когда coordinator стартует checkpoint, он шлёт barriers в source. Barriers идут через все каналы. На consumer side B видит barrier на одном входном канале и ждёт его на всех остальных (это alignment, без него exactly-once невозможен). Пока он ждёт — он должен разобрать все буферы, накопленные до barrier’а. Это те самые 320 ms задержки на каждом канале.
В сложном пайплайне с 10 операторами в цепочке alignment time накапливается: каждый next operator должен ждать predecessor’а. Реальные числа: на нагруженной job alignment может быть 30 секунд при checkpoint interval 60 секунд. Половина checkpoint’а — это просто alignment.
FLIP-183 (FLINK-23451, реализовано в 1.14) ввёл adaptive buffer size:
- Producer не отправляет полный 32 KiB буфер, а отправляет ровно столько, сколько consumer способен прожевать за заданное время.
- Время задаётся параметром
taskmanager.network.memory.buffer-debloat.target(по умолчанию 1 секунда). - Размер пересчитывается раз в окне
buffer-debloat.period(по умолчанию 200 ms) на основании измеренного throughput.
Идея: «у меня в очереди должно быть не более N байт, где N = consumer.throughput × 1 секунда». В нагруженный момент N маленькое, очередь маленькая, alignment time маленький. В свободный момент N большое — throughput не пострадает.
Производитель адаптирует filling-уровень буфера под throughput. При backpressure буферы 'тоньше', alignment time меньше. Когда нагрузка спадает — буферы заполняются полностью, throughput возвращается.
В коде это реализовано не через изменение размера MemorySegment (он остаётся 32 KiB), а через partial filling: producer закрывает буфер раньше, не дожидаясь его полного заполнения. Сам сегмент по-прежнему 32 KiB direct memory, но в нём могут жить только 8 KiB полезных данных, остальное — wasted.
Конфигурация и тюнинг
# Включить debloating (включён по умолчанию с 1.14, но проверь)
taskmanager.network.memory.buffer-debloat.enabled: true
# Целевое время, в течение которого consumer должен прожевать все буферы
# в очереди (включая те, что в полёте через TCP)
taskmanager.network.memory.buffer-debloat.target: 1s
# Период пересчёта размера буфера
taskmanager.network.memory.buffer-debloat.period: 200ms
# Сколько последних замеров усреднять (стабильность vs реактивность)
taskmanager.network.memory.buffer-debloat.samples: 20
# Минимальный размер «эффективного» буфера (нижний лимит)
taskmanager.network.memory.buffer-debloat.threshold-percentages: 25
threshold-percentages: 25 означает: размер пересчитывается, только если новое значение отличается от текущего больше чем на 25%. Защита от микро-флуктуаций.
Что мониторить:
flink_taskmanager_job_task_estimatedTimeToConsumeBuffersMs— текущая оценка времени до прожёвывания очереди.flink_taskmanager_job_task_debloatedBufferSize— текущий эффективный размер буфера.
Buffer debloating даёт максимальный профит на пайплайнах с большим числом операторов в цепочке (8+) и большим parallelism (50+). На маленьких graphs alignment time и так минимален, debloating добавляет небольшой overhead и снижает throughput на 1-3%. Решение: если у тебя checkpoints стабильно меньше 1 сек — debloating можно выключить. Если стабильно больше — оставлять.
Альтернатива: unaligned checkpoints
Если debloating не справляется (a alignment всё равно гигантский), есть более радикальное решение — unaligned checkpoints (FLIP-76, 1.11+). Barrier «перепрыгивает» через накопленные in-flight данные и сразу включает snapshot. In-flight буферы сохраняются вместе с состоянием. На restore они воспроизводятся.
Параметр:
execution.checkpointing.unaligned: true
execution.checkpointing.aligned-checkpoint-timeout: 30s
aligned-checkpoint-timeout: сначала пытаемся aligned, если за 30 сек не уложились — автоматически falling back to unaligned. Это разумный compromise.
Подробно про unaligned — в модуле 06 (Checkpoint internals).
Где смотреть в Web UI
Современный Flink UI (1.13+) показывает на каждом операторе три полосы:
- Busy (зелёная) — busy ratio.
- Idle (серая) — idle ratio.
- Backpressure (красная) — backpressured ratio.
Кликаешь на оператор -> видишь точные значения и историю за последние минуты. На вкладке BackPressure — топ subtasks с наибольшим backpressure (полезно если в parallelism один subtask висит, а остальные ок).
В REST API: GET /jobs/<jobId>/vertices/<vertexId>/backpressure — возвращает JSON с распределением по subtask’ам.
Чтение source
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDebloater.java— алгоритм адаптации размера. МетодrecalculateBufferSize()— основная математика.flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java— экспоненциальное усреднение throughput.flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java— куда подключается debloater, как читаются метрики busy/idle.flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferDebloaterTest.java— unit-тесты, показывающие как ведёт себя алгоритм при разных throughput.
Чек-лист
- В современном Flink backpressure — это continuous metrics
busy/idle/backpressuredTimeMsPerSecond, считаются в RecordWriter и StreamTask. - Сумма
busy + idle + backpressured = 1.0. Алгоритм диагностики: ищи снизу вверх первый оператор сbusy ≈ 1.0, backpressured ≈ 0— это bottleneck. - Sampling-based подход (до 1.13) заменён continuous metrics в FLIP-179.
- Buffer debloating (FLINK-23451, 1.14+) — partial filling буферов под backpressure. Target по умолчанию 1 сек.
- Размер сегмента не меняется (32 KiB direct), меняется эффективное заполнение.
- Параметры:
buffer-debloat.target,buffer-debloat.period,buffer-debloat.threshold-percentages. - Альтернатива — unaligned checkpoints (
execution.checkpointing.unaligned: true), которые пропускают alignment целиком.