Learning Platform
Глоссарий Troubleshooting
Урок 08.08 · 14 мин
Продвинутый
CDCDebeziumChange Data CaptureKafkaDelta Lake

CDC: потребление событий Debezium

Архитектура CDC pipeline

Change Data Capture (CDC) — паттерн захвата изменений в базе данных (INSERT, UPDATE, DELETE) и передачи их как потока событий. Debezium — самый популярный open-source CDC connector для PostgreSQL, MySQL, MongoDB и других баз.

Полный CDC pipeline:

CDC Pipeline

Database → Debezium → Kafka → Spark → Delta Lake

Database(source)WAL/binlog
DebeziumConnectorCDC events
Kafka(events)JSON/Avro
SparkStructuredStreamingParse + merge
Delta Lake(target)ACID table

Debezium читает WAL (Write-Ahead Log) базы данных — это тот же механизм, который база использует для репликации. Это означает:

  • Нет нагрузки на source DB — читается только лог, не таблица
  • Capture всех изменений — включая DELETE
  • Низкая задержка — события появляются в Kafka за миллисекунды
TIP

Подробная настройка Debezium — это отдельная большая тема. Если вы ещё не работали с Debezium, рекомендуем начать с нашего курса:

Debezium: Single Message Transforms

Там подробно разбирается настройка source connectors, SMT (Single Message Transforms) и schema registry.

Формат событий Debezium

Каждое CDC событие Debezium содержит envelope с полями:

{
  "before": {
    "id": 1001,
    "name": "Alice",
    "email": "[email protected]",
    "amount": 100.0
  },
  "after": {
    "id": 1001,
    "name": "Alice",
    "email": "[email protected]",
    "amount": 150.0
  },
  "op": "u",
  "ts_ms": 1705312800000,
  "source": {
    "connector": "postgresql",
    "db": "orders_db",
    "schema": "public",
    "table": "customers"
  }
}
ПолеОписаниеЗначения
beforeСостояние строки до измененияnull для INSERT
afterСостояние строки после измененияnull для DELETE
opТип операцииc (create), u (update), d (delete), r (snapshot read)
ts_msTimestamp изменения (миллисекунды)Unix epoch ms
sourceМетаданные источникаdatabase, schema, table, LSN/binlog position

Парсинг CDC событий в Spark

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, when
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType

spark = SparkSession.builder \
    .appName("CDC-Debezium-Pipeline") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Schema для Debezium envelope
row_schema = StructType([
    StructField("id", LongType()),
    StructField("name", StringType()),
    StructField("email", StringType()),
    StructField("amount", DoubleType())
])

envelope_schema = StructType([
    StructField("before", row_schema),
    StructField("after", row_schema),
    StructField("op", StringType()),
    StructField("ts_ms", LongType())
])

# Kafka source -- читаем CDC топик Debezium
raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "dbserver1.public.customers") \
    .option("startingOffsets", "earliest") \
    .load()

# Парсинг JSON envelope
cdc_events = raw.select(
    from_json(col("value").cast("string"), envelope_schema).alias("cdc")
).select(
    col("cdc.op").alias("operation"),
    col("cdc.before").alias("before"),
    col("cdc.after").alias("after"),
    col("cdc.ts_ms").alias("change_timestamp")
)

Обработка INSERT / UPDATE / DELETE

Каждый тип операции требует своей обработки:

# Разбор операций
processed = cdc_events.select(
    # Для INSERT и UPDATE -- берём after
    # Для DELETE -- берём before (after = null)
    when(col("operation").isin("c", "u", "r"), col("after.id"))
        .otherwise(col("before.id")).alias("id"),
    when(col("operation").isin("c", "u", "r"), col("after.name"))
        .otherwise(col("before.name")).alias("name"),
    when(col("operation").isin("c", "u", "r"), col("after.email"))
        .otherwise(col("before.email")).alias("email"),
    when(col("operation").isin("c", "u", "r"), col("after.amount"))
        .otherwise(col("before.amount")).alias("amount"),
    col("operation"),
    col("change_timestamp")
)

Merge в Delta Lake (Upsert паттерн)

Ключевой паттерн CDC -> Delta Lake: merge (upsert). Каждый micro-batch мержится в целевую таблицу:

from delta.tables import DeltaTable

target_path = "/data/gold/customers"

