Learning Platform
Глоссарий Troubleshooting
Урок 08.04 · 16 мин
Продвинутый
WatermarkEvent TimeProcessing TimeLate DatawithWatermark

Watermarks и обработка опоздавших данных

Event Time vs Processing Time

В потоковой обработке каждое событие имеет два времени:

  • Event time — когда событие произошло (timestamp в самих данных)
  • Processing time — когда событие обработано Spark (время прибытия)

Почему это различие критично? Данные приходят неупорядоченными. Мобильное приложение отправляет событие в 10:05, но из-за задержки сети оно приходит в Spark в 10:12. Если агрегировать по processing time, событие попадёт в окно 10:10-10:15, а не 10:05-10:10 где ему место.

Event time:       10:00  10:01  10:02  10:03  10:04  10:05
                    |      |      |      |      |      |
                    ↓      ↓      ↓      ↓      ↓      ↓
Processing time:  10:00  10:01  10:03  10:02  10:07  10:12
                                  ↑             ↑      ↑
                               reorder     5min late  7min late

Structured Streaming обрабатывает по event time — каждое событие попадает в правильное окно по своему timestamp. Но возникает проблема: как долго ждать опоздавшие данные?

Watermark: граница ожидания

Watermark — это движущаяся граница, определяющая максимально допустимое опоздание. Формула:

watermark = max(event_time) - threshold

Где max(event_time) — максимальный event time, увиденный во всех обработанных данных, а threshold — настраиваемая задержка.

Все события с event_time < watermark считаются опоздавшими и отбрасываются. Это позволяет Spark:

  1. Финализировать окна — когда watermark проходит верхнюю границу окна, результат не изменится
  2. Очищать state — старые данные удаляются из state store, предотвращая OOM
  3. Использовать append mode — финализированные окна безопасно записывать

withWatermark() API

from pyspark.sql.functions import window, sum as _sum, count, col
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

schema = StructType() \
    .add("user_id", StringType()) \
    .add("amount", DoubleType()) \
    .add("event_time", TimestampType())

# Source
raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "transactions") \
    .load()

events = raw.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

# Watermark: допускаем опоздание до 10 минут
windowed = events \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window("event_time", "5 minutes"),
        "user_id"
    ) \
    .agg(
        _sum("amount").alias("total"),
        count("*").alias("txn_count")
    )

# Append mode -- записываем только финализированные окна
query = windowed.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/watermark-demo") \
    .start("/data/gold/user_transactions")
TIP

withWatermark() должен быть на том же столбце, что и window(). Watermark определяется по столбцу event_time, и оконная агрегация тоже использует event_time. Если столбцы не совпадают, Spark не сможет связать watermark с окнами и очистить state.

Как watermark продвигается

Watermark обновляется после каждого micro-batch на основе данных в этом batch:

Batch 0: events с event_time [10:00, 10:05]
  max_event_time = 10:05
  watermark = 10:05 - 10min = 09:55
  → все события 09:55+ принимаются

Batch 1: events с event_time [10:03, 10:12]
  max_event_time = 10:12
  watermark = 10:12 - 10min = 10:02
  → событие с event_time 10:01 ОТБРОШЕНО (< 10:02)
  → событие с event_time 10:03 принято (>= 10:02)

Batch 2: events с event_time [10:08, 10:15]
  max_event_time = 10:15
  watermark = 10:15 - 10min = 10:05
  → окно [10:00-10:05) ФИНАЛИЗИРОВАНО
  → state для этого окна очищен

Watermark никогда не движется назад — это монотонно возрастающая граница.

Визуализация: влияние watermark delay

Интерактивная диаграмма ниже показывает, как настройка watermark delay влияет на принятие и отбрасывание событий. Двигайте слайдер, чтобы увидеть:

  • При delay = 0s — все опоздавшие события отбрасываются, минимальный state
  • При delay = 60s — все события принимаются, но state растёт
Streaming Watermarks: обработка опоздавших данных
10s
0s (drop all late)30s60s (accept all)

Event time timeline (x-axis) -- зелёные = принятые, красные = отброшенные:

10:0010:00:2010:00:4010:01:0010:01:20
1
2
4
3
6
5
14
8
7
10
9
12
11
13
WM
y-axis: lateness (processing_time - event_time)
Accepted8/14
Dropped6/14
Watermark70s
Max event time80s

