Learning Platform
Глоссарий Troubleshooting
Урок 06.04 · 22 мин
Продвинутый
RocksDBColumn FamiliesColumnFamilyHandleBlock cacheState isolation

В предыдущем уроке мы говорили про RocksDB как про единый LSM tree. На самом деле RocksDB поддерживает column families — нечто вроде «таблиц» внутри одной DB, у каждой свой набор SST, своя MemTable, свои настройки. Flink использует это для физического разделения state primitives внутри одного slot’а.

State stores в Kafka Streams

Знание column families объясняет загадочные вещи в RocksDB-based job: почему checkpoint содержит сотни SST файлов в разных папках, почему block cache shared между state primitives но WriteBufferManager — раздельный, и как считать общее число open file handles.

Что такое column family

В RocksDB column family (CF) — это логически изолированное keyspace внутри одной DB. У каждого CF свой:

  • MemTable.
  • WAL (но Flink WAL не использует — см. предыдущий урок).
  • Свой набор SST файлов.
  • Свои опции (write buffer size, compaction style, block size, etc).
  • Своя bloom filter конфигурация.

При этом все CF одной DB шарят:

  • Одну директорию на диске.
  • Block cache (если не настроено иначе).
  • Background threads compaction/flush.

Один DB = один путь типа /path/to/rocksdb-state/. Один CF = подкаталог + набор файлов в нём, либо общий каталог + префикс в именах файлов (зависит от версии).

В одном slot’е может быть несколько операторов, у каждого может быть несколько state primitive. Например, оператор-аггрегатор может иметь:

  • ValueState<Long> counter.
  • ListState<Event> window contents.
  • MapState<String, Stats> per-attribute stats.

Если хранить всё в одном keyspace, ключи нужно префиксовать ("counter:" + key, "window:" + key + ":" + namespace). Это работает, но добавляет 10-20 байт overhead на каждый ключ.

Гораздо чище — выделить каждому state primitive свой CF:

  • CF counter.
  • CF window.
  • CF attributes.

Тогда префикс не нужен, и опции каждого CF можно тюнить отдельно. Например, ValueState с маленькими значениями — увеличиваем block cache; ListState с большими списками — уменьшаем block size, чтобы не загружать лишнее.

В Flink каждый getOrCreateKeyedState(descriptor) создаёт новый ColumnFamilyHandle, если такого ещё не было:

// flink-state-backends/flink-statebackend-rocksdb/.../RocksDBOperationUtils.java
public static ColumnFamilyHandle createColumnFamily(
        ColumnFamilyDescriptor columnFamilyDescriptor,
        RocksDB db) throws RocksDBException {
    return db.createColumnFamily(columnFamilyDescriptor);
}

Физическая структура: что на диске

Когда у тебя slot с 5 state primitive (по 1 CF на каждый), на диске будет что-то вроде:

/var/flink/rocksdb-state/job_abc123/slot_2/
├── OPTIONS-000005
├── CURRENT
├── MANIFEST-000004
├── 000007.log               # WAL (если включён, у Flink нет)
├── 000010.sst               # CF "counter", level 0
├── 000011.sst               # CF "window", level 0
├── 000012.sst               # CF "attributes", level 0
├── 000013.sst               # CF "counter", level 1
├── 000014.sst               # CF "window", level 1
└── ...

То есть SST’ы всех CF лежат вперемешку в одной директории, но каждый принадлежит конкретному CF (это записано в metadata).

Имя файла — числовое (auto-incrementing). По имени нельзя сказать, какому CF он принадлежит. Эту информацию хранит MANIFEST файл.

При checkpoint Flink сериализует список SST файлов, относящихся к каждому CF, и копирует их в S3. Это происходит на уровне DB instance, не на уровне CF — поэтому все CF одного slot’а checkpoint’ятся атомарно.

Shared block cache между CF

По умолчанию block cache общий на DB: один LRU cache на все CF этого RocksDB instance. Это важно для эффективности — если ты выделил 1 GiB block cache, он распределяется автоматически между CF: горячий CF (часто читаемый) занимает больше места, холодный — меньше.

Это автоматический tuning через LRU eviction. Если ты не делал ничего специально, RocksDB сам разрулит.

Альтернатива (редко используется): per-CF block cache. Это даёт строгую изоляцию по памяти — каждый CF гарантированно получает свой кусок. Но теряется автотюнинг, и в среднем по системе менее эффективно.

В Flink с включённым state.backend.rocksdb.memory.managed: true (см. урок 2 модуля 04) используется shared cache on slot level: один cache на весь slot, общий для всех CF этого slot’а, и его размер регулируется managed memory pool. Это компромисс — не один global cache на весь TM, но и не per-CF.

Shared block cache внутри slot

Несколько state primitive внутри slot = несколько CF в одном RocksDB instance. Block cache — один на DB, делится автоматически по LRU.

Slot 2 of TM-3RocksDB instance: ~440 MiB managed memory
CF counterMemTable + SST + bloom
CF windowMemTable + SST + bloom
CF attributesMemTable + SST + bloom
Shared LRU Block Cache~440 × 0.5 = 220 MiB (block-cache portion)Распределяется автоматически: горячий CF получит больше, холодный — меньше. Eviction по LRU.
WriteBufferManager~440 × 0.5 = 220 MiB на active + immutable MemTable всех CFLimits sum of all CF write buffers — ограничение чтобы один CF не выел всю память на write.

WriteBufferManager: чтобы один CF не съел всё

