Learning Platform
Глоссарий Troubleshooting
Урок 08.03 · 12 мин
Средний
Append ModeComplete ModeUpdate ModeAggregationOutput Mode Selection

Режимы вывода: append, complete, update

Output mode определяет, какие строки из Result Table записываются в sink при каждом micro-batch. Выбор неправильного mode вызывает AnalysisException — Spark отклоняет запрос ещё до запуска.

Три режима вывода

Append Mode (по умолчанию)

В append mode записываются только новые строки, добавленные в Result Table с последнего batch. Строки, однажды записанные, никогда не изменяются.

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

spark = SparkSession.builder.appName("AppendMode").getOrCreate()

schema = StructType() \
    .add("event_id", StringType()) \
    .add("user_id", StringType()) \
    .add("amount", DoubleType()) \
    .add("event_time", TimestampType())

# Source
raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "events") \
    .load()

events = raw.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

# Append: только новые строки (без агрегаций)
query = events \
    .filter(col("amount") > 100) \
    .writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/append-demo") \
    .start("/data/output/high_value_events")

Ограничение: append mode не работает с агрегациями без watermark. Почему? Без watermark Spark не знает, когда агрегация “финализирована” — новые данные могут изменить результат. Spark отклоняет такой запрос:

# ОШИБКА: AnalysisException
# Append output mode not supported when there are streaming aggregations
# without watermark
counts = events.groupBy("user_id").count()
query = counts.writeStream.outputMode("append").start()  # AnalysisException!

С watermark append mode допускает агрегации — Spark записывает результат окна, когда watermark проходит его границу:

# РАБОТАЕТ: агрегация + watermark = append mode ok
counts = events \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window("event_time", "5 minutes"),
        "user_id"
    ).count()

query = counts.writeStream.outputMode("append").start()  # OK

Complete Mode

В complete mode все строки Result Table записываются в sink при каждом batch. Предыдущий результат полностью перезаписывается.

# Complete: вся таблица на каждом batch
# Подходит для агрегаций с малым числом строк результата
totals = events.groupBy("user_id").agg(
    sum("amount").alias("total_amount"),
    count("*").alias("event_count")
)

query = totals.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("checkpointLocation", "/checkpoints/complete-demo") \
    .start()

# Каждый batch выводит полную таблицу:
# Batch 0: {user_1: 100, user_2: 50}
# Batch 1: {user_1: 250, user_2: 50, user_3: 75}  -- всё заново
# Batch 2: {user_1: 310, user_2: 120, user_3: 75}  -- всё заново

Ограничение: complete mode требует агрегации. Без агрегации Result Table растёт бесконечно — Spark должен хранить все данные в памяти, что приводит к OOM.

# ОШИБКА: Complete output mode not supported for non-aggregation queries
query = events.writeStream.outputMode("complete").start()  # AnalysisException!

Update Mode

В update mode записываются только изменённые строки — новые или обновлённые с последнего batch. Неизменённые строки не записываются.

# Update: только изменённые строки
totals = events.groupBy("user_id").agg(
    sum("amount").alias("total_amount")
)

query = totals.writeStream \
    .outputMode("update") \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/update-demo") \
    .start("/data/output/user_totals")

# Batch 0: {user_1: 100, user_2: 50}              -- обе новые
# Batch 1: {user_1: 250, user_3: 75}               -- user_1 обновлён, user_3 новый
# Batch 2: {user_1: 310, user_2: 120}              -- user_1 и user_2 обновлены

Update mode — компромисс между append (только новые) и complete (всё). Он эффективнее complete для больших Result Table, потому что записывает только дельту.

Один поток, три режима

Рассмотрим один и тот же pipeline — подсчёт событий по типу — в трёх режимах:

# Общая трансформация
event_counts = events \
    .withWatermark("event_time", "10 minutes") \
    .groupBy("event_type") \
    .count()
BatchВходные данныеAppendCompleteUpdate
0click:3, view:2(ничего — окно не закрыто)click:3, view:2click:3, view:2
1click:1, buy:1(ничего)click:4, view:2, buy:1click:4, buy:1
2(watermark advances)click:4, view:2click:4, view:2, buy:1(ничего)

