В прошлом уроке мы выделили managed memory как самый большой и самый «загадочный» из off-heap пулов. Сейчас разберём его на части: что это физически, кто его использует, как Flink делит его между задачами, и почему он критически важен для RocksDB-based jobs.
Виртуальная память и off-heap аллокацияЧто такое managed memory
Managed memory — это native off-heap память, которую Flink аллоцирует сам через Unsafe.allocateMemory() или через JNI (для RocksDB) и сам же делит между задачами. Ключевые отличия от других пулов:
| Свойство | JVM Heap | Direct Memory | Managed Memory |
|---|---|---|---|
| GC контролирует | да | нет (но Cleaner зовёт free() при GC объекта DirectByteBuffer) | нет |
| Лимит JVM | -Xmx | -XX:MaxDirectMemorySize | нет JVM лимита, лимит — Flink |
| Аллокатор | JVM | JVM | Flink через Unsafe или JNI |
| Кто видит | GC heap dump | BufferPoolMXBean | внутренний MemoryManager |
| Типичные пользователи | userCode, HashMapStateBackend | Netty, framework off-heap, network shuffle | RocksDB, batch sort, Python UDF |
То, что managed memory не контролируется JVM — это и плюс, и минус. Плюс: GC не тратит время на её обход (RocksDB block cache 5 GiB не давит на GC pause). Минус: если ты ошибся в учёте — JVM не подскажет, а просто пакет вылетит за лимит контейнера и получит SIGKILL от OOM killer’а.
MemoryManager: владелец пула
В каждом TaskManager — один MemoryManager (flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java). При старте он аллоцирует весь managed memory одним куском (через UnpooledOffHeapMemory.allocateUnsafe()) и делит на MemorySegment по 32 KiB. Дальше задачи запрашивают сегменты, MemoryManager выдаёт.
Параметр размера:
taskmanager.memory.managed.size: 2gb
# или относительно
taskmanager.memory.managed.fraction: 0.4
fraction имеет приоритет — если задан, size игнорируется. По умолчанию 0.4 — то есть 40% от Total Flink Memory отдаётся под managed.
При старте TM пишет в лог:
Managed Memory: 3.353 gb (3600000000 bytes)
Это количество — фиксировано на всё время жизни TM. Не растёт и не сжимается.
Слоты и распределение managed между задачами
В каждом TM N slots. Managed memory делится поровну между slot’ами. Если 4 slot’а и 2 GiB managed — у каждого slot’а свой пул 512 MiB.
Внутри слота managed память используется всеми задачами одного pipeline’а (то, что попало в этот slot благодаря slot sharing). Если в slot’е работают source + map + window-aggregate + sink, и window-aggregate использует RocksDB, то весь 512 MiB managed slot’а доступен ему.
Один важный nuance: managed memory распределяется по operatoram внутри слота через consumer weights (FLIP-141, в 1.13+). Раньше всё забирал RocksDB, теперь — пропорционально весам:
taskmanager.memory.managed.consumer-weights: OPERATOR:70,STATE_BACKEND:70,PYTHON:30
Здесь:
OPERATOR— batch sort/hash join (если в batch-режиме).STATE_BACKEND— RocksDB.PYTHON— буферы для PyFlink IPC.
В streaming-режиме OPERATOR обычно не используется, и весь slot получает RocksDB + Python (если есть). В batch — наоборот, OPERATOR съест почти всё, потому что hash table или sort на 10 GiB данных требует адекватного буфера.
MemoryManager — один пул на TM. Делится поровну между slot'ами. Внутри slot'а — поровну (или по consumer-weights) между потребителями: RocksDB, batch ops, Python.
RocksDB и managed memory: как это связано
Главный потребитель managed memory в streaming-режиме — RocksDB. У RocksDB своя нативная сишная аллокация (через JNI), и по умолчанию она не знает про лимиты Flink. Это самая частая причина OOMKilled.
Чтобы привязать RocksDB к managed pool:
state.backend.rocksdb.memory.managed: true
Когда это включено, Flink при создании RocksDB instance передаёт ему shared WriteBufferManager и LRUCache (RocksDB-классы), которые предварительно зарезервировали managed-сегменты. RocksDB будет аллоцировать буферы и кэш-блоки из этих shared объектов, и итоговый размер всех его внутренних структур уложится в выделенный пул.
Без memory.managed: true каждая state primitive (ValueState, ListState, MapState — в RocksDB это column families) создаёт собственные write buffer и block cache. На job’е с 10 state primitives × 100 KeyGroup × 3 column families = тысячи независимых RocksDB-структур, и память уходит без контроля.
В уроке про rocksdb-tuning (модуль 05, урок 5) мы детально разберём как именно RocksDB делит выделенную ему managed память на block cache, write buffer и index/filter cache.
Batch операторы: sort и hash
В batch-режиме managed memory — это место, где живут sort buffer’ы и hash table’ы.
ExternalSorter— алгоритм external merge sort. Использует managed buffer для in-memory partitioning, spill’ит на диск, когда buffer переполнен. Сколько помещается in-memory — столько меньше spill’ов и быстрее сортировка.HashTable— для hash join. Внутренние bucket’ы — managed segments. Если data fit’ятся в memory — это hash join in-memory; если нет — переходит в hybrid hash join со spill.
Цитата из кода (flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ExternalSorter.java):
// Allocate from MemoryManager — pre-sized via consumer weights
List<MemorySegment> memory = memoryManager.allocatePages(parentTask, numPages);
Это нативные сегменты, передаваемые operator’у на использование. Когда operator завершён — segments возвращаются в pool.
Python: IPC buffers для PyFlink
Когда оператор написан на Python (через PyFlink), Flink не выполняет Python внутри JVM. Вместо этого запускается отдельный Python worker (через flink-python runtime), и данные передаются туда через shared memory или gRPC. Буферы для этой передачи — managed memory.
Параметр:
taskmanager.memory.managed.consumer-weights: STATE_BACKEND:70,PYTHON:30
30% managed уходит под Python IPC. В mixed Java+Python job это разумный baseline. В чистом PyFlink — поднимают до 60.
Что произойдёт, если managed не хватит
В streaming-режиме с RocksDB — ничего страшного: RocksDB просто будет чаще делать flush и compaction, тратя CPU/IO вместо памяти. Job не упадёт, но throughput упадёт.
В batch — наоборот, недостаток managed = больше spill’ов на диск = медленнее. Если совсем мало managed выделено, может упасть с IOException: Cannot allocate enough memory for sort.
В Python — недостаток буферов = больше round-trip’ов между Java и Python через gRPC, throughput падает.
То есть managed — это в основном performance-параметр, а не correctness. Но переборщить с managed (взяв слишком много из total) опасно — урежется Task Heap или Network, и тогда уже будут падения с другой стороны.
Где смотреть в коде
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java— owner всего пула. МетодallocatePages()— основная точка входа.flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnpooledOffHeapMemory.java— реализация аллокатора черезUnsafe.flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBMemoryManager.java— bridge между managed и RocksDB JNI.flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java— где запрашиваются managed segments для Python IPC.
Production-наблюдение
В Web UI вкладка TaskManagers показывает по каждому TM текущее использование managed: Managed Memory used / total. Если используется ~0% на streaming с RocksDB — значит, rocksdb.memory.managed: false (RocksDB живёт где-то в native non-direct, и это пожароопасно).
В Prometheus:
flink_taskmanager_Status_Flink_Memory_Managed_Used
flink_taskmanager_Status_Flink_Memory_Managed_Total
Здоровое отношение Used/Total для RocksDB job — близко к 1.0 (RocksDB сразу занимает почти весь свой блок кэшем). Низкое отношение — повод проверить конфиг.
Если ты подключаешь BloomBerg-style multi-tenant платформу, где разные job’ы конкурируют за TM ресурсы — consumer-weights единственный честный способ зарезервировать памяти под Python без обиды для RocksDB. Без него мажоритарный потребитель съест всё.
Чтение source
В пакете flink-runtime/src/main/java/org/apache/flink/runtime/memory/ лежит вся механика. Самый ценный класс — MemoryManager, в нём 100 строк JavaDoc, описывающих логику consumer weights и allocation lifecycle. Параллельно почитай RocksDBSharedResources в state backend — там видно, как именно managed segments передаются в RocksDB JNI.
Чек-лист
- Managed memory — native off-heap, аллоцированная Flink через
Unsafeили JNI. Вне JVM heap, вне-XX:MaxDirectMemorySize. - Управляется одним
MemoryManagerна TM. Делится поровну между slot’ами. - Внутри slot’а — между потребителями по
consumer-weights: OPERATOR (batch sort/hash), STATE_BACKEND (RocksDB), PYTHON (PyFlink IPC). - В streaming-режиме главный потребитель — RocksDB. Включи
state.backend.rocksdb.memory.managed: true, иначе RocksDB живёт вне Flink контроля. - В batch — sort/hash buffers. Чем больше managed — тем меньше spill’ов на диск.
- Недостаток managed — performance issue, не correctness. Но переборщить тоже плохо — урезается Task Heap / Network.
- Мониторинг:
Status_Flink_Memory_Managed_Used/_Totalв Prometheus.