Оконные функции в стриминге
Оконные функции (windowing) группируют потоковые события по временным интервалам. Каждое окно — это группа событий с event_time в заданном диапазоне. Spark поддерживает три типа окон.
Tumbling Windows (неперекрывающиеся)
Tumbling window — фиксированный интервал без перекрытий. Каждое событие попадает ровно в одно окно.
Время: 10:00 10:05 10:10 10:15 10:20
|--------|--------|--------|--------|
Window 1 Window 2 Window 3 Window 4
5 мин 5 мин 5 мин 5 мин
from pyspark.sql.functions import window, sum as _sum, count, col, from_json
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
schema = StructType() \
.add("user_id", StringType()) \
.add("amount", DoubleType()) \
.add("event_time", TimestampType())
# Source
raw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "transactions") \
.load()
events = raw.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Tumbling window: 5-минутные неперекрывающиеся окна
tumbling = events \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window("event_time", "5 minutes"),
"user_id"
) \
.agg(
_sum("amount").alias("total"),
count("*").alias("txn_count")
)
query = tumbling.writeStream \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", "/checkpoints/tumbling-demo") \
.start("/data/gold/user_totals_5min")
Tumbling windows — самый распространённый тип. Используйте для: почасовых/дневных агрегатов, метрик за период, batch-style агрегации на streaming данных.
Sliding Windows (перекрывающиеся)
Sliding window имеет два параметра: размер окна и интервал сдвига (slide). Окна перекрываются, и одно событие может попасть в несколько окон.
Размер окна: 10 мин, Slide: 5 мин
Время: 10:00 10:05 10:10 10:15 10:20
|----------------|
|----------------|
|----------------|
Window 1 Window 2 Window 3
[10:00-10:10) [10:05-10:15) [10:10-10:20)
Событие с event_time = 10:07 попадёт и в Window 1, и в Window 2.
# Sliding window: 10-минутное окно, сдвиг каждые 5 минут
sliding = events \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window("event_time", "10 minutes", "5 minutes"),
"user_id"
) \
.agg(
_sum("amount").alias("total"),
count("*").alias("txn_count")
)
query = sliding.writeStream \
.outputMode("update") \
.format("delta") \
.option("checkpointLocation", "/checkpoints/sliding-demo") \
.start("/data/gold/user_totals_sliding")
Anti-pattern: sliding window с slide значительно меньше window. Если window = 1 час, slide = 1 секунда, каждое событие дублируется в 3600 окнах. State store взрывается: num_windows = window_size / slide_interval. Правило: slide >= window / 10. Для 1-часового окна минимальный slide = 6 минут.
Используйте sliding windows для: скользящих средних, trending metrics, alerting (среднее за последние 10 минут обновляется каждые 2 минуты).
Session Windows (динамические)
Session window группирует события по gap (паузе) между ними. Окно закрывается, когда между событиями нет активности дольше заданного gap. Размер окна динамический — зависит от данных.
Gap: 5 мин
Событие: A(10:00) B(10:03) C(10:04) D(10:12) E(10:14)
|------ Сессия 1 -------| |-- Сессия 2 --|
[10:00 - 10:09) [10:12 - 10:19)
gap < 5min gap < 5min
gap = 8min > 5min → новая сессия
from pyspark.sql.functions import session_window
# Session window: gap 5 минут
sessions = events \
.withWatermark("event_time", "15 minutes") \
.groupBy(
session_window("event_time", "5 minutes"),
"user_id"
) \
.agg(
_sum("amount").alias("session_total"),
count("*").alias("event_count")
)
query = sessions.writeStream \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", "/checkpoints/session-demo") \
.start("/data/gold/user_sessions")
session_window() появился в Spark 3.2. До этого session windows реализовывались через flatMapGroupsWithState вручную (десятки строк кода). session_window() делает то же самое одной строкой. Watermark для session windows рекомендуется >= 2x gap, чтобы учесть опоздавшие события, расширяющие сессию.
Session windows незаменимы для: пользовательских сессий (web analytics), user journey tracking, clickstream analysis, fraud detection (серия транзакций).
Столбец window в результате
window() и session_window() добавляют структурный столбец window с полями start и end:
# Результат содержит:
# window: {start: "2024-01-15 10:00:00", end: "2024-01-15 10:05:00"}
# user_id: "user_123"
# total: 450.0
# txn_count: 5
# Можно извлечь границы окна:
result = tumbling.select(
col("window.start").alias("window_start"),
col("window.end").alias("window_end"),
"user_id",
"total"
)
Комбинация окон с watermarks
Watermark определяет, когда окно финализируется:
# Watermark = 10 мин, Window = 5 мин
# Окно [10:00-10:05) финализируется когда:
# max(event_time) >= 10:05 + 10min = 10:15
# То есть через 10 минут после закрытия окна
events \
.withWatermark("event_time", "10 minutes") \
.groupBy(window("event_time", "5 minutes")) \
.count()
Без watermark state store хранит все окна бессрочно. С watermark старые окна удаляются:
State store размер:
Без watermark: [10:00] [10:05] [10:10] ... [23:55] → 288 окон за день!
С watermark 10min: [10:55] [11:00] [11:05] → только 3 окна
Полный pipeline: каждый тип окна
# Tumbling: точные 5-минутные агрегаты
tumbling_query = events \
.withWatermark("event_time", "10 minutes") \
.groupBy(window("event_time", "5 minutes")) \
.agg(_sum("amount").alias("total")) \
.writeStream.outputMode("append") \
.format("delta") \
.option("checkpointLocation", "/checkpoints/tumbling") \
.start("/data/gold/5min_totals")
# Sliding: скользящее среднее за 30 мин, обновление каждые 5 мин
sliding_query = events \
.withWatermark("event_time", "30 minutes") \
.groupBy(window("event_time", "30 minutes", "5 minutes")) \
.agg(_sum("amount").alias("rolling_total")) \
.writeStream.outputMode("update") \
.format("delta") \
.option("checkpointLocation", "/checkpoints/sliding") \
.start("/data/gold/30min_rolling")
# Session: пользовательские сессии с gap 10 мин
session_query = events \
.withWatermark("event_time", "20 minutes") \
.groupBy(session_window("event_time", "10 minutes"), "user_id") \
.agg(
count("*").alias("events_in_session"),
_sum("amount").alias("session_total")
) \
.writeStream.outputMode("append") \
.format("delta") \
.option("checkpointLocation", "/checkpoints/session") \
.start("/data/gold/user_sessions")
Что дальше?
В следующем уроке мы разберём stateful операции — mapGroupsWithState и flatMapGroupsWithState для произвольной логики состояния, выходящей за рамки стандартных агрегаций.