Stateful операции: mapGroupsWithState
Зачем нужны произвольные stateful операции?
Встроенные агрегации (groupBy().agg()) покрывают стандартные сценарии: суммы, средние, count по окнам. Но есть задачи, где нужна произвольная логика состояния:
- Sessionization — отслеживание пользовательских сессий с custom логикой (не просто gap-based)
- Pattern detection — поиск последовательности событий (login -> fail -> fail -> lock)
- Deduplication — удаление дублей по custom ключу в streaming
- Running aggregates — кумулятивные метрики с custom merge logic
- Alerting — отправка alert когда метрика превышает порог N раз подряд
Для этих задач Spark предоставляет mapGroupsWithState и flatMapGroupsWithState.
mapGroupsWithState API
mapGroupsWithState группирует события по ключу и для каждой группы вызывает user-defined функцию с доступом к сохранённому состоянию:
from pyspark.sql.streaming import GroupState, GroupStateTimeout
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType
from typing import Iterator, Tuple
# Определяем тип состояния
class UserSession:
def __init__(self, user_id: str, start_time: str, event_count: int, total_amount: float):
self.user_id = user_id
self.start_time = start_time
self.event_count = event_count
self.total_amount = total_amount
# Определяем тип вывода
session_schema = StructType([
StructField("user_id", StringType()),
StructField("session_start", StringType()),
StructField("event_count", LongType()),
StructField("total_amount", DoubleType()),
StructField("session_status", StringType())
])
Функция обновления состояния получает три аргумента:
def update_session(
key: Tuple[str], # ключ группы (user_id)
events: Iterator[Row], # новые события для этого ключа
state: GroupState # текущее состояние (может быть пустым)
):
"""Обновление пользовательской сессии."""
# 1. Получить текущее состояние или создать новое
if state.exists:
session = state.get
else:
session = (key[0], None, 0, 0.0) # (user_id, start, count, total)
user_id, start_time, event_count, total_amount = session
# 2. Обработать новые события
for event in events:
if start_time is None:
start_time = str(event.event_time)
event_count += 1
total_amount += event.amount
# 3. Проверить timeout
if state.hasTimedOut:
# Сессия завершена -- вернуть результат и удалить state
state.remove()
return (user_id, start_time, event_count, total_amount, "closed")
# 4. Обновить state с timeout
state.update((user_id, start_time, event_count, total_amount))
state.setTimeoutDuration("10 minutes") # processing time timeout
return (user_id, start_time, event_count, total_amount, "active")
Применение к streaming DataFrame:
# Source
events = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "user-events") \
.load() \
.select(from_json(col("value").cast("string"), event_schema).alias("data")) \
.select("data.*")
# mapGroupsWithState
sessions = events \
.groupBy("user_id") \
.applyInPandasWithState(
update_session,
outputStructType=session_schema,
stateStructType=state_schema,
outputMode="update",
timeoutConf=GroupStateTimeout.ProcessingTimeTimeout
)
query = sessions.writeStream \
.outputMode("update") \
.format("delta") \
.option("checkpointLocation", "/checkpoints/sessions") \
.start("/data/gold/user_sessions")
Scala vs Python API. В Scala mapGroupsWithState вызывается напрямую. В PySpark (Spark 3.4+) эквивалент — applyInPandasWithState, который принимает pandas UDF. Логика идентична, но синтаксис отличается.
GroupState: операции
| Метод | Описание |
|---|---|
state.exists | Есть ли сохранённое состояние для ключа |
state.get | Получить текущее состояние |
state.update(newState) | Обновить состояние |
state.remove() | Удалить состояние (финализация) |
state.hasTimedOut | Сработал ли timeout |
state.setTimeoutDuration("10 min") | Processing time timeout |
state.setTimeoutTimestamp(ts) | Event time timeout |
State Timeouts: processing time vs event time
Timeout определяет, когда вызвать функцию для группы без новых событий:
Processing Time Timeout
# Timeout через 10 минут реального времени без событий
GroupStateTimeout.ProcessingTimeTimeout
state.setTimeoutDuration("10 minutes")
Простой, но ненадёжный: зависит от clock executor. Если micro-batch задерживается на 15 минут, timeout может сработать раньше или позже ожидаемого.
Event Time Timeout
# Timeout когда watermark проходит заданный timestamp
GroupStateTimeout.EventTimeTimeout
state.setTimeoutTimestamp(last_event_time + timedelta(minutes=10))
Надёжный: привязан к event time и watermark. Timeout срабатывает когда watermark > timeout_timestamp. Требует withWatermark() в pipeline.
# Event time timeout -- рекомендуется для production
sessions = events \
.withWatermark("event_time", "15 minutes") \
.groupBy("user_id") \
.applyInPandasWithState(
update_session,
outputStructType=session_schema,
stateStructType=state_schema,
outputMode="update",
timeoutConf=GroupStateTimeout.EventTimeTimeout
)
flatMapGroupsWithState: несколько выходных строк
mapGroupsWithState возвращает одну строку на группу. flatMapGroupsWithState возвращает Iterator — ноль, одну или несколько строк:
def detect_fraud_pattern(
key: Tuple[str],
events: Iterator[Row],
state: GroupState
) -> Iterator[Row]:
"""Детекция паттерна: 3+ failed login за 5 минут."""
# Получить историю попыток
if state.exists:
attempts = list(state.get)
else:
attempts = []
# Добавить новые события
for event in events:
attempts.append({
"time": event.event_time,
"status": event.login_status
})
# Удалить старые (> 5 мин от последнего)
if attempts:
cutoff = attempts[-1]["time"] - timedelta(minutes=5)
attempts = [a for a in attempts if a["time"] >= cutoff]
# Проверить паттерн
failed = [a for a in attempts if a["status"] == "failed"]
if len(failed) >= 3:
# Генерируем alert
yield Row(
user_id=key[0],
alert_type="brute_force",
failed_count=len(failed),
window_start=str(failed[0]["time"]),
window_end=str(failed[-1]["time"])
)
# Обновить state
state.update(attempts)
state.setTimeoutDuration("10 minutes")
State Store Backends
Stateful операции сохраняют state в state store. Spark поддерживает два backend:
| Backend | Характеристики | Когда использовать |
|---|---|---|
| HDFS (default) | State в памяти + checkpoint на HDFS | Малый state (< 1 ГБ) |
| RocksDB | State на локальном SSD + checkpoint | Большой state (> 1 ГБ) |
# Включить RocksDB state store
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
)
RocksDB для production. Default HDFS backend хранит весь state в памяти JVM. При большом количестве ключей (миллионы пользователей) это приводит к GC pressure и OOM. RocksDB хранит state на SSD и использует блочный кэш, потребляя на порядок меньше heap memory. С Spark 3.2+ RocksDB — рекомендуемый backend для production.
Anti-pattern: unbounded state без timeout. Если не установить timeout и не вызывать state.remove(), state store растёт бесконечно. Каждый уникальный ключ (user_id) добавляет запись, которая никогда не удаляется. При миллионах пользователей state store достигнет десятков ГБ и вызовет OOM даже с RocksDB. Всегда устанавливайте timeout или явно удаляйте state.
Полный pipeline: user session tracking
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
spark = SparkSession.builder \
.appName("SessionTracking") \
.config("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
.getOrCreate()
event_schema = StructType() \
.add("user_id", StringType()) \
.add("page", StringType()) \
.add("action", StringType()) \
.add("event_time", TimestampType())
# Kafka source
raw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "clickstream") \
.load()
events = raw.select(
from_json(col("value").cast("string"), event_schema).alias("data")
).select("data.*")
# Stateful session tracking с event time timeout
sessions = events \
.withWatermark("event_time", "20 minutes") \
.groupBy("user_id") \
.applyInPandasWithState(
track_user_session,
outputStructType=session_output_schema,
stateStructType=session_state_schema,
outputMode="append",
timeoutConf=GroupStateTimeout.EventTimeTimeout
)
# Delta sink
query = sessions.writeStream \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", "/checkpoints/clickstream-sessions") \
.trigger(processingTime="30 seconds") \
.start("/data/gold/clickstream_sessions")
query.awaitTermination()
Как stateful-стриминг хранит состояние внутри — на уровне исходников — в курсе Apache Spark Internals:
Spark Internals: StateStoreЧто дальше?
В следующем уроке мы перейдём к преемнику этого API — transformWithState (Arbitrary State API v2 в Spark 4.0): он решает основные ограничения mapGroupsWithState, поддерживает несколько state-переменных, композитные типы и декларативный TTL.