Learning Platform
Глоссарий Troubleshooting
Урок 08.06 · 16 мин
Продвинутый
mapGroupsWithStateflatMapGroupsWithStateGroupStateState TimeoutArbitrary Stateful Processing

Stateful операции: mapGroupsWithState

Зачем нужны произвольные stateful операции?

Встроенные агрегации (groupBy().agg()) покрывают стандартные сценарии: суммы, средние, count по окнам. Но есть задачи, где нужна произвольная логика состояния:

  • Sessionization — отслеживание пользовательских сессий с custom логикой (не просто gap-based)
  • Pattern detection — поиск последовательности событий (login -> fail -> fail -> lock)
  • Deduplication — удаление дублей по custom ключу в streaming
  • Running aggregates — кумулятивные метрики с custom merge logic
  • Alerting — отправка alert когда метрика превышает порог N раз подряд

Для этих задач Spark предоставляет mapGroupsWithState и flatMapGroupsWithState.

mapGroupsWithState API

mapGroupsWithState группирует события по ключу и для каждой группы вызывает user-defined функцию с доступом к сохранённому состоянию:

from pyspark.sql.streaming import GroupState, GroupStateTimeout
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType
from typing import Iterator, Tuple

# Определяем тип состояния
class UserSession:
    def __init__(self, user_id: str, start_time: str, event_count: int, total_amount: float):
        self.user_id = user_id
        self.start_time = start_time
        self.event_count = event_count
        self.total_amount = total_amount

# Определяем тип вывода
session_schema = StructType([
    StructField("user_id", StringType()),
    StructField("session_start", StringType()),
    StructField("event_count", LongType()),
    StructField("total_amount", DoubleType()),
    StructField("session_status", StringType())
])

Функция обновления состояния получает три аргумента:

def update_session(
    key: Tuple[str],           # ключ группы (user_id)
    events: Iterator[Row],     # новые события для этого ключа
    state: GroupState           # текущее состояние (может быть пустым)
):
    """Обновление пользовательской сессии."""

    # 1. Получить текущее состояние или создать новое
    if state.exists:
        session = state.get
    else:
        session = (key[0], None, 0, 0.0)  # (user_id, start, count, total)

    user_id, start_time, event_count, total_amount = session

    # 2. Обработать новые события
    for event in events:
        if start_time is None:
            start_time = str(event.event_time)
        event_count += 1
        total_amount += event.amount

    # 3. Проверить timeout
    if state.hasTimedOut:
        # Сессия завершена -- вернуть результат и удалить state
        state.remove()
        return (user_id, start_time, event_count, total_amount, "closed")

    # 4. Обновить state с timeout
    state.update((user_id, start_time, event_count, total_amount))
    state.setTimeoutDuration("10 minutes")  # processing time timeout

    return (user_id, start_time, event_count, total_amount, "active")

Применение к streaming DataFrame:

# Source
events = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "user-events") \
    .load() \
    .select(from_json(col("value").cast("string"), event_schema).alias("data")) \
    .select("data.*")

# mapGroupsWithState
sessions = events \
    .groupBy("user_id") \
    .applyInPandasWithState(
        update_session,
        outputStructType=session_schema,
        stateStructType=state_schema,
        outputMode="update",
        timeoutConf=GroupStateTimeout.ProcessingTimeTimeout
    )

query = sessions.writeStream \
    .outputMode("update") \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/sessions") \
    .start("/data/gold/user_sessions")
TIP

Scala vs Python API. В Scala mapGroupsWithState вызывается напрямую. В PySpark (Spark 3.4+) эквивалент — applyInPandasWithState, который принимает pandas UDF. Логика идентична, но синтаксис отличается.

GroupState: операции

МетодОписание
state.existsЕсть ли сохранённое состояние для ключа
state.getПолучить текущее состояние
state.update(newState)Обновить состояние
state.remove()Удалить состояние (финализация)
state.hasTimedOutСработал ли timeout
state.setTimeoutDuration("10 min")Processing time timeout
state.setTimeoutTimestamp(ts)Event time timeout

State Timeouts: processing time vs event time

Timeout определяет, когда вызвать функцию для группы без новых событий:

Processing Time Timeout

# Timeout через 10 минут реального времени без событий
GroupStateTimeout.ProcessingTimeTimeout
state.setTimeoutDuration("10 minutes")

Простой, но ненадёжный: зависит от clock executor. Если micro-batch задерживается на 15 минут, timeout может сработать раньше или позже ожидаемого.

Event Time Timeout

# Timeout когда watermark проходит заданный timestamp
GroupStateTimeout.EventTimeTimeout
state.setTimeoutTimestamp(last_event_time + timedelta(minutes=10))

Надёжный: привязан к event time и watermark. Timeout срабатывает когда watermark > timeout_timestamp. Требует withWatermark() в pipeline.

# Event time timeout -- рекомендуется для production
sessions = events \
    .withWatermark("event_time", "15 minutes") \
    .groupBy("user_id") \
    .applyInPandasWithState(
        update_session,
        outputStructType=session_schema,
        stateStructType=state_schema,
        outputMode="update",
        timeoutConf=GroupStateTimeout.EventTimeTimeout
    )

flatMapGroupsWithState: несколько выходных строк