def merge_cdc_batch(batch_df, batch_id):
    """Merge CDC micro-batch в Delta Lake."""

    if batch_df.isEmpty():
        return

    # Дедупликация внутри batch (последнее изменение побеждает)
    from pyspark.sql.window import Window
    from pyspark.sql.functions import row_number, desc

    w = Window.partitionBy("id").orderBy(desc("change_timestamp"))
    deduped = batch_df.withColumn("rn", row_number().over(w)) \
        .filter(col("rn") == 1).drop("rn")

    # Создать таблицу при первом запуске
    if not DeltaTable.isDeltaTable(spark, target_path):
        deduped.filter(col("operation") != "d") \
            .select("id", "name", "email", "amount") \
            .write.format("delta").save(target_path)
        return

    # Merge: INSERT, UPDATE, DELETE
    target = DeltaTable.forPath(spark, target_path)

    target.alias("target").merge(
        deduped.alias("source"),
        "target.id = source.id"
    ).whenMatchedDelete(
        condition="source.operation = 'd'"
    ).whenMatchedUpdate(
        condition="source.operation IN ('u', 'c', 'r')",
        set={
            "name": "source.name",
            "email": "source.email",
            "amount": "source.amount"
        }
    ).whenNotMatchedInsert(
        condition="source.operation != 'd'",
        values={
            "id": "source.id",
            "name": "source.name",
            "email": "source.email",
            "amount": "source.amount"
        }
    ).execute()

# foreachBatch -- merge каждый micro-batch
query = processed.writeStream \
    .foreachBatch(merge_cdc_batch) \
    .option("checkpointLocation", "/checkpoints/cdc-customers") \
    .trigger(processingTime="30 seconds") \
    .start()

query.awaitTermination()
TIP

foreachBatch + DeltaTable.merge() — стандартный production паттерн для CDC. foreachBatch даёт доступ к batch как обычному DataFrame, а merge() выполняет атомарный upsert с поддержкой DELETE. Дедупликация внутри batch обязательна — один id может измениться несколько раз за один micro-batch.

Подробнее о Debezium

Для глубокого понимания CDC рекомендуем изучить наш курс по Debezium. Два ключевых модуля:

Debezium + PySpark Structured Streaming CDC ETL/ELT паттерны

Полный production pipeline

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, when, current_timestamp
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("CDC-Production-Pipeline") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Kafka CDC source
raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "dbserver1.public.orders") \
    .option("startingOffsets", "earliest") \
    .option("maxOffsetsPerTrigger", 50000) \
    .load()

# Parse Debezium envelope
cdc = raw.select(
    from_json(col("value").cast("string"), envelope_schema).alias("cdc")
).select(
    col("cdc.op").alias("op"),
    col("cdc.before"),
    col("cdc.after"),
    col("cdc.ts_ms")
)

# Extract fields
orders = cdc.select(
    when(col("op").isin("c", "u", "r"), col("after.order_id"))
        .otherwise(col("before.order_id")).alias("order_id"),
    when(col("op").isin("c", "u", "r"), col("after.customer_id"))
        .otherwise(col("before.customer_id")).alias("customer_id"),
    when(col("op").isin("c", "u", "r"), col("after.total"))
        .otherwise(col("before.total")).alias("total"),
    col("op"),
    col("ts_ms")
)

# Merge into Delta Lake via foreachBatch
orders.writeStream \
    .foreachBatch(merge_orders_batch) \
    .option("checkpointLocation", "/checkpoints/cdc-orders") \
    .trigger(processingTime="30 seconds") \
    .start() \
    .awaitTermination()
WARNING

Anti-pattern: игнорирование tombstone (DELETE) событий. В CDC потоке op = "d" означает удаление строки. Если не обрабатывать DELETE, целевая таблица будет содержать удалённые записи — рассинхронизация с source database. Всегда включайте whenMatchedDelete в merge или фильтруйте удалённые записи в downstream processing.

Проверка знанийKnowledge check
В Debezium CDC событии поле op='u'. Какие поля before и after содержат, и какое состояние строки вы используете для записи в Delta Lake?
ОтветAnswer
При op='u' (update): before содержит состояние строки ДО изменения, after содержит состояние ПОСЛЕ изменения. Для записи в Delta Lake используем after -- это актуальное состояние строки после обновления. before может быть полезен для аудита (что именно изменилось) или SCD Type 2 (history tracking), но для основной target таблицы всегда берём after.
Проверка знанийKnowledge check
Почему дедупликация внутри micro-batch обязательна при CDC -> Delta merge?
ОтветAnswer
В одном micro-batch могут быть несколько изменений одного id: INSERT -> UPDATE -> UPDATE. Без дедупликации merge получит конфликт -- несколько source строк match одной target строке, что вызовет DeltaMergeMatchedMultipleRowError. Дедупликация оставляет только последнее изменение (по ts_ms) для каждого id, гарантируя корректный merge. Используем window function с row_number() ORDER BY ts_ms DESC, берём rn=1.

Что дальше?

В Модуле 10 мы перейдём к Lakehouse форматам — Delta Lake, Apache Iceberg, Apache Hudi и Apache Paimon. Вы увидите, как эти форматы обеспечивают ACID транзакции поверх объектного хранилища и как они интегрируются со Structured Streaming.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. В Debezium envelope-формате, какие значения полей before и after указывают на DELETE-операцию?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8