Prerequisites:
- module-1/04-first-connector
Потребление CDC событий на Python
В предыдущем уроке мы использовали kafka-console-consumer для просмотра CDC-событий. Теперь пора перейти к production-ready Python коду, который можно использовать в реальных data pipelines.
Почему confluent-kafka, а не kafka-python?
Для работы с Kafka из Python существует два основных клиента:
| Библиотека | Основа | Производительность | Поддержка |
|---|---|---|---|
| confluent-kafka | librdkafka (C) | ~10x быстрее | Официальная Confluent |
| kafka-python | Чистый Python | Медленнее | Community |
Мы используем confluent-kafka потому что:
- Производительность: Построена на librdkafka — высокопроизводительной C-библиотеке
- Официальная поддержка: Разрабатывается Confluent (создатели Kafka)
- Schema Registry: Встроенная интеграция для последующих модулей
- ARM64 native: Начиная с версии 2.13.0+ поддерживает Apple Silicon нативно
В нашем лабораторном окружении confluent-kafka>=2.13.0 уже установлен в JupyterLab контейнере.
ВАЖНО: kafka:9092 vs localhost:9092
Когда использовать какой адрес?
Где запускаете код Адрес Kafka Внутри Docker (JupyterLab, другие контейнеры) kafka:9092На вашем компьютере (терминал Mac/Windows/Linux) localhost:9092В этом уроке мы работаем в JupyterLab (внутри Docker сети), поэтому используем
kafka:9092.Если бы вы запускали Python скрипт на своем компьютере (не в контейнере), нужен был бы
localhost:9092.Это частая причина ошибок подключения — всегда проверяйте, откуда запускается ваш код!
Проверка знанийPython-скрипт запущен в контейнере JupyterLab (внутри Docker-сети) и подключается к kafka по адресу localhost:9092. Почему возникает ошибка подключения?
Запуск JupyterLab
Откройте в браузере: http://localhost:8888
JupyterLab запущен в нашем лабораторном окружении. Библиотека confluent-kafka уже установлена — ничего дополнительно устанавливать не нужно.
Создайте новый notebook: File > New > Notebook (выберите Python 3 kernel).
Базовая конфигурация Consumer
from confluent_kafka import Consumer
import json
# Конфигурация consumer
# ВАЖНО: 'kafka:9092' работает только внутри Docker сети (JupyterLab)!
# Для запуска с хоста используйте 'localhost:9092'
config = {
'bootstrap.servers': 'kafka:9092', # Docker internal hostname
'group.id': 'cdc-tutorial-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True
}
consumer = Consumer(config)
consumer.subscribe(['inventory.public.customers'])
Объяснение конфигурации
| Параметр | Значение | Назначение |
|---|---|---|
bootstrap.servers | kafka:9092 | Адрес Kafka брокера. Используем kafka — имя сервиса в Docker Compose |
group.id | cdc-tutorial-group | Идентификатор consumer group для отслеживания offset |
auto.offset.reset | earliest | Где начать чтение, если offset не найден: earliest = сначала, latest = только новые |
enable.auto.commit | True | Автоматический commit offset после чтения |
О consumer groups
Consumer group — это механизм координации нескольких consumer’ов, читающих из одного топика. Kafka сохраняет offset (позицию чтения) для каждой группы.
- При первом запуске группы
auto.offset.resetопределяет стартовую позицию - При последующих запусках consumer продолжает с сохраненного offset
- Разные группы читают топик независимо
Базовый цикл чтения
print("Ожидание CDC событий... (Ctrl+C или Interrupt kernel для остановки)")
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f"Ошибка: {msg.error()}")
continue
# Парсинг и вывод события
event = json.loads(msg.value().decode('utf-8'))
print(f"Получено событие: {event}")
except KeyboardInterrupt:
print("Остановка...")
finally:
consumer.close()
Полный рабочий пример
Скопируйте весь код ниже в одну ячейку JupyterLab и запустите:
from confluent_kafka import Consumer
import json
# ==============================================================================
# КОНФИГУРАЦИЯ
# ==============================================================================
# ВАЖНО: Выберите правильный адрес!
# - 'kafka:9092' - если код запускается В JupyterLab (внутри Docker)
# - 'localhost:9092' - если код запускается НА ВАШЕМ КОМПЬЮТЕРЕ (не в Docker)
# ==============================================================================
config = {
'bootstrap.servers': 'kafka:9092', # Мы в JupyterLab = внутри Docker
'group.id': 'cdc-tutorial-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True
}
consumer = Consumer(config)
consumer.subscribe(['inventory.public.customers'])
print("=" * 60)
print("CDC Consumer запущен")
print("Топик: inventory.public.customers")
print("Broker: kafka:9092 (Docker internal)")
print("=" * 60)
print("\nОжидание событий...")
print("Вставьте/измените/удалите записи в PostgreSQL для генерации событий")
print("Нажмите Ctrl+C или остановите kernel для завершения\n")
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
try:
event = json.loads(msg.value().decode('utf-8'))
payload = event.get('payload', {})
# Определение типа операции
op = payload.get('op', '?')
op_names = {
'r': 'SNAPSHOT', # Read - начальный snapshot
'c': 'CREATE', # Create - INSERT
'u': 'UPDATE', # Update - UPDATE
'd': 'DELETE' # Delete - DELETE
}
op_name = op_names.get(op, f'UNKNOWN({op})')
# Получение данных (after для create/update, before для delete)
data = payload.get('after') or payload.get('before')
print(f"[{op_name}] {data}")
except (json.JSONDecodeError, UnicodeDecodeError) as e:
print(f"Ошибка декодирования: {e}")
continue
except KeyboardInterrupt:
print("\n\nОстановка consumer...")
finally:
consumer.close()
print("Consumer закрыт.")
Тестирование consumer
Шаг 1: Запустите код в JupyterLab
Скопируйте полный пример выше в ячейку и запустите (Shift+Enter).
Шаг 2: В отдельном терминале выполните SQL
Откройте терминал на вашем компьютере и выполните:
# INSERT - создание новой записи
cd labs/
docker compose exec postgres psql -U postgres -d inventory -c "
INSERT INTO customers (name, email) VALUES ('Тестовый Пользователь', '[email protected]');
"
В JupyterLab вы увидите:
[CREATE] {'id': 4, 'name': 'Тестовый Пользователь', 'email': '[email protected]', 'created_at': ...}
Шаг 3: Протестируйте UPDATE
docker compose exec postgres psql -U postgres -d inventory -c "
UPDATE customers SET name = 'Обновленный Пользователь' WHERE email = '[email protected]';
"
Результат:
[UPDATE] {'id': 4, 'name': 'Обновленный Пользователь', 'email': '[email protected]', 'created_at': ...}
Шаг 4: Протестируйте DELETE
docker compose exec postgres psql -U postgres -d inventory -c "
DELETE FROM customers WHERE email = '[email protected]';
"
Результат:
[DELETE] {'id': 4, 'name': 'Обновленный Пользователь', 'email': '[email protected]', 'created_at': ...}
Поток данных
Разбор poll() timeout
msg = consumer.poll(timeout=1.0)
Параметр timeout определяет, сколько секунд ждать сообщение:
| Значение | Поведение | Когда использовать |
|---|---|---|
timeout=0 | Немедленный возврат (не ждать) | Неблокирующий режим |
timeout=1.0 | Ждать до 1 секунды | Интерактивные демонстрации |
timeout=0.1 | Ждать 100мс | Production high-throughput |
timeout=None | Ждать бесконечно | Редко используется |
Если сообщений нет:
poll()возвращаетNoneпо истечении timeout- Это нормальное поведение, не ошибка
- Просто продолжайте цикл (
continue)
Рекомендации:
- Для демонстраций и обучения:
timeout=1.0 - Для production с высокой нагрузкой:
timeout=0.1или меньше - Меньший timeout = выше CPU usage, но быстрее реакция
Проверка знанийЧто произойдет, если consumer использует poll(timeout=0) в бесконечном цикле? Какой компромисс возникает?
Обработка ошибок
Наш код обрабатывает два типа ошибок:
1. Ошибки Kafka
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
Примеры: разрыв соединения, неизвестный топик, проблемы с брокером.
2. Ошибки парсинга JSON
try:
event = json.loads(msg.value().decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
print(f"Ошибка декодирования: {e}")
continue
Примеры: поврежденное сообщение, неожиданный формат (например, Avro вместо JSON).
Что мы узнали
- confluent-kafka — production-ready библиотека для работы с Kafka из Python
- kafka:9092 vs localhost:9092 — внутри Docker используем
kafka, снаружи —localhost - Consumer group — механизм отслеживания позиции чтения
- poll() timeout — контролирует баланс между responsiveness и CPU usage
- Обработка ошибок — проверяем
msg.error()и оборачиваем JSON parsing в try/except
Что дальше?
В следующем уроке мы разберем структуру CDC-событий Debezium и напишем функцию для парсинга envelope-формата с обработкой всех типов операций.
Check Your Understanding
Finished the lesson?
Mark it as complete to track your progress