Learning Platform
Глоссарий Troubleshooting
Урок 05.01 · 28 мин
Продвинутый
Memory modelTaskManager memoryJVM HeapOff-HeapManaged memoryDirect memory

Если ты деплоишь Flink в Kubernetes и видишь OOMKilled на TaskManager pod’е — почти наверняка ты неправильно поделил память. Не «Flink ест больше, чем выдал JVM heap», а где-то рядом — в Managed, в Direct, в Metaspace, в Native non-direct. И единственный способ это понять — выучить полную карту памяти Flink TM. Эта карта сложна не потому, что Flink усложняет жизнь, а потому что JVM в современных контейнерных средах усложнила её сама.

В этом уроке мы пройдём всю иерархию сверху вниз — от Total Process Memory до конкретных байтов в каждом фрагменте. Дальше в модуле — три урока, посвящённых самым неочевидным частям: managed memory, network memory и контейнерный тюнинг.

Memory hierarchy: registers, cache, RAM, disk и реальная latency Управление памятью в Spark: Unified Memory Manager

Иерархия: одна большая картина

Flink группирует память TM в несколько концентрических уровней. Все верхние — производные от нижних:

Иерархия памяти TaskManager

Total Process Memory = что видит OS. Total Flink Memory = что Flink контролирует. JVM Heap + JVM Off-Heap = что выделено JVM. Дальше внутри off-heap — managed, direct, native non-direct.

Total Process Memory (taskmanager.memory.process.size = 8 GiB)всё, что видит OS — это сумма всех нижеперечисленныхВ Kubernetes это совпадает с container memory limit. Если выйдет за — OOMKilled.
Total Flink Memory (taskmanager.memory.flink.size)что Flink планирует и контролирует
JVM HeapOn-heap user objects
Framework Heapdefault 128 MiB — Flink internals
Task HeapuserCode + state (только HashMapStateBackend)
Off-Heap (Direct + Native)off-heap, не GC
Framework Off-Heapdefault 128 MiB — Netty, framework direct
Task Off-Heapdefault 0 — пользовательский native (например JNI)
Networkfraction 0.1, для shuffle buffers (см. урок 3 модуля 03)
Managedfraction 0.4, для RocksDB и batch sort
JVM Metaspacedefault 256 MiB — class metadata
JVM Overheadfraction 0.1, не меньше 192 MiBGC structures, thread stacks, native code, JIT compile cache, etc. Не контролируется Flink напрямую.

Это девять разных байтовых границ, и каждая может стать виновником OOMKilled. Хорошая новость: Flink сам считает все эти границы из одного параметра — taskmanager.memory.process.size, и применяет их к JVM флагам и своему внутреннему пулу.

Два рекомендуемых способа задавать память

В Flink есть два mutually exclusive «корневых» параметра:

# Способ 1 — для контейнеров (рекомендуется для k8s, YARN, Docker)
taskmanager.memory.process.size: 8gb

# Способ 2 — для standalone (рекомендуется для bare-metal, on-prem)
taskmanager.memory.flink.size: 7gb

Никогда не задавай оба — Flink выкинет ошибку при старте. И никогда не задавай оба вместе с детальными ограничениями (task.heap.size, managed.size) без понимания формул — Flink не сможет согласовать.

Правило простое:

  • В Kubernetes — задавай taskmanager.memory.process.size, и точно равной container memory limit. Например, если pod resources.limits.memory = 8Gi, то process.size = 8gb. Flink сам разрежет на части, оставив правильный запас под Metaspace и Overhead.
  • На bare-metal — задавай taskmanager.memory.flink.size. Тогда Flink сам добавит Metaspace и Overhead сверху.

Что считается из чего: формулы

Когда задано process.size, Flink идёт сверху вниз:

process.size = 8192 MiB
- JVM Metaspace:  256 MiB     -> = 7936 MiB
- JVM Overhead:   fraction 0.1 от process, не меньше 192, не больше 1 GiB
                  -> max(192, 819) = 819 MiB
                                -> = 7117 MiB = Total Flink Memory

Total Flink Memory:
- Network:        fraction 0.1, min 64, max 1 GiB
                  -> max(64, 711) = 711 MiB -> но max=1GiB -> 711 MiB
