Network memory мы уже трогали в модуле 03 — там мы говорили про NetworkBufferPool, exclusive и floating buffers, и distribution между каналами. Сейчас закрепим то же самое с памяти-bound стороны: как считать, сколько нужно, как не получить Insufficient number of network buffers, и почему network — это direct, а не managed.
Почему network — это direct, а не managed
Это правильный вопрос: оба пула off-heap, оба под контролем Flink, оба MemorySegment. Зачем два разных типа?
Ответ — в Netty. Netty написан в предположении, что байты для записи в сокет лежат в ByteBuffer, причём желательно direct (чтобы избежать копирования при socket.write()). Если бы network был managed (Unsafe.allocateMemory()), нам пришлось бы либо копировать каждый сегмент в direct buffer перед отправкой, либо костылить Netty.
Flink использует Netty в native (off-heap, direct) режиме: MemorySegment для network — это обёртка над DirectByteBuffer, который под капотом представлен куском native памяти, но через JVM API. ByteBuf.wrap(directByteBuffer) отдаётся Netty zero-copy.
Это и есть техническая причина, почему network — отдельный пул:
- Direct: совместим с Netty zero-copy, контролируется
-XX:MaxDirectMemorySize. - Managed: native non-direct, под управлением Flink, не совместим с Netty I/O, но идеален для RocksDB и sort buffer’ов.
Конфигурация network pool
Три параметра:
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb
taskmanager.memory.network.fraction: 0.1
Размер пула считается так:
network_size = clamp(fraction × total_flink_memory, min, max)
То есть сначала пробуют fraction, затем зажимают в [min, max].
Пример: Total Flink Memory = 7 GiB, fraction = 0.1 -> 0.7 GiB -> внутри [64mb, 1gb] -> 700 MiB.
Если ты хочешь жёстко зафиксировать размер (без fraction), задай min = max:
taskmanager.memory.network.min: 512mb
taskmanager.memory.network.max: 512mb
Это полезно, когда ты хочешь точно предсказать, сколько Direct Memory будет использоваться, и не давать Flink «болтать» рамку.
Sizing формула: сколько буферов нужно
В уроке 3 модуля 03 мы вывели формулу:
required_segments =
sum(per ResultPartition) (#subpartitions + 1) +
sum(per InputGate) (#channels × exclusive + floating)
Здесь #subpartitions = parallelism downstream’а, #channels = parallelism upstream’а, exclusive = buffers-per-channel (2 по умолчанию), floating = floating-buffers-per-gate (8 по умолчанию).
Переведём в формулу для TM. Пусть на одном TM:
- N тасков (по slot’у на task в общем случае).
- Каждый таск имеет до 2 ResultPartition и до 2 InputGate (для большинства streaming job’ов).
- Среднее
#subpartitionsи#channels= parallelism P пайплайна.
Тогда минимум на TM:
min_segments = N × [2 × (P + 1) + 2 × (2P + 8)]
= N × [2P + 2 + 4P + 16]
= N × [6P + 18]
Для N = 4 slot, P = 16:
min_segments = 4 × (96 + 18) = 4 × 114 = 456 segments
min_bytes = 456 × 32 KiB = ~14 MiB
Это минимум — для функционирования. Но это не оптимум:
- Нужен запас на floating buffers под нагрузкой.
- Нужен запас на чekpoint barrier handling.
- Нужен запас на partial-buffer transitions.
Эмпирическое правило: factor 4-8x от минимума. То есть для нашего примера — ~60-120 MiB. Стандартный дефолт min=64mb это покрывает, и для большинства streaming job’ов хватает.
Когда дефолтов не хватает: real-world пример
Реальная ситуация: ETL job, parallelism 64, цепочка из 12 операторов с shuffle между каждой парой. На TM запущено 8 slot’ов.
min_segments = 8 × [6 × 64 + 18] × 6 # 6 пар shuffle на pipeline
= 8 × 402 × 6 = 19296 segments
min_bytes = 19296 × 32 KiB = 619 MiB
С запасом 4x — 2.4 GiB. Это уже не помещается в дефолтные 1 GiB (network.max). При старте job упадёт:
IOException: Insufficient number of network buffers: required 2476, but only 1024 available.
The total number of network buffers is currently set to 32768 of 32768 bytes each.
You can increase this number by setting the configuration keys
'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min',
and 'taskmanager.memory.network.max'.
Решение: поднимаем network.max до 2.5 GiB.
taskmanager.memory.network.fraction: 0.2
taskmanager.memory.network.max: 3gb
И обязательно увеличиваем taskmanager.memory.process.size на эту же сумму, иначе остальные пулы (managed, task heap) пострадают.
Частая ошибка: разработчик увеличил network.max, но не тронул process.size. Flink взял память из task heap’а, и job начал ловить GC pause’ы и OutOfMemoryError в userCode. Всегда расширяй и process.size, если меняешь любой fraction в верхнюю сторону.
Контейнерный аспект: как считается -XX:MaxDirectMemorySize
Flink при старте TM считает направленный лимит direct memory как:
-XX:MaxDirectMemorySize = Framework_OffHeap + Task_OffHeap + Network
То есть в нашем примере:
-XX:MaxDirectMemorySize = 128 MiB + 0 + 3 GiB = 3.128 GiB
Это верхняя граница для всех direct buffers, не только Flink’овских. Если ты подключаешь какие-то библиотеки (например, drivers БД, которые тоже используют direct buffers) — они тоже считаются в этом лимите.
Превышение даст:
java.lang.OutOfMemoryError: Direct buffer memory
И вот это не убъёт контейнер, а просто упадёт задачей. Это сильно отличается от managed-OOM, который вылетает за лимит контейнера и приводит к SIGKILL.
Производительные следствия
Большой network pool — не бесплатно:
-
Direct memory вне GC, но
DirectByteBuffer-объекты-обёртки лежат в heap. Они освобождаются только при GC. Если у тебя 1 GiB direct memory = ~32768 сегментов = столько же объектов-обёрток. На больших G1GC heap это полезная нагрузка ~1 KiB на каждый, итого ~32 MiB heap allocation для wrapper’ов. -
Cold buffers вытесняются из CPU cache. Если network pool 8 GiB и редко используется — большинство сегментов будет cold, при обращении к ним cache miss. Не критично для большинства job, но на NUMA-системах добавляет latency.
-
Аллокация direct memory медленная. В отличие от heap (TLAB),
ByteBuffer.allocateDirect()идёт через системный вызовmmap. Поэтому Flink аллоцирует всё разом при старте. Если ты как-то динамически делаешьallocateDirect()внутри userCode (странная идея, но бывает) — это медленно.
Network vs Network: TCP socket buffers
Не путай! Помимо Flink’овского network pool в JVM, есть TCP socket buffers на уровне ядра. Они задаются параметрами:
taskmanager.network.netty.client.numThreads: 1
taskmanager.network.netty.server.numThreads: 1
taskmanager.network.netty.client.connectTimeoutSec: 120
taskmanager.network.netty.transport: auto # native epoll или nio
И на уровне OS:
sysctl net.core.rmem_max
sysctl net.core.wmem_max
Эти буферы — память ядра, к Flink’овскому network pool отношения не имеют. Но на high-throughput сетях (10 Gbps+) они тоже могут стать узким местом. Стандартно ядро держит ~256 KiB на сокет, для multi-GBps пайплайна это часто узко.
Чтение source
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java— аллокатор direct memory для network.flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java— расчёт network size из process.size.flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java— фабрика, выбирающая Hybrid vs OffHeap segment.flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java— Netty-level настройки, отделённые от network pool.
Чек-лист
- Network memory — direct off-heap (JVM-managed через
-XX:MaxDirectMemorySize), используется только под shuffle buffers. - Технически отдельна от managed, потому что Netty требует direct buffers для zero-copy.
- Sizing:
network_size = clamp(fraction × total_flink_memory, min, max). Дефолт 0.1, 64mb-1gb. - Минимум буферов на TM:
N_slot × (6P + 18), где P = parallelism пайплайна. Рекомендуется 4-8x запас. - Большой network pool увеличивает
-XX:MaxDirectMemorySize— следи за остальными библиотеками, которые могут аллоцировать direct. - Превышение даёт
OutOfMemoryError: Direct buffer memory, не SIGKILL контейнера. - TCP socket buffers (ядро) — отдельная история, не Flink network pool.
- При увеличении
network.maxвсегда увеличивайprocess.sizeна ту же сумму.