Learning Platform
Глоссарий Troubleshooting
Урок 08.01 · 14 мин
Средний
Structured StreamingMicro-BatchContinuous ProcessingTriggerStreamingQuery

Structured Streaming: основы

Эволюция потоковой обработки в Spark

До Spark 2.0 единственным API для потоковой обработки был DStream (Discretized Stream). DStream работал на уровне RDD — каждый micro-batch создавал новый RDD, и вы писали трансформации в терминах RDD API. Проблемы DStream:

  • Отдельный API от batch DataFrame — код нельзя переиспользовать
  • Нет event-time processing — только processing time
  • Нет exactly-once гарантий без сложного ручного кода
  • Нет оптимизации Catalyst — RDD не проходит через оптимизатор
# LEGACY: DStream API (НЕ используйте в новых проектах)
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, batchDuration=1)
lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
counts = words.map(lambda w: (w, 1)).reduceByKey(lambda a, b: a + b)
counts.pprint()
ssc.start()

Structured Streaming (Spark 2.0+) полностью заменил DStream. Ключевая идея: потоковые данные — это unbounded table (бесконечная таблица), к которой применяются те же DataFrame/SQL операции, что и к batch-данным.

Micro-Batch модель

Structured Streaming обрабатывает данные micro-batch-ами. Каждый micro-batch:

  1. Spark проверяет источник на новые данные
  2. Создаёт инкрементальный DataFrame с новыми строками
  3. Применяет трансформации (те же, что для batch)
  4. Записывает результат в sink
  5. Фиксирует offset в checkpoint
Micro-batch модель

Unbounded Input Table → DataFrame API → Result Table

Unbounded Input Table
Batch 0
rows 1-100
Batch 1
rows 101-250
Batch 2
rows 251-400
Batch 3 …
rows 401+
каждый batch
DataFrame APIfilter / join / aggCatalyst Optimizer
Result Table(sink output)

Это не просто синтаксический сахар — Catalyst optimizer применяет одни и те же оптимизации (predicate pushdown, column pruning, join reordering) и к batch, и к streaming запросам.

Полный pipeline: readStream -> transform -> writeStream

Каждый streaming pipeline в Spark состоит из трёх частей:

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, count
from pyspark.sql.types import StructType, StringType, TimestampType

spark = SparkSession.builder.appName("StreamingFundamentals").getOrCreate()

schema = StructType() \
    .add("user_id", StringType()) \
    .add("event_type", StringType()) \
    .add("event_time", TimestampType())

# 1. SOURCE: readStream (unbounded input)
raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "user-events") \
    .option("startingOffsets", "latest") \
    .load()

# 2. TRANSFORM: обычные DataFrame операции
events = raw.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

counts = events.groupBy("event_type").agg(count("*").alias("total"))

# 3. SINK: writeStream (output)
query = counts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("checkpointLocation", "/checkpoints/user-events") \
    .trigger(processingTime="10 seconds") \
    .start()

query.awaitTermination()
TIP

readStream vs read: замените spark.read на spark.readStream и df.write на df.writeStream — и ваш batch pipeline становится streaming. Трансформации в середине идентичны. Это главное преимущество unified API.

Trigger режимы

Trigger определяет когда Spark запускает следующий micro-batch:

TriggerОписаниеКогда использовать
processingTime("10 seconds")Каждые N секундProduction: стабильная задержка
once()Один batch, затем stopBackfill: разовая обработка накопленных данных
availableNow()Все доступные данные batch-ами, затем stopИнкрементальный ETL (замена once() с Spark 3.3)
continuous("1 second")Continuous processing (experimental)Ultra-low latency (<1ms), ограниченные операции
# Processing time -- стандартный production trigger
query = df.writeStream \
    .trigger(processingTime="30 seconds") \
    .start()

# Available now -- инкрементальный ETL в Airflow/cron
query = df.writeStream \
    .trigger(availableNow=True) \
    .start()

# Once -- legacy, используйте availableNow вместо
query = df.writeStream \
    .trigger(once=True) \
    .start()
WARNING

Anti-pattern: использование DStream API. DStream — legacy API с Spark 2.0. Он не поддерживает Catalyst optimization, event-time processing и exactly-once semantics. Всегда используйте Structured Streaming (readStream/writeStream). Если вы видите from pyspark.streaming import StreamingContext в коде — это сигнал для рефакторинга.

Жизненный цикл StreamingQuery

writeStream.start() возвращает объект StreamingQuery — ваш интерфейс управления потоком:

# Запуск
query = df.writeStream.format("delta").start("/output/")

# Мониторинг
print(query.id)              # UUID запроса (стабильный между перезапусками)
print(query.runId)           # UUID текущего запуска
print(query.status)          # текущее состояние
print(query.lastProgress)    # метрики последнего batch
print(query.isActive)        # работает ли

# Ожидание завершения (блокирующий вызов)
query.awaitTermination()

# Остановка
query.stop()

# Обработка исключений
try:
    query.awaitTermination()
except Exception as e:
    print(f"Query failed: {e}")
    # alert, retry logic

Fault Tolerance через Checkpoints

Structured Streaming гарантирует exactly-once обработку через механизм checkpoints:

  • Offset log — какие данные уже прочитаны из source
  • State store — текущее состояние агрегаций (для stateful операций)
  • Commit log — какие batch успешно записаны в sink
query = df.writeStream \
    .option("checkpointLocation", "/checkpoints/my-pipeline") \
    .start()

При перезапуске Spark:

  1. Читает checkpoint — определяет, где остановились
  2. Перечитывает данные с последнего committed offset
  3. Восстанавливает state store
  4. Продолжает обработку без потерь и дублей
TIP

checkpointLocation обязателен для production. Без checkpoint Spark начинает обработку с начала при каждом перезапуске. Checkpoint directory должен быть на надёжном хранилище (HDFS, S3) — не на локальном диске executor.

Проверка знанийKnowledge check
Почему Structured Streaming заменил DStream API? Назовите минимум 3 ограничения DStream.
ОтветAnswer
DStream имел 4 ключевых ограничения: (1) отдельный RDD API, не совместимый с DataFrame batch кодом; (2) отсутствие event-time processing -- только processing time; (3) отсутствие exactly-once гарантий без сложного ручного кода; (4) отсутствие Catalyst optimization -- RDD не проходит через оптимизатор. Structured Streaming решает все эти проблемы: unified DataFrame API, встроенная поддержка event time и watermarks, exactly-once через checkpoints, и полная оптимизация Catalyst.
Проверка знанийKnowledge check
Чем trigger availableNow отличается от trigger once? Когда использовать каждый?
ОтветAnswer
trigger(once=True) обрабатывает все доступные данные за один micro-batch, что может вызвать OOM при большом объёме. trigger(availableNow=True) (Spark 3.3+) разбивает все доступные данные на несколько micro-batch и обрабатывает их последовательно, затем останавливается. availableNow безопаснее для инкрементального ETL в Airflow/cron, так как контролирует размер каждого batch. once считается legacy -- используйте availableNow.

Что дальше?

В следующем уроке мы разберём источники и приёмники данных — какие sources и sinks поддерживает Structured Streaming, как настроить Kafka source, и почему Delta Lake — лучший production sink.

Kafka: брокеры, топики и партиции

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какая концептуальная модель лежит в основе Structured Streaming и заменяет DStream-подход?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8