Learning Platform
Глоссарий Troubleshooting
Урок 05.02 · 26 мин
Продвинутый
Managed memoryMemoryManagerRocksDBBatch sortSlot memoryNative off-heap

В прошлом уроке мы выделили managed memory как самый большой и самый «загадочный» из off-heap пулов. Сейчас разберём его на части: что это физически, кто его использует, как Flink делит его между задачами, и почему он критически важен для RocksDB-based jobs.

Виртуальная память и off-heap аллокация

Что такое managed memory

Managed memory — это native off-heap память, которую Flink аллоцирует сам через Unsafe.allocateMemory() или через JNI (для RocksDB) и сам же делит между задачами. Ключевые отличия от других пулов:

СвойствоJVM HeapDirect MemoryManaged Memory
GC контролируетданет (но Cleaner зовёт free() при GC объекта DirectByteBuffer)нет
Лимит JVM-Xmx-XX:MaxDirectMemorySizeнет JVM лимита, лимит — Flink
АллокаторJVMJVMFlink через Unsafe или JNI
Кто видитGC heap dumpBufferPoolMXBeanвнутренний MemoryManager
Типичные пользователиuserCode, HashMapStateBackendNetty, framework off-heap, network shuffleRocksDB, batch sort, Python UDF

То, что managed memory не контролируется JVM — это и плюс, и минус. Плюс: GC не тратит время на её обход (RocksDB block cache 5 GiB не давит на GC pause). Минус: если ты ошибся в учёте — JVM не подскажет, а просто пакет вылетит за лимит контейнера и получит SIGKILL от OOM killer’а.

MemoryManager: владелец пула

В каждом TaskManager — один MemoryManager (flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java). При старте он аллоцирует весь managed memory одним куском (через UnpooledOffHeapMemory.allocateUnsafe()) и делит на MemorySegment по 32 KiB. Дальше задачи запрашивают сегменты, MemoryManager выдаёт.

Параметр размера:

taskmanager.memory.managed.size: 2gb
# или относительно
taskmanager.memory.managed.fraction: 0.4

fraction имеет приоритет — если задан, size игнорируется. По умолчанию 0.4 — то есть 40% от Total Flink Memory отдаётся под managed.

При старте TM пишет в лог:

Managed Memory: 3.353 gb (3600000000 bytes)

Это количество — фиксировано на всё время жизни TM. Не растёт и не сжимается.

Слоты и распределение managed между задачами

В каждом TM N slots. Managed memory делится поровну между slot’ами. Если 4 slot’а и 2 GiB managed — у каждого slot’а свой пул 512 MiB.

Внутри слота managed память используется всеми задачами одного pipeline’а (то, что попало в этот slot благодаря slot sharing). Если в slot’е работают source + map + window-aggregate + sink, и window-aggregate использует RocksDB, то весь 512 MiB managed slot’а доступен ему.

Один важный nuance: managed memory распределяется по operatoram внутри слота через consumer weights (FLIP-141, в 1.13+). Раньше всё забирал RocksDB, теперь — пропорционально весам:

taskmanager.memory.managed.consumer-weights: OPERATOR:70,STATE_BACKEND:70,PYTHON:30

Здесь:

  • OPERATOR — batch sort/hash join (если в batch-режиме).
  • STATE_BACKEND — RocksDB.
  • PYTHON — буферы для PyFlink IPC.

В streaming-режиме OPERATOR обычно не используется, и весь slot получает RocksDB + Python (если есть). В batch — наоборот, OPERATOR съест почти всё, потому что hash table или sort на 10 GiB данных требует адекватного буфера.

Распределение managed memory в TaskManager

MemoryManager — один пул на TM. Делится поровну между slot'ами. Внутри slot'а — поровну (или по consumer-weights) между потребителями: RocksDB, batch ops, Python.

