Learning Platform
Глоссарий Troubleshooting
Урок 04.04 · 26 мин
Продвинутый
BackpressureBuffer debloatingWeb UIBusy ratioIdle ratioCheckpoint alignment

Финальный урок про network stack — про две практические вещи: как детектить backpressure и как его уменьшать. Backpressure в современном Flink — это не red light в UI, а конкретные числовые метрики, которые ты должен уметь читать. А buffer debloating — это первая защита для систем, где главное ограничение не throughput, а длительность checkpoint’ов.

Web UI Flink: backpressure, task graph, метрики Kafka Performance Tuning: throughput и latency

Как UI узнаёт о backpressure: history

Изначально (до 1.13) backpressure в Web UI определялся sampling thread stack traces: каждые ~10 ms JM брал у TM stack trace всех task threads. Если в стеке встречался метод LocalBufferPool.requestBuffer() (то есть задача ждёт свободный буфер для отправки), считалось, что задача в backpressure. Раз в N секунд UI агрегировал результаты и красил оператор в OK, LOW, HIGH.

Подход работал, но имел недостатки:

  • Sampling-noise: на коротких эпизодах backpressure могло не попасть в выборку.
  • Overhead на JM при большом числе TM и parallelism.
  • Не давал численного значения — только три категории.

В 1.13 (FLIP-179) подход переписали на continuous task metrics: значения busy/idle/backpressured считаются прямо в RecordWriter и InputProcessor и экспортируются как обычные счётчики.

Busy/idle/backpressured: треугольник состояний

В каждый момент времени любая задача в одном из трёх состояний:

  • busy — оператор обрабатывает запись (выполняется userCode внутри processElement()).
  • idle — оператор ничего не делает, ждёт входящих данных (InputGate.pollNext() блокируется).
  • backpressured — оператор хочет отправить запись, но не может: нет credits на исходящем канале или нет свободного буфера в пуле.

Сумма всегда busy + idle + backpressured = 1 (нормировано на единицу времени).

В метриках это:

flink_taskmanager_job_task_busyTimeMsPerSecond
flink_taskmanager_job_task_idleTimeMsPerSecond
flink_taskmanager_job_task_backPressuredTimeMsPerSecond

И булева метрика flink_taskmanager_job_task_isBackPressured — текущее состояние.

Треугольник состояний задачи

busy + idle + backpressured = 1. Здоровая задача: ~80% busy, ~20% idle, ~0% backpressured. Перегруженная: ~30% busy, ~70% backpressured.

Здоровая задача (хорошо отбалансированный пайплайн)busy=820 ms/s, idle=180 ms/s, backpressured=0
Underutilized задача (узкое место — где-то выше по графу)busy=150 ms/s, idle=850 ms/s, backpressured=0
Bottleneck задача (это она держит весь pipeline)busy=950 ms/s, idle=50 ms/s, backpressured=0Это та задача, на которой visible красная индикация bottleneck. busy ≈ 1.0 — она работает на потолке.
Backpressured задача (downstream её душит)busy=200 ms/s, idle=50 ms/s, backpressured=750 ms/sЗадача не успевает отдать данные, потому что её consumer не успевает их забирать. Ищи bottleneck вниз по графу.

Алгоритм диагностики backpressure: пройди операторы снизу вверх (от sink к source), найди первый, у которого busy ≈ 1.0 и backpressured ≈ 0. Это твой bottleneck. Всё, что выше него по графу — будет показывать backpressure (они не могут отдать ему данные так быстро, как хотят).

Где это считается в коде

busyTimeMsPerSecond инкрементируется в StreamTask.processInput() каждый раз, когда задача активно вычисляет. idleTimeMsPerSecond — когда мейл-бокс пуст и задача спит в MailboxProcessor.runMailboxLoop(). backPressuredTimeMsPerSecond — когда RecordWriter.emit() не может писать из-за отсутствия credits и крутится в ожидании.

Точные классы:

  • flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — общий lifecycle.
  • flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java — главный цикл, метрики busy/idle.
  • flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java — metrik backPressured.

Это continuous метрики (нет sampling-окна), их можно агрегировать в Prometheus с любой гранулярностью. В Grafana дашборде стандартный паттерн:

rate(flink_taskmanager_job_task_busyTimeMsPerSecond[1m])
  / 1000

даёт долю времени в busy от 0 до 1.

Как ослабить backpressure: общие подходы

