Learning Platform
Глоссарий Troubleshooting
Урок 04.03 · 24 мин
Продвинутый
NetworkBufferPoolMemorySegmentLocalBufferPoolExclusive buffersFloating buffers

В прошлом уроке мы говорили про credits, но обходили вопрос «откуда буферы физически берутся». Сейчас разберём подсистему, которая владеет всей сетевой памятью TaskManager: NetworkBufferPool и его потомков.

Buddy-система и slab-аллокатор ядра Здесь живёт ответ на «почему job упал с Insufficient number of network buffers» и «как правильно посчитать taskmanager.memory.network.max».

NetworkBufferPool: глобальный пул TM

Когда TaskManager запускается, он выделяет фиксированный объём direct (off-heap) памяти под сеть. Количество и размер сегментов:

  • taskmanager.memory.segment-size — размер одного буфера, по умолчанию 32kb.
  • taskmanager.memory.network.min / taskmanager.memory.network.max — границы пула в байтах. По умолчанию 64mb / 1gb.
  • taskmanager.memory.network.fraction — доля от Total Flink Memory, идущая в сеть. По умолчанию 0.1 (10%).

Реальное число сегментов = network memory / segment-size. Например, при 256 MiB network и 32 KiB segment — это 8192 сегмента.

Все они создаются разом при старте TM в виде одного большого DirectByteBuffer, после чего разбиваются на MemorySegment (обёртки с быстрым unsafe-доступом). После аллокации число сегментов не меняется до конца жизни TM. Это сделано специально — чтобы избежать GC pressure и фрагментации direct memory.

NetworkBufferPool: один пул на TaskManager

Direct memory выделяется один раз при старте, делится на MemorySegment по 32 KiB. Этот пул раздаёт буферы LocalBufferPool'ам для каждого ResultPartition и InputGate.

NetworkBufferPool (TaskManager singleton)8192 MemorySegment по 32 KiB = 256 MiB directАллоцируется один раз при старте через ByteBuffer.allocateDirect. Не растёт и не сжимается.
LocalBufferPool task1.ResultPartitionnumRequired=2 (exclusive), maxUsed=20
LocalBufferPool task1.InputGatenumRequired=2N (N каналов), maxUsed=2N+8
LocalBufferPool task2.ResultPartition...
LocalBufferPool task2.InputGate...
всё остальноесвободные сегменты, доступные на запрос (floating reserve)

Если сегментов не хватает на старте задачи — IOException: Insufficient number of network buffers. Если они закончились в runtime (что бывает крайне редко) — OutOfMemoryError, точнее direct memory OOM.

LocalBufferPool: пул на компонент задачи

Каждый ResultPartition и каждый InputGate запрашивает у глобального NetworkBufferPool свой LocalBufferPool. Этот subpool — основной интерфейс, через который producer/consumer берёт буферы для работы.

LocalBufferPool имеет три ключевых параметра:

  • numRequired (getNumberOfRequiredMemorySegments) — минимум, который пул держит в памяти. Если он не может его получить — задача не запустится.
  • maxUsed (getMaxNumberOfMemorySegments) — потолок. Сколько сегментов пул может позаимствовать сверху от глобального пула, если ему нужно.
  • excessMemorySegments — текущее число «возвращённых» сегментов, ждущих, когда их попросят другие пулы.

Идея: каждый LocalBufferPool сначала гарантированно получает свои numRequired сегменты. Остальное (maxUsed - numRequired) он может позаимствовать по требованию. Когда другой пул запросит больше, лишние сегменты с overflow returnsятся в глобальный.

Для ResultPartition numRequired считается просто: 1 сегмент на каждую subpartition (чтобы быть способным накопить хотя бы один буфер для каждого consumer’а) + ещё один в текущем BufferBuilder. То есть для 4 subpartitions = 4 * 1 + 1 = 5 минимум.

Для InputGate numRequired = exclusive buffers × число каналов. Если 8 каналов и 2 exclusive на канал — 8 * 2 = 16 минимум.

Exclusive buffers: per-channel reservation

Exclusive buffers — это сегменты, которые жёстко закреплены за конкретным RemoteInputChannel. Количество задаётся параметром:

taskmanager.network.memory.buffers-per-channel: 2

При создании канала InputGate просит у LocalBufferPool ровно столько сегментов и кладёт их в AvailableBufferQueue этого канала. Когда канал получает буфер от Netty (onBuffer), он обязан положить байты в один из exclusive буферов — нельзя занять чужой. Когда буфер освобождён — он возвращается в очередь этого же канала, не в общий пул.

Зачем эта жёсткая резервация? Гарантия прогресса. Если ты дашь все буферы общему пулу, то теоретически один жадный канал может высосать весь пул, и остальные каналы не смогут получить ни одного буфера, чтобы прочитать хотя бы checkpoint barrier. Exclusive обеспечивают, что каждый канал всегда может принять минимум 2 буфера — этого достаточно для протекания управляющих сообщений.

В уроке про credit-based мы видели формулу: initialCredit = exclusiveBuffers. То есть consumer при старте сразу выдаёт producer’у credits, равные числу exclusive буферов. Никаких сетевых round-trip’ов на стартовый negotiation.

Floating buffers: общий пул на InputGate

Floating buffers — это общая очередь на весь InputGate, из которой каналы тянут дополнительные сегменты, когда нагрузка высока. Параметр:

taskmanager.network.memory.floating-buffers-per-gate: 8

Динамика: канал, который сейчас активно работает (его credits быстро тратятся, производитель шлёт буферы непрерывно), просит floating buffers. Если они есть в пуле — получает; если все 8 уже разданы другим каналам того же gate — ждёт.

