Skip to content
Learning Platform
Advanced
45 minutes
PySpark Spark Structured Streaming Kafka Watermarks

Prerequisites:

  • module-5/04-pyflink-stateful-processing

PySpark Structured Streaming с Kafka

Вы изучили PyFlink для потоковой обработки CDC событий. Но в экосистеме больших данных есть еще один мощный инструмент — Apache Spark с его Structured Streaming API. Когда использовать PySpark вместо PyFlink? Как настроить Spark для чтения из Kafka? Как обрабатывать late-arriving события? В этом уроке мы разберем PySpark Structured Streaming для CDC processing at scale.

И PySpark, и PyFlink решают задачу stream processing, но с разными философиями.

Ключевые различия

PyFlink Philosophy
Pure StreamingEvent-by-event processing
Low LatencyMillisecond response
Exactly-OnceStrong guarantees
Complex CEPPattern matching
PySpark Philosophy
Unified Batch + StreamingSame API for both
Micro-batchingSecond-scale latency
Data Lake NativeParquet, Delta, Iceberg
ML IntegrationSpark MLlib ecosystem

Таблица выбора:

КритерийPyFlinkPySpark
Latency требованияменее 100ms1-10 seconds
Exactly-once в Kafka✅ Native support❌ At-least-once only
Интеграция с MLLimited✅ MLlib, feature stores
Batch + Stream unifiedSeparate APIs✅ Same DataFrame API
Data lake writesPossible✅ Native (Parquet, Delta)
Complex event patterns✅ CEP libraryBasic windowing only
Operational complexityFlink clusterSpark cluster

Правило выбора:

  • PyFlink: Когда нужна низкая latency, exactly-once guarantees, complex event processing
  • PySpark: Когда нужна интеграция с data lake, ML pipelines, unified batch/streaming logic

Production истина: Большинство CDC → data lake пайплайнов используют PySpark. Большинство real-time alerting систем — PyFlink.

Проверка знаний
PySpark Structured Streaming использует micro-batch модель. Почему это означает, что PySpark не может обеспечить exactly-once доставку в Kafka sink, в отличие от PyFlink?
Ответ
PySpark micro-batch обрабатывает данные порциями: read batch -> process -> write. Для Kafka sink PySpark не поддерживает Kafka transactional producer API, поэтому при failure и retry одного micro-batch дублирующие сообщения могут попасть в output topic (at-least-once). PyFlink работает event-by-event и использует Kafka transactional producer для atomic commit: чтение offset + обработка + запись в output topic -- одна транзакция. Для записи в файлы (Parquet, Delta) PySpark обеспечивает idempotent writes (тот же batch пишет те же файлы), поэтому для data lake сценариев PySpark вполне подходит.

Structured Streaming основы

Structured Streaming — это Spark API для stream processing, использующий DataFrame abstraction.

Концептуальная модель

Structured Streaming Conceptual Model

Unbounded table abstraction для streaming data

Input Stream
Kafka TopicUnbounded table
readStream
Spark Processing
DataFrame API
TriggerMicro-batch intervals
writeStream
Output Stream
SinkParquet, Kafka, Console
Key Concept:

Structured Streaming использует DataFrame API для унификации batch и streaming processing. Один код работает для обоих режимов.

Ключевые концепции:

  1. DataFrame API — те же операции, что и для batch (select, filter, groupBy)
  2. Micro-batching — stream обрабатывается небольшими batch’ами (default: as fast as possible)
  3. Continuous processing — experimental mode для lower latency
  4. Checkpoints — fault tolerance через state snapshots

Micro-batch vs Continuous:

  • Micro-batch (default): Каждый batch как отдельная транзакция. Latency ~100ms+. Проще отлаживать.
  • Continuous: Event-by-event processing. Latency ~1ms. Экспериментальный режим.

Совет: Для CDC processing используйте micro-batch. Continuous mode нестабилен в Spark 4.x.

Настройка SparkSession

Все Spark приложения начинаются с SparkSession.

from pyspark.sql import SparkSession

# Create SparkSession with streaming configuration
spark = SparkSession.builder \
    .appName("CDC-Streaming-Pipeline") \
    .config("spark.sql.streaming.checkpointLocation", "/data/checkpoints/cdc") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

