Перейти к содержанию
Learning Platform
Средний
25 минут
Debezium Event Structure JSON Python

Требуемые знания:

  • module-1/05-python-consumer

Структура CDC событий Debezium

В этом уроке мы разберем анатомию CDC-событий Debezium и напишем переиспользуемую функцию для их парсинга.

Envelope-формат Debezium

Каждое CDC-событие обернуто в envelope (конверт) — стандартную структуру с метаданными:

{
  "schema": {
    "type": "struct",
    "fields": [ ... ]
  },
  "payload": {
    "before": null,
    "after": {
      "id": 3,
      "name": "Алексей Козлов",
      "email": "[email protected]",
      "created_at": 1706745600000000
    },
    "source": {
      "version": "2.5.4.Final",
      "connector": "postgresql",
      "name": "inventory",
      "ts_ms": 1706745600123,
      "snapshot": "false",
      "db": "inventory",
      "schema": "public",
      "table": "customers",
      "txId": 1234,
      "lsn": 23456789
    },
    "op": "c",
    "ts_ms": 1706745600456,
    "transaction": null
  }
}

Разберем ключевые компоненты:

schema (опционально)

Содержит JSON Schema описание структуры payload. Полезно для Schema Registry, но для базовой работы можно игнорировать.

payload — основная часть события

ПолеНазначение
beforeСостояние строки ДО изменения
afterСостояние строки ПОСЛЕ изменения
sourceМетаданные об источнике (база, таблица, позиция в логе)
opТип операции: r, c, u, d
ts_msTimestamp обработки Debezium (миллисекунды)
transactionИнформация о транзакции (опционально)

Типы операций (op)

КодНазваниеbeforeafterКогда
rread (snapshot)nullданныеНачальный snapshot
ccreate (INSERT)nullданныеВставка новой строки
uupdate (UPDATE)старые данныеновые данныеОбновление строки
ddelete (DELETE)старые данныеnullУдаление строки

Визуализация

r (snapshot)
null
данные
c (create)
null
данные
u (update)
старое
новое
d (delete)
данные
null
Проверка знаний
При DELETE-операции какое поле содержит данные удаленной строки -- before или after? Почему?
Ответ
Поле before содержит данные удаленной строки, а after равно null, потому что строка больше не существует после удаления. Для INSERT -- наоборот: before равно null (строки не было), after содержит новые данные. Для UPDATE доступны оба поля: before -- старое состояние, after -- новое.

Ключевая ошибка: null в before/after

Самая частая ошибка начинающих — обращение к after без проверки на null:

# НЕПРАВИЛЬНО - упадет на DELETE
customer_id = event['payload']['after']['id']  # after = null для DELETE!

Правильный подход — проверять тип операции:

# ПРАВИЛЬНО - обрабатываем все случаи
payload = event['payload']
op = payload['op']

if op == 'r':  # Snapshot
    data = payload['after']  # after есть
elif op == 'c':  # Create (INSERT)
    data = payload['after']  # after есть
elif op == 'u':  # Update
    old_data = payload['before']  # before есть
    new_data = payload['after']   # after есть
elif op == 'd':  # Delete
    data = payload['before']  # before есть, after = null!

Метаданные источника (source)

Поле source содержит информацию о происхождении события:

"source": {
  "version": "2.5.4.Final",
  "connector": "postgresql",
  "name": "inventory",
  "ts_ms": 1706745600123,
  "snapshot": "false",
  "db": "inventory",
  "schema": "public",
  "table": "customers",
  "txId": 1234,
  "lsn": 23456789
}
ПолеТипОписание
versionstringВерсия Debezium
connectorstringТип коннектора (postgresql, mysql, mongodb…)
namestringЛогическое имя сервера (topic.prefix)
ts_msnumberTimestamp изменения в БД (миллисекунды)
snapshotstring"true" = snapshot, "false" = live, "last" = последнее событие snapshot
dbstringИмя базы данных
schemastringИмя схемы (для PostgreSQL)
tablestringИмя таблицы
txIdnumberID транзакции в PostgreSQL
lsnnumberLog Sequence Number — позиция в WAL

Важные поля для обработки

ts_ms (source) — когда изменение произошло в базе данных. Используйте для event time processing.

snapshot — определяет, это начальная загрузка или live-изменение:

  • "true" — событие из snapshot
  • "false" — live-изменение
  • "last" — последнее событие snapshot (редко используется)

lsn — позиция в WAL. Используется для точного восстановления позиции при перезапуске.

