Learning Platform
Глоссарий Troubleshooting
Урок 08.05 · 16 мин
Продвинутый
Tumbling WindowSliding WindowSession Windowwindow()session_window()

Оконные функции в стриминге

Оконные функции (windowing) группируют потоковые события по временным интервалам. Каждое окно — это группа событий с event_time в заданном диапазоне. Spark поддерживает три типа окон.

Tumbling Windows (неперекрывающиеся)

Tumbling window — фиксированный интервал без перекрытий. Каждое событие попадает ровно в одно окно.

Время:    10:00    10:05    10:10    10:15    10:20
          |--------|--------|--------|--------|
          Window 1  Window 2  Window 3  Window 4
          5 мин     5 мин     5 мин     5 мин
from pyspark.sql.functions import window, sum as _sum, count, col, from_json
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

schema = StructType() \
    .add("user_id", StringType()) \
    .add("amount", DoubleType()) \
    .add("event_time", TimestampType())

# Source
raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "transactions") \
    .load()

events = raw.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

# Tumbling window: 5-минутные неперекрывающиеся окна
tumbling = events \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window("event_time", "5 minutes"),
        "user_id"
    ) \
    .agg(
        _sum("amount").alias("total"),
        count("*").alias("txn_count")
    )

query = tumbling.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/tumbling-demo") \
    .start("/data/gold/user_totals_5min")

Tumbling windows — самый распространённый тип. Используйте для: почасовых/дневных агрегатов, метрик за период, batch-style агрегации на streaming данных.

Sliding Windows (перекрывающиеся)

Sliding window имеет два параметра: размер окна и интервал сдвига (slide). Окна перекрываются, и одно событие может попасть в несколько окон.

Размер окна: 10 мин, Slide: 5 мин

Время:    10:00    10:05    10:10    10:15    10:20
          |----------------|
                   |----------------|
                            |----------------|
          Window 1          Window 2          Window 3
          [10:00-10:10)     [10:05-10:15)     [10:10-10:20)

Событие с event_time = 10:07 попадёт и в Window 1, и в Window 2.

# Sliding window: 10-минутное окно, сдвиг каждые 5 минут
sliding = events \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window("event_time", "10 minutes", "5 minutes"),
        "user_id"
    ) \
    .agg(
        _sum("amount").alias("total"),
        count("*").alias("txn_count")
    )

query = sliding.writeStream \
    .outputMode("update") \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/sliding-demo") \
    .start("/data/gold/user_totals_sliding")
WARNING

Anti-pattern: sliding window с slide значительно меньше window. Если window = 1 час, slide = 1 секунда, каждое событие дублируется в 3600 окнах. State store взрывается: num_windows = window_size / slide_interval. Правило: slide >= window / 10. Для 1-часового окна минимальный slide = 6 минут.

Используйте sliding windows для: скользящих средних, trending metrics, alerting (среднее за последние 10 минут обновляется каждые 2 минуты).

Session Windows (динамические)

Session window группирует события по gap (паузе) между ними. Окно закрывается, когда между событиями нет активности дольше заданного gap. Размер окна динамический — зависит от данных.

Gap: 5 мин

Событие:  A(10:00)  B(10:03)  C(10:04)     D(10:12)  E(10:14)
          |------ Сессия 1 -------|          |-- Сессия 2 --|
          [10:00 - 10:09)                    [10:12 - 10:19)
                     gap < 5min                gap < 5min
                               gap = 8min > 5min → новая сессия
from pyspark.sql.functions import session_window

# Session window: gap 5 минут
sessions = events \
    .withWatermark("event_time", "15 minutes") \
    .groupBy(
        session_window("event_time", "5 minutes"),
        "user_id"
    ) \
    .agg(
        _sum("amount").alias("session_total"),
        count("*").alias("event_count")
    )

query = sessions.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/session-demo") \
    .start("/data/gold/user_sessions")
TIP

session_window() появился в Spark 3.2. До этого session windows реализовывались через flatMapGroupsWithState вручную (десятки строк кода). session_window() делает то же самое одной строкой. Watermark для session windows рекомендуется >= 2x gap, чтобы учесть опоздавшие события, расширяющие сессию.

