В предыдущем уроке мы говорили про RocksDB как про единый LSM tree. На самом деле RocksDB поддерживает column families — нечто вроде «таблиц» внутри одной DB, у каждой свой набор SST, своя MemTable, свои настройки. Flink использует это для физического разделения state primitives внутри одного slot’а.
State stores в Kafka StreamsЗнание column families объясняет загадочные вещи в RocksDB-based job: почему checkpoint содержит сотни SST файлов в разных папках, почему block cache shared между state primitives но WriteBufferManager — раздельный, и как считать общее число open file handles.
Что такое column family
В RocksDB column family (CF) — это логически изолированное keyspace внутри одной DB. У каждого CF свой:
- MemTable.
- WAL (но Flink WAL не использует — см. предыдущий урок).
- Свой набор SST файлов.
- Свои опции (write buffer size, compaction style, block size, etc).
- Своя bloom filter конфигурация.
При этом все CF одной DB шарят:
- Одну директорию на диске.
- Block cache (если не настроено иначе).
- Background threads compaction/flush.
Один DB = один путь типа /path/to/rocksdb-state/. Один CF = подкаталог + набор файлов в нём, либо общий каталог + префикс в именах файлов (зависит от версии).
Зачем Flink использует CF для state primitives
В одном slot’е может быть несколько операторов, у каждого может быть несколько state primitive. Например, оператор-аггрегатор может иметь:
ValueState<Long>counter.ListState<Event>window contents.MapState<String, Stats>per-attribute stats.
Если хранить всё в одном keyspace, ключи нужно префиксовать ("counter:" + key, "window:" + key + ":" + namespace). Это работает, но добавляет 10-20 байт overhead на каждый ключ.
Гораздо чище — выделить каждому state primitive свой CF:
- CF
counter. - CF
window. - CF
attributes.
Тогда префикс не нужен, и опции каждого CF можно тюнить отдельно. Например, ValueState с маленькими значениями — увеличиваем block cache; ListState с большими списками — уменьшаем block size, чтобы не загружать лишнее.
В Flink каждый getOrCreateKeyedState(descriptor) создаёт новый ColumnFamilyHandle, если такого ещё не было:
// flink-state-backends/flink-statebackend-rocksdb/.../RocksDBOperationUtils.java
public static ColumnFamilyHandle createColumnFamily(
ColumnFamilyDescriptor columnFamilyDescriptor,
RocksDB db) throws RocksDBException {
return db.createColumnFamily(columnFamilyDescriptor);
}
Физическая структура: что на диске
Когда у тебя slot с 5 state primitive (по 1 CF на каждый), на диске будет что-то вроде:
/var/flink/rocksdb-state/job_abc123/slot_2/
├── OPTIONS-000005
├── CURRENT
├── MANIFEST-000004
├── 000007.log # WAL (если включён, у Flink нет)
├── 000010.sst # CF "counter", level 0
├── 000011.sst # CF "window", level 0
├── 000012.sst # CF "attributes", level 0
├── 000013.sst # CF "counter", level 1
├── 000014.sst # CF "window", level 1
└── ...
То есть SST’ы всех CF лежат вперемешку в одной директории, но каждый принадлежит конкретному CF (это записано в metadata).
Имя файла — числовое (auto-incrementing). По имени нельзя сказать, какому CF он принадлежит. Эту информацию хранит MANIFEST файл.
При checkpoint Flink сериализует список SST файлов, относящихся к каждому CF, и копирует их в S3. Это происходит на уровне DB instance, не на уровне CF — поэтому все CF одного slot’а checkpoint’ятся атомарно.
Shared block cache между CF
По умолчанию block cache общий на DB: один LRU cache на все CF этого RocksDB instance. Это важно для эффективности — если ты выделил 1 GiB block cache, он распределяется автоматически между CF: горячий CF (часто читаемый) занимает больше места, холодный — меньше.
Это автоматический tuning через LRU eviction. Если ты не делал ничего специально, RocksDB сам разрулит.
Альтернатива (редко используется): per-CF block cache. Это даёт строгую изоляцию по памяти — каждый CF гарантированно получает свой кусок. Но теряется автотюнинг, и в среднем по системе менее эффективно.
В Flink с включённым state.backend.rocksdb.memory.managed: true (см. урок 2 модуля 04) используется shared cache on slot level: один cache на весь slot, общий для всех CF этого slot’а, и его размер регулируется managed memory pool. Это компромисс — не один global cache на весь TM, но и не per-CF.
Несколько state primitive внутри slot = несколько CF в одном RocksDB instance. Block cache — один на DB, делится автоматически по LRU.
WriteBufferManager: чтобы один CF не съел всё
Если у тебя 5 CF, у каждого write_buffer_size = 64 MiB и max_write_buffer_number = 2 — то теоретический максимум write buffer’ов = 5 × 64 × 2 = 640 MiB. Если managed memory выделил 440 MiB — мы переберём.
WriteBufferManager решает эту проблему: он отслеживает сумму размеров всех write buffer’ов в DB и форсит flush, если сумма превысила лимит. То есть жадный CF не сможет один съесть всю память — RocksDB заставит его flush’нуться.
В Flink это автоматически работает, если включён state.backend.rocksdb.memory.managed: true. WriteBufferManager создаётся с предварительно выделенными managed segments.
Опции на уровне CF: настройка под use case
RocksDB позволяет задавать опции на уровне DB (применятся ко всем CF по умолчанию) или на уровне отдельного CF (override’ит DB опции).
В Flink это делается через RocksDBOptionsFactory (flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBOptionsFactory.java):
public class MyOptionsFactory implements RocksDBOptionsFactory {
@Override
public ColumnFamilyOptions createColumnOptions(
ColumnFamilyOptions currentOptions,
Collection<AutoCloseable> handlesToClose) {
return currentOptions
.setWriteBufferSize(128 * 1024 * 1024) // 128 MiB instead of 64
.setMaxWriteBufferNumber(4)
.setLevel0FileNumCompactionTrigger(8);
}
}
Зарегистрировать в конфиге:
state.backend.rocksdb.options-factory: com.example.MyOptionsFactory
Но в большинстве случаев Flink задаёт опции через свои high-level параметры (см. урок 5).
State primitive -> CF mapping в коде
Каждый раз, когда userCode зовёт getRuntimeContext().getState(descriptor), backend смотрит, есть ли уже CF с таким именем (descriptor.getName()). Если есть — возвращает существующий handle. Если нет — создаёт новый.
// Псевдокод из RocksDBKeyedStateBackend
public <S extends State> S getOrCreateKeyedState(...) {
String stateName = descriptor.getName();
ColumnFamilyHandle handle = columnFamilies.get(stateName);
if (handle == null) {
handle = createColumnFamily(stateName, ...);
columnFamilies.put(stateName, handle);
}
return wrapInStateProxy(handle);
}
Это значит — изменение имени state primitive ломает совместимость. Если ты в новой версии job переименовал "counter" в "event-counter", RocksDB при restore не найдёт CF "event-counter" и создаст пустой. Старый CF "counter" останется в checkpoint, но не будет использоваться (и со временем будет вычищен State Processor API).
Переименование state primitive — breaking change. Используй StateMigration через savepoint и State Processor API, чтобы перенести данные. Или храни state name как константу с явным versioning’ом.
Сколько CF — много?
У RocksDB нет жёсткого лимита на число CF в DB, но есть practical limit:
- Каждый CF держит open file handles (для активных SST’ов).
- Каждый CF имеет свой MemTable — это память.
- На больших number CF (1000+) compaction strategy усложняется.
В Flink job типично 5-20 state primitives на slot. Это далеко от лимитов RocksDB. Однако если ты замечаешь exceptional growth Too many open files ошибки — проверь, не создаёшь ли ты dynamically state primitive (это antipattern).
Чтение source
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java— управление CF handles, методgetOrCreateKeyedState().flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBStateBackend.java— фабрика.flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBSharedResources.java— shared cache + WriteBufferManager для одного slot.- RocksDB documentation:
https://github.com/facebook/rocksdb/wiki/Column-Families— официальная страница про CF.
Чек-лист
- Column Family — изолированный keyspace внутри одной RocksDB DB. Свои MemTable, SST, опции, compaction.
- Flink использует CF для каждого state primitive (ValueState, ListState, MapState — каждый отдельный CF).
- На диске: все SST файлы всех CF в одной директории, mapping в MANIFEST.
- Shared block cache: один LRU на весь DB instance (= один slot). Распределяется автоматически по LRU eviction.
- WriteBufferManager: limits sum write buffer’ов всех CF, предотвращает жадность одного CF.
- Опции на уровне CF — через
RocksDBOptionsFactory. На уровне DB — через high-level Flink configs. - Имя state primitive = имя CF. Переименование = breaking change на restore.
- Практический лимит: 5-20 CF на slot — норма; 1000+ — проблема.