Learning Platform
Глоссарий Troubleshooting
Урок 10.04 · 32 мин
Продвинутый
RocksDBRocksDBStateStoreProviderChangelog CheckpointingSPARK-43421State TuningLSM Tree

RocksDB state backend

Когда state вашего streaming-запроса перестаёт помещаться в heap executor’а, HDFSBackedStateStoreProvider превращается в проблему: GC-паузы, OOM-ошибки, непредсказуемые latency. Решение — RocksDBStateStoreProvider, который хранит state в embedded LSM-tree базе данных off-heap, используя нативный RocksDB. В этом уроке разберём архитектуру, changelog checkpointing (самое большое улучшение производительности для RocksDB в Spark), тюнинг и особенности при миграции.

Почему HDFSBacked недостаточен при большом state

HDFSBackedStateStoreProvider хранит всё в JVM heap (ConcurrentHashMap). Это хорошо для малого state (миллионы ключей), но при десятках и сотнях миллионов ключей:

  1. GC давление: все ключи и значения — объекты в heap. Full GC при нескольких сотнях гигабайт занимает секунды, что прерывает батчи.
  2. Memory overhead: каждый Java-объект несёт overhead ~16 байт. HashMap-entry добавляет ещё ~32 байта. Для state с 10-байтовым ключом реальное потребление памяти в 4-5 раз выше.
  3. Snapshot latency: создание snapshot = синхронная копия всей ConcurrentHashMap. При 50 GB state это занимает несколько секунд с остановкой потока.

RocksDBStateStoreProvider решает все три проблемы: данные в native memory (off-heap), нет Java-объектов, нет GC-давления, snapshot — асинхронная операция RocksDB.

RocksDB: ключевые концепты

RocksDB — это embedded LSM-tree (Log-Structured Merge-Tree) key-value storage от Meta/Facebook. Ключевые свойства:

  • Данные off-heap: RocksDB использует native memory для block cache и memtable, не нагружая JVM GC
  • LSM-tree: записи идут в memtable (in-memory буфер), затем flush в SST-файлы (Sorted String Table) на диск; чтение = memtable + bloom filter + SST files
  • Compaction: фоновые потоки RocksDB периодически сливают SST-файлы, удаляя tombstone’ы и дублирующиеся версии ключей
  • Column families: изолированные namespace’ы внутри одной RocksDB instance (в Spark 4.0: отдельный column family для каждого state variable в transformWithState)

В Spark каждая partition имеет свой экземпляр RocksDB, открытый на локальной файловой системе executor’а (обычно /tmp/spark-rocksdb-...).

RocksDB State Store: архитектура на executor

RocksDB занимает local disk executor'а. Checkpoint (SST файлы) синхронизируется с HDFS/S3 только при commit.

