RocksDB state backend
Когда state вашего streaming-запроса перестаёт помещаться в heap executor’а, HDFSBackedStateStoreProvider превращается в проблему: GC-паузы, OOM-ошибки, непредсказуемые latency. Решение — RocksDBStateStoreProvider, который хранит state в embedded LSM-tree базе данных off-heap, используя нативный RocksDB. В этом уроке разберём архитектуру, changelog checkpointing (самое большое улучшение производительности для RocksDB в Spark), тюнинг и особенности при миграции.
Почему HDFSBacked недостаточен при большом state
HDFSBackedStateStoreProvider хранит всё в JVM heap (ConcurrentHashMap). Это хорошо для малого state (миллионы ключей), но при десятках и сотнях миллионов ключей:
- GC давление: все ключи и значения — объекты в heap. Full GC при нескольких сотнях гигабайт занимает секунды, что прерывает батчи.
- Memory overhead: каждый Java-объект несёт overhead ~16 байт. HashMap-entry добавляет ещё ~32 байта. Для state с 10-байтовым ключом реальное потребление памяти в 4-5 раз выше.
- Snapshot latency: создание snapshot = синхронная копия всей ConcurrentHashMap. При 50 GB state это занимает несколько секунд с остановкой потока.
RocksDBStateStoreProvider решает все три проблемы: данные в native memory (off-heap), нет Java-объектов, нет GC-давления, snapshot — асинхронная операция RocksDB.
RocksDB: ключевые концепты
RocksDB — это embedded LSM-tree (Log-Structured Merge-Tree) key-value storage от Meta/Facebook. Ключевые свойства:
- Данные off-heap: RocksDB использует native memory для block cache и memtable, не нагружая JVM GC
- LSM-tree: записи идут в memtable (in-memory буфер), затем flush в SST-файлы (Sorted String Table) на диск; чтение = memtable + bloom filter + SST files
- Compaction: фоновые потоки RocksDB периодически сливают SST-файлы, удаляя tombstone’ы и дублирующиеся версии ключей
- Column families: изолированные namespace’ы внутри одной RocksDB instance (в Spark 4.0: отдельный column family для каждого state variable в transformWithState)
В Spark каждая partition имеет свой экземпляр RocksDB, открытый на локальной файловой системе executor’а (обычно /tmp/spark-rocksdb-...).
RocksDB занимает local disk executor'а. Checkpoint (SST файлы) синхронизируется с HDFS/S3 только при commit.
Как включить RocksDB backend
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
)
Или через SparkSession builder:
spark = SparkSession.builder \
.config("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
.getOrCreate()
Смена провайдера между батчами несовместима с существующим state. Существующий checkpoint нельзя перечитать другим провайдером без специальной миграции. Если нужно переключить backend для работающего запроса — смотри секцию «Миграция между backend’ами» в конце урока.
Snapshot checkpointing: проблема до SPARK-43421
До Spark 3.5 / SPARK-43421, RocksDBStateStoreProvider при каждом commit() делал:
- Pause RocksDB (останавливал compaction и flush)
- Создавал RocksDB checkpoint (ссылки на текущие SST-файлы)
- Запаковывал все SST-файлы в zip-архив
- Загружал zip в HDFS/S3
- Resume RocksDB
Шаги 3-4 были синхронными и могли занимать десятки секунд для большого state — это напрямую увеличивало длительность батча. Размер zip = размер всего state, даже если изменились 0.1% ключей.
SPARK-43421: Changelog Checkpointing
SPARK-43421 (реализован в Spark 3.5, улучшен в 4.0) вводит принципиально иной подход — changelog checkpointing:
Вместо snapshot всего state при каждом commit, Spark синхронно пишет только лог изменений (changelog) за этот батч. Полный snapshot создаётся асинхронно в фоне и нужен только для ускорения восстановления (без него пришлось бы replay весь changelog с начала).
# Включить changelog checkpointing
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
Формат changelog-файла:
{version}.changelog:
[header: version, numEntries]
[entry: key_bytes, value_bytes, operation_type(PUT=1/DELETE=2)]
...
Changelog содержит только записи, изменившиеся в этом батче. Если из 100M ключей обновилось 10K, changelog = 10K записей, а не 100M.
Trade-off changelog checkpointing:
| Аспект | Snapshot mode | Changelog mode |
|---|---|---|
| Commit latency | O(state size) — медленно | O(batch changes) — быстро |
| Recovery latency | O(last_snapshot + recent_deltas) | O(all_changelogs_since_snapshot) — медленнее |
| HDFS storage | Полные snapshots | Changelogs + periodic snapshots |
| Compaction needed | Нет | Да (changelogs накапливаются) |
Для долгоживущих запросов с частыми обновлениями малого числа ключей changelog mode может ускорить commit в 10-50 раз.
Column families в RocksDB (Spark 4.0)
В Spark 4.0, особенно с transformWithState, RocksDB instance может использовать несколько column families — изолированных namespace’ов с независимыми настройками компрессии и bloom-фильтров:
// Конфиг: включить column families (обязательно для transformWithState)
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.useColumnFamilies", "true")
Каждый ValueState, ListState, MapState в transformWithState получает отдельный column family. Это позволяет:
- Удалить все данные одного state-переменного без сканирования всего state (drop column family)
- Настраивать bloom filter и compaction отдельно для каждого типа state
- Эффективно реализовывать TTL по column family
Тюнинг RocksDB: ключевые параметры
# Размер block cache (LRU для SST-блоков). По умолчанию: 8 MB на partition
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB", "128")
# Размер write buffer (MemTable). По умолчанию: 64 MB
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB", "128")
# Максимум write buffers перед flush. По умолчанию: 2
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber", "4")
# Bloom filter: bits per key. По умолчанию: 10
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.bloomFilterBitsPerKey", "10")
# Включить statistics (для мониторинга, небольшой overhead)
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows", "true")
# Сжатие SST-файлов. По умолчанию: LZ4
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compression", "lz4")
# Compaction style. По умолчанию: LEVEL
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compactionStyle", "LEVEL")
Практические рекомендации по тюнингу:
- Высокая write latency: увеличьте
writeBufferSizeMBиmaxWriteBufferNumber— данные дольше будут в memtable до flush. - Высокая read latency (cache miss): увеличьте
blockCacheSizeMB. Для read-heavy workload — 256-512 MB на partition. - Bloom filter не работает (много ложных обращений к диску): увеличьте
bloomFilterBitsPerKeyдо 15-20 (больший false-positive rate -> меньше I/O, но больше памяти). - Большой backlog при старте: это RocksDB делает compaction старых SST —
disableRocksdbMetrics=falseпоможет увидеть статистику.
Метрики RocksDB в Spark UI
При включённом trackTotalNumberOfRows:
progress = query.lastProgress
for op in progress["stateOperators"]:
custom = op.get("customMetrics", {})
print(f"rocksdbGetLatency: {custom.get('rocksdbGetLatency', 'N/A')} μs")
print(f"rocksdbPutLatency: {custom.get('rocksdbPutLatency', 'N/A')} μs")
print(f"rocksdbTotalBytesRead: {custom.get('rocksdbTotalBytesRead', 'N/A')}")
print(f"rocksdbTotalBytesWritten: {custom.get('rocksdbTotalBytesWritten', 'N/A')}")
print(f"rocksdbTotalCompactionBytesWritten: {custom.get('rocksdbTotalCompactionBytesWritten', 'N/A')}")
Высокое rocksdbTotalCompactionBytesWritten при малом rocksdbTotalBytesWritten от приложения указывает на write amplification — compaction перезаписывает данные намного больше, чем приложение пишет. Можно снизить через compactionStyle=UNIVERSAL для write-heavy workload.
Для production-мониторинга RocksDB метрики интегрируются с Prometheus через Spark metrics system. Добавьте в metrics.properties: *.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet. Метрики вида executor.statestore.rocksdb.* будут доступны на порту 4040.
Миграция между backend’ами
Сценарий: running запрос с HDFSBackedStateStoreProvider, state занимает 50 GB, GC-проблемы. Нужно перейти на RocksDB.
Прямой смены провайдера без потери state не поддерживается (данные несовместимы). Варианты:
Вариант 1 — Full restart с чистым checkpoint:
- Остановить запрос
- Удалить checkpoint directory
- Запустить заново с
RocksDBStateStoreProvider - Недостаток: потеря всего накопленного state. Подходит, если state можно восстановить из истории (например, при deduplication по ключу — повторная обработка данных за последние N дней)
Вариант 2 — State migration tool (экспериментально в Spark 4.0):
- Использует
StateDataSourceReaderдля чтения существующего state как DataFrame - Записывает в новый checkpoint с RocksDB backend
- Подходит для простых state-схем (ValueState с известным форматом)
# Читаем существующий state (Spark 4.0+)
existing_state = spark.read \
.format("statestore") \
.option("path", "/checkpoint/state") \
.option("operatorId", "0") \
.load()
existing_state.show()
# Используем для инициализации нового запроса через initialState
Вариант 3 — Blue/green deployment:
- Запустить параллельный новый запрос с RocksDB с чистым checkpoint
- Новый запрос читает данные из Kafka с начала retention (если достаточно)
- После «прогрева» переключить трафик с старого на новый
- Старый запрос остановить
Попробуй сам
Сравните производительность HDFSBacked и RocksDB backends на одном запросе:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time
def run_with_backend(provider_class, checkpoint_dir, duration_sec=30):
spark = SparkSession.builder \
.appName(f"backend-benchmark-{provider_class.split('.')[-1]}") \
.config("spark.sql.streaming.stateStore.providerClass", provider_class) \
.config("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true") \
.getOrCreate()
df = (spark.readStream
.format("rate")
.option("rowsPerSecond", 1000)
.load())
# Stateful deduplication: state содержит все виденные value
query = (df.dropDuplicates(["value"])
.writeStream
.format("memory")
.queryName(f"bench_{provider_class.split('.')[-1][:6]}")
.option("checkpointLocation", checkpoint_dir)
.trigger(processingTime="5 seconds")
.start())
time.sleep(duration_sec)
metrics = []
for _ in range(3):
if query.lastProgress:
metrics.append(query.lastProgress["durationMs"])
time.sleep(5)
query.stop()
return metrics
# HDFS backend
hdfs_metrics = run_with_backend(
"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider",
"/tmp/bench-hdfs"
)
# RocksDB backend
rocksdb_metrics = run_with_backend(
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
"/tmp/bench-rocksdb"
)
print("HDFS backend batch durations:", hdfs_metrics)
print("RocksDB backend batch durations:", rocksdb_metrics)
При росте числа уникальных ключей вы увидите, что addBatch дольше у HDFSBacked из-за GC, тогда как RocksDB остаётся стабильным.