- Managed:        fraction 0.4, min 0
                  -> 0.4 × 7117 = 2846 MiB
- Framework Heap: default 128 MiB
- Framework Off-Heap: default 128 MiB
- Task Off-Heap:  default 0
- Task Heap:      остаток = 7117 - 711 - 2846 - 128 - 128 - 0 = 3304 MiB

Эту разбивку Flink при старте печатает в лог:

TaskManager memory configuration:
  Process Memory:     8.000 gb (8589934592 bytes)
  Total Flink Memory: 6.946 gb (7461959859 bytes)
  Total JVM Heap:     3.353 gb (3601307336 bytes)
  Total Off-Heap:     3.594 gb (3860652523 bytes)
  ...

Если ты видишь не то, что ожидал — значит, какой-то fraction/min/max сработал не так, как ты думал. Перечитай лог внимательно.

JVM Heap: то, что управляет GC

Framework Heap (128 MiB) + Task Heap (рассчитан, в примере выше 3.3 GiB) = собственно JVM heap, передаваемый в -Xms / -Xmx. Это on-heap память, контролируемая GC.

  • В Framework Heap живут Flink-внутренние объекты: координаторы, watchdog’и, диспетчер задач.
  • В Task Heap живут пользовательские объекты: твои POJO в operator state, ваш userCode, state в HashMapStateBackend (если используешь его, а не RocksDB).

Главный практический совет: если используешь HashMapStateBackend, размер state — это часть Task Heap. Если у тебя 1 GiB state, нужно как минимум 2 GiB Task Heap (state + headroom для GC). При больших state’ах HashMap — это смерть от GC pauses, и тебя выгонят в RocksDB.

JVM Off-Heap: четыре непохожих фрагмента

«Off-heap» — это не один пул, а четыре разных пула с разной природой:

1. Framework Off-Heap (128 MiB)

Что: direct buffers Netty (исходящие), framework direct. Контролируется JVM флагом -XX:MaxDirectMemorySize. Управляется JVM в том смысле, что превышение даст OutOfMemoryError: Direct buffer memory.

2. Task Off-Heap (0 по умолчанию)

Что: место для пользовательского native кода — JNI bindings, native libraries. По умолчанию 0, потому что 99% job’ов native не используют. Если ты вызываешь JNI (например, native parser) — ставь сюда явный лимит.

3. Network (711 MiB в примере)

Что: direct buffers для shuffle (см. урок 3 модуля 03). Тоже считается в -XX:MaxDirectMemorySize (вместе с Framework Off-Heap и Task Off-Heap).

4. Managed (2846 MiB в примере)

Что: native off-heap (НЕ direct buffer JVM!), под управлением Flink. Используется RocksDB как block_cache_size и write_buffer_size, а также batch sort’ом. Это самая большая часть памяти на streaming job’е с RocksDB.

Критично: managed memory — это unsafe.allocateMemory() или native alloc через RocksDB JNI. JVM её не видит в -XX:MaxDirectMemorySize. Это «правда off-heap».

WARNING

Самый частый источник OOMKilled в Kubernetes — недооценка native non-direct. RocksDB через JNI выделяет native память, которая не учитывается ни в Heap, ни в Direct. Если RocksDB наплюёт на свой block_cache_size лимит (бывает, особенно при компактациях), он съест часть JVM Overhead — и контейнер вылетит. Решение: оставь Overhead 15-20%, не 10%, как по умолчанию.

JVM Metaspace (256 MiB) — место под class metadata. Через -XX:MaxMetaspaceSize. Если job динамически генерирует много классов (Calcite SQL queries в реальном времени, Janino) — Metaspace растёт. На больших ad-hoc SQL workload может понадобиться 512 MiB или больше.

JVM Overhead — это резерв для всего остального: thread stacks (по 512 KiB на поток), native структуры GC, JIT compile cache, internal JVM pools. Flink не использует этот пул, но он точно нужен — иначе JVM вылезет за процессный лимит.

Дефолт fraction 0.1 (от process.size) с минимумом 192 MiB и максимумом 1 GiB. Для типичных streaming job’ов хватает. Для крупных контейнеров (32+ GiB) поднимай до 0.15-0.20 — JIT кэш и thread stacks растут нелинейно.