JVM (Spark Executor)RocksDBStateStore.get/put/remove -> JNI callsJava-код Spark вызывает RocksDB через JNI (Java Native Interface).
JNI
RocksDB NativeMemTable (write buffer) + Block Cache (read cache) + Bloom FilterMemTable — write buffer в native memory. Block Cache — LRU кэш для read-heavy доступа. Bloom Filter ускоряет negative lookups.
flush
Local SST Files/tmp/spark-rocksdb-{queryId}/{partitionId}/*.sstFlush из MemTable в SST-файлы на локальный диск. Compaction сливает несколько SST в один.
checkpoint (commit)
HDFS/S3 Checkpointstate/{operatorId}/{partitionId}/{version}.zip (или changelog)При commit SST-файлы или changelog загружаются в долговечное хранилище. Это единственный момент I/O в HDFS.

Как включить RocksDB backend

spark.conf.set(
    "spark.sql.streaming.stateStore.providerClass",
    "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
)

Или через SparkSession builder:

spark = SparkSession.builder \
    .config("spark.sql.streaming.stateStore.providerClass",
            "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
    .getOrCreate()
WARNING

Смена провайдера между батчами несовместима с существующим state. Существующий checkpoint нельзя перечитать другим провайдером без специальной миграции. Если нужно переключить backend для работающего запроса — смотри секцию «Миграция между backend’ами» в конце урока.

Snapshot checkpointing: проблема до SPARK-43421

До Spark 3.5 / SPARK-43421, RocksDBStateStoreProvider при каждом commit() делал:

  1. Pause RocksDB (останавливал compaction и flush)
  2. Создавал RocksDB checkpoint (ссылки на текущие SST-файлы)
  3. Запаковывал все SST-файлы в zip-архив
  4. Загружал zip в HDFS/S3
  5. Resume RocksDB

Шаги 3-4 были синхронными и могли занимать десятки секунд для большого state — это напрямую увеличивало длительность батча. Размер zip = размер всего state, даже если изменились 0.1% ключей.

SPARK-43421: Changelog Checkpointing

SPARK-43421 (реализован в Spark 3.5, улучшен в 4.0) вводит принципиально иной подход — changelog checkpointing:

Вместо snapshot всего state при каждом commit, Spark синхронно пишет только лог изменений (changelog) за этот батч. Полный snapshot создаётся асинхронно в фоне и нужен только для ускорения восстановления (без него пришлось бы replay весь changelog с начала).

# Включить changelog checkpointing
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Формат changelog-файла:

{version}.changelog:
[header: version, numEntries]
[entry: key_bytes, value_bytes, operation_type(PUT=1/DELETE=2)]
...

Changelog содержит только записи, изменившиеся в этом батче. Если из 100M ключей обновилось 10K, changelog = 10K записей, а не 100M.

Trade-off changelog checkpointing:

АспектSnapshot modeChangelog mode
Commit latencyO(state size) — медленноO(batch changes) — быстро
Recovery latencyO(last_snapshot + recent_deltas)O(all_changelogs_since_snapshot) — медленнее
HDFS storageПолные snapshotsChangelogs + periodic snapshots
Compaction neededНетДа (changelogs накапливаются)

Для долгоживущих запросов с частыми обновлениями малого числа ключей changelog mode может ускорить commit в 10-50 раз.

Column families в RocksDB (Spark 4.0)

В Spark 4.0, особенно с transformWithState, RocksDB instance может использовать несколько column families — изолированных namespace’ов с независимыми настройками компрессии и bloom-фильтров:

// Конфиг: включить column families (обязательно для transformWithState)
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.useColumnFamilies", "true")

Каждый ValueState, ListState, MapState в transformWithState получает отдельный column family. Это позволяет:

  • Удалить все данные одного state-переменного без сканирования всего state (drop column family)
  • Настраивать bloom filter и compaction отдельно для каждого типа state
  • Эффективно реализовывать TTL по column family

Тюнинг RocksDB: ключевые параметры

# Размер block cache (LRU для SST-блоков). По умолчанию: 8 MB на partition
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB", "128")

# Размер write buffer (MemTable). По умолчанию: 64 MB
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB", "128")

# Максимум write buffers перед flush. По умолчанию: 2
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber", "4")

# Bloom filter: bits per key. По умолчанию: 10
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.bloomFilterBitsPerKey", "10")

# Включить statistics (для мониторинга, небольшой overhead)
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows", "true")

# Сжатие SST-файлов. По умолчанию: LZ4
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compression", "lz4")

# Compaction style. По умолчанию: LEVEL
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compactionStyle", "LEVEL")

Практические рекомендации по тюнингу:

  • Высокая write latency: увеличьте writeBufferSizeMB и maxWriteBufferNumber — данные дольше будут в memtable до flush.
  • Высокая read latency (cache miss): увеличьте blockCacheSizeMB. Для read-heavy workload — 256-512 MB на partition.
  • Bloom filter не работает (много ложных обращений к диску): увеличьте bloomFilterBitsPerKey до 15-20 (больший false-positive rate -> меньше I/O, но больше памяти).
  • Большой backlog при старте: это RocksDB делает compaction старых SST — disableRocksdbMetrics=false поможет увидеть статистику.

Метрики RocksDB в Spark UI

При включённом trackTotalNumberOfRows:

progress = query.lastProgress
for op in progress["stateOperators"]:
    custom = op.get("customMetrics", {})
    print(f"rocksdbGetLatency: {custom.get('rocksdbGetLatency', 'N/A')} μs")
    print(f"rocksdbPutLatency: {custom.get('rocksdbPutLatency', 'N/A')} μs")
    print(f"rocksdbTotalBytesRead: {custom.get('rocksdbTotalBytesRead', 'N/A')}")
    print(f"rocksdbTotalBytesWritten: {custom.get('rocksdbTotalBytesWritten', 'N/A')}")
    print(f"rocksdbTotalCompactionBytesWritten: {custom.get('rocksdbTotalCompactionBytesWritten', 'N/A')}")

Высокое rocksdbTotalCompactionBytesWritten при малом rocksdbTotalBytesWritten от приложения указывает на write amplification — compaction перезаписывает данные намного больше, чем приложение пишет. Можно снизить через compactionStyle=UNIVERSAL для write-heavy workload.

TIP

Для production-мониторинга RocksDB метрики интегрируются с Prometheus через Spark metrics system. Добавьте в metrics.properties: *.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet. Метрики вида executor.statestore.rocksdb.* будут доступны на порту 4040.

Миграция между backend’ами

Сценарий: running запрос с HDFSBackedStateStoreProvider, state занимает 50 GB, GC-проблемы. Нужно перейти на RocksDB.

Прямой смены провайдера без потери state не поддерживается (данные несовместимы). Варианты:

Вариант 1 — Full restart с чистым checkpoint:

  • Остановить запрос
  • Удалить checkpoint directory
  • Запустить заново с RocksDBStateStoreProvider
  • Недостаток: потеря всего накопленного state. Подходит, если state можно восстановить из истории (например, при deduplication по ключу — повторная обработка данных за последние N дней)

Вариант 2 — State migration tool (экспериментально в Spark 4.0):

  • Использует StateDataSourceReader для чтения существующего state как DataFrame
  • Записывает в новый checkpoint с RocksDB backend
  • Подходит для простых state-схем (ValueState с известным форматом)
# Читаем существующий state (Spark 4.0+)
existing_state = spark.read \
    .format("statestore") \
    .option("path", "/checkpoint/state") \
    .option("operatorId", "0") \
    .load()

existing_state.show()
# Используем для инициализации нового запроса через initialState

Вариант 3 — Blue/green deployment:

  • Запустить параллельный новый запрос с RocksDB с чистым checkpoint
  • Новый запрос читает данные из Kafka с начала retention (если достаточно)
  • После «прогрева» переключить трафик с старого на новый
  • Старый запрос остановить

Попробуй сам

Сравните производительность HDFSBacked и RocksDB backends на одном запросе:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time

def run_with_backend(provider_class, checkpoint_dir, duration_sec=30):
    spark = SparkSession.builder \
        .appName(f"backend-benchmark-{provider_class.split('.')[-1]}") \
        .config("spark.sql.streaming.stateStore.providerClass", provider_class) \
        .config("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true") \
        .getOrCreate()

    df = (spark.readStream
        .format("rate")
        .option("rowsPerSecond", 1000)
        .load())

    # Stateful deduplication: state содержит все виденные value
    query = (df.dropDuplicates(["value"])
        .writeStream
        .format("memory")
        .queryName(f"bench_{provider_class.split('.')[-1][:6]}")
        .option("checkpointLocation", checkpoint_dir)
        .trigger(processingTime="5 seconds")
        .start())

    time.sleep(duration_sec)
    
    metrics = []
    for _ in range(3):
        if query.lastProgress:
            metrics.append(query.lastProgress["durationMs"])
        time.sleep(5)
    
    query.stop()
    return metrics

# HDFS backend
hdfs_metrics = run_with_backend(
    "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider",
    "/tmp/bench-hdfs"
)

# RocksDB backend
rocksdb_metrics = run_with_backend(
    "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider", 
    "/tmp/bench-rocksdb"
)

print("HDFS backend batch durations:", hdfs_metrics)
print("RocksDB backend batch durations:", rocksdb_metrics)

При росте числа уникальных ключей вы увидите, что addBatch дольше у HDFSBacked из-за GC, тогда как RocksDB остаётся стабильным.

Проверка знанийKnowledge check
Changelog checkpointing включён. Streaming-запрос работал 1000 батчей с RocksDB backend. Executor умер, и нужно восстановить state на версии 1000. Последний async snapshot был загружен на версии 950. В HDFS есть: 950.snapshot и changelogs 951...1000. Опишите алгоритм восстановления и почему asynchronous snapshot так важен для changelog mode.
ОтветAnswer
Алгоритм восстановления: (1) загрузить 950.snapshot — это полный RocksDB checkpoint с SST-файлами, записывается в локальный RocksDB на новом executor'е; (2) последовательно применить changelogs 951, 952, ..., 1000 — каждый changelog содержит PUT/DELETE операции, которые воспроизводятся на RocksDB; (3) после применения всех changelogs RocksDB находится в состоянии версии 1000, можно продолжать обработку. Асинхронный snapshot критически важен именно потому, что changelog mode не загружает полный snapshot при каждом commit — только changelog. Без snapshot пришлось бы replay ВСЕ 1000 changelogs с нуля, что для большого state может занять часы. Snapshot создаёт «точку опоры», сокращая количество changelogs для replay. Именно поэтому interval создания snapshot'ов (конфиг snapshotInterval) — ключевой параметр тюнинга: слишком редкий snapshot = долгое восстановление; слишком частый = нагрузка на HDFS и I/O.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. До SPARK-43421 (changelog checkpointing) RocksDB backend при каждом commit() синхронно загружал полный snapshot в HDFS. Почему это было узким местом для streaming-запросов с большим state, даже если в конкретном батче обновились лишь 0.1% ключей?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6