Требуемые знания:
- module-1/05-python-consumer
Структура CDC событий Debezium
В этом уроке мы разберем анатомию CDC-событий Debezium и напишем переиспользуемую функцию для их парсинга.
Envelope-формат Debezium
Каждое CDC-событие обернуто в envelope (конверт) — стандартную структуру с метаданными:
{
"schema": {
"type": "struct",
"fields": [ ... ]
},
"payload": {
"before": null,
"after": {
"id": 3,
"name": "Алексей Козлов",
"email": "[email protected]",
"created_at": 1706745600000000
},
"source": {
"version": "2.5.4.Final",
"connector": "postgresql",
"name": "inventory",
"ts_ms": 1706745600123,
"snapshot": "false",
"db": "inventory",
"schema": "public",
"table": "customers",
"txId": 1234,
"lsn": 23456789
},
"op": "c",
"ts_ms": 1706745600456,
"transaction": null
}
}
Разберем ключевые компоненты:
schema (опционально)
Содержит JSON Schema описание структуры payload. Полезно для Schema Registry, но для базовой работы можно игнорировать.
payload — основная часть события
| Поле | Назначение |
|---|---|
before | Состояние строки ДО изменения |
after | Состояние строки ПОСЛЕ изменения |
source | Метаданные об источнике (база, таблица, позиция в логе) |
op | Тип операции: r, c, u, d |
ts_ms | Timestamp обработки Debezium (миллисекунды) |
transaction | Информация о транзакции (опционально) |
Типы операций (op)
| Код | Название | before | after | Когда |
|---|---|---|---|---|
r | read (snapshot) | null | данные | Начальный snapshot |
c | create (INSERT) | null | данные | Вставка новой строки |
u | update (UPDATE) | старые данные | новые данные | Обновление строки |
d | delete (DELETE) | старые данные | null | Удаление строки |
Визуализация
Проверка знанийПри DELETE-операции какое поле содержит данные удаленной строки -- before или after? Почему?
Ключевая ошибка: null в before/after
Самая частая ошибка начинающих — обращение к after без проверки на null:
# НЕПРАВИЛЬНО - упадет на DELETE
customer_id = event['payload']['after']['id'] # after = null для DELETE!
Правильный подход — проверять тип операции:
# ПРАВИЛЬНО - обрабатываем все случаи
payload = event['payload']
op = payload['op']
if op == 'r': # Snapshot
data = payload['after'] # after есть
elif op == 'c': # Create (INSERT)
data = payload['after'] # after есть
elif op == 'u': # Update
old_data = payload['before'] # before есть
new_data = payload['after'] # after есть
elif op == 'd': # Delete
data = payload['before'] # before есть, after = null!
Метаданные источника (source)
Поле source содержит информацию о происхождении события:
"source": {
"version": "2.5.4.Final",
"connector": "postgresql",
"name": "inventory",
"ts_ms": 1706745600123,
"snapshot": "false",
"db": "inventory",
"schema": "public",
"table": "customers",
"txId": 1234,
"lsn": 23456789
}
| Поле | Тип | Описание |
|---|---|---|
version | string | Версия Debezium |
connector | string | Тип коннектора (postgresql, mysql, mongodb…) |
name | string | Логическое имя сервера (topic.prefix) |
ts_ms | number | Timestamp изменения в БД (миллисекунды) |
snapshot | string | "true" = snapshot, "false" = live, "last" = последнее событие snapshot |
db | string | Имя базы данных |
schema | string | Имя схемы (для PostgreSQL) |
table | string | Имя таблицы |
txId | number | ID транзакции в PostgreSQL |
lsn | number | Log Sequence Number — позиция в WAL |
Важные поля для обработки
ts_ms (source) — когда изменение произошло в базе данных. Используйте для event time processing.
snapshot — определяет, это начальная загрузка или live-изменение:
"true"— событие из snapshot"false"— live-изменение"last"— последнее событие snapshot (редко используется)
lsn — позиция в WAL. Используется для точного восстановления позиции при перезапуске.
Проверка знанийВ чем разница между payload.ts_ms и source.ts_ms в Debezium-событии? Какое значение следует использовать для event time processing?
Полная функция парсинга CDC событий
Эта функция корректно обрабатывает все четыре типа операций:
def parse_cdc_event(raw_event):
"""
Парсинг Debezium CDC события в структурированный формат.
Обрабатывает все типы операций:
- 'r' (read/snapshot): Начальная загрузка существующих данных
- 'c' (create): INSERT — новая строка
- 'u' (update): UPDATE — изменение строки
- 'd' (delete): DELETE — удаление строки
Args:
raw_event: dict из json.loads(message.value())
Returns:
dict с полями:
- operation: человекочитаемое имя операции
- operation_code: оригинальный код (r/c/u/d)
- before: состояние до изменения (null для r, c)
- after: состояние после изменения (null для d)
- is_snapshot: True если событие из начального snapshot
- table: полное имя таблицы (schema.table)
- database: имя базы данных
- timestamp_ms: время обработки Debezium
- transaction_id: ID транзакции PostgreSQL
- lsn: Log Sequence Number (позиция в WAL)
"""
payload = raw_event.get('payload', {})
source = payload.get('source', {})
op = payload.get('op')
op_names = {
'r': 'snapshot', # Read - начальный snapshot
'c': 'create', # Create - INSERT
'u': 'update', # Update - UPDATE
'd': 'delete' # Delete - DELETE
}
return {
'operation': op_names.get(op, f'unknown({op})'),
'operation_code': op,
'before': payload.get('before'), # null для r, c
'after': payload.get('after'), # null для d
'is_snapshot': source.get('snapshot') == 'true',
'table': f"{source.get('schema')}.{source.get('table')}",
'database': source.get('db'),
'timestamp_ms': payload.get('ts_ms'),
'transaction_id': source.get('txId'),
'lsn': source.get('lsn')
}
Использование функции парсинга
Полный пример consumer с обработкой всех типов операций:
from confluent_kafka import Consumer
import json
def parse_cdc_event(raw_event):
"""
Парсинг Debezium CDC события в структурированный формат.
Обрабатывает все типы операций:
- 'r' (read/snapshot): Начальная загрузка существующих данных
- 'c' (create): INSERT — новая строка
- 'u' (update): UPDATE — изменение строки
- 'd' (delete): DELETE — удаление строки
Args:
raw_event: dict из json.loads(message.value())
Returns:
dict с полями operation, operation_code, before, after,
is_snapshot, table, database, timestamp_ms, transaction_id, lsn
"""
payload = raw_event.get('payload', {})
source = payload.get('source', {})
op = payload.get('op')
op_names = {
'r': 'snapshot',
'c': 'create',
'u': 'update',
'd': 'delete'
}
return {
'operation': op_names.get(op, f'unknown({op})'),
'operation_code': op,
'before': payload.get('before'),
'after': payload.get('after'),
'is_snapshot': source.get('snapshot') == 'true',
'table': f"{source.get('schema')}.{source.get('table')}",
'database': source.get('db'),
'timestamp_ms': payload.get('ts_ms'),
'transaction_id': source.get('txId'),
'lsn': source.get('lsn')
}
# Конфигурация consumer
# ВАЖНО: kafka:9092 для JupyterLab, localhost:9092 для хоста
config = {
'bootstrap.servers': 'kafka:9092',
'group.id': 'cdc-parser-demo',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True
}
consumer = Consumer(config)
consumer.subscribe(['inventory.public.customers'])
print("CDC Parser запущен...")
print("Обрабатываем события с inventory.public.customers\n")
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f"Error: {msg.error()}")
continue
try:
raw = json.loads(msg.value().decode('utf-8'))
event = parse_cdc_event(raw)
print(f"[{event['operation'].upper()}] {event['table']}")
# Обработка каждого типа операции
op = event['operation_code']
if op == 'r': # Snapshot (начальная загрузка)
print(f" Snapshot данные: {event['after']}")
print(f" (Это существующая запись при старте коннектора)")
elif op == 'c': # Create (INSERT)
print(f" Новая запись: {event['after']}")
elif op == 'u': # Update
print(f" Было: {event['before']}")
print(f" Стало: {event['after']}")
elif op == 'd': # Delete
print(f" Удалено: {event['before']}")
else:
print(f" Неизвестная операция: {op}")
# Дополнительная информация
if event['is_snapshot']:
print(f" [SNAPSHOT]")
print(f" Transaction ID: {event['transaction_id']}, LSN: {event['lsn']}")
print()
except (json.JSONDecodeError, UnicodeDecodeError) as e:
print(f"Ошибка декодирования: {e}")
continue
except KeyboardInterrupt:
print("\nОстановка...")
finally:
consumer.close()
print("Consumer закрыт.")
Различие snapshot и streaming событий
При первом запуске коннектора Debezium выполняет snapshot — чтение всех существующих данных:
[SNAPSHOT] public.customers
Snapshot данные: {'id': 1, 'name': 'Иван Петров', ...}
(Это существующая запись при старте коннектора)
[SNAPSHOT]
Transaction ID: None, LSN: None
[SNAPSHOT] public.customers
Snapshot данные: {'id': 2, 'name': 'Мария Сидорова', ...}
(Это существующая запись при старте коннектора)
[SNAPSHOT]
Transaction ID: None, LSN: None
После завершения snapshot события становятся streaming (live):
[CREATE] public.customers
Новая запись: {'id': 3, 'name': 'Алексей Козлов', ...}
Transaction ID: 1234, LSN: 23456789
Ключевое отличие:
- Snapshot события:
op='r',source.snapshot='true', нетtxId/lsn - Streaming события:
op=c/u/d,source.snapshot='false', естьtxId/lsn
Зачем это различать?
В data pipelines часто нужно:
- Обрабатывать snapshot отдельно — например, bulk load в data warehouse
- Фильтровать snapshot — если нужны только новые изменения
- Дедупликация — snapshot может пересекаться с уже загруженными данными
Работа с timestamps
Debezium использует миллисекунды для timestamps:
from datetime import datetime
def format_event_time(event):
"""Форматирование времени события."""
ts_ms = event['timestamp_ms']
if ts_ms is None:
return "N/A (snapshot)"
dt = datetime.fromtimestamp(ts_ms / 1000)
return dt.strftime('%Y-%m-%d %H:%M:%S')
# Использование
event = parse_cdc_event(raw)
print(f"Время события: {format_event_time(event)}")
Два timestamp в событии:
payload.ts_ms— когда Debezium обработал событиеsource.ts_ms— когда изменение произошло в базе данных
Для event time processing используйте source.ts_ms (время в базе), не payload.ts_ms (время обработки).
Анатомия события
Итоги модуля 1
Поздравляем! Вы завершили первый модуль курса. Вот что мы изучили:
Урок 1: CDC Fundamentals
- Что такое Change Data Capture
- Преимущества log-based CDC над polling
Урок 2: Debezium Architecture
- Компоненты Debezium
- Роль Kafka Connect
Урок 3: Lab Setup
- Docker Compose окружение
- Сервисы: PostgreSQL, Kafka, Connect, JupyterLab
Урок 4: First Connector
- Развертывание PostgreSQL коннектора
- Kafka Connect REST API
- Replication slots
Урок 5: Python Consumer
- confluent-kafka библиотека
- Consumer groups и offset tracking
- kafka:9092 vs localhost:9092
Урок 6: Event Structure (этот урок)
- Envelope формат Debezium
- Типы операций: r, c, u, d
- Функция parse_cdc_event
- Snapshot vs streaming
Что дальше?
В Модуле 2 мы глубже погрузимся в PostgreSQL:
- Конфигурация WAL для CDC (wal_level, max_replication_slots)
- Replication slots — мониторинг и управление
- Особенности работы с Amazon Aurora
- TOAST columns и большие данные
- Schema changes во время CDC
Теперь у вас есть прочный фундамент для работы с CDC и Debezium!
Проверьте понимание
Закончили урок?
Отметьте его как пройденный, чтобы отслеживать свой прогресс