Learning Platform
Глоссарий Troubleshooting
Урок 10.03 · 30 мин
Продвинутый
StateStoreHDFSBackedStateStoreProviderStateStoreCoordinatorDelta FilesSnapshot FilesVersioning

StateStore: абстракция состояния

Агрегации, дедупликация, stream-stream join, arbitrary stateful operations — все они требуют хранить состояние между батчами. Хранить его в RDD или DataFrame нельзя — они immutable и перевычисляются заново. Хранить в обычной HashMap нельзя — она не переживает рестарты executor’а. Spark решает это через StateStore: версионированное, fault-tolerant хранилище ключ-значение, которое исполнитель держит в памяти, но синхронизирует с долговечным хранилищем (HDFS/S3).

Интерфейс StateStore

StateStore (sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala) — это trait, определяющий контракт версионированного K/V-хранилища для одной партиции:

trait StateStore {
  // Идентификатор этого стора
  def id: StateStoreId

  // Версия данных в этом сторе
  def version: Long

  // Получить значение по ключу
  def get(key: UnsafeRow): UnsafeRow

  // Обновить / добавить
  def put(key: UnsafeRow, value: UnsafeRow): Unit

  // Удалить ключ
  def remove(key: UnsafeRow): Unit

  // Итерация по всем парам
  def iterator(): Iterator[UnsafeRowPair]

  // Зафиксировать изменения, вернуть новую версию
  def commit(): Long

  // Откатить изменения (при ошибке)
  def abort(): Unit

  // Метрики: число ключей, размер данных
  def metrics: StateStoreMetrics
}

Ключевой момент: ключи и значения — UnsafeRow, что означает off-heap двоичное представление Spark Row. Сериализация/десериализация происходит снаружи store’а — сам store работает с сырыми байтами.

StateStoreId — уникальный идентификатор: (checkpointRootLocation, operatorId, partitionId, storeName). Один streaming-запрос с 200 партициями и двумя stateful-операторами создаёт 400 StateStore instance.

StateStoreProvider: фабрика и lifecycle

StateStoreProvider — trait, управляющий жизненным циклом StateStore для одной партиции:

trait StateStoreProvider {
  // Инициализация провайдера
  def init(stateStoreId: StateStoreId,
           keySchema: StructType,
           valueSchema: StructType,
           keyStateEncoderSpec: KeyStateEncoderSpec,
           useColumnFamilies: Boolean,
           storeConfs: StateStoreConf,
           hadoopConf: Configuration): Unit

  // Получить write-версию для указанной версии
  def getStore(version: Long): StateStore

  // Получить read-only версию
  def getReadStore(version: Long): ReadStateStore

  // Фоновая задача: snapshot, purge
  def doMaintenance(): Unit

  // Освобождение ресурсов
  def close(): Unit
}

Выбор реализации контролируется конфигом:

spark.conf.set("spark.sql.streaming.stateStore.providerClass",
    "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")
    # или
    # "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"

HDFSBackedStateStoreProvider: реализация по умолчанию

HDFSBackedStateStoreProvider — дефолтный провайдер. Он хранит state в памяти как ConcurrentHashMap[UnsafeRow, UnsafeRow] и синхронизирует её с HDFS через версионированные файлы.

Структура файлов провайдера в checkpoint:

state/
└── {operatorId}/
    └── {partitionId}/
        ├── 1.delta    -- изменения между версиями 0 и 1
        ├── 2.delta    -- изменения между версиями 1 и 2
        ├── ...
        ├── 10.delta
        ├── 10.snapshot  -- полный снимок на версии 10
        ├── 11.delta
        └── 12.delta

Delta файлы хранят только изменения: какие ключи были добавлены, обновлены или удалены с момента предыдущей версии. Формат: LZ4-сжатый бинарный поток пар (UnsafeRow key, UnsafeRow value) с маркером удаления.

Snapshot файлы хранят полное состояние на конкретную версию. Это полный дамп in-memory map на момент создания.

Файлы HDFSBackedStateStoreProvider

Delta-файлы — инкрементальные изменения. Snapshot — полный снимок. Maintenance task периодически создаёт snapshot'ы и удаляет старые delta.

