Structured Streaming: основы
Эволюция потоковой обработки в Spark
До Spark 2.0 единственным API для потоковой обработки был DStream (Discretized Stream). DStream работал на уровне RDD — каждый micro-batch создавал новый RDD, и вы писали трансформации в терминах RDD API. Проблемы DStream:
- Отдельный API от batch DataFrame — код нельзя переиспользовать
- Нет event-time processing — только processing time
- Нет exactly-once гарантий без сложного ручного кода
- Нет оптимизации Catalyst — RDD не проходит через оптимизатор
# LEGACY: DStream API (НЕ используйте в новых проектах)
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, batchDuration=1)
lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
counts = words.map(lambda w: (w, 1)).reduceByKey(lambda a, b: a + b)
counts.pprint()
ssc.start()
Structured Streaming (Spark 2.0+) полностью заменил DStream. Ключевая идея: потоковые данные — это unbounded table (бесконечная таблица), к которой применяются те же DataFrame/SQL операции, что и к batch-данным.
Micro-Batch модель
Structured Streaming обрабатывает данные micro-batch-ами. Каждый micro-batch:
- Spark проверяет источник на новые данные
- Создаёт инкрементальный DataFrame с новыми строками
- Применяет трансформации (те же, что для batch)
- Записывает результат в sink
- Фиксирует offset в checkpoint
Unbounded Input Table → DataFrame API → Result Table
rows 1-100Batch 1
rows 101-250Batch 2
rows 251-400Batch 3 …
rows 401+
Это не просто синтаксический сахар — Catalyst optimizer применяет одни и те же оптимизации (predicate pushdown, column pruning, join reordering) и к batch, и к streaming запросам.
Полный pipeline: readStream -> transform -> writeStream
Каждый streaming pipeline в Spark состоит из трёх частей:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, count
from pyspark.sql.types import StructType, StringType, TimestampType
spark = SparkSession.builder.appName("StreamingFundamentals").getOrCreate()
schema = StructType() \
.add("user_id", StringType()) \
.add("event_type", StringType()) \
.add("event_time", TimestampType())
# 1. SOURCE: readStream (unbounded input)
raw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "user-events") \
.option("startingOffsets", "latest") \
.load()
# 2. TRANSFORM: обычные DataFrame операции
events = raw.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
counts = events.groupBy("event_type").agg(count("*").alias("total"))
# 3. SINK: writeStream (output)
query = counts.writeStream \
.outputMode("complete") \
.format("console") \
.option("checkpointLocation", "/checkpoints/user-events") \
.trigger(processingTime="10 seconds") \
.start()
query.awaitTermination()
readStream vs read: замените spark.read на spark.readStream и df.write на df.writeStream — и ваш batch pipeline становится streaming. Трансформации в середине идентичны. Это главное преимущество unified API.
Trigger режимы
Trigger определяет когда Spark запускает следующий micro-batch:
| Trigger | Описание | Когда использовать |
|---|---|---|
processingTime("10 seconds") | Каждые N секунд | Production: стабильная задержка |
once() | Один batch, затем stop | Backfill: разовая обработка накопленных данных |
availableNow() | Все доступные данные batch-ами, затем stop | Инкрементальный ETL (замена once() с Spark 3.3) |
continuous("1 second") | Continuous processing (experimental) | Ultra-low latency (<1ms), ограниченные операции |
# Processing time -- стандартный production trigger
query = df.writeStream \
.trigger(processingTime="30 seconds") \
.start()
# Available now -- инкрементальный ETL в Airflow/cron
query = df.writeStream \
.trigger(availableNow=True) \
.start()
# Once -- legacy, используйте availableNow вместо
query = df.writeStream \
.trigger(once=True) \
.start()
Anti-pattern: использование DStream API. DStream — legacy API с Spark 2.0. Он не поддерживает Catalyst optimization, event-time processing и exactly-once semantics. Всегда используйте Structured Streaming (readStream/writeStream). Если вы видите from pyspark.streaming import StreamingContext в коде — это сигнал для рефакторинга.
Жизненный цикл StreamingQuery
writeStream.start() возвращает объект StreamingQuery — ваш интерфейс управления потоком:
# Запуск
query = df.writeStream.format("delta").start("/output/")
# Мониторинг
print(query.id) # UUID запроса (стабильный между перезапусками)
print(query.runId) # UUID текущего запуска
print(query.status) # текущее состояние
print(query.lastProgress) # метрики последнего batch
print(query.isActive) # работает ли
# Ожидание завершения (блокирующий вызов)
query.awaitTermination()
# Остановка
query.stop()
# Обработка исключений
try:
query.awaitTermination()
except Exception as e:
print(f"Query failed: {e}")
# alert, retry logic
Fault Tolerance через Checkpoints
Structured Streaming гарантирует exactly-once обработку через механизм checkpoints:
- Offset log — какие данные уже прочитаны из source
- State store — текущее состояние агрегаций (для stateful операций)
- Commit log — какие batch успешно записаны в sink
query = df.writeStream \
.option("checkpointLocation", "/checkpoints/my-pipeline") \
.start()
При перезапуске Spark:
- Читает checkpoint — определяет, где остановились
- Перечитывает данные с последнего committed offset
- Восстанавливает state store
- Продолжает обработку без потерь и дублей
checkpointLocation обязателен для production. Без checkpoint Spark начинает обработку с начала при каждом перезапуске. Checkpoint directory должен быть на надёжном хранилище (HDFS, S3) — не на локальном диске executor.
Что дальше?
В следующем уроке мы разберём источники и приёмники данных — какие sources и sinks поддерживает Structured Streaming, как настроить Kafka source, и почему Delta Lake — лучший production sink.
Kafka: брокеры, топики и партиции