watermark = max(event_time) - delay = 80s - 10s = 70s

События с event_time < 70s отбрасываются как опоздавшие

AcceptedDroppedWatermark line

Tradeoff: полнота vs задержка

Выбор watermark delay — это компромисс:

Маленький delay (1-5 мин)Большой delay (30-60 мин)
Быстрая финализация оконМедленная финализация
Больше отброшенных событийМеньше отброшенных событий
Маленький state storeБольшой state store
Низкая задержка результатовВысокая задержка результатов
Подходит для: real-time dashboardПодходит для: точные отчёты

Как выбрать? Анализируйте реальные данные:

# Измерьте реальное опоздание данных
from pyspark.sql.functions import current_timestamp

latency = events.select(
    (current_timestamp().cast("long") - col("event_time").cast("long")).alias("latency_seconds")
)

# P99 latency = watermark delay
# Если 99% событий приходят в течение 8 минут -- delay = 10 минут
latency.select(
    percentile_approx("latency_seconds", 0.99).alias("p99_latency")
).show()
WARNING

Anti-pattern: агрегация без watermark в append mode. Без watermark Spark не может финализировать окна, поэтому state store растёт бесконечно. Даже если Spark не выбрасывает AnalysisException (например, в update mode), state store со временем вызовет OOM. Всегда добавляйте watermark при оконных агрегациях.

Watermark + Output Mode

Поведение watermark зависит от output mode:

Output ModeКогда записываетсяLate data
AppendКогда watermark проходит окноОтбрасывается после watermark
UpdateКаждый batch (изменённые строки)Обновляет результат до watermark
CompleteКаждый batch (все строки)Обновляет результат до watermark

В append mode watermark критичен — без него агрегации невозможны. В update/complete mode watermark опционален, но рекомендован для ограничения state store.

Полный production pipeline с watermark

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, sum as _sum, count

spark = SparkSession.builder \
    .appName("WatermarkPipeline") \
    .config("spark.sql.streaming.stateStore.stateSchemaCheck", "false") \
    .getOrCreate()

schema = StructType() \
    .add("sensor_id", StringType()) \
    .add("temperature", DoubleType()) \
    .add("event_time", TimestampType())

# Kafka source
raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "sensor-data") \
    .option("startingOffsets", "latest") \
    .load()

sensors = raw.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

# Watermark: 10 минут допустимого опоздания
# Window: 5-минутные окна для средней температуры
avg_temp = sensors \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window("event_time", "5 minutes"),
        "sensor_id"
    ) \
    .agg(
        _sum("temperature").alias("temp_sum"),
        count("*").alias("readings")
    )

# Delta sink в append mode -- финализированные окна
query = avg_temp.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/sensor-avg") \
    .trigger(processingTime="30 seconds") \
    .start("/data/gold/sensor_averages")

query.awaitTermination()
Проверка знанийKnowledge check
Watermark = max(event_time) - threshold. Если текущий max event_time = 10:30, а threshold = 15 минут, какие события будут отброшены?
ОтветAnswer
Watermark = 10:30 - 15min = 10:15. Все события с event_time < 10:15 будут отброшены как опоздавшие. Событие с event_time = 10:14 будет отброшено. Событие с event_time = 10:15 будет принято. Watermark монотонно растёт -- если в следующем batch max event_time = 10:25 (меньше текущего 10:30), watermark не откатится назад и останется 10:15.
Проверка знанийKnowledge check
Почему агрегация без watermark приводит к OOM в production, даже если Spark не выбрасывает AnalysisException?
ОтветAnswer
Без watermark Spark не знает, когда можно удалить старые данные из state store. Для groupBy().agg() каждая группа хранится в state store бессрочно -- новые данные могут изменить результат в любой момент. Со временем state store растёт неограниченно: 1000 пользователей -> 10000 -> 1000000. При достаточном времени работы executor исчерпывает память и падает с OOM. Watermark позволяет Spark удалять state для групп, чьи окна финализированы.

Что дальше?

В следующем уроке мы разберём оконные функции — tumbling, sliding и session windows. Вы увидите, как окна работают совместно с watermarks для bounded stateful processing.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 6. Формула watermark: watermark = max(event_time) - threshold. Если max(event_time) = 10:25, threshold = 10 минут, что произойдёт с событием, имеющим event_time = 10:14?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8