Версия 1-9: только delta-файлы1.delta, 2.delta, ..., 9.deltaКаждый delta содержит только изменения: PUT(key, value) и DELETE(key) для этой версии.
Версия 10: snapshot + delta10.snapshot (полный дамп), 10.delta (изменения к этой версии)Snapshot создаётся maintenance task'ом когда delta файлов больше minDeltasForSnapshot (по умолчанию 10).
Версия 11-20: снова delta11.delta, 12.delta, ..., 20.deltaПосле snapshot снова накапливаются delta. Следующий snapshot будет на версии 20.
Purge: удалены файлы 1-8После создания snapshot на 10, файлы 1.delta...9.delta можно удалитьMaintenance task удаляет файлы старше последнего snapshot. Восстановление с версии 10: загружаем 10.snapshot, применяем 10+.delta.

Версионирование: как это работает

Каждый раз, когда StateStoreSaveExec завершает батч, он вызывает store.commit(). Это атомарно:

  1. Все in-memory изменения (новые и удалённые ключи с момента getStore(version)) сериализуются и пишутся в {version+1}.delta
  2. In-memory map обновляется: теперь она отражает версию version+1
  3. commit() возвращает version+1

При следующем батче StateStoreRestoreExec вызывает getStore(currentVersion). Если эта версия уже в кэше (loadedMaps) — никакой I/O. Если нет — загрузка с диска.

Восстановление версии с диска:

Алгоритм загрузки версии V:
1. Найти последний snapshot <= V: файл S.snapshot
2. Загрузить S.snapshot в ConcurrentHashMap
3. Последовательно применить delta-файлы S+1.delta, S+2.delta, ..., V.delta
4. Результат — состояние на версию V

Если нужна версия 15, и есть snapshot на 10 и delta 11-15 — читаем snapshot 10, применяем 5 delta-файлов. Без snapshot пришлось бы читать все delta с начала.

loadedMaps: кэш версий в памяти

HDFSBackedStateStoreProvider держит кэш версий state в памяти:

// Упрощённо
private val loadedMaps = new mutable.HashMap[Long, ConcurrentHashMap[UnsafeRow, UnsafeRow]]

По умолчанию кэшируются последние 2 версии. Это покрывает типичный паттерн: текущий батч (N) и предыдущий (N-1). Конфиг:

# Количество версий state, которые держатся в памяти на executor
spark.conf.set("spark.sql.streaming.stateStore.maxVersionsToRetainInMemory", "2")

Пока executor жив, обращения к state не требуют I/O — вся работа идёт через loadedMaps. HDFS нужен только при первой загрузке (старт или failover).

WARNING

Вся state в HDFSBackedStateStoreProvider хранится в JVM heap. При большом state (миллиарды ключей, большие значения) это вызывает GC-давление и потенциальные OOM. Именно для этого случая существует RocksDB backend — он хранит state off-heap (урок 4).

Maintenance task: фоновые операции

MaintenanceTask — фоновый daemon thread, запущенный на каждом executor. Он просыпается каждые maintenanceInterval (по умолчанию 60 секунд) и выполняет:

# Интервал maintenance task
spark.conf.set("spark.sql.streaming.stateStore.maintenanceInterval", "60s")

# Минимум delta-файлов для создания snapshot
spark.conf.set("spark.sql.streaming.stateStore.minDeltasForSnapshot", "10")

Что делает maintenance task:

  1. Snapshot creation: если число delta-файлов с последнего snapshot превысило minDeltasForSnapshot (10), создаёт новый snapshot. Snapshot = сериализация текущей in-memory map в {version}.snapshot.

  2. Purge old files: после создания snapshot удаляет delta-файлы старше предыдущего snapshot. Например, если создан snapshot 20 и существует snapshot 10, удаляются файлы 1.delta…9.delta.

  3. Coordinator reporting: сообщает StateStoreCoordinator, что данный executor жив и владеет определёнными партициями state.

NOTE

Maintenance task работает асинхронно относительно micro-batch execution. Это значит, что при создании snapshot executor продолжает обрабатывать батчи. Snapshot создаётся от текущей версии in-memory map — это безопасно, так как snapshot’у нужна только read-консистентность, а ConcurrentHashMap её обеспечивает.

StateStoreCoordinator: трекер размещения

StateStoreCoordinator — это RPC endpoint (ActorRef), работающий на driver. Его задача: знать, какой executor владеет state какой партиции, и использовать эти знания для задание планирования (task locality).

// На driver
class StateStoreCoordinator extends ThreadSafeRpcEndpoint {
  // Executor -> Set[StateStoreId]
  private val instances = new mutable.HashMap[String, Set[StateStoreId]]

  def reportActiveInstance(storeId: StateStoreId, host: String, executorId: String): Unit
  def getLocation(storeId: StateStoreId): Option[String]
}

