Перейти к содержанию
Learning Platform
Средний
35 минут
Python Pandas DataFrame CDC Data Analysis

Требуемые знания:

  • module-5/01-advanced-python-consumer

Интеграция CDC событий с Pandas

После того как мы научились создавать production-ready consumer, следующий шаг — преобразование потока CDC событий в удобный для анализа формат. Pandas DataFrame — стандартный инструмент для табличной обработки данных в Python.

Зачем Pandas для CDC данных?

Типичные задачи data engineering с CDC:

1. Batch Analysis

Накопить N событий из Kafka, преобразовать в DataFrame, выполнить агрегации:

  • Сколько операций каждого типа (INSERT/UPDATE/DELETE)?
  • Какие таблицы изменяются чаще всего?
  • Распределение событий по времени

2. Exploratory Data Analysis (EDA)

Исследование структуры и паттернов в CDC данных:

  • Какие поля чаще всего изменяются?
  • Есть ли корреляции между изменениями разных таблиц?
  • Визуализация временных паттернов

3. ETL Transformations

Преобразование CDC событий перед загрузкой в data warehouse:

  • Фильтрация ненужных полей
  • Расчет derived metrics (time since last update, change rate)
  • Join с dimension tables

4. Feature Engineering

Подготовка real-time features для ML моделей:

  • Rolling window aggregations (30-day order count)
  • Recency features (days since last purchase)
  • Change detection (price increased > 10%)

Pandas предоставляет:

  • Удобный API для всех этих операций
  • Высокую производительность (NumPy backend)
  • Интеграцию с visualization (matplotlib, seaborn)
  • Arrow-совместимость для zero-copy interchange

Pandas 3.0: Важные изменения

⚠️ КРИТИЧЕСКОЕ ОБНОВЛЕНИЕ

Pandas 3.0.0 вышел 21 января 2026 с breaking changes, которые влияют на обработку CDC данных.

В лабораторном окружении курса установлен pandas 3.0.0+. Если вы работаете с более старым кодом или примерами из интернета (до 2026 года), код может не работать.

Изменение 1: Строковый тип данных

Раньше (pandas < 3.0):

df['name'].dtype  # dtype('O')  - object dtype

Теперь (pandas 3.0+):

df['name'].dtype  # StringDtype  - str dtype

Влияние на CDC:

  • Проверки df['column'].dtype == 'object' больше не работают
  • Используйте df['column'].dtype == 'str' для строковых колонок
  • CDC поля с текстом (email, name, status) теперь имеют тип str

Изменение 2: Copy-on-Write (CoW)

Раньше (pandas < 3.0):

# Chained assignment работал
df[df['total'] > 100]['status'] = 'high-value'

Теперь (pandas 3.0+):

# ❌ Chained assignment НЕ РАБОТАЕТ (silent failure)
df[df['total'] > 100]['status'] = 'high-value'

# ✅ Правильный способ
df.loc[df['total'] > 100, 'status'] = 'high-value'

Влияние на CDC:

  • Используйте только .loc[] для модификации DataFrame
  • Chained assignment теперь создает view, а не copy — изменения не применяются к исходному DataFrame
  • Преимущество: меньше копирований, выше производительность

Изменение 3: Datetime Resolution

Раньше (pandas < 3.0):

# Nanosecond resolution (ns)
pd.to_datetime(df['ts_ms'], unit='ms')  # dtype: datetime64[ns]

Теперь (pandas 3.0+):

# Microsecond resolution (us) by default
pd.to_datetime(df['ts_ms'], unit='ms')  # dtype: datetime64[us]

Влияние на CDC:

  • CDC timestamp поле ts_ms (milliseconds) корректно обрабатывается
  • Если код полагается на nanosecond precision, используйте .as_unit('ns')
  • Для большинства CDC задач microsecond precision достаточно

Сравнительная таблица

АспектPandas < 3.0Pandas 3.0+
Строковый dtypeobjectstr (StringDtype)
Модификация DataFrameChained assignment работалТолько .loc[]
Datetime resolutionNanosecond (ns)Microsecond (us)
Copy behaviorCopy by defaultCopy-on-Write (lazy)
PerformanceBaseline10-30% быстрее (CoW)

