После четырёх уроков про state backends — финальный, самый практический. Тюнинг RocksDB. У этой части два уровня сложности: на одном — крутить десяток high-level параметров Flink (это работает в 80% случаев). На другом — копаться в самих RocksDB опциях через RocksDBOptionsFactory. Этот урок покрывает оба уровня.
В основе всего — три величины (write amplification, read amplification, space amplification) и треугольник между ними. Любая ручка в RocksDB двигает баланс, а не улучшает всё сразу. Понимая trade-off — ты можешь настраивать осознанно.
Треугольник: write / read / space amplification
Каждая запись в RocksDB amplification:
- Write amplification (WA): сколько раз байт записывается на диск на одну логическую запись. Для leveled compaction = ~10-30x. Для universal — ~5-10x. Это IO traffic диска.
- Read amplification (RA): сколько SST файлов нужно прочитать в худшем случае на один read. Зависит от числа уровней. Для leveled = O(L), для universal = O(N), где N — число SST.
- Space amplification (SA): сколько дополнительного места на диске занято дублями и tombstones. Leveled = 1.1-1.3x, universal = 2-3x.
Правило игры: ты не можешь улучшить все три одновременно. Можно snizit’ WA ценой SA (больше дубликатов на диске -> реже compaction -> меньше IO). Можно snizit’ RA ценой WA (больше compaction -> меньше дубликатов -> но больше IO). И так далее.
Любая ручка двигает баланс между тремя величинами. Понимание твоего workload — какой ты можешь себе позволить тратить, какой нет — основа тюнинга.
Главные high-level параметры Flink
В flink-conf.yaml:
# === MEMORY ===
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.memory.fixed-per-slot: false # использовать pool вместо fixed
state.backend.rocksdb.memory.write-buffer-ratio: 0.5 # 50% memory на write buffers
state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1 # cache priority for index/filter
# === THREADS ===
state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.threads.num-shared: false
# === FILES ===
state.backend.rocksdb.files.open: -1 # unlimited
# === LOGGING ===
state.backend.rocksdb.log.level: WARN_LEVEL
# === COMPACTION ===
state.backend.rocksdb.compaction.style: LEVEL # LEVEL, UNIVERSAL, FIFO
state.backend.rocksdb.compaction.level.use-dynamic-size: true
# === BLOOM ===
state.backend.rocksdb.use-bloom-filter: true
state.backend.rocksdb.bloom-filter.bits-per-key: 10
Разбор по группам.
Memory
memory.managed: true (см. урок 2 модуля 04 и урок 4 модуля 05) — must для production. Без него RocksDB живёт вне Flink budgeting и легко OOMKill’ит контейнер.
memory.write-buffer-ratio: 0.5 — какая доля выделенной RocksDB managed memory идёт на write buffer (MemTable + immutable). Остальное идёт в block cache.
- Write-heavy workload: 0.5-0.7 (больше write buffer, реже flush).
- Read-heavy: 0.2-0.3 (больше block cache, лучше hit rate).
memory.high-prio-pool-ratio: 0.1 — доля block cache, выделенная под index и filter blocks с высоким priority. Эти блоки нужны для каждого read, и их eviction — катастрофа. 0.1 = 10% — обычно достаточно для job с разумным числом keyspace.
Threads
thread.num: 4 — число background threads на flush и compaction. По умолчанию 1 (мало!).
- На современных контейнерах с 4+ CPU — увеличь до 4.
- На high-throughput jobs — до 8.
threads.num-shared: false — separate thread pool на каждый RocksDB instance (= каждый slot). Если true, один thread pool на TM — экономия threads, но возможна contention.
Files
files.open: -1 — unlimited open file handles. Для больших state’ов с тысячами SST это критично. Если ограничить — RocksDB будет постоянно открывать/закрывать файлы (медленно).
Не забудь поднять и system limit:
ulimit -n 1048576
Compaction
compaction.style: LEVEL vs UNIVERSAL:
| Leveled | Universal | |
|---|---|---|
| Write amplification | 10-30x | 5-10x |
| Read amplification | O(L) | O(N) |
| Space amplification | 1.1-1.3x | 2-3x |
| Used by | Дефолт RocksDB, Flink дефолт | Cassandra-style, бывает быстрее на bursty write |
Для большинства Flink job — LEVEL правильный выбор. UNIVERSAL имеет смысл при write-burst характере и наличии запаса диска.
compaction.level.use-dynamic-size: true — adaptive size для каждого level. По умолчанию off в RocksDB. Flink включает по умолчанию. Это позволяет L1, L2 расти пропорционально реальному size, а не по жёстким лимитам.
Bloom filters
use-bloom-filter: true — bloom filters на каждом SST. Это must для read-heavy workload и для пропуска несуществующих ключей.
bloom-filter.bits-per-key: 10 — точность bloom filter. 10 бит на ключ дают false positive rate ~1%. Память: ~1.25 байт на ключ × число ключей в SST. Для большого state — это сотни MiB. Если памяти жмёт — снижай до 6-8 (false positive rate 5-10%).
Когда дефолтных параметров мало: per-CF тюнинг
Через RocksDBOptionsFactory можно тюнить опции конкретного CF. Это особенно полезно, когда у тебя в job сильно разные state primitive — например, один state маленький и горячий (читается на каждом event), другой большой и редко читается.
Пример: оператор с двумя state — counter (ValueState<Long>, 8 байт значение) и events (ListState<Event>, 10 MiB значение).
public class PerStateFactory implements RocksDBOptionsFactory {
@Override
public ColumnFamilyOptions createColumnOptions(
ColumnFamilyOptions currentOptions,
Collection<AutoCloseable> handlesToClose) {
// Different settings per CF would require detecting by CF name
// which isn't directly available here — но Flink через
// ConfigurableRocksDBOptionsFactory даёт extended API
return currentOptions
.setWriteBufferSize(64 * 1024 * 1024)
.setMaxWriteBufferNumber(3)
.setLevel0FileNumCompactionTrigger(4);
}
@Override
public DBOptions createDBOptions(
DBOptions currentOptions,
Collection<AutoCloseable> handlesToClose) {
return currentOptions
.setIncreaseParallelism(8)
.setMaxBackgroundJobs(8);
}
}
Главные RocksDB metrics
Что мониторить через Prometheus или JMX:
flink_taskmanager_job_task_operator_rocksdb_estimate-num-keys
- оценочное число ключей в DB
flink_taskmanager_job_task_operator_rocksdb_total-sst-files-size
- сумма размера всех SST. Это твой "state size".
flink_taskmanager_job_task_operator_rocksdb_estimate-live-data-size
- оценка реального data size (без дубликатов от LSM).
total / live = space amplification.
flink_taskmanager_job_task_operator_rocksdb_compaction-pending
- сколько compaction job ожидают. Если > 0 устойчиво — compaction отстаёт.
flink_taskmanager_job_task_operator_rocksdb_num-immutable-mem-table
- число immutable MemTable ожидающих flush. Если > 0 устойчиво — flush thread отстаёт, увеличь thread.num.
flink_taskmanager_job_task_operator_rocksdb_block-cache-hit
flink_taskmanager_job_task_operator_rocksdb_block-cache-miss
- hit rate = hit / (hit + miss). Healthy ≥ 95% для cached working set.
Эти метрики по умолчанию выключены в Flink из-за overhead’а. Включи:
state.backend.rocksdb.metrics.estimate-num-keys: true
state.backend.rocksdb.metrics.total-sst-files-size: true
state.backend.rocksdb.metrics.compaction-pending: true
state.backend.rocksdb.metrics.num-immutable-mem-table: true
state.backend.rocksdb.metrics.block-cache-hit: true
state.backend.rocksdb.metrics.block-cache-miss: true
И только в проблемные периоды — детальные:
state.backend.rocksdb.metrics.cur-size-active-mem-table: true
state.backend.rocksdb.metrics.compaction-pending: true
state.backend.rocksdb.metrics.background-errors: true
Production playbook: типичные ситуации
Симптом: высокая checkpoint latency
Симптомы:
flink_jobmanager_job_lastCheckpointDuration> 30 сек.- В TM логах frequent:
Slow disk write detected.
Диагноз: либо flush не успевает, либо async snapshot к S3 медленный.
Исправление:
- Увеличь
thread.num: 8для большего IO parallelism. - Включи compression:
state.backend.rocksdb.compression.per.level: SNAPPY_COMPRESSION. Уменьшит SST size -> быстрее upload в S3. - Если S3 медленный — переключи на S3 multipart upload с большим chunk size.
Симптом: high backpressure на оператор с RocksDB
Симптомы:
busyTimeMsPerSecondвысокий на keyed operator.- В метриках RocksDB:
num-immutable-mem-table > 0устойчиво.
Диагноз: write throughput state ≥ flush throughput -> MemTable не успевают сбрасываться -> задача стопится в ожидании.
Исправление:
thread.num: 8— больше flush thread’ов.- Поднять
write-buffer-ratio: 0.7— больший write buffer, реже flush. - Проверить, не использует ли job очень большое значение (10 MiB+ ListState entries) — переделать на MapState или ValueState с компактным форматом.
Симптом: hit rate block cache низкий (ниже 70%)
Диагноз: working set не помещается в cache.
Исправление:
- Снизить
write-buffer-ratio(с 0.5 до 0.3) — больше памяти под cache. - Включить
cache_index_and_filter_blocks: false(DBOption) — pin index в memory. - Если ничего не помогает — увеличить выделенный managed memory (через
taskmanager.memory.managed.fraction).
Симптом: state size растёт без видимого growth работающего keys
Диагноз: либо tombstone hell, либо forgotten state без TTL.
Исправление:
- Включить TTL на state, который должен expire’ить.
- Запустить
compactRange()через State Processor API. - Проверить через
compact-pendingметрику — может, compaction просто отстаёт.
Главная рекомендация по тюнингу RocksDB: не тюнь, пока нет конкретной проблемы. Дефолты Flink — разумны для большинства workload. Проверь через метрики что у тебя bottleneck, потом меняй точечно. Слепое крутение параметров «потому что в блоге так написано» — обычно делает хуже.
Чтение source
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBConfigurableOptions.java— все Flink параметры тюнинга.flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/DefaultConfigurableOptionsFactory.java— как параметры мапятся на RocksDB опции.flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBMemoryConfiguration.java— memory management.- Внешнее: RocksDB Tuning Guide
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide— официальный must-read. - Внешнее: Facebook engineering blog про tuning RocksDB в проде.
Чек-лист
- Тюнинг RocksDB = balance треугольника write / read / space amplification.
- Дефолт Flink — разумный baseline, не тюнь без диагноза.
state.backend.rocksdb.memory.managed: true— must для prod.thread.num: 4-8— больше background thread’ов для flush/compaction.write-buffer-ratio— баланс между write throughput (больше -> реже flush) и read latency (меньше -> больше cache).- Bloom filters:
bits-per-key: 10стандарт, снижение до 6-8 при memory pressure. - Включай RocksDB metrics в Prometheus, но только нужные (overhead!).
- Симптомы и решения: slow checkpoint -> больше threads + compression; backpressure -> thread.num + write-buffer; low cache hit -> больше cache memory; growing state -> TTL + manual compact.