mapGroupsWithState возвращает одну строку на группу. flatMapGroupsWithState возвращает Iterator — ноль, одну или несколько строк:

def detect_fraud_pattern(
    key: Tuple[str],
    events: Iterator[Row],
    state: GroupState
) -> Iterator[Row]:
    """Детекция паттерна: 3+ failed login за 5 минут."""

    # Получить историю попыток
    if state.exists:
        attempts = list(state.get)
    else:
        attempts = []

    # Добавить новые события
    for event in events:
        attempts.append({
            "time": event.event_time,
            "status": event.login_status
        })

    # Удалить старые (> 5 мин от последнего)
    if attempts:
        cutoff = attempts[-1]["time"] - timedelta(minutes=5)
        attempts = [a for a in attempts if a["time"] >= cutoff]

    # Проверить паттерн
    failed = [a for a in attempts if a["status"] == "failed"]
    if len(failed) >= 3:
        # Генерируем alert
        yield Row(
            user_id=key[0],
            alert_type="brute_force",
            failed_count=len(failed),
            window_start=str(failed[0]["time"]),
            window_end=str(failed[-1]["time"])
        )

    # Обновить state
    state.update(attempts)
    state.setTimeoutDuration("10 minutes")

State Store Backends

Stateful операции сохраняют state в state store. Spark поддерживает два backend:

BackendХарактеристикиКогда использовать
HDFS (default)State в памяти + checkpoint на HDFSМалый state (< 1 ГБ)
RocksDBState на локальном SSD + checkpointБольшой state (> 1 ГБ)
# Включить RocksDB state store
spark.conf.set(
    "spark.sql.streaming.stateStore.providerClass",
    "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
)
TIP

RocksDB для production. Default HDFS backend хранит весь state в памяти JVM. При большом количестве ключей (миллионы пользователей) это приводит к GC pressure и OOM. RocksDB хранит state на SSD и использует блочный кэш, потребляя на порядок меньше heap memory. С Spark 3.2+ RocksDB — рекомендуемый backend для production.

WARNING

Anti-pattern: unbounded state без timeout. Если не установить timeout и не вызывать state.remove(), state store растёт бесконечно. Каждый уникальный ключ (user_id) добавляет запись, которая никогда не удаляется. При миллионах пользователей state store достигнет десятков ГБ и вызовет OOM даже с RocksDB. Всегда устанавливайте timeout или явно удаляйте state.

Полный pipeline: user session tracking

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

spark = SparkSession.builder \
    .appName("SessionTracking") \
    .config("spark.sql.streaming.stateStore.providerClass",
            "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
    .getOrCreate()

event_schema = StructType() \
    .add("user_id", StringType()) \
    .add("page", StringType()) \
    .add("action", StringType()) \
    .add("event_time", TimestampType())

# Kafka source
raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "clickstream") \
    .load()

events = raw.select(
    from_json(col("value").cast("string"), event_schema).alias("data")
).select("data.*")

# Stateful session tracking с event time timeout
sessions = events \
    .withWatermark("event_time", "20 minutes") \
    .groupBy("user_id") \
    .applyInPandasWithState(
        track_user_session,
        outputStructType=session_output_schema,
        stateStructType=session_state_schema,
        outputMode="append",
        timeoutConf=GroupStateTimeout.EventTimeTimeout
    )

# Delta sink
query = sessions.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/clickstream-sessions") \
    .trigger(processingTime="30 seconds") \
    .start("/data/gold/clickstream_sessions")

query.awaitTermination()
Проверка знанийKnowledge check
Чем mapGroupsWithState отличается от flatMapGroupsWithState? Когда использовать каждый?
ОтветAnswer
mapGroupsWithState возвращает ровно одну строку на каждый вызов (на каждый ключ за micro-batch). flatMapGroupsWithState возвращает Iterator -- ноль, одну или несколько строк. Используйте mapGroupsWithState для: обновления session state (одна сессия = одна строка). Используйте flatMapGroupsWithState для: pattern detection (может не генерировать alert = 0 строк), exploding sessions (может разбить одну сессию на фазы = N строк), фильтрации (может отбросить невалидные группы = 0 строк).
Проверка знанийKnowledge check
Почему event time timeout предпочтительнее processing time timeout для production?
ОтветAnswer
Processing time timeout зависит от реального clock executor и от того, как часто Spark запускает micro-batch. Если batch задерживается (backpressure, перезапуск), timeout может сработать значительно позже ожидаемого. Event time timeout привязан к watermark -- он срабатывает когда watermark (определяемый по данным) проходит заданный timestamp. Это делает поведение детерминированным и воспроизводимым: одни и те же данные всегда дают одинаковый результат, независимо от задержек обработки.

Как stateful-стриминг хранит состояние внутри — на уровне исходников — в курсе Apache Spark Internals:

Spark Internals: StateStore

Что дальше?

В следующем уроке мы перейдём к преемнику этого API — transformWithState (Arbitrary State API v2 в Spark 4.0): он решает основные ограничения mapGroupsWithState, поддерживает несколько state-переменных, композитные типы и декларативный TTL.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Когда следует использовать mapGroupsWithState вместо встроенных агрегаций (groupBy + count/sum)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8