Рекомендация: В этом курсе мы сразу учим pandas 3.0 паттернам. Если встречаете старый код, обновите его.

Проверка знаний
Почему Copy-on-Write в pandas 3.0 делает chained assignment (df[condition]["col"] = value) опасным? Что произойдёт с данными?
Ответ
В pandas 3.0 Copy-on-Write включён по умолчанию. При chained assignment df[condition] возвращает view (не copy), и присваивание ["col"] = value создаёт новый объект, не связанный с исходным DataFrame. В результате изменение молча не применяется -- данные не обновляются, ошибка не возникает. Правильный подход: использовать df.loc[condition, "col"] = value, который модифицирует DataFrame напрямую. Это частая причина багов при миграции с pandas 2.x на 3.0.

Структура Debezium CDC события

Прежде чем преобразовывать в DataFrame, вспомним структуру CDC события:

Структура Debezium CDC события

Hierarchical envelope формат с полями before, after, op, ts_ms, source

CDC Event JSON
schema(описание структуры)
payload(данные события)
Поля payload
before: {...}
id, customer_id, total, ...
after: {...}
id, customer_id, total, ...
op: 'c'|'u'|'d'|'r'
c=INSERTu=UPDATEd=DELETEr=READ
ts_ms: 1738425600000
(epoch millis)
source: {...}
db, table, lsn, connector, ...
Критически важно для Pandas обработки:
  • INSERT (op='c'): before=null, используйте after для данных
  • UPDATE (op='u'): оба поля заполнены, сравните для diff
  • DELETE (op='d'): after=null, используйте before для данных!
  • READ (op='r'): snapshot data, before=null, используйте after

Ключевые поля для DataFrame

ПолеТипОписание
payload.opStringТип операции: 'c' (create), 'u' (update), 'd' (delete), 'r' (snapshot read)
payload.beforeObjectСостояние строки до изменения (null для INSERT)
payload.afterObjectСостояние строки после изменения (null для DELETE)
payload.ts_msIntegerTimestamp события в миллисекундах (epoch)
payload.source.dbStringИмя базы данных
payload.source.tableStringИмя таблицы

Операции и наличие before/after

ОперацияopbeforeafterКогда
Snapshot'r'nullНачальная загрузка существующих данных
INSERT'c'nullНовая запись добавлена
UPDATE'u'Запись изменена
DELETE'd'nullЗапись удалена

Критически важно: DELETE события имеют after=null, используйте поле before для получения данных!


Простое преобразование в DataFrame

Паттерн: Manual Flattening

import pandas as pd
import json
from confluent_kafka import Consumer

def cdc_events_to_dataframe(messages):
    """
    Преобразование списка CDC сообщений в Pandas DataFrame.

    Args:
        messages: Список confluent_kafka.Message объектов

    Returns:
        pd.DataFrame с колонками:
            - op: тип операции (c, u, d, r)
            - ts: datetime события
            - source_db: имя базы данных
            - source_table: имя таблицы
            - before_*: поля before (с префиксом)
            - after_*: поля after (с префиксом)
    """
    records = []

    for msg in messages:
        # Парсинг JSON
        event = json.loads(msg.value().decode('utf-8'))
        payload = event.get('payload', {})

        # Базовые поля
        record = {
            'op': payload.get('op'),
            'ts_ms': payload.get('ts_ms'),
            'source_db': payload.get('source', {}).get('db'),
            'source_table': payload.get('source', {}).get('table'),
        }

        # Добавление before полей с префиксом
        before = payload.get('before')
        if before is not None:
            for key, val in before.items():
                record[f'before_{key}'] = val

        # Добавление after полей с префиксом
        after = payload.get('after')
        if after is not None:
            for key, val in after.items():
                record[f'after_{key}'] = val

        records.append(record)

    # Создание DataFrame
    df = pd.DataFrame(records)

    # Преобразование timestamp в datetime (pandas 3.0 = microsecond resolution)
    if 'ts_ms' in df.columns:
        df['ts'] = pd.to_datetime(df['ts_ms'], unit='ms')

    return df

Пример использования