# For local testing (single machine)
spark = SparkSession.builder \
    .appName("CDC-Local-Test") \
    .master("local[*]") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoints") \
    .getOrCreate()

Критически важные config:

  • spark.sql.streaming.checkpointLocation — директория для fault recovery. ОБЯЗАТЕЛЬНО для production.
  • spark.sql.shuffle.partitions — количество partitions для shuffle операций. Default 200 слишком много для small clusters.
  • masterlocal[*] для локального режима, yarn или k8s:// для clusters.

Предупреждение: После создания checkpoint НЕЛЬЗЯ менять shuffle.partitions. Spark не сможет восстановить state.

Определение CDC схемы

PySpark требует явную схему для parsing JSON. Для Debezium envelope нужна nested schema.

from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType,
    DecimalType, LongType, TimestampType
)

# Define schema for orders table CDC events
cdc_schema = StructType([
    StructField("payload", StructType([
        # before state (nullable for INSERT)
        StructField("before", StructType([
            StructField("id", IntegerType()),
            StructField("customer_id", IntegerType()),
            StructField("total", DecimalType(10, 2)),
            StructField("status", StringType())
        ])),
        # after state (nullable for DELETE)
        StructField("after", StructType([
            StructField("id", IntegerType()),
            StructField("customer_id", IntegerType()),
            StructField("total", DecimalType(10, 2)),
            StructField("status", StringType())
        ])),
        # operation type: c, u, d, r
        StructField("op", StringType()),
        # timestamp in milliseconds
        StructField("ts_ms", LongType()),
        # source metadata
        StructField("source", StructType([
            StructField("db", StringType()),
            StructField("table", StringType()),
            StructField("lsn", LongType())
        ]))
    ]))
])

Критически важно:

  • Schema must match exactly — если поле total в базе NUMERIC, используйте DecimalType, не DoubleType
  • Nullable fieldsbefore для INSERT, after для DELETE должны быть nullable
  • Nested structurepayload.after.id требует вложенный StructType

Если schema не совпадаетfrom_json() вернет null без ошибки. Silent data loss!

Совет для production: Генерируйте schema автоматически из Debezium schema или используйте Schema Registry.

Чтение из Kafka

Используйте readStream.format("kafka") для streaming Kafka reads.

from pyspark.sql.functions import from_json, col, to_timestamp

# Read from Kafka topic
kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "dbserver1.public.orders") \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .load()

# Kafka returns: key, value, topic, partition, offset, timestamp
# value is binary, must cast to string for JSON parsing
parsed_df = kafka_df.select(
    from_json(col("value").cast("string"), cdc_schema).alias("data"),
    col("timestamp").alias("kafka_timestamp")
).select("data.payload.*", "kafka_timestamp")

# Extract after state for inserts and updates
current_df = parsed_df \
    .filter(col("op").isin("c", "u", "r")) \
    .select(
        col("after.id").alias("id"),
        col("after.customer_id").alias("customer_id"),
        col("after.total").alias("total"),
        col("after.status").alias("status"),
        col("op"),
        to_timestamp(col("ts_ms") / 1000).alias("event_time"),
        col("kafka_timestamp")
    )

# Show schema
current_df.printSchema()

Kafka options explained:

  • bootstrap.servers — Kafka broker addresses (comma-separated list)
  • subscribe — topic name (use subscribePattern for regex matching)
  • startingOffsetsearliest (from beginning), latest (new messages only)
  • failOnDataLossfalse для production (Kafka могла удалить старые offsets)

Важно: value column в Kafka DataFrame — это BinaryType. Обязательно .cast("string") перед from_json().

Warning: Если from_json() не может распарсить → возвращает null. Всегда проверяйте data IS NOT NULL после parsing.

Watermarks и агрегации

Watermarking — это механизм для handling late-arriving events в streaming aggregations.

Проблема: Late Data

Watermark для Late Events

PySpark отбрасывает события позже watermark threshold

10:00
10:01
Event at 10:01 (arrives 10:01)
10:03
Event at 10:03 (arrives 10:03)
10:05
Watermark threshold
10:10
Event at 10:02 (arrives 10:10!)❌ Dropped (too late)
Watermark Configuration:

.withWatermark("event_time", "10 minutes")