Где смотреть в коде

  • flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java — главная фабрика, которая считает все границы из конфига. Метод processSpecFromConfig().
  • flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java — структура, которую Flink передаёт в скрипт запуска TM как JVM флаги.
  • flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java — управление managed memory: аллокация native, раздача по slot’ам.

В исходниках есть JavaDoc-комментарий-«ASCII art» в TaskExecutorMemoryConfiguration, который буквально воспроизводит нашу диаграмму. Прочти его — это лучший reference, который у тебя будет.

Реальный production-пример

Типичный конфиг для streaming job в k8s (Kubernetes resources.limits.memory = 8Gi):

taskmanager.memory.process.size: 8gb
taskmanager.memory.managed.fraction: 0.4
taskmanager.memory.network.fraction: 0.08
taskmanager.memory.network.max: 1gb
taskmanager.memory.jvm-overhead.fraction: 0.15
taskmanager.memory.jvm-metaspace.size: 384mb
state.backend.type: rocksdb
state.backend.rocksdb.memory.managed: true

Что мы сделали:

  1. process.size = pod limit.
  2. managed.fraction 0.4 — стандарт для RocksDB.
  3. network.fraction 0.08 — снизили, потому что у нас не shuffle-heavy job.
  4. jvm-overhead.fraction 0.15 — увеличили с 0.10 для запаса под RocksDB JNI.
  5. metaspace.size 384 MiB — поднимаем, потому что у нас есть Calcite SQL planning.
  6. rocksdb.memory.managed: true — критично: пускаем RocksDB через managed memory pool, чтобы он сидел в наших лимитах, а не лез в JVM Overhead.

В следующем уроке мы детально разберём, что значит «RocksDB сидит в managed memory» и как это работает.

Чтение source

Чтобы пройти всю цепочку, как параметры конвертируются в JVM флаги, начни с TaskExecutorProcessUtils.processSpecFromConfig() и иди по дереву вниз. Метод возвращает TaskExecutorProcessSpec, который потом конвертируется в командную строку для запуска JVM в BashJavaUtils. Все формулы там — pure-Java, без магии.

Чек-лист

  • Total Process Memory = container limit. Total Flink Memory = Flink-controlled (без Metaspace и Overhead).
  • Total Flink Memory = JVM Heap + Off-Heap (Framework + Task + Network + Managed).
  • В контейнерах задавай taskmanager.memory.process.size. На bare-metal — taskmanager.memory.flink.size. Никогда оба.
  • Off-Heap делится на четыре пула: Framework Off-Heap (JVM direct), Task Off-Heap (JNI), Network (JVM direct), Managed (native non-direct).
  • Managed memory НЕ контролируется -XX:MaxDirectMemorySize. RocksDB сидит здесь.
  • Главный риск OOMKilled — native non-direct (RocksDB JNI). Защита: jvm-overhead.fraction 0.15-0.20 и state.backend.rocksdb.memory.managed: true.
  • HashMapStateBackend хранит state в Task Heap. RocksDB — в Managed.
  • Все формулы в TaskExecutorProcessUtils.
Проверка знанийKnowledge check
У тебя в k8s pod с resources.limits.memory = 16Gi. Job использует RocksDB для state. Какие 4-5 параметров в flink-conf.yaml ты бы установил для здорового сайзинга, и почему?
ОтветAnswer
Базовый набор: (1) taskmanager.memory.process.size: 16gb — должно совпадать с container limit. (2) taskmanager.memory.managed.fraction: 0.4 — стандартный для RocksDB, даст ~5-6 GiB на state backend. (3) taskmanager.memory.jvm-overhead.fraction: 0.15 — увеличить с дефолта 0.10, чтобы оставить запас под RocksDB JNI native allocation, которая не учтена в Direct и Managed. (4) state.backend.rocksdb.memory.managed: true — критично, без этого RocksDB будет аллоцировать память вне Flink контроля и легко уйдёт в OOMKilled. (5) Опционально: taskmanager.memory.jvm-metaspace.size: 384mb если используешь SQL планнер (Calcite много генерит классов). Что НЕ делать: не задавай одновременно process.size и flink.size; не оставляй jvm-overhead.fraction по умолчанию 0.10 на RocksDB workload; не отключай rocksdb.memory.managed.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какие из следующих утверждений про Total Process Memory и Total Flink Memory верны?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4