# Настройка consumer
config = {
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'pandas-integration-lab',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False
}

consumer = Consumer(config)
consumer.subscribe(['dbserver1.public.orders'])

# Сбор 100 сообщений
messages = []
while len(messages) < 100:
    msg = consumer.poll(timeout=1.0)
    if msg and not msg.error():
        messages.append(msg)

consumer.close()

# Преобразование в DataFrame
df = cdc_events_to_dataframe(messages)

print(df.head())
print(f"\nShape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")

Вывод:

   op        ts_ms  source_db source_table  before_id  before_customer_id  ...
0   r  1738425600  inventory       orders        NaN                 NaN  ...
1   c  1738425601  inventory       orders        NaN                 NaN  ...
2   u  1738425602  inventory       orders       42.0                12.0  ...
3   d  1738425603  inventory       orders       43.0                15.0  ...

Shape: (100, 15)
Columns: ['op', 'ts_ms', 'source_db', 'source_table', 'before_id', ..., 'after_total']

json_normalize для сложных структур

Для CDC событий с глубокой вложенностью или сложными схемами используйте pd.json_normalize().

Паттерн: json_normalize

import pandas as pd
import json

def cdc_events_to_dataframe_normalized(messages):
    """
    Преобразование CDC событий в DataFrame с автоматическим flatten через json_normalize.

    Подходит для сложных схем с nested objects и arrays.
    """
    events = [json.loads(msg.value().decode('utf-8')) for msg in messages]

    # Normalize на уровне payload
    df = pd.json_normalize(
        events,
        sep='_',       # Separator для nested полей: source.db → source_db
        max_level=3    # Максимальная глубина flatten (обычно 2-3 достаточно)
    )

    # Преобразование timestamp
    if 'payload_ts_ms' in df.columns:
        df['ts'] = pd.to_datetime(df['payload_ts_ms'], unit='ms')

    return df

Параметры json_normalize:

ПараметрЗначениеОписание
sep'_'Разделитель для nested полей (source.dbsource_db)
max_level2-3Глубина flatten. CDC обычно 2-3 уровня (payload → before/after → fields)
record_pathNoneПуть к массиву для unnest (используется редко для CDC)

Когда использовать json_normalize?

СценарийРекомендация
Простая схема (orders: id, customer_id, total)Manual flattening (больше контроля)
Сложная схема (nested JSON fields, arrays, 4+ levels)json_normalize (автоматический flatten)
Изменяющаяся схема (добавляются новые поля)json_normalize (автоматически добавляет колонки)
Performance-критично (millions events)Manual flattening (меньше overhead)

Рекомендация для CDC: Начните с manual flattening (больше контроля). Переходите на json_normalize при усложнении схемы.


ВАЖНО: kafka:9092 vs localhost:9092

Когда использовать какой адрес?

Где запускаете кодАдрес Kafka
Внутри Docker (JupyterLab, другие контейнеры)kafka:9092
На вашем компьютере (терминал Mac/Windows/Linux)localhost:9092

В примерах этого урока используется kafka:9092 (JupyterLab).


Обработка разных типов операций

Критическая ошибка: DELETE events

# ❌ НЕПРАВИЛЬНО - упадет на DELETE операциях
df['customer_id'] = df.apply(lambda row: row['payload']['after']['customer_id'], axis=1)
# KeyError: 'after' is None for DELETE events

# ✅ ПРАВИЛЬНО - использовать after для c/u, before для d
def extract_customer_id(row):
    payload = row['payload']
    op = payload.get('op')

    if op == 'd':
        # DELETE - используем before
        return payload.get('before', {}).get('customer_id')
    else:
        # CREATE, UPDATE, READ - используем after
        return payload.get('after', {}).get('customer_id')

df['customer_id'] = df.apply(extract_customer_id, axis=1)

Фильтрация по типу операции

# Только INSERT операции
inserts_df = df[df['op'] == 'c'].copy()

# Только UPDATE операции
updates_df = df[df['op'] == 'u'].copy()

# Только DELETE операции
deletes_df = df[df['op'] == 'd'].copy()

# CREATE + UPDATE (активные записи, исключая DELETE)
active_df = df[df['op'].isin(['c', 'u', 'r'])].copy()

