Требуемые знания:
- module-5/01-advanced-python-consumer
Интеграция CDC событий с Pandas
После того как мы научились создавать production-ready consumer, следующий шаг — преобразование потока CDC событий в удобный для анализа формат. Pandas DataFrame — стандартный инструмент для табличной обработки данных в Python.
Зачем Pandas для CDC данных?
Типичные задачи data engineering с CDC:
1. Batch Analysis
Накопить N событий из Kafka, преобразовать в DataFrame, выполнить агрегации:
- Сколько операций каждого типа (INSERT/UPDATE/DELETE)?
- Какие таблицы изменяются чаще всего?
- Распределение событий по времени
2. Exploratory Data Analysis (EDA)
Исследование структуры и паттернов в CDC данных:
- Какие поля чаще всего изменяются?
- Есть ли корреляции между изменениями разных таблиц?
- Визуализация временных паттернов
3. ETL Transformations
Преобразование CDC событий перед загрузкой в data warehouse:
- Фильтрация ненужных полей
- Расчет derived metrics (time since last update, change rate)
- Join с dimension tables
4. Feature Engineering
Подготовка real-time features для ML моделей:
- Rolling window aggregations (30-day order count)
- Recency features (days since last purchase)
- Change detection (price increased > 10%)
Pandas предоставляет:
- Удобный API для всех этих операций
- Высокую производительность (NumPy backend)
- Интеграцию с visualization (matplotlib, seaborn)
- Arrow-совместимость для zero-copy interchange
Pandas 3.0: Важные изменения
⚠️ КРИТИЧЕСКОЕ ОБНОВЛЕНИЕ
Pandas 3.0.0 вышел 21 января 2026 с breaking changes, которые влияют на обработку CDC данных.
В лабораторном окружении курса установлен pandas 3.0.0+. Если вы работаете с более старым кодом или примерами из интернета (до 2026 года), код может не работать.
Изменение 1: Строковый тип данных
Раньше (pandas < 3.0):
df['name'].dtype # dtype('O') - object dtype
Теперь (pandas 3.0+):
df['name'].dtype # StringDtype - str dtype
Влияние на CDC:
- Проверки
df['column'].dtype == 'object'больше не работают - Используйте
df['column'].dtype == 'str'для строковых колонок - CDC поля с текстом (
email,name,status) теперь имеют типstr
Изменение 2: Copy-on-Write (CoW)
Раньше (pandas < 3.0):
# Chained assignment работал
df[df['total'] > 100]['status'] = 'high-value'
Теперь (pandas 3.0+):
# ❌ Chained assignment НЕ РАБОТАЕТ (silent failure)
df[df['total'] > 100]['status'] = 'high-value'
# ✅ Правильный способ
df.loc[df['total'] > 100, 'status'] = 'high-value'
Влияние на CDC:
- Используйте только
.loc[]для модификации DataFrame - Chained assignment теперь создает view, а не copy — изменения не применяются к исходному DataFrame
- Преимущество: меньше копирований, выше производительность
Изменение 3: Datetime Resolution
Раньше (pandas < 3.0):
# Nanosecond resolution (ns)
pd.to_datetime(df['ts_ms'], unit='ms') # dtype: datetime64[ns]
Теперь (pandas 3.0+):
# Microsecond resolution (us) by default
pd.to_datetime(df['ts_ms'], unit='ms') # dtype: datetime64[us]
Влияние на CDC:
- CDC timestamp поле
ts_ms(milliseconds) корректно обрабатывается - Если код полагается на nanosecond precision, используйте
.as_unit('ns') - Для большинства CDC задач microsecond precision достаточно
Сравнительная таблица
| Аспект | Pandas < 3.0 | Pandas 3.0+ |
|---|---|---|
| Строковый dtype | object | str (StringDtype) |
| Модификация DataFrame | Chained assignment работал | Только .loc[] |
| Datetime resolution | Nanosecond (ns) | Microsecond (us) |
| Copy behavior | Copy by default | Copy-on-Write (lazy) |
| Performance | Baseline | 10-30% быстрее (CoW) |
Рекомендация: В этом курсе мы сразу учим pandas 3.0 паттернам. Если встречаете старый код, обновите его.
Проверка знанийПочему Copy-on-Write в pandas 3.0 делает chained assignment (df[condition]["col"] = value) опасным? Что произойдёт с данными?
Структура Debezium CDC события
Прежде чем преобразовывать в DataFrame, вспомним структуру CDC события:
Hierarchical envelope формат с полями before, after, op, ts_ms, source
- INSERT (op='c'): before=null, используйте after для данных
- UPDATE (op='u'): оба поля заполнены, сравните для diff
- DELETE (op='d'): after=null, используйте before для данных!
- READ (op='r'): snapshot data, before=null, используйте after
Ключевые поля для DataFrame
| Поле | Тип | Описание |
|---|---|---|
payload.op | String | Тип операции: 'c' (create), 'u' (update), 'd' (delete), 'r' (snapshot read) |
payload.before | Object | Состояние строки до изменения (null для INSERT) |
payload.after | Object | Состояние строки после изменения (null для DELETE) |
payload.ts_ms | Integer | Timestamp события в миллисекундах (epoch) |
payload.source.db | String | Имя базы данных |
payload.source.table | String | Имя таблицы |
Операции и наличие before/after
| Операция | op | before | after | Когда |
|---|---|---|---|---|
| Snapshot | 'r' | null | ✅ | Начальная загрузка существующих данных |
| INSERT | 'c' | null | ✅ | Новая запись добавлена |
| UPDATE | 'u' | ✅ | ✅ | Запись изменена |
| DELETE | 'd' | ✅ | null | Запись удалена |
Критически важно: DELETE события имеют after=null, используйте поле before для получения данных!
Простое преобразование в DataFrame
Паттерн: Manual Flattening
import pandas as pd
import json
from confluent_kafka import Consumer
def cdc_events_to_dataframe(messages):
"""
Преобразование списка CDC сообщений в Pandas DataFrame.
Args:
messages: Список confluent_kafka.Message объектов
Returns:
pd.DataFrame с колонками:
- op: тип операции (c, u, d, r)
- ts: datetime события
- source_db: имя базы данных
- source_table: имя таблицы
- before_*: поля before (с префиксом)
- after_*: поля after (с префиксом)
"""
records = []
for msg in messages:
# Парсинг JSON
event = json.loads(msg.value().decode('utf-8'))
payload = event.get('payload', {})
# Базовые поля
record = {
'op': payload.get('op'),
'ts_ms': payload.get('ts_ms'),
'source_db': payload.get('source', {}).get('db'),
'source_table': payload.get('source', {}).get('table'),
}
# Добавление before полей с префиксом
before = payload.get('before')
if before is not None:
for key, val in before.items():
record[f'before_{key}'] = val
# Добавление after полей с префиксом
after = payload.get('after')
if after is not None:
for key, val in after.items():
record[f'after_{key}'] = val
records.append(record)
# Создание DataFrame
df = pd.DataFrame(records)
# Преобразование timestamp в datetime (pandas 3.0 = microsecond resolution)
if 'ts_ms' in df.columns:
df['ts'] = pd.to_datetime(df['ts_ms'], unit='ms')
return df
Пример использования
# Настройка consumer
config = {
'bootstrap.servers': 'kafka:9092',
'group.id': 'pandas-integration-lab',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
}
consumer = Consumer(config)
consumer.subscribe(['dbserver1.public.orders'])
# Сбор 100 сообщений
messages = []
while len(messages) < 100:
msg = consumer.poll(timeout=1.0)
if msg and not msg.error():
messages.append(msg)
consumer.close()
# Преобразование в DataFrame
df = cdc_events_to_dataframe(messages)
print(df.head())
print(f"\nShape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")
Вывод:
op ts_ms source_db source_table before_id before_customer_id ...
0 r 1738425600 inventory orders NaN NaN ...
1 c 1738425601 inventory orders NaN NaN ...
2 u 1738425602 inventory orders 42.0 12.0 ...
3 d 1738425603 inventory orders 43.0 15.0 ...
Shape: (100, 15)
Columns: ['op', 'ts_ms', 'source_db', 'source_table', 'before_id', ..., 'after_total']
json_normalize для сложных структур
Для CDC событий с глубокой вложенностью или сложными схемами используйте pd.json_normalize().
Паттерн: json_normalize
import pandas as pd
import json
def cdc_events_to_dataframe_normalized(messages):
"""
Преобразование CDC событий в DataFrame с автоматическим flatten через json_normalize.
Подходит для сложных схем с nested objects и arrays.
"""
events = [json.loads(msg.value().decode('utf-8')) for msg in messages]
# Normalize на уровне payload
df = pd.json_normalize(
events,
sep='_', # Separator для nested полей: source.db → source_db
max_level=3 # Максимальная глубина flatten (обычно 2-3 достаточно)
)
# Преобразование timestamp
if 'payload_ts_ms' in df.columns:
df['ts'] = pd.to_datetime(df['payload_ts_ms'], unit='ms')
return df
Параметры json_normalize:
| Параметр | Значение | Описание |
|---|---|---|
sep | '_' | Разделитель для nested полей (source.db → source_db) |
max_level | 2-3 | Глубина flatten. CDC обычно 2-3 уровня (payload → before/after → fields) |
record_path | None | Путь к массиву для unnest (используется редко для CDC) |
Когда использовать json_normalize?
| Сценарий | Рекомендация |
|---|---|
| Простая схема (orders: id, customer_id, total) | Manual flattening (больше контроля) |
| Сложная схема (nested JSON fields, arrays, 4+ levels) | json_normalize (автоматический flatten) |
| Изменяющаяся схема (добавляются новые поля) | json_normalize (автоматически добавляет колонки) |
| Performance-критично (millions events) | Manual flattening (меньше overhead) |
Рекомендация для CDC: Начните с manual flattening (больше контроля). Переходите на json_normalize при усложнении схемы.
ВАЖНО: kafka:9092 vs localhost:9092
Когда использовать какой адрес?
Где запускаете код Адрес Kafka Внутри Docker (JupyterLab, другие контейнеры) kafka:9092На вашем компьютере (терминал Mac/Windows/Linux) localhost:9092В примерах этого урока используется
kafka:9092(JupyterLab).
Обработка разных типов операций
Критическая ошибка: DELETE events
# ❌ НЕПРАВИЛЬНО - упадет на DELETE операциях
df['customer_id'] = df.apply(lambda row: row['payload']['after']['customer_id'], axis=1)
# KeyError: 'after' is None for DELETE events
# ✅ ПРАВИЛЬНО - использовать after для c/u, before для d
def extract_customer_id(row):
payload = row['payload']
op = payload.get('op')
if op == 'd':
# DELETE - используем before
return payload.get('before', {}).get('customer_id')
else:
# CREATE, UPDATE, READ - используем after
return payload.get('after', {}).get('customer_id')
df['customer_id'] = df.apply(extract_customer_id, axis=1)
Фильтрация по типу операции
# Только INSERT операции
inserts_df = df[df['op'] == 'c'].copy()
# Только UPDATE операции
updates_df = df[df['op'] == 'u'].copy()
# Только DELETE операции
deletes_df = df[df['op'] == 'd'].copy()
# CREATE + UPDATE (активные записи, исключая DELETE)
active_df = df[df['op'].isin(['c', 'u', 'r'])].copy()
print(f"Inserts: {len(inserts_df)}")
print(f"Updates: {len(updates_df)}")
print(f"Deletes: {len(deletes_df)}")
Вычисление изменений (UPDATE diff)
# Для UPDATE операций - вычислить, что изменилось
updates_df = df[df['op'] == 'u'].copy()
# Pandas 3.0: используем .loc[] для модификации
updates_df.loc[:, 'total_change'] = (
updates_df['after_total'] - updates_df['before_total']
)
updates_df.loc[:, 'status_changed'] = (
updates_df['after_status'] != updates_df['before_status']
)
# Найти UPDATE операции с изменением total > 100
significant_updates = updates_df[updates_df['total_change'].abs() > 100]
print(f"Significant updates: {len(significant_updates)}")
print(significant_updates[['after_id', 'before_total', 'after_total', 'total_change']])
Проверка знанийПочему при преобразовании CDC событий в DataFrame нельзя всегда использовать поле after для извлечения данных? Какой тип операции это сломает?
Лабораторная работа: CDC Analytics
Создайте notebook, который собирает CDC события, строит DataFrame и выполняет аналитику.
Задание
- Откройте JupyterLab: http://localhost:8888
- Создайте новый notebook:
module5/02_pandas_integration_lab.ipynb - Реализуйте:
- Consumer для сбора 100 CDC событий
- Преобразование в DataFrame
- Анализ: события по типам операций
- Анализ: события в минуту (time-based)
- (Опционально) Визуализация с matplotlib
Решение
# ==============================================================================
# ЧАСТЬ 1: Сбор CDC событий
# ==============================================================================
from confluent_kafka import Consumer
import pandas as pd
import json
config = {
'bootstrap.servers': 'kafka:9092',
'group.id': 'pandas-analytics-lab',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
}
consumer = Consumer(config)
consumer.subscribe(['dbserver1.public.orders'])
print("Сбор 100 CDC событий...")
messages = []
while len(messages) < 100:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f"Error: {msg.error()}")
continue
messages.append(msg)
if len(messages) % 10 == 0:
print(f" Собрано: {len(messages)}/100")
consumer.close()
print(f"✅ Собрано {len(messages)} сообщений\n")
# ==============================================================================
# ЧАСТЬ 2: Преобразование в DataFrame
# ==============================================================================
def cdc_events_to_dataframe(messages):
"""Преобразование CDC сообщений в DataFrame."""
records = []
for msg in messages:
event = json.loads(msg.value().decode('utf-8'))
payload = event.get('payload', {})
record = {
'op': payload.get('op'),
'ts_ms': payload.get('ts_ms'),
'source_db': payload.get('source', {}).get('db'),
'source_table': payload.get('source', {}).get('table'),
}
# Before fields
before = payload.get('before')
if before is not None:
for key, val in before.items():
record[f'before_{key}'] = val
# After fields
after = payload.get('after')
if after is not None:
for key, val in after.items():
record[f'after_{key}'] = val
records.append(record)
df = pd.DataFrame(records)
# Convert timestamp
if 'ts_ms' in df.columns:
df['ts'] = pd.to_datetime(df['ts_ms'], unit='ms')
return df
df = cdc_events_to_dataframe(messages)
print(f"DataFrame shape: {df.shape}")
print(f"Columns: {df.columns.tolist()}\n")
print(df.head())
# ==============================================================================
# ЧАСТЬ 3: Анализ по типам операций
# ==============================================================================
print("\n" + "=" * 70)
print("АНАЛИЗ: События по типам операций")
print("=" * 70)
op_counts = df['op'].value_counts()
op_names = {
'r': 'Snapshot (READ)',
'c': 'CREATE (INSERT)',
'u': 'UPDATE',
'd': 'DELETE'
}
for op, count in op_counts.items():
op_name = op_names.get(op, f'Unknown ({op})')
percentage = (count / len(df)) * 100
print(f"{op_name:20} {count:5} ({percentage:5.1f}%)")
# ==============================================================================
# ЧАСТЬ 4: Анализ по времени
# ==============================================================================
print("\n" + "=" * 70)
print("АНАЛИЗ: События в минуту")
print("=" * 70)
# Группировка по минутам
df['minute'] = df['ts'].dt.floor('1min')
events_per_minute = df.groupby('minute').size()
print(events_per_minute)
# Статистика
print(f"\nСреднее событий в минуту: {events_per_minute.mean():.1f}")
print(f"Максимум событий в минуту: {events_per_minute.max()}")
print(f"Минимум событий в минуту: {events_per_minute.min()}")
# ==============================================================================
# ЧАСТЬ 5 (Опционально): Визуализация
# ==============================================================================
import matplotlib.pyplot as plt
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# График 1: События по типам операций
op_counts_renamed = op_counts.rename(index=op_names)
op_counts_renamed.plot(kind='bar', ax=axes[0], color='steelblue')
axes[0].set_title('События по типам операций')
axes[0].set_xlabel('Тип операции')
axes[0].set_ylabel('Количество')
axes[0].tick_params(axis='x', rotation=45)
# График 2: События по времени
events_per_minute.plot(kind='line', ax=axes[1], marker='o', color='coral')
axes[1].set_title('События в минуту')
axes[1].set_xlabel('Время')
axes[1].set_ylabel('Количество событий')
axes[1].tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.show()
print("\n✅ Анализ завершен!")
Генерация тестовых данных
Перед запуском notebook сгенерируйте CDC события в терминале:
cd labs/
# Генерация INSERT событий
docker compose exec postgres psql -U postgres -d inventory -c "
INSERT INTO orders (customer_id, product_id, quantity)
SELECT
(random() * 10 + 1)::int,
(random() * 5 + 1)::int,
(random() * 10 + 1)::int
FROM generate_series(1, 50);
"
# Генерация UPDATE событий
docker compose exec postgres psql -U postgres -d inventory -c "
UPDATE orders
SET quantity = quantity + 1
WHERE id IN (SELECT id FROM orders ORDER BY RANDOM() LIMIT 20);
"
# Генерация DELETE событий
docker compose exec postgres psql -U postgres -d inventory -c "
DELETE FROM orders
WHERE id IN (SELECT id FROM orders ORDER BY RANDOM() LIMIT 10);
"
Ожидаемый результат
АНАЛИЗ: События по типам операций
======================================================================
Snapshot (READ) 30 ( 30.0%)
CREATE (INSERT) 50 ( 50.0%)
UPDATE 15 ( 15.0%)
DELETE 5 ( 5.0%)
АНАЛИЗ: События в минуту
======================================================================
minute
2026-02-01 07:10:00 12
2026-02-01 07:11:00 45
2026-02-01 07:12:00 43
Среднее событий в минуту: 33.3
Максимум событий в минуту: 45
Минимум событий в минуту: 12
Что мы узнали
- Pandas 3.0 breaking changes:
strdtype вместоobject, Copy-on-Write требует.loc[], microsecond datetime resolution - Manual flattening: Контролируемое преобразование CDC событий в DataFrame с before/after префиксами
- json_normalize: Автоматический flatten для сложных nested структур
- Операции CDC: DELETE события имеют
after=null, используйтеbeforeдля данных - Analytics patterns: Группировка по типам операций, time-based analysis, change detection
Что дальше?
В следующих уроках Module 5 мы перейдем к stream processing с PyFlink и PySpark для real-time обработки CDC данных с stateful aggregations и windowing.
Проверьте понимание
Закончили урок?
Отметьте его как пройденный, чтобы отслеживать свой прогресс