В прошлом уроке мы говорили про credits, но обходили вопрос «откуда буферы физически берутся». Сейчас разберём подсистему, которая владеет всей сетевой памятью TaskManager: NetworkBufferPool и его потомков.
Buddy-система и slab-аллокатор ядра Здесь живёт ответ на «почему job упал с Insufficient number of network buffers» и «как правильно посчитать taskmanager.memory.network.max».
NetworkBufferPool: глобальный пул TM
Когда TaskManager запускается, он выделяет фиксированный объём direct (off-heap) памяти под сеть. Количество и размер сегментов:
taskmanager.memory.segment-size— размер одного буфера, по умолчанию32kb.taskmanager.memory.network.min/taskmanager.memory.network.max— границы пула в байтах. По умолчанию64mb/1gb.taskmanager.memory.network.fraction— доля от Total Flink Memory, идущая в сеть. По умолчанию0.1(10%).
Реальное число сегментов = network memory / segment-size. Например, при 256 MiB network и 32 KiB segment — это 8192 сегмента.
Все они создаются разом при старте TM в виде одного большого DirectByteBuffer, после чего разбиваются на MemorySegment (обёртки с быстрым unsafe-доступом). После аллокации число сегментов не меняется до конца жизни TM. Это сделано специально — чтобы избежать GC pressure и фрагментации direct memory.
Direct memory выделяется один раз при старте, делится на MemorySegment по 32 KiB. Этот пул раздаёт буферы LocalBufferPool'ам для каждого ResultPartition и InputGate.
Если сегментов не хватает на старте задачи — IOException: Insufficient number of network buffers. Если они закончились в runtime (что бывает крайне редко) — OutOfMemoryError, точнее direct memory OOM.
LocalBufferPool: пул на компонент задачи
Каждый ResultPartition и каждый InputGate запрашивает у глобального NetworkBufferPool свой LocalBufferPool. Этот subpool — основной интерфейс, через который producer/consumer берёт буферы для работы.
LocalBufferPool имеет три ключевых параметра:
- numRequired (
getNumberOfRequiredMemorySegments) — минимум, который пул держит в памяти. Если он не может его получить — задача не запустится. - maxUsed (
getMaxNumberOfMemorySegments) — потолок. Сколько сегментов пул может позаимствовать сверху от глобального пула, если ему нужно. - excessMemorySegments — текущее число «возвращённых» сегментов, ждущих, когда их попросят другие пулы.
Идея: каждый LocalBufferPool сначала гарантированно получает свои numRequired сегменты. Остальное (maxUsed - numRequired) он может позаимствовать по требованию. Когда другой пул запросит больше, лишние сегменты с overflow returnsятся в глобальный.
Для ResultPartition numRequired считается просто: 1 сегмент на каждую subpartition (чтобы быть способным накопить хотя бы один буфер для каждого consumer’а) + ещё один в текущем BufferBuilder. То есть для 4 subpartitions = 4 * 1 + 1 = 5 минимум.
Для InputGate numRequired = exclusive buffers × число каналов. Если 8 каналов и 2 exclusive на канал — 8 * 2 = 16 минимум.
Exclusive buffers: per-channel reservation
Exclusive buffers — это сегменты, которые жёстко закреплены за конкретным RemoteInputChannel. Количество задаётся параметром:
taskmanager.network.memory.buffers-per-channel: 2
При создании канала InputGate просит у LocalBufferPool ровно столько сегментов и кладёт их в AvailableBufferQueue этого канала. Когда канал получает буфер от Netty (onBuffer), он обязан положить байты в один из exclusive буферов — нельзя занять чужой. Когда буфер освобождён — он возвращается в очередь этого же канала, не в общий пул.
Зачем эта жёсткая резервация? Гарантия прогресса. Если ты дашь все буферы общему пулу, то теоретически один жадный канал может высосать весь пул, и остальные каналы не смогут получить ни одного буфера, чтобы прочитать хотя бы checkpoint barrier. Exclusive обеспечивают, что каждый канал всегда может принять минимум 2 буфера — этого достаточно для протекания управляющих сообщений.
В уроке про credit-based мы видели формулу: initialCredit = exclusiveBuffers. То есть consumer при старте сразу выдаёт producer’у credits, равные числу exclusive буферов. Никаких сетевых round-trip’ов на стартовый negotiation.
Floating buffers: общий пул на InputGate
Floating buffers — это общая очередь на весь InputGate, из которой каналы тянут дополнительные сегменты, когда нагрузка высока. Параметр:
taskmanager.network.memory.floating-buffers-per-gate: 8
Динамика: канал, который сейчас активно работает (его credits быстро тратятся, производитель шлёт буферы непрерывно), просит floating buffers. Если они есть в пуле — получает; если все 8 уже разданы другим каналам того же gate — ждёт.
Так Flink динамически адаптирует ресурсы: каналы с большим трафиком получают больше буферов, простаивающие — отдают.
3 канала, у каждого по 2 exclusive (всегда зарезервированы). Floating pool на 8 буферов делится между каналами по требованию. Канал C в backpressure: получил 6 floating сверх своих 2 exclusive.
Защита от одного жадного канала, который захватит весь floating pool:
taskmanager.network.memory.max-buffers-per-channel: 10
Канал не может иметь больше 10 буферов одновременно (exclusive + floating). При попытке выйти за лимит — он перестаёт запрашивать floating и просто ждёт.
Sizing: сколько сетевых буферов нужно
Готовая формула для большой задачи:
required_segments =
(за каждый ResultPartition) (#subpartitions + 1) +
(за каждый InputGate) (#channels * exclusive + floating)
Пример. Job с 100 операторами, parallelism 16 на каждом, шаффлинг между каждой парой соседних операторов. На каждом TM работает по 4 slot’а. Тогда на одном TM:
- 4 slot × 100 операторов / 16 (но на slot обычно один pipeline) — оценочно 4 × ~5 chained-pipelines = ~20 ResultPartition.
- Каждая ResultPartition — 16 subpartitions = 17 сегментов минимум.
- 20 ResultPartition × 17 = 340 сегментов на producer side.
- 20 InputGate × (16 × 2 + 8) = 20 × 40 = 800 сегментов на consumer side.
- Итого ~1140 сегментов =
1140 × 32 KiB ≈ 36 MiBминимум.
На практике закладывают 2-3x запас, и taskmanager.memory.network.max = 512mb — это обычная конфигурация для среднего job.
Если ты видишь ошибку:
IOException: Insufficient number of network buffers: required 800, but only 512 available.
— это значит, что numRequired LocalBufferPool’ов суммарно превышает размер NetworkBufferPool. Решение: увеличить taskmanager.memory.network.fraction или network.max.
Throughput vs latency: компромисс
Размер буфера 32 KiB — компромисс между throughput и latency.
- Throughput: чем больше буфер, тем меньше overhead’а на каждый отправленный буфер (frame заголовки в Netty, переключения в event loop). На gigabit ethernet 32 KiB достаточно, чтобы амортизировать overhead до ~1%.
- Latency: пока буфер не заполнен или не сработал
execution.buffer-timeout(по умолчанию100 ms), запись не уходит на consumer. Худшая latency =buffer_timeout, средняя —~buffer_size / throughput.
Для low-latency пайплайнов:
execution.buffer-timeout: 10 # ms
taskmanager.memory.segment-size: 16kb
Для maximum throughput batch-режима: увеличивают и timeout, и segment-size.
В Flink 1.14 появилось buffer debloating (FLINK-23451) — автоматическая адаптация эффективного размера буфера под backpressure. Producer не отправляет полный 32 KiB, а отправляет ровно столько, сколько consumer успевает обработать за заданное окно (taskmanager.network.memory.buffer-debloat.target, по умолчанию 1s). Это резко уменьшает alignment time на checkpoints, не теряя throughput. Подробно — в следующем уроке.
Если ты видишь в логах Buffer pool is destroyed. при shutdown — это нормально, LocalBufferPool деаллоцируется при завершении задачи. Если ты видишь Buffer pool is destroyed. в середине работы — задача упала, и кто-то ещё пытается писать в уже закрытый пул. Смотри stack trace выше: должно быть исключение, из-за которого задача провалилась.
Чтение source
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java— глобальный пул, ленивая раздача черезrequestUnpooledMemorySegments.flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java— основной workhorse, методыrequestBuffer(),recycle(),lazyDestroy().flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java— фабрика, считающая numRequired/maxUsed для каждого consumer’а.flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java— фабрика сегментов, выбирает между HeapMemorySegment и HybridMemorySegment.flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java— хорошие тесты с воспроизводимыми сценариями.
Чек-лист
NetworkBufferPool— единый пул на TaskManager. Direct memory выделяется один раз при старте, не растёт.LocalBufferPool— один на каждыйResultPartitionиInputGate. Параметры: numRequired (минимум, всегда зарезервирован) и maxUsed (потолок).- Exclusive buffers (
buffers-per-channel, по умолчанию 2) — гарантированные per канал. Защита от голодания. - Floating buffers (
floating-buffers-per-gate, по умолчанию 8) — общий пул на InputGate, делится по нагрузке. Ограниченmax-buffers-per-channel. - Размер сегмента 32 KiB по умолчанию — компромисс throughput vs latency.
execution.buffer-timeoutограничивает latency сверху. - Sizing формула:
required = sum_per_partition + sum_per_gate. Закладывают 2-3x запас. Insufficient number of network buffers—network.maxслишком мал, увеличитьfractionилиmax.