print(f"Inserts: {len(inserts_df)}")
print(f"Updates: {len(updates_df)}")
print(f"Deletes: {len(deletes_df)}")

Вычисление изменений (UPDATE diff)

# Для UPDATE операций - вычислить, что изменилось
updates_df = df[df['op'] == 'u'].copy()

# Pandas 3.0: используем .loc[] для модификации
updates_df.loc[:, 'total_change'] = (
    updates_df['after_total'] - updates_df['before_total']
)

updates_df.loc[:, 'status_changed'] = (
    updates_df['after_status'] != updates_df['before_status']
)

# Найти UPDATE операции с изменением total > 100
significant_updates = updates_df[updates_df['total_change'].abs() > 100]

print(f"Significant updates: {len(significant_updates)}")
print(significant_updates[['after_id', 'before_total', 'after_total', 'total_change']])
Проверка знаний
Почему при преобразовании CDC событий в DataFrame нельзя всегда использовать поле after для извлечения данных? Какой тип операции это сломает?
Ответ
DELETE операции (op="d") имеют after=null, потому что после удаления записи нет состояния "после". Данные удалённой строки находятся в поле before. Если код извлекает данные только из after, DELETE события приведут к KeyError или NullPointerException. Правильный подход: для операций c (create), u (update), r (read/snapshot) использовать after, а для d (delete) -- before. Это критическая особенность Debezium envelope, которую необходимо учитывать в любом CDC-to-DataFrame pipeline.

Лабораторная работа: CDC Analytics

Создайте notebook, который собирает CDC события, строит DataFrame и выполняет аналитику.

Задание

  1. Откройте JupyterLab: http://localhost:8888
  2. Создайте новый notebook: module5/02_pandas_integration_lab.ipynb
  3. Реализуйте:
    • Consumer для сбора 100 CDC событий
    • Преобразование в DataFrame
    • Анализ: события по типам операций
    • Анализ: события в минуту (time-based)
  4. (Опционально) Визуализация с matplotlib

Решение

# ==============================================================================
# ЧАСТЬ 1: Сбор CDC событий
# ==============================================================================
from confluent_kafka import Consumer
import pandas as pd
import json

config = {
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'pandas-analytics-lab',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False
}

consumer = Consumer(config)
consumer.subscribe(['dbserver1.public.orders'])

print("Сбор 100 CDC событий...")
messages = []
while len(messages) < 100:
    msg = consumer.poll(timeout=1.0)
    if msg is None:
        continue
    if msg.error():
        print(f"Error: {msg.error()}")
        continue
    messages.append(msg)
    if len(messages) % 10 == 0:
        print(f"  Собрано: {len(messages)}/100")

consumer.close()
print(f"✅ Собрано {len(messages)} сообщений\n")

# ==============================================================================
# ЧАСТЬ 2: Преобразование в DataFrame
# ==============================================================================
def cdc_events_to_dataframe(messages):
    """Преобразование CDC сообщений в DataFrame."""
    records = []

    for msg in messages:
        event = json.loads(msg.value().decode('utf-8'))
        payload = event.get('payload', {})

        record = {
            'op': payload.get('op'),
            'ts_ms': payload.get('ts_ms'),
            'source_db': payload.get('source', {}).get('db'),
            'source_table': payload.get('source', {}).get('table'),
        }

        # Before fields
        before = payload.get('before')
        if before is not None:
            for key, val in before.items():
                record[f'before_{key}'] = val

        # After fields
        after = payload.get('after')
        if after is not None:
            for key, val in after.items():
                record[f'after_{key}'] = val

        records.append(record)

    df = pd.DataFrame(records)

    # Convert timestamp
    if 'ts_ms' in df.columns:
        df['ts'] = pd.to_datetime(df['ts_ms'], unit='ms')

    return df

df = cdc_events_to_dataframe(messages)
print(f"DataFrame shape: {df.shape}")
print(f"Columns: {df.columns.tolist()}\n")
print(df.head())

# ==============================================================================
# ЧАСТЬ 3: Анализ по типам операций
# ==============================================================================
print("\n" + "=" * 70)
print("АНАЛИЗ: События по типам операций")
print("=" * 70)

