Watermarks и обработка опоздавших данных
Event Time vs Processing Time
В потоковой обработке каждое событие имеет два времени:
- Event time — когда событие произошло (timestamp в самих данных)
- Processing time — когда событие обработано Spark (время прибытия)
Почему это различие критично? Данные приходят неупорядоченными. Мобильное приложение отправляет событие в 10:05, но из-за задержки сети оно приходит в Spark в 10:12. Если агрегировать по processing time, событие попадёт в окно 10:10-10:15, а не 10:05-10:10 где ему место.
Event time: 10:00 10:01 10:02 10:03 10:04 10:05
| | | | | |
↓ ↓ ↓ ↓ ↓ ↓
Processing time: 10:00 10:01 10:03 10:02 10:07 10:12
↑ ↑ ↑
reorder 5min late 7min late
Structured Streaming обрабатывает по event time — каждое событие попадает в правильное окно по своему timestamp. Но возникает проблема: как долго ждать опоздавшие данные?
Watermark: граница ожидания
Watermark — это движущаяся граница, определяющая максимально допустимое опоздание. Формула:
watermark = max(event_time) - threshold
Где max(event_time) — максимальный event time, увиденный во всех обработанных данных, а threshold — настраиваемая задержка.
Все события с event_time < watermark считаются опоздавшими и отбрасываются. Это позволяет Spark:
- Финализировать окна — когда watermark проходит верхнюю границу окна, результат не изменится
- Очищать state — старые данные удаляются из state store, предотвращая OOM
- Использовать append mode — финализированные окна безопасно записывать
withWatermark() API
from pyspark.sql.functions import window, sum as _sum, count, col
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
schema = StructType() \
.add("user_id", StringType()) \
.add("amount", DoubleType()) \
.add("event_time", TimestampType())
# Source
raw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "transactions") \
.load()
events = raw.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Watermark: допускаем опоздание до 10 минут
windowed = events \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window("event_time", "5 minutes"),
"user_id"
) \
.agg(
_sum("amount").alias("total"),
count("*").alias("txn_count")
)
# Append mode -- записываем только финализированные окна
query = windowed.writeStream \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", "/checkpoints/watermark-demo") \
.start("/data/gold/user_transactions")
withWatermark() должен быть на том же столбце, что и window(). Watermark определяется по столбцу event_time, и оконная агрегация тоже использует event_time. Если столбцы не совпадают, Spark не сможет связать watermark с окнами и очистить state.
Как watermark продвигается
Watermark обновляется после каждого micro-batch на основе данных в этом batch:
Batch 0: events с event_time [10:00, 10:05]
max_event_time = 10:05
watermark = 10:05 - 10min = 09:55
→ все события 09:55+ принимаются
Batch 1: events с event_time [10:03, 10:12]
max_event_time = 10:12
watermark = 10:12 - 10min = 10:02
→ событие с event_time 10:01 ОТБРОШЕНО (< 10:02)
→ событие с event_time 10:03 принято (>= 10:02)
Batch 2: events с event_time [10:08, 10:15]
max_event_time = 10:15
watermark = 10:15 - 10min = 10:05
→ окно [10:00-10:05) ФИНАЛИЗИРОВАНО
→ state для этого окна очищен
Watermark никогда не движется назад — это монотонно возрастающая граница.
Визуализация: влияние watermark delay
Интерактивная диаграмма ниже показывает, как настройка watermark delay влияет на принятие и отбрасывание событий. Двигайте слайдер, чтобы увидеть:
- При delay = 0s — все опоздавшие события отбрасываются, минимальный state
- При delay = 60s — все события принимаются, но state растёт
Event time timeline (x-axis) -- зелёные = принятые, красные = отброшенные:
watermark = max(event_time) - delay = 80s - 10s = 70s
События с event_time < 70s отбрасываются как опоздавшие
Tradeoff: полнота vs задержка
Выбор watermark delay — это компромисс:
| Маленький delay (1-5 мин) | Большой delay (30-60 мин) |
|---|---|
| Быстрая финализация окон | Медленная финализация |
| Больше отброшенных событий | Меньше отброшенных событий |
| Маленький state store | Большой state store |
| Низкая задержка результатов | Высокая задержка результатов |
| Подходит для: real-time dashboard | Подходит для: точные отчёты |
Как выбрать? Анализируйте реальные данные:
# Измерьте реальное опоздание данных
from pyspark.sql.functions import current_timestamp
latency = events.select(
(current_timestamp().cast("long") - col("event_time").cast("long")).alias("latency_seconds")
)
# P99 latency = watermark delay
# Если 99% событий приходят в течение 8 минут -- delay = 10 минут
latency.select(
percentile_approx("latency_seconds", 0.99).alias("p99_latency")
).show()
Anti-pattern: агрегация без watermark в append mode. Без watermark Spark не может финализировать окна, поэтому state store растёт бесконечно. Даже если Spark не выбрасывает AnalysisException (например, в update mode), state store со временем вызовет OOM. Всегда добавляйте watermark при оконных агрегациях.
Watermark + Output Mode
Поведение watermark зависит от output mode:
| Output Mode | Когда записывается | Late data |
|---|---|---|
| Append | Когда watermark проходит окно | Отбрасывается после watermark |
| Update | Каждый batch (изменённые строки) | Обновляет результат до watermark |
| Complete | Каждый batch (все строки) | Обновляет результат до watermark |
В append mode watermark критичен — без него агрегации невозможны. В update/complete mode watermark опционален, но рекомендован для ограничения state store.
Полный production pipeline с watermark
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, sum as _sum, count
spark = SparkSession.builder \
.appName("WatermarkPipeline") \
.config("spark.sql.streaming.stateStore.stateSchemaCheck", "false") \
.getOrCreate()
schema = StructType() \
.add("sensor_id", StringType()) \
.add("temperature", DoubleType()) \
.add("event_time", TimestampType())
# Kafka source
raw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "sensor-data") \
.option("startingOffsets", "latest") \
.load()
sensors = raw.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Watermark: 10 минут допустимого опоздания
# Window: 5-минутные окна для средней температуры
avg_temp = sensors \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window("event_time", "5 minutes"),
"sensor_id"
) \
.agg(
_sum("temperature").alias("temp_sum"),
count("*").alias("readings")
)
# Delta sink в append mode -- финализированные окна
query = avg_temp.writeStream \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", "/checkpoints/sensor-avg") \
.trigger(processingTime="30 seconds") \
.start("/data/gold/sensor_averages")
query.awaitTermination()
Что дальше?
В следующем уроке мы разберём оконные функции — tumbling, sliding и session windows. Вы увидите, как окна работают совместно с watermarks для bounded stateful processing.