Если ты увидел, что оператор X — bottleneck, есть пять основных рычагов:

  1. Увеличить parallelism X (если можно). Чем больше параллельных subtasks, тем меньше нагрузка на каждый.
  2. Оптимизировать userCode. Часто backpressure — это просто медленный сериализатор (Kryo вместо POJO) или synchronous внешний вызов (HTTP в processElement — анти-паттерн).
  3. Увеличить slot ресурсы (CPU/memory). Не всегда помогает, но иногда узкое место — это GC.
  4. Переразместить операторы, чтобы тяжёлый shuffle стал co-located (LocalInputChannel вместо Remote). Через slotSharingGroup или disableChaining.
  5. Включить buffer debloating — это и про latency, и про checkpoint alignment.

Пункт 5 — самый интересный, о нём весь оставшийся урок.

Buffer debloating: проблема и решение

Сценарий проблемы. У тебя нагруженный пайплайн, downstream временно не справляется (например, sink в S3 timeout). Между оператором A и B — RemoteInputChannel с 2 exclusive + 8 floating буферов на 32 KiB. Накопление: 10 × 32 KiB = 320 KiB. Если B обрабатывает 1 MB/s — это 320 мс «висящих» данных в очереди.

Когда coordinator стартует checkpoint, он шлёт barriers в source. Barriers идут через все каналы. На consumer side B видит barrier на одном входном канале и ждёт его на всех остальных (это alignment, без него exactly-once невозможен). Пока он ждёт — он должен разобрать все буферы, накопленные до barrier’а. Это те самые 320 ms задержки на каждом канале.

В сложном пайплайне с 10 операторами в цепочке alignment time накапливается: каждый next operator должен ждать predecessor’а. Реальные числа: на нагруженной job alignment может быть 30 секунд при checkpoint interval 60 секунд. Половина checkpoint’а — это просто alignment.

FLIP-183 (FLINK-23451, реализовано в 1.14) ввёл adaptive buffer size:

  • Producer не отправляет полный 32 KiB буфер, а отправляет ровно столько, сколько consumer способен прожевать за заданное время.
  • Время задаётся параметром taskmanager.network.memory.buffer-debloat.target (по умолчанию 1 секунда).
  • Размер пересчитывается раз в окне buffer-debloat.period (по умолчанию 200 ms) на основании измеренного throughput.

Идея: «у меня в очереди должно быть не более N байт, где N = consumer.throughput × 1 секунда». В нагруженный момент N маленькое, очередь маленькая, alignment time маленький. В свободный момент N большое — throughput не пострадает.

С buffer debloating: динамический размер

Производитель адаптирует filling-уровень буфера под throughput. При backpressure буферы 'тоньше', alignment time меньше. Когда нагрузка спадает — буферы заполняются полностью, throughput возвращается.

Steady state (нет backpressure)filling = 32768 байт = full segment
consumer throughput 100 MiB/sза 1 сек = 100 MiB. 10 буферов по 32 KiB = 320 KiB. alignment = 3 ms
Backpressure: throughput упал до 1 MiB/sза 1 сек = 1 MiB. 10 буферов должны весить 1 MiB / 10 = 100 KiB на канал, но 32 KiB полный — это перебор.
Debloating включилсяfilling = 1 MiB / 10 / число параллельных каналов = ~10 KiB вместо 32 KiB. alignment = 1 сек а не 32 сек.
Throughput восстановился до 100 MiB/sfilling вернулся к 32 KiB, throughput 100 MiB/s — без потери производительности

В коде это реализовано не через изменение размера MemorySegment (он остаётся 32 KiB), а через partial filling: producer закрывает буфер раньше, не дожидаясь его полного заполнения. Сам сегмент по-прежнему 32 KiB direct memory, но в нём могут жить только 8 KiB полезных данных, остальное — wasted.

Конфигурация и тюнинг

# Включить debloating (включён по умолчанию с 1.14, но проверь)
taskmanager.network.memory.buffer-debloat.enabled: true

# Целевое время, в течение которого consumer должен прожевать все буферы
# в очереди (включая те, что в полёте через TCP)
taskmanager.network.memory.buffer-debloat.target: 1s

# Период пересчёта размера буфера
taskmanager.network.memory.buffer-debloat.period: 200ms

# Сколько последних замеров усреднять (стабильность vs реактивность)
taskmanager.network.memory.buffer-debloat.samples: 20

# Минимальный размер «эффективного» буфера (нижний лимит)
taskmanager.network.memory.buffer-debloat.threshold-percentages: 25

threshold-percentages: 25 означает: размер пересчитывается, только если новое значение отличается от текущего больше чем на 25%. Защита от микро-флуктуаций.

