В первых трёх уроках мы говорили про то, как Flink делит память внутри JVM-процесса. Сейчас — про внешнюю границу: контейнер. В k8s container memory limit — это не рекомендация и не soft cap, это kill signal.
Flink Kubernetes Operator: ресурсы и конфигурация pod-овSwap, overcommit и OOM killer в Linux Когда cgroup’а превышена, OOM killer срабатывает за миллисекунды, без возможности для процесса вызвать shutdown hook или сохранить checkpoint. Это особенно болезненно для Flink, где такая смерть означает 1-5 минут на recovery и rebuild RocksDB state.
В этом уроке — production playbook для k8s sizing: какие параметры выставлять, какие защитные slack’и оставлять, как мониторить, и как диагностировать OOMKilled постфактум.
Контейнерный SIGKILL: что именно происходит
Когда RSS контейнера превышает resources.limits.memory, происходит следующее:
- Linux kernel определяет нарушение через cgroup memory controller (
memory.usage_in_bytes > memory.limit_in_bytes). - Срабатывает OOM killer внутри cgroup’ы. Он выбирает «жертву» по
oom_score_adjиoom_score. - В контейнере с одним основным процессом (JVM) — жертвой будет именно он.
- Процесс получает
SIGKILL. Не SIGTERM, а сразу SIGKILL. Это не shutdown hook, не finally, не try/catch. - Контейнер останавливается, kubelet видит
ExitCode 137(128 + 9 = SIGKILL). - Pod в зависимости от restartPolicy перезапускается. Для Flink JobManager — это потеря lease и failover.
- Для TaskManager — другие TM детектят отсутствие через heartbeat, JM начинает recovery от последнего checkpoint’а.
Финальный pod status показывает:
state:
terminated:
reason: OOMKilled
exitCode: 137
Это самый распространённый аварийный сценарий Flink в k8s. По нашему опыту — ~70% всех неожиданных рестартов TM на нагруженных платформах.
Почему OOMKilled происходит даже при правильном process.size
Главный источник проблем — то, что JVM не контролирует все аллокации в своём процессе. Кроме heap и direct memory есть native non-direct — память, выделенная через malloc() нативным кодом, JNI, или системными библиотеками.
Что входит в native non-direct:
- RocksDB JNI: block cache, write buffer, index/filter blocks, compaction tmp buffers.
- Kryo native pools: сериализатор внутри использует native allocations для off-heap pools.
- JNI libraries: любая user’ская native dependency (например, SnappyCompressor, native cryptography).
- JIT compiler caches: код, сгенерированный JIT, лежит в Code Cache. По умолчанию до 240 MiB, может вырасти.
- GC structures: card table, remembered sets, G1 region metadata.
- Thread stacks: каждый Java thread = 512 KiB по умолчанию (
-Xss). При сотне потоков — 50 MiB. - NIO внутренние буферы: temp buffers, которые JVM использует для I/O.
Все эти аллокации не учитываются в -Xmx и -XX:MaxDirectMemorySize. JVM их не видит, не управляет, не ограничивает.
Flink покрывает их через JVM Overhead буфер:
taskmanager.memory.jvm-overhead.fraction: 0.1
taskmanager.memory.jvm-overhead.min: 192mb
taskmanager.memory.jvm-overhead.max: 1gb
Это резерв — не передаётся в JVM флаги, не используется Flink’ом, просто «не отдаётся» другим пулам.
Проблема: дефолтные 10% — это очень тонко для production. На RocksDB-heavy job native non-direct легко вырастает до 15-25%. Если ты оставил дефолт 10% — рано или поздно вылетишь за лимит.
Production-проверенный baseline
После наблюдения за несколькими сотнями TM в проде, мы пришли к такому baseline для RocksDB streaming jobs в k8s:
# === KEY PARAMETERS ===
taskmanager.memory.process.size: ${CONTAINER_MEMORY_LIMIT} # точное совпадение
# === CONSERVATIVE OVERHEAD ===
taskmanager.memory.jvm-overhead.fraction: 0.15
taskmanager.memory.jvm-overhead.max: 2gb
taskmanager.memory.jvm-metaspace.size: 384mb # для Calcite/Janino
# === BUDGETS ===
taskmanager.memory.managed.fraction: 0.4
taskmanager.memory.network.fraction: 0.08
taskmanager.memory.network.max: 1gb
# === ROCKSDB ===
state.backend.type: rocksdb
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.memory.write-buffer-ratio: 0.5
state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1
# === GC ===
env.java.opts.taskmanager: >-
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:+ParallelRefProcEnabled
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/flink/heap-dumps/
Разбор изменений против дефолта:
jvm-overhead.fraction: 0.15— основная защита. На 8 GiB pod это 1.2 GiB слака. Достаточно для типичных native non-direct превышений.jvm-overhead.max: 2gb— снимает потолок 1 GiB для больших pod’ов (16+ GiB).metaspace.size: 384mb— поднимаем с 256, потому что Calcite/Janino много генерят классов.rocksdb.memory.managed: true— критично (см. урок 2 модуля 04).write-buffer-ratio: 0.5— внутри managed 50% уйдёт на active write buffer (а не block cache). На write-heavy workload.+ExitOnOutOfMemoryError— при любом JVM OOM (heap, direct) JVM сразу exit’ится. Без этого процесс может зависнуть в полу-мёртвом состоянии.
Как мониторить и предсказывать OOMKilled
Главные метрики, по которым видно приближение к лимиту:
# Container memory usage / limit
container_memory_working_set_bytes{pod="flink-taskmanager-..."}
/ container_spec_memory_limit_bytes
# JVM heap usage
flink_taskmanager_Status_JVM_Memory_Heap_Used
/ flink_taskmanager_Status_JVM_Memory_Heap_Max
# Direct memory (Network + Framework off-heap + Task off-heap)
flink_taskmanager_Status_JVM_Memory_Direct_Count
flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed
# RocksDB managed
flink_taskmanager_Status_Flink_Memory_Managed_Used
/ flink_taskmanager_Status_Flink_Memory_Managed_Total
Alert правила, которые мы используем:
- alert: FlinkTMMemoryNearLimit
expr: |
container_memory_working_set_bytes / container_spec_memory_limit_bytes > 0.92
for: 5m
annotations:
summary: "Flink TM {{ $labels.pod }} at >92% container memory for 5 min"
- alert: FlinkTMGCPausesHigh
expr: |
rate(flink_taskmanager_Status_JVM_GarbageCollector_G1_Old_Generation_Time[5m]) > 0.1
for: 10m
annotations:
summary: "Flink TM {{ $labels.pod }} spending >10% time in Old GC"
Первый — самый важный. Если 92% устойчиво — следующий пик RocksDB compaction уведёт тебя в OOMKilled.
Диагностика после OOMKilled: heap dumps и kernel logs
Если TM упал с OOMKilled, посмертный анализ:
kubectl describe pod ...— увидишь reason=OOMKilled, exitCode=137.- Kernel logs на узле:
dmesg | grep -i oom. Покажет, какой именно процесс убит и текущий memory map cgroup’ы:[12345.678] Memory cgroup out of memory: Killed process 4567 (java) total-vm:8845632kB, anon-rss:6291456kB, file-rss:32768kB, shmem-rss:0kB - Heap dump — если ты включил
-XX:+HeapDumpOnOutOfMemoryError, в/var/log/flink/heap-dumps/появятся.hprofфайлы (но только если упало с JVM OOM, не с cgroup OOMKilled!). - Container memory logs в Prometheus за период перед смертью — посмотри график
container_memory_working_set_bytes. Если был резкий пик — это либо RocksDB compaction, либо GC.
Heap dump vs container OOMKilled
Важный момент: +HeapDumpOnOutOfMemoryError срабатывает только при JVM-level OOM (когда JVM сама детектит, что не хватает heap или direct). Если ты вышел за cgroup limit — kernel OOM killer пришёл раньше, SIGKILL получен мгновенно, JVM не успела ничего записать.
Для post-mortem container OOM нужны другие инструменты:
- JFR (Java Flight Recorder) в continuous режиме:
-XX:StartFlightRecording=name=cont,settings=profile,maxsize=512m,filename=/var/log/flink/jfr.jfr,dumponexit=true. Тогда даже после SIGKILL последний record остаётся на диске. - NMT (Native Memory Tracking):
-XX:NativeMemoryTracking=detail. В логи можно периодически дампитьjcmd <pid> VM.native_memory baseline. Видно, сколько каждая subsystem потребляет. - Memory pressure sidecar: отдельный pod, который раз в 5 сек снимает
cat /proc/<pid>/statusи пишет в лог.
Pod sizing: с чего начинать
Если у тебя нет исторических данных, baseline для streaming job с RocksDB:
| Job complexity | Pod size | TM slots | State size cap (per TM) |
|---|---|---|---|
| Lightweight (1-5 operators, P=4) | 4 GiB | 2 | 500 MiB |
| Medium (5-15 operators, P=16) | 8 GiB | 4 | 2 GiB |
| Heavy (15+ operators, P=64+) | 16 GiB | 4 | 5 GiB |
| Very heavy (large state) | 32 GiB | 4 | 10 GiB |
Правило: state size на slot не больше 30% от total managed. То есть на 8 GiB pod (~3.2 GiB managed) — не больше ~800 MiB rocksdb state на slot. Превышение — RocksDB постоянно будет flush’ить и compaction’ить, throughput упадёт.
Если ты видишь, что в реальном проде state на TM растёт без видимого ограничения — у тебя где-то протекает state в keyBy. Например, временные ключи (timestamp как часть ключа) или unbounded broadcast state. Это причина 80% всех «случайных» OOMKilled — не сайзинг, а утечка state.
Tips & tricks из реальной эксплуатации
1. Различай initial OOMKilled и постепенный
Сразу после старта job (первые 2-3 минуты): значит, ты неправильно посчитал budget’ы или Kafka source держит слишком большой prefetch. Решение — пересчитать конфиг или увеличить pod.
Через час-два после старта: значит, утекают какие-то ресурсы — обычно RocksDB compaction накапливается. Решение — rocksdb.compaction.style (level vs universal), rocksdb.block.cache-size.
Резкие spike’и раз в N часов: window aggregations / checkpoints. Решение — state.backend.rocksdb.thread.num, чтобы параллелить compaction.
2. Используй init container для preloading
Когда job запускается, RocksDB сначала пуст, потом начинает наполняться state’ом. В этот момент memory низкая, но через 10-20 минут она вырастает до steady-state. Если ты сайзишь под steady-state — первые 10 минут будешь сильно недозагружен. Если под initial — упадёшь через час.
Решение: init container, который проходит read-warmup на checkpoint’е перед стартом основного TM. RocksDB сразу видит, сколько ему нужно block cache.
3. Multi-tenant: ResourceQuota и LimitRange
В shared k8s намecпейсе используй:
apiVersion: v1
kind: ResourceQuota
spec:
hard:
requests.memory: 100Gi
limits.memory: 100Gi
И never используй requests != limits для Flink TM. Burstable QoS class даёт высокий риск OOMKilled при memory pressure на узле, потому что k8s eviction начнётся с burstable.
Чтение source
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java— все k8s-specific параметры Flink.flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java— формулы calculate.- Документация:
docs/content/docs/deployment/memory/mem_setup_tm.md— официальный playbook от Flink team. - Реальные incident reports: ticket’ы FLINK-25538, FLINK-28074 — обсуждение OOMKilled root cause analysis.
Чек-лист
- В k8s
resources.limits.memory— абсолютный потолок. Превышение = SIGKILL без shutdown hook. - Главный источник OOMKilled — native non-direct (RocksDB JNI, Kryo, JVM internal), не учтённая в Heap и Direct.
- Защита:
jvm-overhead.fraction0.15 (не дефолт 0.10),jvm-overhead.max2gb для больших pod. - Обязательно
state.backend.rocksdb.memory.managed: trueдля streaming с RocksDB. +ExitOnOutOfMemoryError— при любом JVM OOM сразу exit, не пытаться продолжить.- Мониторинг:
container_memory_working_set_bytes / container_spec_memory_limit_bytes > 0.92— alert. - Post-mortem: heap dumps только при JVM OOM, не при cgroup OOMKilled. Для cgroup — JFR continuous и NMT.
- Sizing rule: state на slot не более 30% от managed. Иначе постоянные compaction’ы.
- Always используй
requests == limitsдля TM. Burstable QoS = риск eviction под нагрузкой.