События с event_time более чем на 10 минут старше текущего max(event_time) будут отброшены.

Без watermark: Spark не знает, когда можно finalize результат окна. Late events drop silently.

С watermark: Spark ждет дополнительное время для late events.

from pyspark.sql.functions import window, count, sum as _sum, avg

# Configure watermark: wait 10 minutes for late data
watermarked_df = current_df \
    .withWatermark("event_time", "10 minutes")

# Tumbling window aggregation (5-minute windows)
result_df = watermarked_df \
    .groupBy(
        window(col("event_time"), "5 minutes"),
        col("customer_id")
    ) \
    .agg(
        count("*").alias("order_count"),
        _sum("total").alias("total_revenue"),
        avg("total").alias("avg_order_value")
    )

# Result schema:
# window: {start: timestamp, end: timestamp}
# customer_id: int
# order_count: long
# total_revenue: decimal
# avg_order_value: decimal

Watermark logic:

  • Watermark = max(event_time) - threshold
  • События с event_time < watermark drop
  • Окна с end < watermark finalize

Пример: Если watermark = 10 minutes, и текущий max event_time = 10:15:

  • Watermark = 10:05
  • Window 10:00-10:05 finalized (no more updates)
  • Events с timestamp < 10:05 dropped

Выбор watermark threshold:

ThresholdКогда использовать
1 minuteReal-time dashboards, low latency priority
10 minutesBalanced (типичный для CDC)
1 hourBatch-like processing, high completeness priority

Production WARNING: Pitfall 4 из RESEARCH.md — отсутствие watermark = silent data loss. Всегда конфигурируйте .withWatermark() перед aggregations.

Checkpoint и восстановление

Checkpoints обеспечивают exactly-once processing и fault recovery.

Checkpoint Architecture

Micro-batch vs Continuous Processing

Выбор режима обработки влияет на latency и stability

Micro-batch (Default)Recommended
Каждый batch как отдельная транзакция

Checkpoint между batches для fault tolerance

Latency ~100ms+

Configurable через trigger interval

Стабильный и проверенный режим

Recommended для CDC → data lake pipelines

Continuous (Experimental)
Event-by-event processing

No batching overhead

Latency ~1ms

Ultra-low latency processing

Экспериментальный режим

Not recommended для production

Рекомендация для CDC:

Используйте micro-batch mode с trigger interval 30-60 seconds. Continuous mode нестабилен и не нужен для CDC → data lake use case.

Что сохраняется в checkpoint:

  1. Kafka offsets — последний обработанный offset для каждого partition
  2. Aggregation state — state для stateful операций (groupBy, window)
  3. Metadata — query configuration, schema

Конфигурация:

# Option 1: Global checkpoint location
spark = SparkSession.builder \
    .config("spark.sql.streaming.checkpointLocation", "/data/checkpoints/global") \
    .getOrCreate()

# Option 2: Per-query checkpoint location (recommended)
query = result_df \
    .writeStream \
    .format("parquet") \
    .option("path", "/data/lake/orders_aggregated") \
    .option("checkpointLocation", "/data/checkpoints/orders_query") \
    .outputMode("update") \
    .start()

Checkpoint best practices:

  1. Unique directory per query — НИКОГДА не share checkpoint dirs между queries
  2. Persistent storage — используйте HDFS, S3, Azure Blob (не /tmp)
  3. Sufficient space — state может быть большим для long windows
  4. Never change shuffle partitions — после checkpoint нельзя менять spark.sql.shuffle.partitions

Production WARNING: Pitfall 3 из RESEARCH.md — shared checkpoint dirs приводят к corruption.

Проверка знаний
Почему после создания checkpoint в PySpark нельзя изменить spark.sql.shuffle.partitions? Что произойдёт при попытке?
Ответ
Checkpoint сохраняет state, распределённый по определённому количеству shuffle partitions. Каждая partition хранит свою часть aggregation state (например, часть customer_id для GROUP BY). При изменении количества partitions Spark не может перераспределить сохранённый state -- mapping между ключами и partitions изменился. Spark выбросит ошибку при попытке восстановления: state из 200 partitions нельзя загрузить в 4 partitions. Решение: установить shuffle.partitions до первого запуска и не менять, или создать новый checkpoint (с потерей state).

Output Modes: append, update, complete