Session windows незаменимы для: пользовательских сессий (web analytics), user journey tracking, clickstream analysis, fraud detection (серия транзакций).

Столбец window в результате

window() и session_window() добавляют структурный столбец window с полями start и end:

# Результат содержит:
# window: {start: "2024-01-15 10:00:00", end: "2024-01-15 10:05:00"}
# user_id: "user_123"
# total: 450.0
# txn_count: 5

# Можно извлечь границы окна:
result = tumbling.select(
    col("window.start").alias("window_start"),
    col("window.end").alias("window_end"),
    "user_id",
    "total"
)

Комбинация окон с watermarks

Watermark определяет, когда окно финализируется:

# Watermark = 10 мин, Window = 5 мин
# Окно [10:00-10:05) финализируется когда:
# max(event_time) >= 10:05 + 10min = 10:15
# То есть через 10 минут после закрытия окна

events \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(window("event_time", "5 minutes")) \
    .count()

Без watermark state store хранит все окна бессрочно. С watermark старые окна удаляются:

State store размер:
  Без watermark:  [10:00] [10:05] [10:10] ... [23:55]  → 288 окон за день!
  С watermark 10min: [10:55] [11:00] [11:05]           → только 3 окна

Полный pipeline: каждый тип окна

# Tumbling: точные 5-минутные агрегаты
tumbling_query = events \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(window("event_time", "5 minutes")) \
    .agg(_sum("amount").alias("total")) \
    .writeStream.outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/tumbling") \
    .start("/data/gold/5min_totals")

# Sliding: скользящее среднее за 30 мин, обновление каждые 5 мин
sliding_query = events \
    .withWatermark("event_time", "30 minutes") \
    .groupBy(window("event_time", "30 minutes", "5 minutes")) \
    .agg(_sum("amount").alias("rolling_total")) \
    .writeStream.outputMode("update") \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/sliding") \
    .start("/data/gold/30min_rolling")

# Session: пользовательские сессии с gap 10 мин
session_query = events \
    .withWatermark("event_time", "20 minutes") \
    .groupBy(session_window("event_time", "10 minutes"), "user_id") \
    .agg(
        count("*").alias("events_in_session"),
        _sum("amount").alias("session_total")
    ) \
    .writeStream.outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/session") \
    .start("/data/gold/user_sessions")
Проверка знанийKnowledge check
Tumbling window = 10 мин, sliding window = 10 мин с slide 2 мин. Сколько окон содержит событие с event_time = 10:07 в каждом случае?
ОтветAnswer
В tumbling window событие 10:07 попадает в ровно 1 окно: [10:00-10:10). В sliding window с slide 2 мин событие попадает в 5 окон: [10:00-10:10), [10:02-10:12), [10:04-10:14), [10:06-10:16), и ещё зависит от расчёта. Формула: num_windows = window_size / slide = 10/2 = 5 окон максимум. Каждое событие дублируется в window_size/slide окнах, что увеличивает state store в 5 раз.
Проверка знанийKnowledge check
Пользователь заходит на сайт в 10:00, кликает в 10:02, 10:04, затем уходит. Возвращается в 10:15, кликает в 10:16. Session gap = 5 минут. Сколько сессий создаст session_window?
ОтветAnswer
Две сессии. Первая сессия: события 10:00, 10:02, 10:04 -- gap между соседними <= 5 минут. Сессия: [10:00, ~10:09). Вторая сессия: события 10:15, 10:16 -- gap от предыдущей сессии = 11 минут (10:04 → 10:15 > 5 мин gap), поэтому новая сессия. Сессия: [10:15, ~10:21). Session windows динамически определяют границы на основе реальной активности пользователя.

Что дальше?

В следующем уроке мы разберём stateful операцииmapGroupsWithState и flatMapGroupsWithState для произвольной логики состояния, выходящей за рамки стандартных агрегаций.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Чем отличается tumbling window от sliding window в Structured Streaming?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8