Проверка знаний
В чем разница между payload.ts_ms и source.ts_ms в Debezium-событии? Какое значение следует использовать для event time processing?
Ответ
source.ts_ms -- это время, когда изменение произошло в базе данных (event time). payload.ts_ms -- это время, когда Debezium обработал событие (processing time). Для event time processing в data pipeline всегда используйте source.ts_ms, потому что оно отражает реальный момент изменения данных, а не задержку обработки.

Полная функция парсинга CDC событий

Эта функция корректно обрабатывает все четыре типа операций:

def parse_cdc_event(raw_event):
    """
    Парсинг Debezium CDC события в структурированный формат.

    Обрабатывает все типы операций:
    - 'r' (read/snapshot): Начальная загрузка существующих данных
    - 'c' (create): INSERT — новая строка
    - 'u' (update): UPDATE — изменение строки
    - 'd' (delete): DELETE — удаление строки

    Args:
        raw_event: dict из json.loads(message.value())

    Returns:
        dict с полями:
        - operation: человекочитаемое имя операции
        - operation_code: оригинальный код (r/c/u/d)
        - before: состояние до изменения (null для r, c)
        - after: состояние после изменения (null для d)
        - is_snapshot: True если событие из начального snapshot
        - table: полное имя таблицы (schema.table)
        - database: имя базы данных
        - timestamp_ms: время обработки Debezium
        - transaction_id: ID транзакции PostgreSQL
        - lsn: Log Sequence Number (позиция в WAL)
    """
    payload = raw_event.get('payload', {})
    source = payload.get('source', {})

    op = payload.get('op')
    op_names = {
        'r': 'snapshot',  # Read - начальный snapshot
        'c': 'create',    # Create - INSERT
        'u': 'update',    # Update - UPDATE
        'd': 'delete'     # Delete - DELETE
    }

    return {
        'operation': op_names.get(op, f'unknown({op})'),
        'operation_code': op,
        'before': payload.get('before'),  # null для r, c
        'after': payload.get('after'),    # null для d
        'is_snapshot': source.get('snapshot') == 'true',
        'table': f"{source.get('schema')}.{source.get('table')}",
        'database': source.get('db'),
        'timestamp_ms': payload.get('ts_ms'),
        'transaction_id': source.get('txId'),
        'lsn': source.get('lsn')
    }

Использование функции парсинга

Полный пример consumer с обработкой всех типов операций:

from confluent_kafka import Consumer
import json


def parse_cdc_event(raw_event):
    """
    Парсинг Debezium CDC события в структурированный формат.

    Обрабатывает все типы операций:
    - 'r' (read/snapshot): Начальная загрузка существующих данных
    - 'c' (create): INSERT — новая строка
    - 'u' (update): UPDATE — изменение строки
    - 'd' (delete): DELETE — удаление строки

    Args:
        raw_event: dict из json.loads(message.value())

    Returns:
        dict с полями operation, operation_code, before, after,
        is_snapshot, table, database, timestamp_ms, transaction_id, lsn
    """
    payload = raw_event.get('payload', {})
    source = payload.get('source', {})

    op = payload.get('op')
    op_names = {
        'r': 'snapshot',
        'c': 'create',
        'u': 'update',
        'd': 'delete'
    }

    return {
        'operation': op_names.get(op, f'unknown({op})'),
        'operation_code': op,
        'before': payload.get('before'),
        'after': payload.get('after'),
        'is_snapshot': source.get('snapshot') == 'true',
        'table': f"{source.get('schema')}.{source.get('table')}",
        'database': source.get('db'),
        'timestamp_ms': payload.get('ts_ms'),
        'transaction_id': source.get('txId'),
        'lsn': source.get('lsn')
    }


# Конфигурация consumer
# ВАЖНО: kafka:9092 для JupyterLab, localhost:9092 для хоста
config = {
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'cdc-parser-demo',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True
}

consumer = Consumer(config)
consumer.subscribe(['inventory.public.customers'])

print("CDC Parser запущен...")
print("Обрабатываем события с inventory.public.customers\n")