op_counts = df['op'].value_counts()
op_names = {
    'r': 'Snapshot (READ)',
    'c': 'CREATE (INSERT)',
    'u': 'UPDATE',
    'd': 'DELETE'
}

for op, count in op_counts.items():
    op_name = op_names.get(op, f'Unknown ({op})')
    percentage = (count / len(df)) * 100
    print(f"{op_name:20} {count:5} ({percentage:5.1f}%)")

# ==============================================================================
# ЧАСТЬ 4: Анализ по времени
# ==============================================================================
print("\n" + "=" * 70)
print("АНАЛИЗ: События в минуту")
print("=" * 70)

# Группировка по минутам
df['minute'] = df['ts'].dt.floor('1min')
events_per_minute = df.groupby('minute').size()

print(events_per_minute)

# Статистика
print(f"\nСреднее событий в минуту: {events_per_minute.mean():.1f}")
print(f"Максимум событий в минуту: {events_per_minute.max()}")
print(f"Минимум событий в минуту: {events_per_minute.min()}")

# ==============================================================================
# ЧАСТЬ 5 (Опционально): Визуализация
# ==============================================================================
import matplotlib.pyplot as plt

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# График 1: События по типам операций
op_counts_renamed = op_counts.rename(index=op_names)
op_counts_renamed.plot(kind='bar', ax=axes[0], color='steelblue')
axes[0].set_title('События по типам операций')
axes[0].set_xlabel('Тип операции')
axes[0].set_ylabel('Количество')
axes[0].tick_params(axis='x', rotation=45)

# График 2: События по времени
events_per_minute.plot(kind='line', ax=axes[1], marker='o', color='coral')
axes[1].set_title('События в минуту')
axes[1].set_xlabel('Время')
axes[1].set_ylabel('Количество событий')
axes[1].tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

print("\n✅ Анализ завершен!")

Генерация тестовых данных

Перед запуском notebook сгенерируйте CDC события в терминале:

cd labs/

# Генерация INSERT событий
docker compose exec postgres psql -U postgres -d inventory -c "
INSERT INTO orders (customer_id, product_id, quantity)
SELECT
    (random() * 10 + 1)::int,
    (random() * 5 + 1)::int,
    (random() * 10 + 1)::int
FROM generate_series(1, 50);
"

# Генерация UPDATE событий
docker compose exec postgres psql -U postgres -d inventory -c "
UPDATE orders
SET quantity = quantity + 1
WHERE id IN (SELECT id FROM orders ORDER BY RANDOM() LIMIT 20);
"

# Генерация DELETE событий
docker compose exec postgres psql -U postgres -d inventory -c "
DELETE FROM orders
WHERE id IN (SELECT id FROM orders ORDER BY RANDOM() LIMIT 10);
"

Ожидаемый результат

АНАЛИЗ: События по типам операций
======================================================================
Snapshot (READ)      30 ( 30.0%)
CREATE (INSERT)      50 ( 50.0%)
UPDATE               15 ( 15.0%)
DELETE                5 (  5.0%)

АНАЛИЗ: События в минуту
======================================================================
minute
2026-02-01 07:10:00    12
2026-02-01 07:11:00    45
2026-02-01 07:12:00    43

Среднее событий в минуту: 33.3
Максимум событий в минуту: 45
Минимум событий в минуту: 12

Что мы узнали

  1. Pandas 3.0 breaking changes: str dtype вместо object, Copy-on-Write требует .loc[], microsecond datetime resolution
  2. Manual flattening: Контролируемое преобразование CDC событий в DataFrame с before/after префиксами
  3. json_normalize: Автоматический flatten для сложных nested структур
  4. Операции CDC: DELETE события имеют after=null, используйте before для данных
  5. Analytics patterns: Группировка по типам операций, time-based analysis, change detection

Что дальше?

В следующих уроках Module 5 мы перейдем к stream processing с PyFlink и PySpark для real-time обработки CDC данных с stateful aggregations и windowing.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какое критическое изменение в Pandas 3.0 влияет на обработку CDC данных при модификации DataFrame?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс