CDC: потребление событий Debezium
Архитектура CDC pipeline
Change Data Capture (CDC) — паттерн захвата изменений в базе данных (INSERT, UPDATE, DELETE) и передачи их как потока событий. Debezium — самый популярный open-source CDC connector для PostgreSQL, MySQL, MongoDB и других баз.
Полный CDC pipeline:
Database → Debezium → Kafka → Spark → Delta Lake
Debezium читает WAL (Write-Ahead Log) базы данных — это тот же механизм, который база использует для репликации. Это означает:
- Нет нагрузки на source DB — читается только лог, не таблица
- Capture всех изменений — включая DELETE
- Низкая задержка — события появляются в Kafka за миллисекунды
Подробная настройка Debezium — это отдельная большая тема. Если вы ещё не работали с Debezium, рекомендуем начать с нашего курса:
Debezium: Single Message TransformsТам подробно разбирается настройка source connectors, SMT (Single Message Transforms) и schema registry.
Формат событий Debezium
Каждое CDC событие Debezium содержит envelope с полями:
{
"before": {
"id": 1001,
"name": "Alice",
"email": "[email protected]",
"amount": 100.0
},
"after": {
"id": 1001,
"name": "Alice",
"email": "[email protected]",
"amount": 150.0
},
"op": "u",
"ts_ms": 1705312800000,
"source": {
"connector": "postgresql",
"db": "orders_db",
"schema": "public",
"table": "customers"
}
}
| Поле | Описание | Значения |
|---|---|---|
before | Состояние строки до изменения | null для INSERT |
after | Состояние строки после изменения | null для DELETE |
op | Тип операции | c (create), u (update), d (delete), r (snapshot read) |
ts_ms | Timestamp изменения (миллисекунды) | Unix epoch ms |
source | Метаданные источника | database, schema, table, LSN/binlog position |
Парсинг CDC событий в Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, when
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType
spark = SparkSession.builder \
.appName("CDC-Debezium-Pipeline") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Schema для Debezium envelope
row_schema = StructType([
StructField("id", LongType()),
StructField("name", StringType()),
StructField("email", StringType()),
StructField("amount", DoubleType())
])
envelope_schema = StructType([
StructField("before", row_schema),
StructField("after", row_schema),
StructField("op", StringType()),
StructField("ts_ms", LongType())
])
# Kafka source -- читаем CDC топик Debezium
raw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "dbserver1.public.customers") \
.option("startingOffsets", "earliest") \
.load()
# Парсинг JSON envelope
cdc_events = raw.select(
from_json(col("value").cast("string"), envelope_schema).alias("cdc")
).select(
col("cdc.op").alias("operation"),
col("cdc.before").alias("before"),
col("cdc.after").alias("after"),
col("cdc.ts_ms").alias("change_timestamp")
)
Обработка INSERT / UPDATE / DELETE
Каждый тип операции требует своей обработки:
# Разбор операций
processed = cdc_events.select(
# Для INSERT и UPDATE -- берём after
# Для DELETE -- берём before (after = null)
when(col("operation").isin("c", "u", "r"), col("after.id"))
.otherwise(col("before.id")).alias("id"),
when(col("operation").isin("c", "u", "r"), col("after.name"))
.otherwise(col("before.name")).alias("name"),
when(col("operation").isin("c", "u", "r"), col("after.email"))
.otherwise(col("before.email")).alias("email"),
when(col("operation").isin("c", "u", "r"), col("after.amount"))
.otherwise(col("before.amount")).alias("amount"),
col("operation"),
col("change_timestamp")
)
Merge в Delta Lake (Upsert паттерн)
Ключевой паттерн CDC -> Delta Lake: merge (upsert). Каждый micro-batch мержится в целевую таблицу:
from delta.tables import DeltaTable
target_path = "/data/gold/customers"
def merge_cdc_batch(batch_df, batch_id):
"""Merge CDC micro-batch в Delta Lake."""
if batch_df.isEmpty():
return
# Дедупликация внутри batch (последнее изменение побеждает)
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc
w = Window.partitionBy("id").orderBy(desc("change_timestamp"))
deduped = batch_df.withColumn("rn", row_number().over(w)) \
.filter(col("rn") == 1).drop("rn")
# Создать таблицу при первом запуске
if not DeltaTable.isDeltaTable(spark, target_path):
deduped.filter(col("operation") != "d") \
.select("id", "name", "email", "amount") \
.write.format("delta").save(target_path)
return
# Merge: INSERT, UPDATE, DELETE
target = DeltaTable.forPath(spark, target_path)
target.alias("target").merge(
deduped.alias("source"),
"target.id = source.id"
).whenMatchedDelete(
condition="source.operation = 'd'"
).whenMatchedUpdate(
condition="source.operation IN ('u', 'c', 'r')",
set={
"name": "source.name",
"email": "source.email",
"amount": "source.amount"
}
).whenNotMatchedInsert(
condition="source.operation != 'd'",
values={
"id": "source.id",
"name": "source.name",
"email": "source.email",
"amount": "source.amount"
}
).execute()
# foreachBatch -- merge каждый micro-batch
query = processed.writeStream \
.foreachBatch(merge_cdc_batch) \
.option("checkpointLocation", "/checkpoints/cdc-customers") \
.trigger(processingTime="30 seconds") \
.start()
query.awaitTermination()
foreachBatch + DeltaTable.merge() — стандартный production паттерн для CDC. foreachBatch даёт доступ к batch как обычному DataFrame, а merge() выполняет атомарный upsert с поддержкой DELETE. Дедупликация внутри batch обязательна — один id может измениться несколько раз за один micro-batch.
Подробнее о Debezium
Для глубокого понимания CDC рекомендуем изучить наш курс по Debezium. Два ключевых модуля:
Debezium + PySpark Structured Streaming CDC ETL/ELT паттерныПолный production pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, when, current_timestamp
from delta.tables import DeltaTable
spark = SparkSession.builder \
.appName("CDC-Production-Pipeline") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Kafka CDC source
raw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "dbserver1.public.orders") \
.option("startingOffsets", "earliest") \
.option("maxOffsetsPerTrigger", 50000) \
.load()
# Parse Debezium envelope
cdc = raw.select(
from_json(col("value").cast("string"), envelope_schema).alias("cdc")
).select(
col("cdc.op").alias("op"),
col("cdc.before"),
col("cdc.after"),
col("cdc.ts_ms")
)
# Extract fields
orders = cdc.select(
when(col("op").isin("c", "u", "r"), col("after.order_id"))
.otherwise(col("before.order_id")).alias("order_id"),
when(col("op").isin("c", "u", "r"), col("after.customer_id"))
.otherwise(col("before.customer_id")).alias("customer_id"),
when(col("op").isin("c", "u", "r"), col("after.total"))
.otherwise(col("before.total")).alias("total"),
col("op"),
col("ts_ms")
)
# Merge into Delta Lake via foreachBatch
orders.writeStream \
.foreachBatch(merge_orders_batch) \
.option("checkpointLocation", "/checkpoints/cdc-orders") \
.trigger(processingTime="30 seconds") \
.start() \
.awaitTermination()
Anti-pattern: игнорирование tombstone (DELETE) событий. В CDC потоке op = "d" означает удаление строки. Если не обрабатывать DELETE, целевая таблица будет содержать удалённые записи — рассинхронизация с source database. Всегда включайте whenMatchedDelete в merge или фильтруйте удалённые записи в downstream processing.
Что дальше?
В Модуле 10 мы перейдём к Lakehouse форматам — Delta Lake, Apache Iceberg, Apache Hudi и Apache Paimon. Вы увидите, как эти форматы обеспечивают ACID транзакции поверх объектного хранилища и как они интегрируются со Structured Streaming.