MemoryManager (TaskManager singleton)2 GiB managed, 65536 MemorySegment по 32 KiBАллокировано через Unsafe.allocateMemory или JNI один раз при старте.
Slot 0 (512 MiB)pipeline: source -> map -> window-agg(RocksDB) -> sink
STATE_BACKEND (RocksDB)block_cache + write_buffer = ~440 MiBС учётом weight 70 — основной потребитель. Block cache ~60%, write buffer ~40%.
PYTHON0 — нет PyFlink в этом slot'е
Slot 1 (512 MiB)другой job, batch sort heavy
OPERATOR (sort/hash)~450 MiB на sort buffer
STATE_BACKENDне используется в batch
Slot 2, Slot 3по 512 MiB каждый, аналогично распределяется по своим pipeline'ам

RocksDB и managed memory: как это связано

Главный потребитель managed memory в streaming-режиме — RocksDB. У RocksDB своя нативная сишная аллокация (через JNI), и по умолчанию она не знает про лимиты Flink. Это самая частая причина OOMKilled.

Чтобы привязать RocksDB к managed pool:

state.backend.rocksdb.memory.managed: true

Когда это включено, Flink при создании RocksDB instance передаёт ему shared WriteBufferManager и LRUCache (RocksDB-классы), которые предварительно зарезервировали managed-сегменты. RocksDB будет аллоцировать буферы и кэш-блоки из этих shared объектов, и итоговый размер всех его внутренних структур уложится в выделенный пул.

Без memory.managed: true каждая state primitive (ValueState, ListState, MapState — в RocksDB это column families) создаёт собственные write buffer и block cache. На job’е с 10 state primitives × 100 KeyGroup × 3 column families = тысячи независимых RocksDB-структур, и память уходит без контроля.

В уроке про rocksdb-tuning (модуль 05, урок 5) мы детально разберём как именно RocksDB делит выделенную ему managed память на block cache, write buffer и index/filter cache.

Batch операторы: sort и hash

В batch-режиме managed memory — это место, где живут sort buffer’ы и hash table’ы.

  • ExternalSorter — алгоритм external merge sort. Использует managed buffer для in-memory partitioning, spill’ит на диск, когда buffer переполнен. Сколько помещается in-memory — столько меньше spill’ов и быстрее сортировка.
  • HashTable — для hash join. Внутренние bucket’ы — managed segments. Если data fit’ятся в memory — это hash join in-memory; если нет — переходит в hybrid hash join со spill.

Цитата из кода (flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ExternalSorter.java):

// Allocate from MemoryManager — pre-sized via consumer weights
List<MemorySegment> memory = memoryManager.allocatePages(parentTask, numPages);

Это нативные сегменты, передаваемые operator’у на использование. Когда operator завершён — segments возвращаются в pool.

Когда оператор написан на Python (через PyFlink), Flink не выполняет Python внутри JVM. Вместо этого запускается отдельный Python worker (через flink-python runtime), и данные передаются туда через shared memory или gRPC. Буферы для этой передачи — managed memory.

Параметр:

taskmanager.memory.managed.consumer-weights: STATE_BACKEND:70,PYTHON:30

30% managed уходит под Python IPC. В mixed Java+Python job это разумный baseline. В чистом PyFlink — поднимают до 60.

Что произойдёт, если managed не хватит

В streaming-режиме с RocksDB — ничего страшного: RocksDB просто будет чаще делать flush и compaction, тратя CPU/IO вместо памяти. Job не упадёт, но throughput упадёт.

В batch — наоборот, недостаток managed = больше spill’ов на диск = медленнее. Если совсем мало managed выделено, может упасть с IOException: Cannot allocate enough memory for sort.

В Python — недостаток буферов = больше round-trip’ов между Java и Python через gRPC, throughput падает.

То есть managed — это в основном performance-параметр, а не correctness. Но переборщить с managed (взяв слишком много из total) опасно — урежется Task Heap или Network, и тогда уже будут падения с другой стороны.

