Источники и приемники данных
Structured Streaming поддерживает несколько встроенных sources (откуда читать) и sinks (куда писать). Каждая комбинация source + sink определяет гарантии доставки и допустимые output modes.
Sources: откуда читать данные
Kafka Source (production)
Kafka — основной production source для Structured Streaming. Каждое сообщение Kafka представляется строкой с полями key, value, topic, partition, offset, timestamp:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
spark = SparkSession.builder.appName("KafkaSource").getOrCreate()
# Чтение из Kafka
raw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "transactions") \
.option("startingOffsets", "latest") \
.option("maxOffsetsPerTrigger", 10000) \
.load()
# raw schema: key (binary), value (binary), topic, partition, offset, timestamp
# Парсинг JSON из value
schema = StructType() \
.add("user_id", StringType()) \
.add("amount", DoubleType()) \
.add("event_time", TimestampType())
parsed = raw.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
| Опция | Описание | Пример |
|---|---|---|
subscribe | Конкретные топики | "topic1,topic2" |
subscribePattern | Regex паттерн | "events-.*" |
startingOffsets | Начальная позиция | "latest", "earliest", JSON |
maxOffsetsPerTrigger | Лимит записей на batch | 10000 |
kafka.group.id | Consumer group | "spark-pipeline-01" |
File Source (batch-to-streaming)
File source мониторит директорию на новые файлы. Каждый новый файл обрабатывается как micro-batch:
# CSV файлы, появляющиеся в директории
df = spark.readStream \
.format("csv") \
.schema(schema) \
.option("header", True) \
.option("maxFilesPerTrigger", 10) \
.load("/data/incoming/csv/")
# Parquet файлы
df = spark.readStream \
.format("parquet") \
.schema(schema) \
.load("/data/incoming/parquet/")
File source не переобрабатывает файлы. Spark отслеживает обработанные файлы в checkpoint. Если файл удалён и добавлен заново — он не будет обработан повторно. File source подходит для сценария “landing zone”: внешняя система кладёт файлы, Spark забирает.
Rate Source (тестирование)
Rate source генерирует строки с заданной скоростью — идеален для тестирования pipeline без внешних зависимостей:
# Генерация 100 строк в секунду
test_df = spark.readStream \
.format("rate") \
.option("rowsPerSecond", 100) \
.option("numPartitions", 4) \
.load()
# Schema: timestamp (TimestampType), value (LongType)
Socket Source (только разработка)
# Чтение строк из TCP socket
lines = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
Socket source не имеет fault tolerance. Данные, потерянные при сбое, невосстановимы. Используйте только для быстрых экспериментов в notebook.
Sinks: куда писать результат
Delta Lake Sink (production рекомендация)
Delta Lake — лучший production sink для Structured Streaming благодаря ACID транзакциям, exactly-once гарантиям и поддержке time travel:
from pyspark.sql.functions import window, sum as _sum
# Полный pipeline: Kafka -> transform -> Delta Lake
raw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "orders") \
.load()
orders = raw.select(
from_json(col("value").cast("string"), order_schema).alias("data")
).select("data.*")
# Агрегация по 5-минутным окнам
windowed = orders \
.withWatermark("order_time", "10 minutes") \
.groupBy(
window("order_time", "5 minutes"),
"region"
) \
.agg(_sum("amount").alias("total"))
# Запись в Delta Lake
query = windowed.writeStream \
.outputMode("update") \
.format("delta") \
.option("checkpointLocation", "/checkpoints/orders-agg") \
.start("/data/gold/orders_by_region")
query.awaitTermination()
Kafka Sink (event forwarding)
# Запись обратно в Kafka (key + value обязательны)
df.select(
col("user_id").cast("string").alias("key"),
to_json(struct("*")).alias("value")
).writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("topic", "enriched-events") \
.option("checkpointLocation", "/checkpoints/enriched") \
.start()
File Sink (Parquet/JSON/CSV)
# Запись в Parquet файлы с партиционированием
df.writeStream \
.format("parquet") \
.partitionBy("date") \
.option("path", "/data/output/events/") \
.option("checkpointLocation", "/checkpoints/events-parquet") \
.start()
Console и Memory Sinks (только разработка)
# Console -- вывод в stdout (debug)
query = df.writeStream \
.format("console") \
.option("truncate", False) \
.option("numRows", 20) \
.start()
# Memory -- результат в in-memory таблице (debug)
query = df.writeStream \
.format("memory") \
.queryName("debug_table") \
.start()
# Доступ через SQL
spark.sql("SELECT * FROM debug_table").show()
Anti-pattern: memory/console sinks в production. Console sink блокирует pipeline выводом в stdout. Memory sink хранит все результаты в памяти driver — OOM при больших объёмах. Оба не имеют fault tolerance. В production используйте Delta Lake, Kafka или File sinks.
foreachBatch: Custom Sinks
foreachBatch даёт доступ к каждому micro-batch как обычному DataFrame. Это позволяет писать в любую систему:
def write_to_postgres(batch_df, batch_id):
"""Запись micro-batch в PostgreSQL."""
batch_df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://db:5432/analytics") \
.option("dbtable", "events") \
.option("user", "spark") \
.option("password", "secret") \
.mode("append") \
.save()
query = df.writeStream \
.foreachBatch(write_to_postgres) \
.option("checkpointLocation", "/checkpoints/postgres-sink") \
.start()
foreachBatch поддерживает любые batch write операции: JDBC, REST API, файлы, кэши. Но exactly-once требует идемпотентной записи или транзакций на стороне sink.
foreachBatch + Delta merge. Комбинация foreachBatch с DeltaTable.merge() — мощный паттерн для upsert в streaming: каждый micro-batch мержится в целевую таблицу, обновляя существующие строки и вставляя новые.
Совместимость Source + Sink + Output Mode
Не все комбинации допустимы:
| Source | Sink | Append | Complete | Update |
|---|---|---|---|---|
| Kafka | Delta | Да | Нет* | Да |
| Kafka | Kafka | Да | Нет | Да |
| File | File | Да | Нет | Нет |
| Rate | Console | Да | Да | Да |
* Complete mode записывает все строки при каждом batch — нежелательно для больших таблиц.
Что дальше?
В следующем уроке мы разберём output modes — append, complete и update. Вы поймёте, почему выбор неправильного mode вызывает AnalysisException, и как построить decision tree для выбора.
Kafka Consumer API: основы Kafka Share Groups (KIP-932)