Требуемые знания:
- module-5/05-pyspark-structured-streaming
ETL/ELT паттерны с CDC данными
Вы настроили PySpark для чтения CDC событий из Kafka. Но куда писать результаты? Как организовать data flow от transactional database через CDC stream в analytical data warehouse? ETL и ELT — это два фундаментальных подхода к построению data pipelines. В этом уроке мы изучим, как применить эти паттерны к CDC данным, какие challenges возникают, и как их решать в production.
ETL vs ELT: современный подход
ETL (Extract, Transform, Load) и ELT (Extract, Load, Transform) — это архитектурные паттерны для data integration.
Traditional ETL
Extract → Transform → Load с трансформациями перед загрузкой
ETL трансформирует данные перед загрузкой в warehouse. Проблема: нет raw data для re-processing, сложность изменений (новая метрика = переделка ETL).
ETL паттерн:
- Extract — читаем данные из source
- Transform — применяем бизнес-логику, очистка, агрегации
- Load — загружаем готовые данные в warehouse
Плюсы:
- Warehouse хранит только clean data
- Меньше storage requirements
- Controlled data quality
Минусы:
- ETL bottleneck — трансформации могут быть медленными
- Потеря raw data — нет возможности re-process
- Сложность изменений — новая метрика → переделка ETL
Modern ELT
Extract → Load → Transform с трансформациями в warehouse
ELT загружает raw data в lake, трансформации в warehouse. Преимущества: raw data для re-processing, гибкость (новая метрика = новый SQL query), warehouse выполняет трансформации.
ELT паттерн:
- Extract — CDC capture changes
- Load — записываем raw events в data lake
- Transform — трансформации выполняются в warehouse (SQL, dbt)
Плюсы:
- Raw data сохранен — можно re-process
- Warehouse выполняет трансформации — использует свою compute power
- Гибкость — новая метрика = новый SQL query
Минусы:
- Больше storage (храним raw + transformed)
- Data quality проверки downstream
Modern trend: ELT становится стандартом с появлением cloud data warehouses (Snowflake, BigQuery, Redshift) и cheap storage (S3, Azure Blob).
CDC в контексте ETL/ELT
CDC меняет игру: Вместо periodic full dumps мы получаем continuous incremental updates.
CDC enables both patterns:
- ETL with CDC: Transform в Kafka Streams/Flink → load clean data
- ELT with CDC: Load raw CDC events → transform в warehouse
В этом уроке: Фокус на ELT with CDC — как спроектировать data lake для CDC events.
Проверка знанийПочему ELT подход лучше ETL для CDC данных, если бизнес-требования могут измениться? Какое ключевое преимущество даёт сохранение raw CDC событий?
CDC как источник для Data Lake
Data Lake — это centralized repository для хранения raw data в native format (Parquet, ORC, Avro).
Advantages of CDC → Data Lake
Один CDC stream → несколько output layers
All change events
+ metadata columns
One row per entity
Latest values only
All historical values
With timestamps
CDC stream записывается как raw events (append-only). Downstream processing создает snapshot (current state) и history (audit trail). Один source → три output layers для разных use cases.
Преимущества:
- Incremental updates — только изменения, не full table dumps
- Change history — полная история изменений для audit
- Audit trail — кто, когда, что изменил (via CDC metadata)
- Time travel — можно восстановить состояние на любую дату
- Low latency — real-time updates вместо batch loads
Challenges:
- Operation types — INSERT, UPDATE, DELETE требуют разной обработки
- Ordering — CDC события могут arrive out-of-order
- Schema evolution — schema changes в source database
- Compaction — history растет unbounded без cleanup
Target Formats
| Format | Когда использовать | Особенности |
|---|---|---|
| Parquet | Append-only history, simple queries | Columnar, good compression, immutable |
| Delta Lake | ACID updates, time travel, upserts | ACID transactions, MERGE support, versioning |
| Apache Iceberg | Schema evolution, partition evolution | Netflix/Apple developed, advanced features |
Для CDC: Delta Lake или Iceberg рекомендуются для production (ACID upserts). Parquet подходит для append-only scenarios.
Паттерн: Append-Only History
Идея: Сохранять все CDC события как append-only log. Never update or delete.
Architecture
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, current_timestamp, to_date, lit
spark = SparkSession.builder \
.appName("CDC-Append-Only-History") \
.config("spark.sql.streaming.checkpointLocation", "/data/checkpoints/history") \
.getOrCreate()
# Read CDC stream
cdc_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "dbserver1.public.orders") \
.load()
# Parse and add metadata
parsed_df = cdc_df.select(
from_json(col("value").cast("string"), cdc_schema).alias("data")
).select("data.payload.*")
# Add metadata columns
history_df = parsed_df.select(
col("after.*"), # or before.* for deletes
col("op").alias("_operation"), # c, u, d, r
col("ts_ms").alias("_cdc_timestamp"),
current_timestamp().alias("_processed_at"),
col("source.db").alias("_source_db"),
col("source.table").alias("_source_table")
).withColumn(
"_processed_date",
to_date(col("_processed_at"))
)
# Write to Parquet with date partitioning
query = history_df \
.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/data/lake/orders_history") \
.option("checkpointLocation", "/data/checkpoints/orders_history") \
.partitionBy("_processed_date") \
.trigger(processingTime="30 seconds") \
.start()
query.awaitTermination()
Metadata columns:
_operation— тип операции (c=create, u=update, d=delete, r=read/snapshot)_cdc_timestamp— timestamp от database transaction commit_processed_at— timestamp когда Spark обработал событие_source_db/_source_table— source metadata для multi-table pipelines
Partitioning strategy:
/data/lake/orders_history/
├── _processed_date=2026-01-01/
│ ├── part-00000.parquet
│ └── part-00001.parquet
├── _processed_date=2026-01-02/
│ └── part-00000.parquet
Зачем партиционирование по дате:
- Efficient pruning для time-range queries
- Easy cleanup старых partitions
- Simplified backfill logic
Querying history:
-- Get all changes for specific order
SELECT * FROM orders_history
WHERE id = 12345
ORDER BY _cdc_timestamp;
-- Get state at specific point in time
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY id
ORDER BY _cdc_timestamp DESC
) as rn
FROM orders_history
WHERE _cdc_timestamp <= '2026-01-15 10:00:00'
)
WHERE rn = 1 AND _operation != 'd';
Production tip: Append-only history идеально для compliance (GDPR audit trail), но растет unbounded. Нужна retention policy (например, delete partitions > 2 years).
Паттерн: Separate Streams by Operation
Идея: Разделить CDC stream на отдельные потоки по типу операции. INSERT → append table, UPDATE → upsert logic, DELETE → soft delete.
Architecture
Добавление metadata columns для downstream processing
Metadata columns критически важны для troubleshooting, lag monitoring, правильной обработки operation types, и multi-table pipelines. Всегда добавляйте их при записи CDC data в data lake.
Implementation:
from pyspark.sql.functions import when
# Read and parse CDC stream
parsed_df = kafka_df.select(
from_json(col("value").cast("string"), cdc_schema).alias("data")
).select("data.payload.*")
# Separate by operation type
inserts = parsed_df \
.filter(col("op") == "c") \
.select("after.*") \
.withColumn("created_at", current_timestamp())
updates = parsed_df \
.filter(col("op") == "u") \
.select("after.*") \
.withColumn("updated_at", current_timestamp())
deletes = parsed_df \
.filter(col("op") == "d") \
.select(
col("before.id"),
current_timestamp().alias("deleted_at")
)
# Write inserts to append-only table
inserts_query = inserts \
.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/data/lake/orders_inserts") \
.option("checkpointLocation", "/data/checkpoints/inserts") \
.start()
# Write updates to separate location for MERGE processing
updates_query = updates \
.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/data/lake/orders_updates") \
.option("checkpointLocation", "/data/checkpoints/updates") \
.start()
# Write deletes for soft-delete processing
deletes_query = deletes \
.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/data/lake/orders_deletes") \
.option("checkpointLocation", "/data/checkpoints/deletes") \
.start()
Зачем разделять потоки:
- Different processing logic — INSERT просто append, UPDATE требует upsert, DELETE требует soft-delete
- Different schemas — DELETE имеет только
beforefields - Optimization — можно настроить разные partitioning/compression strategies
- Monitoring — отдельные metrics для каждого operation type
Downstream processing:
- Inserts: Просто append в финальную таблицу
- Updates: Batch MERGE job (hourly/daily) для применения updates
- Deletes: Batch job для soft-delete (set
deleted_attimestamp)
Паттерн: Merge/Upsert с Delta Lake
Delta Lake предоставляет ACID transactions и MERGE command для atomic upserts.
Delta Lake MERGE
from delta.tables import DeltaTable
# Initialize Delta Lake
spark.sparkContext.setSystemProperty("spark.databricks.delta.retentionDurationCheck.enabled", "false")
# Read CDC stream
cdc_stream = parsed_df.select(
col("after.*"),
col("op")
).filter(col("op").isin("c", "u", "d"))
# Function to merge batch into Delta table
def merge_to_delta(batch_df, batch_id):
"""Merge CDC batch into Delta Lake table."""
# Target Delta table path
delta_path = "/data/lake/orders_delta"
# Create table if doesn't exist
if not DeltaTable.isDeltaTable(spark, delta_path):
batch_df.limit(0).write.format("delta").save(delta_path)
delta_table = DeltaTable.forPath(spark, delta_path)
# MERGE logic
delta_table.alias("target").merge(
batch_df.alias("source"),
"target.id = source.id"
).whenMatchedDelete(
condition="source.op = 'd'"
).whenMatchedUpdateAll(
condition="source.op = 'u'"
).whenNotMatchedInsertAll(
condition="source.op = 'c'"
).execute()
# Apply merge using foreachBatch
query = cdc_stream \
.writeStream \
.foreachBatch(merge_to_delta) \
.option("checkpointLocation", "/data/checkpoints/delta_merge") \
.trigger(processingTime="1 minute") \
.start()
MERGE logic explained:
whenMatchedDelete— еслиidсуществует иop = 'd'→ DELETE rowwhenMatchedUpdateAll— еслиidсуществует иop = 'u'→ UPDATE all columnswhenNotMatchedInsertAll— еслиidне существует иop = 'c'→ INSERT new row
Advantages:
- ACID guarantees — atomic merge operations
- Time travel — можно query historical versions
- Schema evolution — automatic schema merging
- Optimizations — Z-ordering, data skipping
Performance tip: MERGE операции дорогие. Используйте batching (trigger каждую минуту, не каждое событие).
Проверка знанийПочему Delta Lake MERGE предпочтительнее append-only Parquet для CDC данных, если нужен актуальный snapshot таблицы? Какое ограничение Parquet это создаёт?
Streaming to Parquet
Parquet — это columnar storage format, оптимизированный для analytical queries.
Configuration
from pyspark.sql.functions import date_format
# Read CDC stream
cdc_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "dbserver1.public.orders") \
.load()
parsed_df = cdc_df.select(
from_json(col("value").cast("string"), cdc_schema).alias("data")
).select("data.payload.*")
# Extract current state (after field for c/u, before for d)
current_state = parsed_df.select(
when(col("op").isin("c", "u", "r"), col("after.id")).otherwise(col("before.id")).alias("id"),
when(col("op").isin("c", "u", "r"), col("after.customer_id")).otherwise(col("before.customer_id")).alias("customer_id"),
when(col("op").isin("c", "u", "r"), col("after.total")).otherwise(None).alias("total"),
when(col("op").isin("c", "u", "r"), col("after.status")).otherwise(None).alias("status"),
col("op").alias("_operation"),
col("ts_ms").alias("_cdc_timestamp"),
current_timestamp().alias("_processed_at")
).withColumn(
"_year_month",
date_format(col("_processed_at"), "yyyy-MM")
)
# Write to Parquet with partitioning
query = current_state \
.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/data/lake/orders") \
.option("checkpointLocation", "/data/checkpoints/orders_parquet") \
.option("compression", "snappy") \
.partitionBy("_year_month") \
.trigger(processingTime="30 seconds") \
.start()
Partitioning strategies:
| Strategy | Pattern | When to use |
|---|---|---|
| By date | _year_month=2026-01 | Time-range queries, retention policies |
| By table | _table=orders | Multi-table CDC pipeline |
| By operation | _operation=c | Separate analytics for inserts/updates |
| By dimension | region=EU | Geographic partitioning |
Trigger options:
# Process as fast as possible (default)
.trigger(processingTime="0 seconds")
# Process every 30 seconds
.trigger(processingTime="30 seconds")
# Process once and stop (batch-like)
.trigger(once=True)
# Process all available data (Spark 3.3+)
.trigger(availableNow=True)
Compression codecs:
snappy— fast compression/decompression, moderate compression ratio (default)gzip— high compression ratio, slowerzstd— balanced, good for modern workloads
Production recommendation:
snappyдля hot data (recent),zstdдля cold data (archival).
Exactly-Once в PySpark Streaming
Critical limitation: PySpark Structured Streaming предоставляет at-least-once semantics для Kafka sink.
PySpark Guarantees
Разделение CDC stream по типу операции
Разные операции требуют разной обработки downstream. INSERT — простой append, UPDATE — MERGE/upsert, DELETE — soft-delete. Разделение упрощает downstream logic и позволяет настроить separate partitioning, compression, monitoring для каждого типа.
What PySpark provides:
- Kafka source → Spark: Exactly-once (via checkpointed offsets)
- Spark → Parquet/Delta: Idempotent writes (same batch writes same files)
- Spark → Kafka: At-least-once only (no transactional writes)
Why at-least-once to Kafka:
- PySpark не поддерживает Kafka transactional producer API
- При failure и retry → duplicate messages в output Kafka topic
- PyFlink поддерживает exactly-once to Kafka через transactional writes
Solutions for Exactly-Once
Option 1: Idempotent consumers
# Downstream consumer должен быть idempotent
# Example: Database upsert вместо insert
def process_message(msg):
# INSERT ... ON CONFLICT UPDATE
db.execute("""
INSERT INTO orders (id, customer_id, total)
VALUES (?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
customer_id = EXCLUDED.customer_id,
total = EXCLUDED.total
""", (msg.id, msg.customer_id, msg.total))
Option 2: Deduplication with state store
# Store processed message IDs in Redis/database
def is_duplicate(message_id):
return redis.sismember("processed_messages", message_id)
def mark_processed(message_id):
redis.sadd("processed_messages", message_id)
redis.expire("processed_messages", 86400) # 24h TTL
Option 3: Use PyFlink instead
If exactly-once to Kafka is critical → use PyFlink with transactional sink.
Production insight: Для CDC → data lake пайплайнов at-least-once обычно acceptable (idempotent writes). Для CDC → Kafka → другой consumer рекомендуется PyFlink.
Мониторинг streaming pipeline
Streaming query metrics доступны через StreamingQueryListener API.
Key Metrics
from pyspark.sql.streaming import StreamingQueryListener
class MetricsListener(StreamingQueryListener):
def onQueryProgress(self, event):
progress = event.progress
print(f"Batch: {progress.batchId}")
print(f"Input rows: {progress.numInputRows}")
print(f"Processed rows/sec: {progress.processedRowsPerSecond}")
print(f"Batch duration: {progress.batchDuration}ms")
# Check for lag
if progress.processedRowsPerSecond < 100:
print("WARNING: Processing lag detected!")
def onQueryStarted(self, event):
print(f"Query started: {event.id}")
def onQueryTerminated(self, event):
print(f"Query terminated: {event.id}")
# Register listener
spark.streams.addListener(MetricsListener())
Metrics to monitor:
inputRowsPerSecond— incoming data rateprocessedRowsPerSecond— processing throughputbatchDuration— time to process each micro-batch- Lag = input rate - processed rate — if lag growing → scaling needed
Alerting thresholds:
alerts:
- name: Processing Lag
condition: processedRowsPerSecond < inputRowsPerSecond
for: 5m
severity: warning
- name: Batch Duration High
condition: batchDuration > 60000 # 60 seconds
for: 3m
severity: critical
Лабораторная работа: CDC to Data Lake
Цель: Создать production-like ETL pipeline от CDC stream до Parquet data lake.
Задание:
- Прочитать CDC события из
dbserver1.public.orders - Добавить metadata columns:
_operation,_cdc_timestamp,_processed_at - Партиционировать по
_processed_date(date from_processed_at) - Записать в Parquet format с compression
snappy - Trigger: каждые 30 секунд
- Query результаты через Spark SQL для проверки
Starter code:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
from_json, col, current_timestamp, to_date, date_format
)
# TODO: Create SparkSession
# TODO: Define CDC schema
# TODO: Read from Kafka
# TODO: Parse JSON and add metadata columns
# TODO: Write to Parquet with partitioning
# TODO: Query Parquet files to verify
# spark.read.parquet("/data/lake/orders").show()
Expected directory structure:
/data/lake/orders/
├── _processed_date=2026-02-01/
│ ├── part-00000-xxx.snappy.parquet
│ └── part-00001-xxx.snappy.parquet
└── _processed_date=2026-02-02/
└── part-00000-xxx.snappy.parquet
Verification query:
df = spark.read.parquet("/data/lake/orders")
# Check schema
df.printSchema()
# Count by operation type
df.groupBy("_operation").count().show()
# Check partitions
df.groupBy("_processed_date").count().show()
# Query specific order history
df.filter(col("id") == 12345).orderBy("_cdc_timestamp").show()
Что дальше?
Вы изучили ETL vs ELT подходы, паттерны для CDC → data lake integration, и практические аспекты работы с Parquet и Delta Lake.
Но остается вопрос: Как использовать CDC данные для real-time machine learning? Как извлекать features из streaming CDC events? Как обновлять feature store для online predictions?
В следующем уроке мы изучим Real-time Feature Engineering — как построить ML pipeline поверх CDC stream для fraud detection, recommendation systems, и real-time personalization.
Ключевые выводы
- ETL трансформирует данные перед загрузкой, ELT загружает raw и трансформирует в warehouse
- Modern trend: ELT с cheap storage и powerful cloud warehouses
- CDC enables incremental ELT — continuous updates вместо batch dumps
- Append-only history паттерн сохраняет все CDC события для audit trail
- Separate streams by operation позволяет применять разную логику для INSERT/UPDATE/DELETE
- Delta Lake MERGE предоставляет ACID upserts для CDC data
- Parquet partitioning критически важно для query performance и retention policies
- Metadata columns (
_operation,_cdc_timestamp,_processed_at) essential для troubleshooting - PySpark provides at-least-once для Kafka sink (не exactly-once)
- Idempotent consumers или PyFlink needed для exactly-once end-to-end
- Monitor
processedRowsPerSecondдля detection processing lag - Trigger configuration влияет на latency vs efficiency tradeoff
Проверьте понимание
Закончили урок?
Отметьте его как пройденный, чтобы отслеживать свой прогресс