Где смотреть в коде

  • flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java — owner всего пула. Метод allocatePages() — основная точка входа.
  • flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnpooledOffHeapMemory.java — реализация аллокатора через Unsafe.
  • flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBMemoryManager.java — bridge между managed и RocksDB JNI.
  • flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java — где запрашиваются managed segments для Python IPC.

Production-наблюдение

В Web UI вкладка TaskManagers показывает по каждому TM текущее использование managed: Managed Memory used / total. Если используется ~0% на streaming с RocksDB — значит, rocksdb.memory.managed: false (RocksDB живёт где-то в native non-direct, и это пожароопасно).

В Prometheus:

flink_taskmanager_Status_Flink_Memory_Managed_Used
flink_taskmanager_Status_Flink_Memory_Managed_Total

Здоровое отношение Used/Total для RocksDB job — близко к 1.0 (RocksDB сразу занимает почти весь свой блок кэшем). Низкое отношение — повод проверить конфиг.

TIP

Если ты подключаешь BloomBerg-style multi-tenant платформу, где разные job’ы конкурируют за TM ресурсы — consumer-weights единственный честный способ зарезервировать памяти под Python без обиды для RocksDB. Без него мажоритарный потребитель съест всё.

Чтение source

В пакете flink-runtime/src/main/java/org/apache/flink/runtime/memory/ лежит вся механика. Самый ценный класс — MemoryManager, в нём 100 строк JavaDoc, описывающих логику consumer weights и allocation lifecycle. Параллельно почитай RocksDBSharedResources в state backend — там видно, как именно managed segments передаются в RocksDB JNI.

Чек-лист

  • Managed memory — native off-heap, аллоцированная Flink через Unsafe или JNI. Вне JVM heap, вне -XX:MaxDirectMemorySize.
  • Управляется одним MemoryManager на TM. Делится поровну между slot’ами.
  • Внутри slot’а — между потребителями по consumer-weights: OPERATOR (batch sort/hash), STATE_BACKEND (RocksDB), PYTHON (PyFlink IPC).
  • В streaming-режиме главный потребитель — RocksDB. Включи state.backend.rocksdb.memory.managed: true, иначе RocksDB живёт вне Flink контроля.
  • В batch — sort/hash buffers. Чем больше managed — тем меньше spill’ов на диск.
  • Недостаток managed — performance issue, не correctness. Но переборщить тоже плохо — урезается Task Heap / Network.
  • Мониторинг: Status_Flink_Memory_Managed_Used / _Total в Prometheus.
Проверка знанийKnowledge check
Job с RocksDB state backend в k8s pod 16 GiB. TM запущен, в логах ты видишь "Managed Memory: 5.2 gb". В Web UI на TM показано "Managed Memory Used: 0%". Job работает, но регулярно вылетает с OOMKilled через 10-15 минут. Что не так и как исправить?
ОтветAnswer
Used=0% при включённом RocksDB — главный симптом: state backend не использует managed pool, аллоцирует через native JNI calls, которые проходят мимо Flink memory accounting. Эта память уходит в "невидимую" область JVM Overhead и Native non-direct. Когда RocksDB наполняет block cache (пара GB), он выходит за лимит контейнера и OOMKilled. Исправление: добавить в flink-conf.yaml `state.backend.rocksdb.memory.managed: true`. Тогда RocksDB при создании каждого instance получит shared LRUCache и WriteBufferManager, которые предварительно резервируют managed сегменты. Used сразу прыгнет к 90-100%, а суммарное потребление вместится в контейнер. Дополнительно проверь `taskmanager.memory.jvm-overhead.fraction` — для RocksDB подними до 0.15.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. В streaming job с RocksDB и без PyFlink, ты задал taskmanager.memory.managed.consumer-weights: STATE_BACKEND:70,PYTHON:30. Что произойдёт с managed memory?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4