Learning Platform
Глоссарий Troubleshooting
Урок 08.02 · 14 мин
Средний
Kafka SourceFile SourceRate SourceConsole SinkDelta SinkForeach Sink

Источники и приемники данных

Structured Streaming поддерживает несколько встроенных sources (откуда читать) и sinks (куда писать). Каждая комбинация source + sink определяет гарантии доставки и допустимые output modes.

Sources: откуда читать данные

Kafka Source (production)

Kafka — основной production source для Structured Streaming. Каждое сообщение Kafka представляется строкой с полями key, value, topic, partition, offset, timestamp:

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

spark = SparkSession.builder.appName("KafkaSource").getOrCreate()

# Чтение из Kafka
raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "transactions") \
    .option("startingOffsets", "latest") \
    .option("maxOffsetsPerTrigger", 10000) \
    .load()

# raw schema: key (binary), value (binary), topic, partition, offset, timestamp
# Парсинг JSON из value
schema = StructType() \
    .add("user_id", StringType()) \
    .add("amount", DoubleType()) \
    .add("event_time", TimestampType())

parsed = raw.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
ОпцияОписаниеПример
subscribeКонкретные топики"topic1,topic2"
subscribePatternRegex паттерн"events-.*"
startingOffsetsНачальная позиция"latest", "earliest", JSON
maxOffsetsPerTriggerЛимит записей на batch10000
kafka.group.idConsumer group"spark-pipeline-01"

File Source (batch-to-streaming)

File source мониторит директорию на новые файлы. Каждый новый файл обрабатывается как micro-batch:

# CSV файлы, появляющиеся в директории
df = spark.readStream \
    .format("csv") \
    .schema(schema) \
    .option("header", True) \
    .option("maxFilesPerTrigger", 10) \
    .load("/data/incoming/csv/")

# Parquet файлы
df = spark.readStream \
    .format("parquet") \
    .schema(schema) \
    .load("/data/incoming/parquet/")
TIP

File source не переобрабатывает файлы. Spark отслеживает обработанные файлы в checkpoint. Если файл удалён и добавлен заново — он не будет обработан повторно. File source подходит для сценария “landing zone”: внешняя система кладёт файлы, Spark забирает.

Rate Source (тестирование)

Rate source генерирует строки с заданной скоростью — идеален для тестирования pipeline без внешних зависимостей:

# Генерация 100 строк в секунду
test_df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 100) \
    .option("numPartitions", 4) \
    .load()

# Schema: timestamp (TimestampType), value (LongType)

Socket Source (только разработка)

# Чтение строк из TCP socket
lines = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()
WARNING

Socket source не имеет fault tolerance. Данные, потерянные при сбое, невосстановимы. Используйте только для быстрых экспериментов в notebook.

Sinks: куда писать результат

Delta Lake Sink (production рекомендация)

Delta Lake — лучший production sink для Structured Streaming благодаря ACID транзакциям, exactly-once гарантиям и поддержке time travel:

from pyspark.sql.functions import window, sum as _sum

# Полный pipeline: Kafka -> transform -> Delta Lake
raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "orders") \
    .load()

orders = raw.select(
    from_json(col("value").cast("string"), order_schema).alias("data")
).select("data.*")

# Агрегация по 5-минутным окнам
windowed = orders \
    .withWatermark("order_time", "10 minutes") \
    .groupBy(
        window("order_time", "5 minutes"),
        "region"
    ) \
    .agg(_sum("amount").alias("total"))

# Запись в Delta Lake
query = windowed.writeStream \
    .outputMode("update") \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/orders-agg") \
    .start("/data/gold/orders_by_region")

query.awaitTermination()

Kafka Sink (event forwarding)

# Запись обратно в Kafka (key + value обязательны)
df.select(
    col("user_id").cast("string").alias("key"),
    to_json(struct("*")).alias("value")
).writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "enriched-events") \
    .option("checkpointLocation", "/checkpoints/enriched") \
    .start()