В append mode результат появляется только когда watermark закрывает окно. В complete mode — полная таблица каждый раз. В update — только изменённые строки.

Decision Tree: выбор Output Mode

Есть ли агрегации (groupBy, count, sum)?
├── НЕТ (map, filter, select)
│   └── Append mode (единственный вариант)

└── ДА
    ├── Нужны ли все строки каждый раз?
    │   ├── ДА → Complete mode
    │   │   (малый результат, dashboard, memory sink)
    │   │
    │   └── НЕТ
    │       ├── Есть watermark?
    │       │   ├── ДА → Append mode
    │       │   │   (файловый sink, идемпотентность)
    │       │   │
    │       │   └── НЕТ → Update mode
    │       │       (Delta/Kafka sink, низкая латентность)
    │       │
    │       └── Update mode
    │           (наиболее универсальный для агрегаций)
TIP

Практическое правило. Без агрегаций — append. С агрегациями для dashboard/мониторинга — complete. С агрегациями для production sink (Delta, Kafka) — update. Если записываете агрегаты в файловый sink — append + watermark (файлы не поддерживают перезапись строк).

Совместимость операций и режимов

ОперацияAppendCompleteUpdate
select, filter, mapДаНетДа
groupBy().agg() без watermarkНетДаДа
groupBy().agg() с watermarkДаДаДа
mapGroupsWithStateЗависитНетДа
flatMapGroupsWithStateДа/UpdateНетДа/Append
WARNING

Anti-pattern: append mode с агрегациями без watermark. Это самая частая ошибка. Spark выбрасывает AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark. Решение: добавьте .withWatermark() перед агрегацией или используйте update/complete mode.

Complete pipeline: все три режима

from pyspark.sql.functions import window, sum as _sum, count

# Source
raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "sales") \
    .load()

sales = raw.select(
    from_json(col("value").cast("string"), sales_schema).alias("data")
).select("data.*")

# Windowed aggregation
hourly = sales \
    .withWatermark("sale_time", "1 hour") \
    .groupBy(
        window("sale_time", "1 hour"),
        "store_id"
    ) \
    .agg(
        _sum("amount").alias("total"),
        count("*").alias("txn_count")
    )

# Append: финализированные окна -> Parquet (архив)
hourly.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/data/archive/hourly_sales/") \
    .option("checkpointLocation", "/checkpoints/sales-archive") \
    .start()

# Update: изменённые строки -> Delta (оперативные данные)
hourly.writeStream \
    .outputMode("update") \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/sales-operational") \
    .start("/data/gold/hourly_sales")

# Complete: полная таблица -> memory (dashboard)
hourly.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("sales_dashboard") \
    .start()
Проверка знанийKnowledge check
Почему append mode с aggregation без watermark вызывает AnalysisException?
ОтветAnswer
В append mode Spark записывает строку один раз и никогда не обновляет. Без watermark агрегация никогда не 'финализируется' -- новые данные могут прийти в любой момент и изменить результат. Spark не может определить, когда безопасно записать результат, поэтому отклоняет запрос. С watermark Spark знает: когда watermark проходит границу окна, поздние данные для этого окна уже не придут, и результат можно записать в append mode.
Проверка знанийKnowledge check
Вы строите real-time dashboard, показывающий суммарные продажи по магазинам. Результат содержит ~50 строк. Какой output mode выбрать и почему?
ОтветAnswer
Complete mode. Dashboard показывает текущее состояние всех магазинов, поэтому нужна полная таблица при каждом обновлении. 50 строк -- малый объём, перезапись всей таблицы не создаёт overhead. Complete mode гарантирует, что dashboard всегда видит актуальные данные по всем магазинам, а не только по изменившимся.

Что дальше?

В следующем уроке мы разберём watermarks и обработку опоздавших данных — ключевой механизм для event-time processing в Structured Streaming. Вы увидите, как watermark определяет, какие данные считать “опоздавшими”, и как настроить баланс между полнотой и задержкой.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 6. Data engineer пишет streaming агрегацию events.groupBy("user_id").count() и выбирает outputMode("append") без watermark. Что произойдёт?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8