Требуемые знания:
- module-5/04-pyflink-stateful-processing
PySpark Structured Streaming с Kafka
Вы изучили PyFlink для потоковой обработки CDC событий. Но в экосистеме больших данных есть еще один мощный инструмент — Apache Spark с его Structured Streaming API. Когда использовать PySpark вместо PyFlink? Как настроить Spark для чтения из Kafka? Как обрабатывать late-arriving события? В этом уроке мы разберем PySpark Structured Streaming для CDC processing at scale.
PySpark vs PyFlink: когда что использовать
И PySpark, и PyFlink решают задачу stream processing, но с разными философиями.
Ключевые различия
Таблица выбора:
| Критерий | PyFlink | PySpark |
|---|---|---|
| Latency требования | менее 100ms | 1-10 seconds |
| Exactly-once в Kafka | ✅ Native support | ❌ At-least-once only |
| Интеграция с ML | Limited | ✅ MLlib, feature stores |
| Batch + Stream unified | Separate APIs | ✅ Same DataFrame API |
| Data lake writes | Possible | ✅ Native (Parquet, Delta) |
| Complex event patterns | ✅ CEP library | Basic windowing only |
| Operational complexity | Flink cluster | Spark cluster |
Правило выбора:
- PyFlink: Когда нужна низкая latency, exactly-once guarantees, complex event processing
- PySpark: Когда нужна интеграция с data lake, ML pipelines, unified batch/streaming logic
Production истина: Большинство CDC → data lake пайплайнов используют PySpark. Большинство real-time alerting систем — PyFlink.
Проверка знанийPySpark Structured Streaming использует micro-batch модель. Почему это означает, что PySpark не может обеспечить exactly-once доставку в Kafka sink, в отличие от PyFlink?
Structured Streaming основы
Structured Streaming — это Spark API для stream processing, использующий DataFrame abstraction.
Концептуальная модель
Unbounded table abstraction для streaming data
Structured Streaming использует DataFrame API для унификации batch и streaming processing. Один код работает для обоих режимов.
Ключевые концепции:
- DataFrame API — те же операции, что и для batch (
select,filter,groupBy) - Micro-batching — stream обрабатывается небольшими batch’ами (default: as fast as possible)
- Continuous processing — experimental mode для lower latency
- Checkpoints — fault tolerance через state snapshots
Micro-batch vs Continuous:
- Micro-batch (default): Каждый batch как отдельная транзакция. Latency ~100ms+. Проще отлаживать.
- Continuous: Event-by-event processing. Latency ~1ms. Экспериментальный режим.
Совет: Для CDC processing используйте micro-batch. Continuous mode нестабилен в Spark 4.x.
Настройка SparkSession
Все Spark приложения начинаются с SparkSession.
from pyspark.sql import SparkSession
# Create SparkSession with streaming configuration
spark = SparkSession.builder \
.appName("CDC-Streaming-Pipeline") \
.config("spark.sql.streaming.checkpointLocation", "/data/checkpoints/cdc") \
.config("spark.sql.shuffle.partitions", "4") \
.getOrCreate()
# For local testing (single machine)
spark = SparkSession.builder \
.appName("CDC-Local-Test") \
.master("local[*]") \
.config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoints") \
.getOrCreate()
Критически важные config:
spark.sql.streaming.checkpointLocation— директория для fault recovery. ОБЯЗАТЕЛЬНО для production.spark.sql.shuffle.partitions— количество partitions для shuffle операций. Default 200 слишком много для small clusters.master—local[*]для локального режима,yarnилиk8s://для clusters.
Предупреждение: После создания checkpoint НЕЛЬЗЯ менять shuffle.partitions. Spark не сможет восстановить state.
Определение CDC схемы
PySpark требует явную схему для parsing JSON. Для Debezium envelope нужна nested schema.
from pyspark.sql.types import (
StructType, StructField, StringType, IntegerType,
DecimalType, LongType, TimestampType
)
# Define schema for orders table CDC events
cdc_schema = StructType([
StructField("payload", StructType([
# before state (nullable for INSERT)
StructField("before", StructType([
StructField("id", IntegerType()),
StructField("customer_id", IntegerType()),
StructField("total", DecimalType(10, 2)),
StructField("status", StringType())
])),
# after state (nullable for DELETE)
StructField("after", StructType([
StructField("id", IntegerType()),
StructField("customer_id", IntegerType()),
StructField("total", DecimalType(10, 2)),
StructField("status", StringType())
])),
# operation type: c, u, d, r
StructField("op", StringType()),
# timestamp in milliseconds
StructField("ts_ms", LongType()),
# source metadata
StructField("source", StructType([
StructField("db", StringType()),
StructField("table", StringType()),
StructField("lsn", LongType())
]))
]))
])
Критически важно:
- Schema must match exactly — если поле
totalв базеNUMERIC, используйтеDecimalType, неDoubleType - Nullable fields —
beforeдля INSERT,afterдля DELETE должны быть nullable - Nested structure —
payload.after.idтребует вложенныйStructType
Если schema не совпадает → from_json() вернет null без ошибки. Silent data loss!
Совет для production: Генерируйте schema автоматически из Debezium schema или используйте Schema Registry.
Чтение из Kafka
Используйте readStream.format("kafka") для streaming Kafka reads.
from pyspark.sql.functions import from_json, col, to_timestamp
# Read from Kafka topic
kafka_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "dbserver1.public.orders") \
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.load()
# Kafka returns: key, value, topic, partition, offset, timestamp
# value is binary, must cast to string for JSON parsing
parsed_df = kafka_df.select(
from_json(col("value").cast("string"), cdc_schema).alias("data"),
col("timestamp").alias("kafka_timestamp")
).select("data.payload.*", "kafka_timestamp")
# Extract after state for inserts and updates
current_df = parsed_df \
.filter(col("op").isin("c", "u", "r")) \
.select(
col("after.id").alias("id"),
col("after.customer_id").alias("customer_id"),
col("after.total").alias("total"),
col("after.status").alias("status"),
col("op"),
to_timestamp(col("ts_ms") / 1000).alias("event_time"),
col("kafka_timestamp")
)
# Show schema
current_df.printSchema()
Kafka options explained:
bootstrap.servers— Kafka broker addresses (comma-separated list)subscribe— topic name (usesubscribePatternfor regex matching)startingOffsets—earliest(from beginning),latest(new messages only)failOnDataLoss—falseдля production (Kafka могла удалить старые offsets)
Важно: value column в Kafka DataFrame — это BinaryType. Обязательно .cast("string") перед from_json().
Warning: Если from_json() не может распарсить → возвращает null. Всегда проверяйте data IS NOT NULL после parsing.
Watermarks и агрегации
Watermarking — это механизм для handling late-arriving events в streaming aggregations.
Проблема: Late Data
PySpark отбрасывает события позже watermark threshold
.withWatermark("event_time", "10 minutes")
События с event_time более чем на 10 минут старше текущего max(event_time) будут отброшены.
Без watermark: Spark не знает, когда можно finalize результат окна. Late events drop silently.
С watermark: Spark ждет дополнительное время для late events.
from pyspark.sql.functions import window, count, sum as _sum, avg
# Configure watermark: wait 10 minutes for late data
watermarked_df = current_df \
.withWatermark("event_time", "10 minutes")
# Tumbling window aggregation (5-minute windows)
result_df = watermarked_df \
.groupBy(
window(col("event_time"), "5 minutes"),
col("customer_id")
) \
.agg(
count("*").alias("order_count"),
_sum("total").alias("total_revenue"),
avg("total").alias("avg_order_value")
)
# Result schema:
# window: {start: timestamp, end: timestamp}
# customer_id: int
# order_count: long
# total_revenue: decimal
# avg_order_value: decimal
Watermark logic:
- Watermark = max(event_time) - threshold
- События с
event_time < watermarkdrop - Окна с
end < watermarkfinalize
Пример: Если watermark = 10 minutes, и текущий max event_time = 10:15:
- Watermark =
10:05 - Window
10:00-10:05finalized (no more updates) - Events с timestamp <
10:05dropped
Выбор watermark threshold:
| Threshold | Когда использовать |
|---|---|
| 1 minute | Real-time dashboards, low latency priority |
| 10 minutes | Balanced (типичный для CDC) |
| 1 hour | Batch-like processing, high completeness priority |
Production WARNING: Pitfall 4 из RESEARCH.md — отсутствие watermark = silent data loss. Всегда конфигурируйте
.withWatermark()перед aggregations.
Checkpoint и восстановление
Checkpoints обеспечивают exactly-once processing и fault recovery.
Checkpoint Architecture
Выбор режима обработки влияет на latency и stability
Checkpoint между batches для fault tolerance
Configurable через trigger interval
Recommended для CDC → data lake pipelines
No batching overhead
Ultra-low latency processing
Not recommended для production
Используйте micro-batch mode с trigger interval 30-60 seconds. Continuous mode нестабилен и не нужен для CDC → data lake use case.
Что сохраняется в checkpoint:
- Kafka offsets — последний обработанный offset для каждого partition
- Aggregation state — state для stateful операций (groupBy, window)
- Metadata — query configuration, schema
Конфигурация:
# Option 1: Global checkpoint location
spark = SparkSession.builder \
.config("spark.sql.streaming.checkpointLocation", "/data/checkpoints/global") \
.getOrCreate()
# Option 2: Per-query checkpoint location (recommended)
query = result_df \
.writeStream \
.format("parquet") \
.option("path", "/data/lake/orders_aggregated") \
.option("checkpointLocation", "/data/checkpoints/orders_query") \
.outputMode("update") \
.start()
Checkpoint best practices:
- Unique directory per query — НИКОГДА не share checkpoint dirs между queries
- Persistent storage — используйте HDFS, S3, Azure Blob (не
/tmp) - Sufficient space — state может быть большим для long windows
- Never change shuffle partitions — после checkpoint нельзя менять
spark.sql.shuffle.partitions
Production WARNING: Pitfall 3 из RESEARCH.md — shared checkpoint dirs приводят к corruption.
Проверка знанийПочему после создания checkpoint в PySpark нельзя изменить spark.sql.shuffle.partitions? Что произойдёт при попытке?
Output Modes: append, update, complete
Output mode определяет, какие rows публикуются в sink при каждом batch.
Append Mode
Когда использовать: Append-only streams (no aggregations или finalized windows только)
query = current_df \
.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/data/lake/orders_raw") \
.start()
Поведение: Только new rows публикуются. Для aggregations публикуются только finalized windows (после watermark).
Ограничения: Нельзя использовать для aggregations без watermark.
Update Mode
Когда использовать: Aggregations с continuous updates.
query = result_df \
.writeStream \
.outputMode("update") \
.format("console") \
.start()
Поведение: Только changed rows публикуются (new + updated).
Применение: Kafka sink для aggregated metrics, real-time dashboards.
Complete Mode
Когда использовать: Small result tables (top-N queries, global aggregations).
top_customers = current_df \
.groupBy("customer_id") \
.agg(_sum("total").alias("lifetime_value")) \
.orderBy(col("lifetime_value").desc()) \
.limit(10)
query = top_customers \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
Поведение: Весь result table публикуется при каждом batch.
WARNING: Не используйте complete mode для unbounded results. Memory exhaustion!
Лабораторная работа: Streaming Aggregation
Цель: Создать streaming pipeline для расчета revenue per customer в 5-minute windows.
Задание:
- Прочитать CDC события из
dbserver1.public.orders - Извлечь
afterstate для INSERT и UPDATE операций - Рассчитать
total_revenueиorder_countper customer в 5-minute tumbling windows - Настроить watermark 10 minutes для late data
- Вывести результаты в console для проверки
Starter code:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, to_timestamp, window, count, sum as _sum
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, LongType
# TODO: Create SparkSession with checkpoint configuration
# TODO: Define CDC schema for orders table
# TODO: Read from Kafka topic dbserver1.public.orders
# TODO: Parse JSON and extract after state
# TODO: Configure watermark (10 minutes)
# TODO: Perform windowed aggregation (5-minute windows, group by customer_id)
# TODO: Write to console with update mode
# TODO: Await termination
Expected output (console):
-------------------------------------------
Batch: 1
-------------------------------------------
+------------------+-----------+------------+-------------+
|window |customer_id|order_count|total_revenue|
+------------------+-----------+------------+-------------+
|{10:00, 10:05} |101 |3 |450.00 |
|{10:00, 10:05} |102 |1 |120.50 |
|{10:05, 10:10} |101 |2 |300.00 |
+------------------+-----------+------------+-------------+
Подсказки:
- Используйте
.writeStream.format("console").outputMode("update") - Проверьте schema после
from_json()через.printSchema() - Late events (arriving after watermark) будут drop — это нормально
Что дальше?
Вы научились настраивать PySpark Structured Streaming для чтения CDC событий из Kafka, определять схемы для parsing, применять watermarks для handling late data, и выполнять windowed aggregations.
Но возникает вопрос: Куда писать результаты? В console хорошо для testing, но production требует data lake (Parquet, Delta Lake) или external systems. Как спроектировать ETL pipeline от CDC источника до data warehouse?
В следующем уроке мы изучим ETL/ELT patterns — как организовать CDC data flow от Kafka в data lake с правильной partitioning strategy, metadata tracking, и exactly-once semantics.
Ключевые выводы
- PySpark Structured Streaming использует DataFrame API для unified batch/streaming processing
- Micro-batching обеспечивает latency ~1-10 seconds (vs PyFlink менее 100ms)
- PySpark подходит для data lake integration, ML pipelines, unified workflows
- PyFlink подходит для low-latency, exactly-once, complex event processing
readStream.format("kafka")читает из Kafka topics с автоматическим offset management- Explicit schema обязательна для
from_json()parsing - Watermarks критически важны для aggregations — без них late events drop silently
- Checkpoints обеспечивают fault recovery и exactly-once processing
- Output modes:
append(new rows),update(changed rows),complete(full table) - Never share checkpoint directories между queries — приводит к corruption
- Never change
shuffle.partitionsпосле создания checkpoint - PySpark предоставляет at-least-once для Kafka sink (не exactly-once)
Проверьте понимание
Закончили урок?
Отметьте его как пройденный, чтобы отслеживать свой прогресс