File Sink (Parquet/JSON/CSV)

# Запись в Parquet файлы с партиционированием
df.writeStream \
    .format("parquet") \
    .partitionBy("date") \
    .option("path", "/data/output/events/") \
    .option("checkpointLocation", "/checkpoints/events-parquet") \
    .start()

Console и Memory Sinks (только разработка)

# Console -- вывод в stdout (debug)
query = df.writeStream \
    .format("console") \
    .option("truncate", False) \
    .option("numRows", 20) \
    .start()

# Memory -- результат в in-memory таблице (debug)
query = df.writeStream \
    .format("memory") \
    .queryName("debug_table") \
    .start()

# Доступ через SQL
spark.sql("SELECT * FROM debug_table").show()
WARNING

Anti-pattern: memory/console sinks в production. Console sink блокирует pipeline выводом в stdout. Memory sink хранит все результаты в памяти driver — OOM при больших объёмах. Оба не имеют fault tolerance. В production используйте Delta Lake, Kafka или File sinks.

foreachBatch: Custom Sinks

foreachBatch даёт доступ к каждому micro-batch как обычному DataFrame. Это позволяет писать в любую систему:

def write_to_postgres(batch_df, batch_id):
    """Запись micro-batch в PostgreSQL."""
    batch_df.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://db:5432/analytics") \
        .option("dbtable", "events") \
        .option("user", "spark") \
        .option("password", "secret") \
        .mode("append") \
        .save()

query = df.writeStream \
    .foreachBatch(write_to_postgres) \
    .option("checkpointLocation", "/checkpoints/postgres-sink") \
    .start()

foreachBatch поддерживает любые batch write операции: JDBC, REST API, файлы, кэши. Но exactly-once требует идемпотентной записи или транзакций на стороне sink.

TIP

foreachBatch + Delta merge. Комбинация foreachBatch с DeltaTable.merge() — мощный паттерн для upsert в streaming: каждый micro-batch мержится в целевую таблицу, обновляя существующие строки и вставляя новые.

Совместимость Source + Sink + Output Mode

Не все комбинации допустимы:

SourceSinkAppendCompleteUpdate
KafkaDeltaДаНет*Да
KafkaKafkaДаНетДа
FileFileДаНетНет
RateConsoleДаДаДа

* Complete mode записывает все строки при каждом batch — нежелательно для больших таблиц.

Проверка знанийKnowledge check
Почему Delta Lake рекомендуется как production sink для Structured Streaming? Назовите минимум 3 причины.
ОтветAnswer
Delta Lake как streaming sink даёт: (1) ACID транзакции -- атомарная запись каждого micro-batch, нет частично записанных данных; (2) exactly-once гарантии совместно с checkpoints; (3) time travel -- возможность прочитать данные на любой момент времени; (4) schema enforcement -- защита от некорректных данных; (5) OPTIMIZE и Z-ORDER для оптимизации чтения; (6) поддержка update output mode для агрегаций.
Проверка знанийKnowledge check
Когда следует использовать foreachBatch вместо встроенного sink? Приведите пример.
ОтветAnswer
foreachBatch нужен, когда целевая система не имеет встроенного streaming sink в Spark. Примеры: запись в PostgreSQL/MySQL через JDBC, отправка в REST API, кастомная логика дедупликации, merge/upsert в Delta Lake. foreachBatch принимает обычный DataFrame (batch_df) и batch_id, позволяя использовать любые batch write операции. Для exactly-once нужна идемпотентность: использовать batch_id для дедупликации или транзакции на стороне получателя.

Что дальше?

В следующем уроке мы разберём output modes — append, complete и update. Вы поймёте, почему выбор неправильного mode вызывает AnalysisException, и как построить decision tree для выбора.

Kafka Consumer API: основы Kafka Share Groups (KIP-932)

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. При настройке Kafka source в Structured Streaming обязательны два параметра. Какие?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8