Так Flink динамически адаптирует ресурсы: каналы с большим трафиком получают больше буферов, простаивающие — отдают.

Распределение буферов в InputGate

3 канала, у каждого по 2 exclusive (всегда зарезервированы). Floating pool на 8 буферов делится между каналами по требованию. Канал C в backpressure: получил 6 floating сверх своих 2 exclusive.

InputGate (consumer task)exclusive=2/канал, floating-pool=8
Channel A2 exclusive + 0 floating = 2 buf
traficlow — успевает обработать
Channel B2 exclusive + 2 floating = 4 buf
traficmedium
Channel C2 exclusive + 6 floating = 8 bufКанал C активно копит данные. Получил больше floating — это его буферный запас, который пока даёт времени consumer'у догнать.
traficHIGH — копит данные, в очереди
оставшийся floating pool8 - 0 - 2 - 6 = 0 буферов свободно

Защита от одного жадного канала, который захватит весь floating pool:

taskmanager.network.memory.max-buffers-per-channel: 10

Канал не может иметь больше 10 буферов одновременно (exclusive + floating). При попытке выйти за лимит — он перестаёт запрашивать floating и просто ждёт.

Sizing: сколько сетевых буферов нужно

Готовая формула для большой задачи:

required_segments = 
    (за каждый ResultPartition) (#subpartitions + 1) +
    (за каждый InputGate) (#channels * exclusive + floating)

Пример. Job с 100 операторами, parallelism 16 на каждом, шаффлинг между каждой парой соседних операторов. На каждом TM работает по 4 slot’а. Тогда на одном TM:

  • 4 slot × 100 операторов / 16 (но на slot обычно один pipeline) — оценочно 4 × ~5 chained-pipelines = ~20 ResultPartition.
  • Каждая ResultPartition — 16 subpartitions = 17 сегментов минимум.
  • 20 ResultPartition × 17 = 340 сегментов на producer side.
  • 20 InputGate × (16 × 2 + 8) = 20 × 40 = 800 сегментов на consumer side.
  • Итого ~1140 сегментов = 1140 × 32 KiB ≈ 36 MiB минимум.

На практике закладывают 2-3x запас, и taskmanager.memory.network.max = 512mb — это обычная конфигурация для среднего job.

Если ты видишь ошибку:

IOException: Insufficient number of network buffers: required 800, but only 512 available.

— это значит, что numRequired LocalBufferPool’ов суммарно превышает размер NetworkBufferPool. Решение: увеличить taskmanager.memory.network.fraction или network.max.

Throughput vs latency: компромисс

Размер буфера 32 KiB — компромисс между throughput и latency.

  • Throughput: чем больше буфер, тем меньше overhead’а на каждый отправленный буфер (frame заголовки в Netty, переключения в event loop). На gigabit ethernet 32 KiB достаточно, чтобы амортизировать overhead до ~1%.
  • Latency: пока буфер не заполнен или не сработал execution.buffer-timeout (по умолчанию 100 ms), запись не уходит на consumer. Худшая latency = buffer_timeout, средняя — ~buffer_size / throughput.

Для low-latency пайплайнов:

execution.buffer-timeout: 10  # ms
taskmanager.memory.segment-size: 16kb

Для maximum throughput batch-режима: увеличивают и timeout, и segment-size.

В Flink 1.14 появилось buffer debloating (FLINK-23451) — автоматическая адаптация эффективного размера буфера под backpressure. Producer не отправляет полный 32 KiB, а отправляет ровно столько, сколько consumer успевает обработать за заданное окно (taskmanager.network.memory.buffer-debloat.target, по умолчанию 1s). Это резко уменьшает alignment time на checkpoints, не теряя throughput. Подробно — в следующем уроке.

WARNING

Если ты видишь в логах Buffer pool is destroyed. при shutdown — это нормально, LocalBufferPool деаллоцируется при завершении задачи. Если ты видишь Buffer pool is destroyed. в середине работы — задача упала, и кто-то ещё пытается писать в уже закрытый пул. Смотри stack trace выше: должно быть исключение, из-за которого задача провалилась.

Чтение source

  • flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java — глобальный пул, ленивая раздача через requestUnpooledMemorySegments.
  • flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java — основной workhorse, методы requestBuffer(), recycle(), lazyDestroy().
  • flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java — фабрика, считающая numRequired/maxUsed для каждого consumer’а.
  • flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java — фабрика сегментов, выбирает между HeapMemorySegment и HybridMemorySegment.
  • flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java — хорошие тесты с воспроизводимыми сценариями.

Чек-лист

  • NetworkBufferPool — единый пул на TaskManager. Direct memory выделяется один раз при старте, не растёт.
  • LocalBufferPool — один на каждый ResultPartition и InputGate. Параметры: numRequired (минимум, всегда зарезервирован) и maxUsed (потолок).
  • Exclusive buffers (buffers-per-channel, по умолчанию 2) — гарантированные per канал. Защита от голодания.
  • Floating buffers (floating-buffers-per-gate, по умолчанию 8) — общий пул на InputGate, делится по нагрузке. Ограничен max-buffers-per-channel.
  • Размер сегмента 32 KiB по умолчанию — компромисс throughput vs latency. execution.buffer-timeout ограничивает latency сверху.
  • Sizing формула: required = sum_per_partition + sum_per_gate. Закладывают 2-3x запас.
  • Insufficient number of network buffersnetwork.max слишком мал, увеличить fraction или max.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. NetworkBufferPool — это пул на каком уровне, и когда он аллоцирует свою память?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4