Когда Spark планирует задачу для партиции N, он запрашивает у StateStoreCoordinator, на каком executor-е живёт state партиции N. Если executor жив — задача назначается туда (NODE_LOCAL или PROCESS_LOCAL locality). Это критично для performance: без locality задача попадёт на случайный executor, который сначала должен загрузить state с HDFS.

Метрики StateStore в Spark UI

Вкладка «Structured Streaming» в Spark UI показывает метрики state per-operator:

# Программный доступ к state metrics
progress = query.lastProgress
for op in progress["stateOperators"]:
    print(f"Operator {op['operatorName']}:")
    print(f"  numRowsTotal: {op['numRowsTotal']}")          # размер state
    print(f"  numRowsUpdated: {op['numRowsUpdated']}")      # изменений в батче
    print(f"  numRowsDroppedByWatermark: {op['numRowsDroppedByWatermark']}")
    print(f"  memoryUsedBytes: {op['memoryUsedBytes']}")    # heap-потребление
    print(f"  numShufflePartitions: {op['numShufflePartitions']}")

Ключевые сигналы:

  • numRowsTotal растёт бесконтрольно -> нет watermark или TTL, state накапливается
  • memoryUsedBytes близко к heap executor’а -> пора переходить на RocksDB backend
  • numRowsDroppedByWatermark = 0 при event-time агрегации -> watermark не работает

Попробуй сам

Исследуйте файлы StateStore напрямую:

from pyspark.sql import SparkSession
from pyspark.sql.functions import window, count
import time, os

spark = SparkSession.builder \
    .appName("state-store-inspect") \
    .config("spark.sql.streaming.stateStore.maintenanceInterval", "10s") \
    .config("spark.sql.streaming.stateStore.minDeltasForSnapshot", "3") \
    .getOrCreate()

CHECKPOINT = "/tmp/state-inspect-checkpoint"

df = (spark.readStream
    .format("rate")
    .option("rowsPerSecond", 20)
    .load())

# Windowed aggregation создаёт state
from pyspark.sql.functions import col, expr
query = (df
    .withWatermark("timestamp", "10 seconds")
    .groupBy(window("timestamp", "30 seconds"))
    .count()
    .writeStream
    .format("memory")
    .queryName("state_demo")
    .option("checkpointLocation", CHECKPOINT)
    .outputMode("update")
    .trigger(processingTime="2 seconds")
    .start())

# Ждём несколько батчей + maintenance
time.sleep(40)
query.stop()

# Смотрим state файлы
state_dir = f"{CHECKPOINT}/state/0/0"  # operator 0, partition 0
if os.path.exists(state_dir):
    files = sorted(os.listdir(state_dir))
    print("State files:")
    for f in files:
        size = os.path.getsize(f"{state_dir}/{f}")
        print(f"  {f}: {size} bytes")

Ожидаемый вывод (после ~20 батчей и maintenance task):

State files:
  1.delta: 156 bytes
  2.delta: 164 bytes
  3.delta: 171 bytes
  3.snapshot: 2048 bytes   <- maintenance создал snapshot на версии 3
  4.delta: 89 bytes
  5.delta: 112 bytes
Проверка знанийKnowledge check
Executor с HDFSBackedStateStoreProvider умер на версии 25 state. Maintenance task последний раз создал snapshot на версии 20. В checkpoint directory есть файлы: 20.snapshot, 21.delta, 22.delta, 23.delta, 24.delta, 25.delta. Сколько файлов нужно прочитать при восстановлении до версии 25, и какой файл является наиболее дорогостоящим для чтения?
ОтветAnswer
При восстановлении до версии 25: читаем 20.snapshot (полный дамп всего state) плюс 5 delta-файлов (21.delta...25.delta). Итого 6 файлов. Самый дорогостоящий — 20.snapshot, потому что он содержит полное состояние, тогда как каждый delta содержит только изменения одного батча. Размер snapshot пропорционален числу уникальных ключей в state, а каждый delta — числу изменений за один батч (обычно намного меньше). Именно поэтому maintenance task создаёт snapshot'ы: чем дальше от последнего snapshot, тем больше delta нужно применить при восстановлении. Без snapshot пришлось бы читать все delta с версии 1, что может быть дорого для долгоживущих запросов.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. HDFSBackedStateStoreProvider хранит delta и snapshot файлы. Snapshot создаётся maintenance task'ом при условии minDeltasForSnapshot=10. Запрос работал 200 батчей. Каков максимально возможный номер последнего snapshot (по умолчанию minDeltasForSnapshot=10)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6