Если у тебя 5 CF, у каждого write_buffer_size = 64 MiB и max_write_buffer_number = 2 — то теоретический максимум write buffer’ов = 5 × 64 × 2 = 640 MiB. Если managed memory выделил 440 MiB — мы переберём.

WriteBufferManager решает эту проблему: он отслеживает сумму размеров всех write buffer’ов в DB и форсит flush, если сумма превысила лимит. То есть жадный CF не сможет один съесть всю память — RocksDB заставит его flush’нуться.

В Flink это автоматически работает, если включён state.backend.rocksdb.memory.managed: true. WriteBufferManager создаётся с предварительно выделенными managed segments.

Опции на уровне CF: настройка под use case

RocksDB позволяет задавать опции на уровне DB (применятся ко всем CF по умолчанию) или на уровне отдельного CF (override’ит DB опции).

В Flink это делается через RocksDBOptionsFactory (flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBOptionsFactory.java):

public class MyOptionsFactory implements RocksDBOptionsFactory {
    @Override
    public ColumnFamilyOptions createColumnOptions(
            ColumnFamilyOptions currentOptions,
            Collection<AutoCloseable> handlesToClose) {
        return currentOptions
            .setWriteBufferSize(128 * 1024 * 1024)  // 128 MiB instead of 64
            .setMaxWriteBufferNumber(4)
            .setLevel0FileNumCompactionTrigger(8);
    }
}

Зарегистрировать в конфиге:

state.backend.rocksdb.options-factory: com.example.MyOptionsFactory

Но в большинстве случаев Flink задаёт опции через свои high-level параметры (см. урок 5).

State primitive -> CF mapping в коде

Каждый раз, когда userCode зовёт getRuntimeContext().getState(descriptor), backend смотрит, есть ли уже CF с таким именем (descriptor.getName()). Если есть — возвращает существующий handle. Если нет — создаёт новый.

// Псевдокод из RocksDBKeyedStateBackend
public <S extends State> S getOrCreateKeyedState(...) {
    String stateName = descriptor.getName();
    ColumnFamilyHandle handle = columnFamilies.get(stateName);
    if (handle == null) {
        handle = createColumnFamily(stateName, ...);
        columnFamilies.put(stateName, handle);
    }
    return wrapInStateProxy(handle);
}

Это значит — изменение имени state primitive ломает совместимость. Если ты в новой версии job переименовал "counter" в "event-counter", RocksDB при restore не найдёт CF "event-counter" и создаст пустой. Старый CF "counter" останется в checkpoint, но не будет использоваться (и со временем будет вычищен State Processor API).

WARNING

Переименование state primitive — breaking change. Используй StateMigration через savepoint и State Processor API, чтобы перенести данные. Или храни state name как константу с явным versioning’ом.

Сколько CF — много?

У RocksDB нет жёсткого лимита на число CF в DB, но есть practical limit:

  • Каждый CF держит open file handles (для активных SST’ов).
  • Каждый CF имеет свой MemTable — это память.
  • На больших number CF (1000+) compaction strategy усложняется.

В Flink job типично 5-20 state primitives на slot. Это далеко от лимитов RocksDB. Однако если ты замечаешь exceptional growth Too many open files ошибки — проверь, не создаёшь ли ты dynamically state primitive (это antipattern).

Чтение source

  • flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java — управление CF handles, метод getOrCreateKeyedState().
  • flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBStateBackend.java — фабрика.
  • flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBSharedResources.java — shared cache + WriteBufferManager для одного slot.
  • RocksDB documentation: https://github.com/facebook/rocksdb/wiki/Column-Families — официальная страница про CF.

Чек-лист

  • Column Family — изолированный keyspace внутри одной RocksDB DB. Свои MemTable, SST, опции, compaction.
  • Flink использует CF для каждого state primitive (ValueState, ListState, MapState — каждый отдельный CF).
  • На диске: все SST файлы всех CF в одной директории, mapping в MANIFEST.
  • Shared block cache: один LRU на весь DB instance (= один slot). Распределяется автоматически по LRU eviction.
  • WriteBufferManager: limits sum write buffer’ов всех CF, предотвращает жадность одного CF.
  • Опции на уровне CF — через RocksDBOptionsFactory. На уровне DB — через high-level Flink configs.
  • Имя state primitive = имя CF. Переименование = breaking change на restore.
  • Практический лимит: 5-20 CF на slot — норма; 1000+ — проблема.
Проверка знанийKnowledge check
Job имеет один оператор-аггрегатор с тремя state primitives: ValueState<Long> counter, ListState<Event> windowContents, MapState<String, Double> stats. Parallelism 8, на TM 4 slot'а. Сколько ColumnFamilyHandle инициализирован суммарно в RocksDB на одном TM?
ОтветAnswer
Для каждого slot'а создаётся отдельный RocksDB instance (через отдельную директорию). В каждом RocksDB instance — отдельный набор CF для каждого state primitive, плюс один default CF (всегда). У нас 3 state primitive: counter, windowContents, stats. Плюс default. Итого 4 CF в каждом RocksDB instance. На TM 4 slot, значит 4 RocksDB instance. Суммарно: 4 × 4 = 16 ColumnFamilyHandle. Каждый CF — это отдельный MemTable, отдельный набор SST, но shared block cache в рамках одного RocksDB instance (один slot). Между slot'ами cache не shared — каждый слот изолирован в своём RocksDB.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что такое RocksDB column family и зачем Flink использует CF для каждого state primitive?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5