StateStore: абстракция состояния
Агрегации, дедупликация, stream-stream join, arbitrary stateful operations — все они требуют хранить состояние между батчами. Хранить его в RDD или DataFrame нельзя — они immutable и перевычисляются заново. Хранить в обычной HashMap нельзя — она не переживает рестарты executor’а. Spark решает это через StateStore: версионированное, fault-tolerant хранилище ключ-значение, которое исполнитель держит в памяти, но синхронизирует с долговечным хранилищем (HDFS/S3).
Интерфейс StateStore
StateStore (sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala) — это trait, определяющий контракт версионированного K/V-хранилища для одной партиции:
trait StateStore {
// Идентификатор этого стора
def id: StateStoreId
// Версия данных в этом сторе
def version: Long
// Получить значение по ключу
def get(key: UnsafeRow): UnsafeRow
// Обновить / добавить
def put(key: UnsafeRow, value: UnsafeRow): Unit
// Удалить ключ
def remove(key: UnsafeRow): Unit
// Итерация по всем парам
def iterator(): Iterator[UnsafeRowPair]
// Зафиксировать изменения, вернуть новую версию
def commit(): Long
// Откатить изменения (при ошибке)
def abort(): Unit
// Метрики: число ключей, размер данных
def metrics: StateStoreMetrics
}
Ключевой момент: ключи и значения — UnsafeRow, что означает off-heap двоичное представление Spark Row. Сериализация/десериализация происходит снаружи store’а — сам store работает с сырыми байтами.
StateStoreId — уникальный идентификатор: (checkpointRootLocation, operatorId, partitionId, storeName). Один streaming-запрос с 200 партициями и двумя stateful-операторами создаёт 400 StateStore instance.
StateStoreProvider: фабрика и lifecycle
StateStoreProvider — trait, управляющий жизненным циклом StateStore для одной партиции:
trait StateStoreProvider {
// Инициализация провайдера
def init(stateStoreId: StateStoreId,
keySchema: StructType,
valueSchema: StructType,
keyStateEncoderSpec: KeyStateEncoderSpec,
useColumnFamilies: Boolean,
storeConfs: StateStoreConf,
hadoopConf: Configuration): Unit
// Получить write-версию для указанной версии
def getStore(version: Long): StateStore
// Получить read-only версию
def getReadStore(version: Long): ReadStateStore
// Фоновая задача: snapshot, purge
def doMaintenance(): Unit
// Освобождение ресурсов
def close(): Unit
}
Выбор реализации контролируется конфигом:
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")
# или
# "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
HDFSBackedStateStoreProvider: реализация по умолчанию
HDFSBackedStateStoreProvider — дефолтный провайдер. Он хранит state в памяти как ConcurrentHashMap[UnsafeRow, UnsafeRow] и синхронизирует её с HDFS через версионированные файлы.
Структура файлов провайдера в checkpoint:
state/
└── {operatorId}/
└── {partitionId}/
├── 1.delta -- изменения между версиями 0 и 1
├── 2.delta -- изменения между версиями 1 и 2
├── ...
├── 10.delta
├── 10.snapshot -- полный снимок на версии 10
├── 11.delta
└── 12.delta
Delta файлы хранят только изменения: какие ключи были добавлены, обновлены или удалены с момента предыдущей версии. Формат: LZ4-сжатый бинарный поток пар (UnsafeRow key, UnsafeRow value) с маркером удаления.
Snapshot файлы хранят полное состояние на конкретную версию. Это полный дамп in-memory map на момент создания.
Delta-файлы — инкрементальные изменения. Snapshot — полный снимок. Maintenance task периодически создаёт snapshot'ы и удаляет старые delta.
Версионирование: как это работает
Каждый раз, когда StateStoreSaveExec завершает батч, он вызывает store.commit(). Это атомарно:
- Все in-memory изменения (новые и удалённые ключи с момента
getStore(version)) сериализуются и пишутся в{version+1}.delta - In-memory map обновляется: теперь она отражает версию
version+1 commit()возвращаетversion+1
При следующем батче StateStoreRestoreExec вызывает getStore(currentVersion). Если эта версия уже в кэше (loadedMaps) — никакой I/O. Если нет — загрузка с диска.
Восстановление версии с диска:
Алгоритм загрузки версии V:
1. Найти последний snapshot <= V: файл S.snapshot
2. Загрузить S.snapshot в ConcurrentHashMap
3. Последовательно применить delta-файлы S+1.delta, S+2.delta, ..., V.delta
4. Результат — состояние на версию V
Если нужна версия 15, и есть snapshot на 10 и delta 11-15 — читаем snapshot 10, применяем 5 delta-файлов. Без snapshot пришлось бы читать все delta с начала.
loadedMaps: кэш версий в памяти
HDFSBackedStateStoreProvider держит кэш версий state в памяти:
// Упрощённо
private val loadedMaps = new mutable.HashMap[Long, ConcurrentHashMap[UnsafeRow, UnsafeRow]]
По умолчанию кэшируются последние 2 версии. Это покрывает типичный паттерн: текущий батч (N) и предыдущий (N-1). Конфиг:
# Количество версий state, которые держатся в памяти на executor
spark.conf.set("spark.sql.streaming.stateStore.maxVersionsToRetainInMemory", "2")
Пока executor жив, обращения к state не требуют I/O — вся работа идёт через loadedMaps. HDFS нужен только при первой загрузке (старт или failover).
Вся state в HDFSBackedStateStoreProvider хранится в JVM heap. При большом state (миллиарды ключей, большие значения) это вызывает GC-давление и потенциальные OOM. Именно для этого случая существует RocksDB backend — он хранит state off-heap (урок 4).
Maintenance task: фоновые операции
MaintenanceTask — фоновый daemon thread, запущенный на каждом executor. Он просыпается каждые maintenanceInterval (по умолчанию 60 секунд) и выполняет:
# Интервал maintenance task
spark.conf.set("spark.sql.streaming.stateStore.maintenanceInterval", "60s")
# Минимум delta-файлов для создания snapshot
spark.conf.set("spark.sql.streaming.stateStore.minDeltasForSnapshot", "10")
Что делает maintenance task:
-
Snapshot creation: если число delta-файлов с последнего snapshot превысило
minDeltasForSnapshot(10), создаёт новый snapshot. Snapshot = сериализация текущей in-memory map в{version}.snapshot. -
Purge old files: после создания snapshot удаляет delta-файлы старше предыдущего snapshot. Например, если создан snapshot 20 и существует snapshot 10, удаляются файлы 1.delta…9.delta.
-
Coordinator reporting: сообщает
StateStoreCoordinator, что данный executor жив и владеет определёнными партициями state.
Maintenance task работает асинхронно относительно micro-batch execution. Это значит, что при создании snapshot executor продолжает обрабатывать батчи. Snapshot создаётся от текущей версии in-memory map — это безопасно, так как snapshot’у нужна только read-консистентность, а ConcurrentHashMap её обеспечивает.
StateStoreCoordinator: трекер размещения
StateStoreCoordinator — это RPC endpoint (ActorRef), работающий на driver. Его задача: знать, какой executor владеет state какой партиции, и использовать эти знания для задание планирования (task locality).
// На driver
class StateStoreCoordinator extends ThreadSafeRpcEndpoint {
// Executor -> Set[StateStoreId]
private val instances = new mutable.HashMap[String, Set[StateStoreId]]
def reportActiveInstance(storeId: StateStoreId, host: String, executorId: String): Unit
def getLocation(storeId: StateStoreId): Option[String]
}
Когда Spark планирует задачу для партиции N, он запрашивает у StateStoreCoordinator, на каком executor-е живёт state партиции N. Если executor жив — задача назначается туда (NODE_LOCAL или PROCESS_LOCAL locality). Это критично для performance: без locality задача попадёт на случайный executor, который сначала должен загрузить state с HDFS.
Метрики StateStore в Spark UI
Вкладка «Structured Streaming» в Spark UI показывает метрики state per-operator:
# Программный доступ к state metrics
progress = query.lastProgress
for op in progress["stateOperators"]:
print(f"Operator {op['operatorName']}:")
print(f" numRowsTotal: {op['numRowsTotal']}") # размер state
print(f" numRowsUpdated: {op['numRowsUpdated']}") # изменений в батче
print(f" numRowsDroppedByWatermark: {op['numRowsDroppedByWatermark']}")
print(f" memoryUsedBytes: {op['memoryUsedBytes']}") # heap-потребление
print(f" numShufflePartitions: {op['numShufflePartitions']}")
Ключевые сигналы:
numRowsTotalрастёт бесконтрольно -> нет watermark или TTL, state накапливаетсяmemoryUsedBytesблизко к heap executor’а -> пора переходить на RocksDB backendnumRowsDroppedByWatermark= 0 при event-time агрегации -> watermark не работает
Попробуй сам
Исследуйте файлы StateStore напрямую:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, count
import time, os
spark = SparkSession.builder \
.appName("state-store-inspect") \
.config("spark.sql.streaming.stateStore.maintenanceInterval", "10s") \
.config("spark.sql.streaming.stateStore.minDeltasForSnapshot", "3") \
.getOrCreate()
CHECKPOINT = "/tmp/state-inspect-checkpoint"
df = (spark.readStream
.format("rate")
.option("rowsPerSecond", 20)
.load())
# Windowed aggregation создаёт state
from pyspark.sql.functions import col, expr
query = (df
.withWatermark("timestamp", "10 seconds")
.groupBy(window("timestamp", "30 seconds"))
.count()
.writeStream
.format("memory")
.queryName("state_demo")
.option("checkpointLocation", CHECKPOINT)
.outputMode("update")
.trigger(processingTime="2 seconds")
.start())
# Ждём несколько батчей + maintenance
time.sleep(40)
query.stop()
# Смотрим state файлы
state_dir = f"{CHECKPOINT}/state/0/0" # operator 0, partition 0
if os.path.exists(state_dir):
files = sorted(os.listdir(state_dir))
print("State files:")
for f in files:
size = os.path.getsize(f"{state_dir}/{f}")
print(f" {f}: {size} bytes")
Ожидаемый вывод (после ~20 батчей и maintenance task):
State files:
1.delta: 156 bytes
2.delta: 164 bytes
3.delta: 171 bytes
3.snapshot: 2048 bytes <- maintenance создал snapshot на версии 3
4.delta: 89 bytes
5.delta: 112 bytes