Что мониторить:

  • flink_taskmanager_job_task_estimatedTimeToConsumeBuffersMs — текущая оценка времени до прожёвывания очереди.
  • flink_taskmanager_job_task_debloatedBufferSize — текущий эффективный размер буфера.
TIP

Buffer debloating даёт максимальный профит на пайплайнах с большим числом операторов в цепочке (8+) и большим parallelism (50+). На маленьких graphs alignment time и так минимален, debloating добавляет небольшой overhead и снижает throughput на 1-3%. Решение: если у тебя checkpoints стабильно меньше 1 сек — debloating можно выключить. Если стабильно больше — оставлять.

Альтернатива: unaligned checkpoints

Если debloating не справляется (a alignment всё равно гигантский), есть более радикальное решение — unaligned checkpoints (FLIP-76, 1.11+). Barrier «перепрыгивает» через накопленные in-flight данные и сразу включает snapshot. In-flight буферы сохраняются вместе с состоянием. На restore они воспроизводятся.

Параметр:

execution.checkpointing.unaligned: true
execution.checkpointing.aligned-checkpoint-timeout: 30s

aligned-checkpoint-timeout: сначала пытаемся aligned, если за 30 сек не уложились — автоматически falling back to unaligned. Это разумный compromise.

Подробно про unaligned — в модуле 06 (Checkpoint internals).

Где смотреть в Web UI

Современный Flink UI (1.13+) показывает на каждом операторе три полосы:

  • Busy (зелёная) — busy ratio.
  • Idle (серая) — idle ratio.
  • Backpressure (красная) — backpressured ratio.

Кликаешь на оператор -> видишь точные значения и историю за последние минуты. На вкладке BackPressure — топ subtasks с наибольшим backpressure (полезно если в parallelism один subtask висит, а остальные ок).

В REST API: GET /jobs/<jobId>/vertices/<vertexId>/backpressure — возвращает JSON с распределением по subtask’ам.

Чтение source

  • flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDebloater.java — алгоритм адаптации размера. Метод recalculateBufferSize() — основная математика.
  • flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java — экспоненциальное усреднение throughput.
  • flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java — куда подключается debloater, как читаются метрики busy/idle.
  • flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferDebloaterTest.java — unit-тесты, показывающие как ведёт себя алгоритм при разных throughput.

Чек-лист

  • В современном Flink backpressure — это continuous metrics busy/idle/backpressuredTimeMsPerSecond, считаются в RecordWriter и StreamTask.
  • Сумма busy + idle + backpressured = 1.0. Алгоритм диагностики: ищи снизу вверх первый оператор с busy ≈ 1.0, backpressured ≈ 0 — это bottleneck.
  • Sampling-based подход (до 1.13) заменён continuous metrics в FLIP-179.
  • Buffer debloating (FLINK-23451, 1.14+) — partial filling буферов под backpressure. Target по умолчанию 1 сек.
  • Размер сегмента не меняется (32 KiB direct), меняется эффективное заполнение.
  • Параметры: buffer-debloat.target, buffer-debloat.period, buffer-debloat.threshold-percentages.
  • Альтернатива — unaligned checkpoints (execution.checkpointing.unaligned: true), которые пропускают alignment целиком.
Проверка знанийKnowledge check
Job с 6 операторами в цепочке. Web UI показывает: op1 (source) busy=10%, backpressured=80%; op2 busy=20%, backpressured=70%; op3 busy=60%, backpressured=20%; op4 busy=95%, backpressured=0%; op5 busy=30%, backpressured=60%; op6 (sink) busy=40%, backpressured=50%. Где bottleneck и почему?
ОтветAnswer
Bottleneck — op4 (busy=95%, backpressured=0%). Алгоритм: проходим снизу вверх (op6, op5, op4...), ищем первого с busy около 1.0 и backpressured около 0 — это и есть кандидат, который держит весь pipeline. op6 и op5 в backpressure, значит их кто-то выше душит. op4 — почти 100% busy и не в backpressure, значит он работает на потолке, и это его задача — bottleneck. op1-op3 видят backpressure, потому что не могут отдать данные через op4. Решения: увеличить parallelism op4; оптимизировать его userCode; вынести в отдельный slot sharing group, если он конкурирует за ресурсы; включить buffer debloating, чтобы хотя бы checkpoint'ы не страдали.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. В современном Flink (1.13+) backpressure определяется через метрики busy/idle/backPressuredTimeMsPerSecond. Какое утверждение про сумму этих метрик верное?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4