try:
    while True:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue

        if msg.error():
            print(f"Error: {msg.error()}")
            continue

        try:
            raw = json.loads(msg.value().decode('utf-8'))
            event = parse_cdc_event(raw)

            print(f"[{event['operation'].upper()}] {event['table']}")

            # Обработка каждого типа операции
            op = event['operation_code']

            if op == 'r':  # Snapshot (начальная загрузка)
                print(f"  Snapshot данные: {event['after']}")
                print(f"  (Это существующая запись при старте коннектора)")

            elif op == 'c':  # Create (INSERT)
                print(f"  Новая запись: {event['after']}")

            elif op == 'u':  # Update
                print(f"  Было: {event['before']}")
                print(f"  Стало: {event['after']}")

            elif op == 'd':  # Delete
                print(f"  Удалено: {event['before']}")

            else:
                print(f"  Неизвестная операция: {op}")

            # Дополнительная информация
            if event['is_snapshot']:
                print(f"  [SNAPSHOT]")
            print(f"  Transaction ID: {event['transaction_id']}, LSN: {event['lsn']}")
            print()

        except (json.JSONDecodeError, UnicodeDecodeError) as e:
            print(f"Ошибка декодирования: {e}")
            continue

except KeyboardInterrupt:
    print("\nОстановка...")
finally:
    consumer.close()
    print("Consumer закрыт.")

Различие snapshot и streaming событий

При первом запуске коннектора Debezium выполняет snapshot — чтение всех существующих данных:

[SNAPSHOT] public.customers
  Snapshot данные: {'id': 1, 'name': 'Иван Петров', ...}
  (Это существующая запись при старте коннектора)
  [SNAPSHOT]
  Transaction ID: None, LSN: None

[SNAPSHOT] public.customers
  Snapshot данные: {'id': 2, 'name': 'Мария Сидорова', ...}
  (Это существующая запись при старте коннектора)
  [SNAPSHOT]
  Transaction ID: None, LSN: None

После завершения snapshot события становятся streaming (live):

[CREATE] public.customers
  Новая запись: {'id': 3, 'name': 'Алексей Козлов', ...}
  Transaction ID: 1234, LSN: 23456789

Ключевое отличие:

  • Snapshot события: op='r', source.snapshot='true', нет txId/lsn
  • Streaming события: op = c/u/d, source.snapshot='false', есть txId/lsn

Зачем это различать?

В data pipelines часто нужно:

  1. Обрабатывать snapshot отдельно — например, bulk load в data warehouse
  2. Фильтровать snapshot — если нужны только новые изменения
  3. Дедупликация — snapshot может пересекаться с уже загруженными данными

Работа с timestamps

Debezium использует миллисекунды для timestamps:

from datetime import datetime

def format_event_time(event):
    """Форматирование времени события."""
    ts_ms = event['timestamp_ms']
    if ts_ms is None:
        return "N/A (snapshot)"

    dt = datetime.fromtimestamp(ts_ms / 1000)
    return dt.strftime('%Y-%m-%d %H:%M:%S')


# Использование
event = parse_cdc_event(raw)
print(f"Время события: {format_event_time(event)}")

Два timestamp в событии:

  • payload.ts_ms — когда Debezium обработал событие
  • source.ts_ms — когда изменение произошло в базе данных

Для event time processing используйте source.ts_ms (время в базе), не payload.ts_ms (время обработки).

Анатомия события

Структура CDC Event
CDC Event
schema
{ type, fields }
payload
before
состояние ДО
after
состояние ПОСЛЕ
op
r | c | u | d
ts_ms
timestamp Debezium
source
db
schema
table
lsn
txId

Итоги модуля 1

Поздравляем! Вы завершили первый модуль курса. Вот что мы изучили:

Урок 1: CDC Fundamentals

  • Что такое Change Data Capture
  • Преимущества log-based CDC над polling

Урок 2: Debezium Architecture

  • Компоненты Debezium
  • Роль Kafka Connect

Урок 3: Lab Setup

  • Docker Compose окружение
  • Сервисы: PostgreSQL, Kafka, Connect, JupyterLab

Урок 4: First Connector

  • Развертывание PostgreSQL коннектора
  • Kafka Connect REST API
  • Replication slots

Урок 5: Python Consumer

  • confluent-kafka библиотека
  • Consumer groups и offset tracking
  • kafka:9092 vs localhost:9092

Урок 6: Event Structure (этот урок)

  • Envelope формат Debezium
  • Типы операций: r, c, u, d
  • Функция parse_cdc_event
  • Snapshot vs streaming

Что дальше?

В Модуле 2 мы глубже погрузимся в PostgreSQL:

  • Конфигурация WAL для CDC (wal_level, max_replication_slots)
  • Replication slots — мониторинг и управление
  • Особенности работы с Amazon Aurora
  • TOAST columns и большие данные
  • Schema changes во время CDC

Теперь у вас есть прочный фундамент для работы с CDC и Debezium!

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что определяет поле op в payload Debezium CDC события?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс