Режимы вывода: append, complete, update
Output mode определяет, какие строки из Result Table записываются в sink при каждом micro-batch. Выбор неправильного mode вызывает AnalysisException — Spark отклоняет запрос ещё до запуска.
Три режима вывода
Append Mode (по умолчанию)
В append mode записываются только новые строки, добавленные в Result Table с последнего batch. Строки, однажды записанные, никогда не изменяются.
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
spark = SparkSession.builder.appName("AppendMode").getOrCreate()
schema = StructType() \
.add("event_id", StringType()) \
.add("user_id", StringType()) \
.add("amount", DoubleType()) \
.add("event_time", TimestampType())
# Source
raw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "events") \
.load()
events = raw.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Append: только новые строки (без агрегаций)
query = events \
.filter(col("amount") > 100) \
.writeStream \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", "/checkpoints/append-demo") \
.start("/data/output/high_value_events")
Ограничение: append mode не работает с агрегациями без watermark. Почему? Без watermark Spark не знает, когда агрегация “финализирована” — новые данные могут изменить результат. Spark отклоняет такой запрос:
# ОШИБКА: AnalysisException
# Append output mode not supported when there are streaming aggregations
# without watermark
counts = events.groupBy("user_id").count()
query = counts.writeStream.outputMode("append").start() # AnalysisException!
С watermark append mode допускает агрегации — Spark записывает результат окна, когда watermark проходит его границу:
# РАБОТАЕТ: агрегация + watermark = append mode ok
counts = events \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window("event_time", "5 minutes"),
"user_id"
).count()
query = counts.writeStream.outputMode("append").start() # OK
Complete Mode
В complete mode все строки Result Table записываются в sink при каждом batch. Предыдущий результат полностью перезаписывается.
# Complete: вся таблица на каждом batch
# Подходит для агрегаций с малым числом строк результата
totals = events.groupBy("user_id").agg(
sum("amount").alias("total_amount"),
count("*").alias("event_count")
)
query = totals.writeStream \
.outputMode("complete") \
.format("console") \
.option("checkpointLocation", "/checkpoints/complete-demo") \
.start()
# Каждый batch выводит полную таблицу:
# Batch 0: {user_1: 100, user_2: 50}
# Batch 1: {user_1: 250, user_2: 50, user_3: 75} -- всё заново
# Batch 2: {user_1: 310, user_2: 120, user_3: 75} -- всё заново
Ограничение: complete mode требует агрегации. Без агрегации Result Table растёт бесконечно — Spark должен хранить все данные в памяти, что приводит к OOM.
# ОШИБКА: Complete output mode not supported for non-aggregation queries
query = events.writeStream.outputMode("complete").start() # AnalysisException!
Update Mode
В update mode записываются только изменённые строки — новые или обновлённые с последнего batch. Неизменённые строки не записываются.
# Update: только изменённые строки
totals = events.groupBy("user_id").agg(
sum("amount").alias("total_amount")
)
query = totals.writeStream \
.outputMode("update") \
.format("delta") \
.option("checkpointLocation", "/checkpoints/update-demo") \
.start("/data/output/user_totals")
# Batch 0: {user_1: 100, user_2: 50} -- обе новые
# Batch 1: {user_1: 250, user_3: 75} -- user_1 обновлён, user_3 новый
# Batch 2: {user_1: 310, user_2: 120} -- user_1 и user_2 обновлены
Update mode — компромисс между append (только новые) и complete (всё). Он эффективнее complete для больших Result Table, потому что записывает только дельту.
Один поток, три режима
Рассмотрим один и тот же pipeline — подсчёт событий по типу — в трёх режимах:
# Общая трансформация
event_counts = events \
.withWatermark("event_time", "10 minutes") \
.groupBy("event_type") \
.count()
| Batch | Входные данные | Append | Complete | Update |
|---|---|---|---|---|
| 0 | click:3, view:2 | (ничего — окно не закрыто) | click:3, view:2 | click:3, view:2 |
| 1 | click:1, buy:1 | (ничего) | click:4, view:2, buy:1 | click:4, buy:1 |
| 2 | (watermark advances) | click:4, view:2 | click:4, view:2, buy:1 | (ничего) |
В append mode результат появляется только когда watermark закрывает окно. В complete mode — полная таблица каждый раз. В update — только изменённые строки.
Decision Tree: выбор Output Mode
Есть ли агрегации (groupBy, count, sum)?
├── НЕТ (map, filter, select)
│ └── Append mode (единственный вариант)
│
└── ДА
├── Нужны ли все строки каждый раз?
│ ├── ДА → Complete mode
│ │ (малый результат, dashboard, memory sink)
│ │
│ └── НЕТ
│ ├── Есть watermark?
│ │ ├── ДА → Append mode
│ │ │ (файловый sink, идемпотентность)
│ │ │
│ │ └── НЕТ → Update mode
│ │ (Delta/Kafka sink, низкая латентность)
│ │
│ └── Update mode
│ (наиболее универсальный для агрегаций)
Практическое правило. Без агрегаций — append. С агрегациями для dashboard/мониторинга — complete. С агрегациями для production sink (Delta, Kafka) — update. Если записываете агрегаты в файловый sink — append + watermark (файлы не поддерживают перезапись строк).
Совместимость операций и режимов
| Операция | Append | Complete | Update |
|---|---|---|---|
| select, filter, map | Да | Нет | Да |
| groupBy().agg() без watermark | Нет | Да | Да |
| groupBy().agg() с watermark | Да | Да | Да |
| mapGroupsWithState | Зависит | Нет | Да |
| flatMapGroupsWithState | Да/Update | Нет | Да/Append |
Anti-pattern: append mode с агрегациями без watermark. Это самая частая ошибка. Spark выбрасывает AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark. Решение: добавьте .withWatermark() перед агрегацией или используйте update/complete mode.
Complete pipeline: все три режима
from pyspark.sql.functions import window, sum as _sum, count
# Source
raw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "sales") \
.load()
sales = raw.select(
from_json(col("value").cast("string"), sales_schema).alias("data")
).select("data.*")
# Windowed aggregation
hourly = sales \
.withWatermark("sale_time", "1 hour") \
.groupBy(
window("sale_time", "1 hour"),
"store_id"
) \
.agg(
_sum("amount").alias("total"),
count("*").alias("txn_count")
)
# Append: финализированные окна -> Parquet (архив)
hourly.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/data/archive/hourly_sales/") \
.option("checkpointLocation", "/checkpoints/sales-archive") \
.start()
# Update: изменённые строки -> Delta (оперативные данные)
hourly.writeStream \
.outputMode("update") \
.format("delta") \
.option("checkpointLocation", "/checkpoints/sales-operational") \
.start("/data/gold/hourly_sales")
# Complete: полная таблица -> memory (dashboard)
hourly.writeStream \
.outputMode("complete") \
.format("memory") \
.queryName("sales_dashboard") \
.start()
Что дальше?
В следующем уроке мы разберём watermarks и обработку опоздавших данных — ключевой механизм для event-time processing в Structured Streaming. Вы увидите, как watermark определяет, какие данные считать “опоздавшими”, и как настроить баланс между полнотой и задержкой.