Output mode определяет, какие rows публикуются в sink при каждом batch.

Append Mode

Когда использовать: Append-only streams (no aggregations или finalized windows только)

query = current_df \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/data/lake/orders_raw") \
    .start()

Поведение: Только new rows публикуются. Для aggregations публикуются только finalized windows (после watermark).

Ограничения: Нельзя использовать для aggregations без watermark.

Update Mode

Когда использовать: Aggregations с continuous updates.

query = result_df \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

Поведение: Только changed rows публикуются (new + updated).

Применение: Kafka sink для aggregated metrics, real-time dashboards.

Complete Mode

Когда использовать: Small result tables (top-N queries, global aggregations).

top_customers = current_df \
    .groupBy("customer_id") \
    .agg(_sum("total").alias("lifetime_value")) \
    .orderBy(col("lifetime_value").desc()) \
    .limit(10)

query = top_customers \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

Поведение: Весь result table публикуется при каждом batch.

WARNING: Не используйте complete mode для unbounded results. Memory exhaustion!

Лабораторная работа: Streaming Aggregation

Цель: Создать streaming pipeline для расчета revenue per customer в 5-minute windows.

Задание:

  1. Прочитать CDC события из dbserver1.public.orders
  2. Извлечь after state для INSERT и UPDATE операций
  3. Рассчитать total_revenue и order_count per customer в 5-minute tumbling windows
  4. Настроить watermark 10 minutes для late data
  5. Вывести результаты в console для проверки

Starter code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, to_timestamp, window, count, sum as _sum
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, LongType

# TODO: Create SparkSession with checkpoint configuration

# TODO: Define CDC schema for orders table

# TODO: Read from Kafka topic dbserver1.public.orders

# TODO: Parse JSON and extract after state

# TODO: Configure watermark (10 minutes)

# TODO: Perform windowed aggregation (5-minute windows, group by customer_id)

# TODO: Write to console with update mode

# TODO: Await termination

Expected output (console):

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------+-----------+------------+-------------+
|window            |customer_id|order_count|total_revenue|
+------------------+-----------+------------+-------------+
|{10:00, 10:05}    |101        |3           |450.00       |
|{10:00, 10:05}    |102        |1           |120.50       |
|{10:05, 10:10}    |101        |2           |300.00       |
+------------------+-----------+------------+-------------+

Подсказки:

  • Используйте .writeStream.format("console").outputMode("update")
  • Проверьте schema после from_json() через .printSchema()
  • Late events (arriving after watermark) будут drop — это нормально

Что дальше?

Вы научились настраивать PySpark Structured Streaming для чтения CDC событий из Kafka, определять схемы для parsing, применять watermarks для handling late data, и выполнять windowed aggregations.

Но возникает вопрос: Куда писать результаты? В console хорошо для testing, но production требует data lake (Parquet, Delta Lake) или external systems. Как спроектировать ETL pipeline от CDC источника до data warehouse?

В следующем уроке мы изучим ETL/ELT patterns — как организовать CDC data flow от Kafka в data lake с правильной partitioning strategy, metadata tracking, и exactly-once semantics.

Ключевые выводы

  1. PySpark Structured Streaming использует DataFrame API для unified batch/streaming processing
  2. Micro-batching обеспечивает latency ~1-10 seconds (vs PyFlink менее 100ms)
  3. PySpark подходит для data lake integration, ML pipelines, unified workflows
  4. PyFlink подходит для low-latency, exactly-once, complex event processing
  5. readStream.format("kafka") читает из Kafka topics с автоматическим offset management
  6. Explicit schema обязательна для from_json() parsing
  7. Watermarks критически важны для aggregations — без них late events drop silently
  8. Checkpoints обеспечивают fault recovery и exactly-once processing
  9. Output modes: append (new rows), update (changed rows), complete (full table)
  10. Never share checkpoint directories между queries — приводит к corruption
  11. Never change shuffle.partitions после создания checkpoint
  12. PySpark предоставляет at-least-once для Kafka sink (не exactly-once)

Check Your Understanding

Score: 0 of 0
Conceptual
Question 1 of 4. Какое ключевое преимущество PySpark Structured Streaming перед PyFlink при построении CDC pipeline в data lake?

Finished the lesson?

Mark it as complete to track your progress