Если ты деплоишь Flink в Kubernetes и видишь OOMKilled на TaskManager pod’е — почти наверняка ты неправильно поделил память. Не «Flink ест больше, чем выдал JVM heap», а где-то рядом — в Managed, в Direct, в Metaspace, в Native non-direct. И единственный способ это понять — выучить полную карту памяти Flink TM. Эта карта сложна не потому, что Flink усложняет жизнь, а потому что JVM в современных контейнерных средах усложнила её сама.
В этом уроке мы пройдём всю иерархию сверху вниз — от Total Process Memory до конкретных байтов в каждом фрагменте. Дальше в модуле — три урока, посвящённых самым неочевидным частям: managed memory, network memory и контейнерный тюнинг.
Memory hierarchy: registers, cache, RAM, disk и реальная latency Управление памятью в Spark: Unified Memory ManagerИерархия: одна большая картина
Flink группирует память TM в несколько концентрических уровней. Все верхние — производные от нижних:
Total Process Memory = что видит OS. Total Flink Memory = что Flink контролирует. JVM Heap + JVM Off-Heap = что выделено JVM. Дальше внутри off-heap — managed, direct, native non-direct.
Это девять разных байтовых границ, и каждая может стать виновником OOMKilled. Хорошая новость: Flink сам считает все эти границы из одного параметра — taskmanager.memory.process.size, и применяет их к JVM флагам и своему внутреннему пулу.
Два рекомендуемых способа задавать память
В Flink есть два mutually exclusive «корневых» параметра:
# Способ 1 — для контейнеров (рекомендуется для k8s, YARN, Docker)
taskmanager.memory.process.size: 8gb
# Способ 2 — для standalone (рекомендуется для bare-metal, on-prem)
taskmanager.memory.flink.size: 7gb
Никогда не задавай оба — Flink выкинет ошибку при старте. И никогда не задавай оба вместе с детальными ограничениями (task.heap.size, managed.size) без понимания формул — Flink не сможет согласовать.
Правило простое:
- В Kubernetes — задавай
taskmanager.memory.process.size, и точно равной container memory limit. Например, если pod resources.limits.memory = 8Gi, тоprocess.size = 8gb. Flink сам разрежет на части, оставив правильный запас под Metaspace и Overhead. - На bare-metal — задавай
taskmanager.memory.flink.size. Тогда Flink сам добавит Metaspace и Overhead сверху.
Что считается из чего: формулы
Когда задано process.size, Flink идёт сверху вниз:
process.size = 8192 MiB
- JVM Metaspace: 256 MiB -> = 7936 MiB
- JVM Overhead: fraction 0.1 от process, не меньше 192, не больше 1 GiB
-> max(192, 819) = 819 MiB
-> = 7117 MiB = Total Flink Memory
Total Flink Memory:
- Network: fraction 0.1, min 64, max 1 GiB
-> max(64, 711) = 711 MiB -> но max=1GiB -> 711 MiB
- Managed: fraction 0.4, min 0
-> 0.4 × 7117 = 2846 MiB
- Framework Heap: default 128 MiB
- Framework Off-Heap: default 128 MiB
- Task Off-Heap: default 0
- Task Heap: остаток = 7117 - 711 - 2846 - 128 - 128 - 0 = 3304 MiB
Эту разбивку Flink при старте печатает в лог:
TaskManager memory configuration:
Process Memory: 8.000 gb (8589934592 bytes)
Total Flink Memory: 6.946 gb (7461959859 bytes)
Total JVM Heap: 3.353 gb (3601307336 bytes)
Total Off-Heap: 3.594 gb (3860652523 bytes)
...
Если ты видишь не то, что ожидал — значит, какой-то fraction/min/max сработал не так, как ты думал. Перечитай лог внимательно.
JVM Heap: то, что управляет GC
Framework Heap (128 MiB) + Task Heap (рассчитан, в примере выше 3.3 GiB) = собственно JVM heap, передаваемый в -Xms / -Xmx. Это on-heap память, контролируемая GC.
- В
Framework Heapживут Flink-внутренние объекты: координаторы, watchdog’и, диспетчер задач. - В
Task Heapживут пользовательские объекты: твои POJO в operator state, ваш userCode, state в HashMapStateBackend (если используешь его, а не RocksDB).
Главный практический совет: если используешь HashMapStateBackend, размер state — это часть Task Heap. Если у тебя 1 GiB state, нужно как минимум 2 GiB Task Heap (state + headroom для GC). При больших state’ах HashMap — это смерть от GC pauses, и тебя выгонят в RocksDB.
JVM Off-Heap: четыре непохожих фрагмента
«Off-heap» — это не один пул, а четыре разных пула с разной природой:
1. Framework Off-Heap (128 MiB)
Что: direct buffers Netty (исходящие), framework direct. Контролируется JVM флагом -XX:MaxDirectMemorySize. Управляется JVM в том смысле, что превышение даст OutOfMemoryError: Direct buffer memory.
2. Task Off-Heap (0 по умолчанию)
Что: место для пользовательского native кода — JNI bindings, native libraries. По умолчанию 0, потому что 99% job’ов native не используют. Если ты вызываешь JNI (например, native parser) — ставь сюда явный лимит.
3. Network (711 MiB в примере)
Что: direct buffers для shuffle (см. урок 3 модуля 03). Тоже считается в -XX:MaxDirectMemorySize (вместе с Framework Off-Heap и Task Off-Heap).
4. Managed (2846 MiB в примере)
Что: native off-heap (НЕ direct buffer JVM!), под управлением Flink. Используется RocksDB как block_cache_size и write_buffer_size, а также batch sort’ом. Это самая большая часть памяти на streaming job’е с RocksDB.
Критично: managed memory — это unsafe.allocateMemory() или native alloc через RocksDB JNI. JVM её не видит в -XX:MaxDirectMemorySize. Это «правда off-heap».
Самый частый источник OOMKilled в Kubernetes — недооценка native non-direct. RocksDB через JNI выделяет native память, которая не учитывается ни в Heap, ни в Direct. Если RocksDB наплюёт на свой block_cache_size лимит (бывает, особенно при компактациях), он съест часть JVM Overhead — и контейнер вылетит. Решение: оставь Overhead 15-20%, не 10%, как по умолчанию.
JVM Metaspace и Overhead: то, что не контролируется Flink
JVM Metaspace (256 MiB) — место под class metadata. Через -XX:MaxMetaspaceSize. Если job динамически генерирует много классов (Calcite SQL queries в реальном времени, Janino) — Metaspace растёт. На больших ad-hoc SQL workload может понадобиться 512 MiB или больше.
JVM Overhead — это резерв для всего остального: thread stacks (по 512 KiB на поток), native структуры GC, JIT compile cache, internal JVM pools. Flink не использует этот пул, но он точно нужен — иначе JVM вылезет за процессный лимит.
Дефолт fraction 0.1 (от process.size) с минимумом 192 MiB и максимумом 1 GiB. Для типичных streaming job’ов хватает. Для крупных контейнеров (32+ GiB) поднимай до 0.15-0.20 — JIT кэш и thread stacks растут нелинейно.
Где смотреть в коде
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java— главная фабрика, которая считает все границы из конфига. МетодprocessSpecFromConfig().flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java— структура, которую Flink передаёт в скрипт запуска TM как JVM флаги.flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java— управление managed memory: аллокация native, раздача по slot’ам.
В исходниках есть JavaDoc-комментарий-«ASCII art» в TaskExecutorMemoryConfiguration, который буквально воспроизводит нашу диаграмму. Прочти его — это лучший reference, который у тебя будет.
Реальный production-пример
Типичный конфиг для streaming job в k8s (Kubernetes resources.limits.memory = 8Gi):
taskmanager.memory.process.size: 8gb
taskmanager.memory.managed.fraction: 0.4
taskmanager.memory.network.fraction: 0.08
taskmanager.memory.network.max: 1gb
taskmanager.memory.jvm-overhead.fraction: 0.15
taskmanager.memory.jvm-metaspace.size: 384mb
state.backend.type: rocksdb
state.backend.rocksdb.memory.managed: true
Что мы сделали:
process.size= pod limit.managed.fraction0.4 — стандарт для RocksDB.network.fraction0.08 — снизили, потому что у нас не shuffle-heavy job.jvm-overhead.fraction0.15 — увеличили с 0.10 для запаса под RocksDB JNI.metaspace.size384 MiB — поднимаем, потому что у нас есть Calcite SQL planning.rocksdb.memory.managed: true— критично: пускаем RocksDB через managed memory pool, чтобы он сидел в наших лимитах, а не лез в JVM Overhead.
В следующем уроке мы детально разберём, что значит «RocksDB сидит в managed memory» и как это работает.
Чтение source
Чтобы пройти всю цепочку, как параметры конвертируются в JVM флаги, начни с TaskExecutorProcessUtils.processSpecFromConfig() и иди по дереву вниз. Метод возвращает TaskExecutorProcessSpec, который потом конвертируется в командную строку для запуска JVM в BashJavaUtils. Все формулы там — pure-Java, без магии.
Чек-лист
- Total Process Memory = container limit. Total Flink Memory = Flink-controlled (без Metaspace и Overhead).
- Total Flink Memory = JVM Heap + Off-Heap (Framework + Task + Network + Managed).
- В контейнерах задавай
taskmanager.memory.process.size. На bare-metal —taskmanager.memory.flink.size. Никогда оба. - Off-Heap делится на четыре пула: Framework Off-Heap (JVM direct), Task Off-Heap (JNI), Network (JVM direct), Managed (native non-direct).
- Managed memory НЕ контролируется
-XX:MaxDirectMemorySize. RocksDB сидит здесь. - Главный риск OOMKilled — native non-direct (RocksDB JNI). Защита:
jvm-overhead.fraction0.15-0.20 иstate.backend.rocksdb.memory.managed: true. - HashMapStateBackend хранит state в Task Heap. RocksDB — в Managed.
- Все формулы в
TaskExecutorProcessUtils.