Skip to content
Learning Platform
Intermediate
30 minutes
Python confluent-kafka Consumer Event Processing

Prerequisites:

  • module-1/04-first-connector

Потребление CDC событий на Python

В предыдущем уроке мы использовали kafka-console-consumer для просмотра CDC-событий. Теперь пора перейти к production-ready Python коду, который можно использовать в реальных data pipelines.

Почему confluent-kafka, а не kafka-python?

Для работы с Kafka из Python существует два основных клиента:

БиблиотекаОсноваПроизводительностьПоддержка
confluent-kafkalibrdkafka (C)~10x быстрееОфициальная Confluent
kafka-pythonЧистый PythonМедленнееCommunity

Мы используем confluent-kafka потому что:

  1. Производительность: Построена на librdkafka — высокопроизводительной C-библиотеке
  2. Официальная поддержка: Разрабатывается Confluent (создатели Kafka)
  3. Schema Registry: Встроенная интеграция для последующих модулей
  4. ARM64 native: Начиная с версии 2.13.0+ поддерживает Apple Silicon нативно

В нашем лабораторном окружении confluent-kafka>=2.13.0 уже установлен в JupyterLab контейнере.


ВАЖНО: kafka:9092 vs localhost:9092

Когда использовать какой адрес?

Где запускаете кодАдрес Kafka
Внутри Docker (JupyterLab, другие контейнеры)kafka:9092
На вашем компьютере (терминал Mac/Windows/Linux)localhost:9092

В этом уроке мы работаем в JupyterLab (внутри Docker сети), поэтому используем kafka:9092.

Если бы вы запускали Python скрипт на своем компьютере (не в контейнере), нужен был бы localhost:9092.

Это частая причина ошибок подключения — всегда проверяйте, откуда запускается ваш код!

Проверка знаний
Python-скрипт запущен в контейнере JupyterLab (внутри Docker-сети) и подключается к kafka по адресу localhost:9092. Почему возникает ошибка подключения?
Ответ
Внутри Docker-контейнера localhost указывает на сам контейнер JupyterLab, а не на брокер Kafka. На порту 9092 внутри JupyterLab никто не слушает, поэтому подключение получит тайм-аут. Правильный адрес -- kafka:9092, где kafka -- имя сервиса в Docker Compose, которое разрешается через встроенный DNS Docker-сети.

Запуск JupyterLab

Откройте в браузере: http://localhost:8888

JupyterLab запущен в нашем лабораторном окружении. Библиотека confluent-kafka уже установлена — ничего дополнительно устанавливать не нужно.

Создайте новый notebook: File > New > Notebook (выберите Python 3 kernel).

Базовая конфигурация Consumer

from confluent_kafka import Consumer
import json

# Конфигурация consumer
# ВАЖНО: 'kafka:9092' работает только внутри Docker сети (JupyterLab)!
# Для запуска с хоста используйте 'localhost:9092'
config = {
    'bootstrap.servers': 'kafka:9092',  # Docker internal hostname
    'group.id': 'cdc-tutorial-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True
}

consumer = Consumer(config)
consumer.subscribe(['inventory.public.customers'])

Объяснение конфигурации

ПараметрЗначениеНазначение
bootstrap.serverskafka:9092Адрес Kafka брокера. Используем kafka — имя сервиса в Docker Compose
group.idcdc-tutorial-groupИдентификатор consumer group для отслеживания offset
auto.offset.resetearliestГде начать чтение, если offset не найден: earliest = сначала, latest = только новые
enable.auto.commitTrueАвтоматический commit offset после чтения

О consumer groups

Consumer group — это механизм координации нескольких consumer’ов, читающих из одного топика. Kafka сохраняет offset (позицию чтения) для каждой группы.

  • При первом запуске группы auto.offset.reset определяет стартовую позицию
  • При последующих запусках consumer продолжает с сохраненного offset
  • Разные группы читают топик независимо

Базовый цикл чтения

print("Ожидание CDC событий... (Ctrl+C или Interrupt kernel для остановки)")

try:
    while True:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue

        if msg.error():
            print(f"Ошибка: {msg.error()}")
            continue

        # Парсинг и вывод события
        event = json.loads(msg.value().decode('utf-8'))
        print(f"Получено событие: {event}")

except KeyboardInterrupt:
    print("Остановка...")
finally:
    consumer.close()

Полный рабочий пример

Скопируйте весь код ниже в одну ячейку JupyterLab и запустите:

from confluent_kafka import Consumer
import json

# ==============================================================================
# КОНФИГУРАЦИЯ
# ==============================================================================
# ВАЖНО: Выберите правильный адрес!
# - 'kafka:9092'     - если код запускается В JupyterLab (внутри Docker)
# - 'localhost:9092' - если код запускается НА ВАШЕМ КОМПЬЮТЕРЕ (не в Docker)
# ==============================================================================
config = {
    'bootstrap.servers': 'kafka:9092',  # Мы в JupyterLab = внутри Docker
    'group.id': 'cdc-tutorial-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True
}

consumer = Consumer(config)
consumer.subscribe(['inventory.public.customers'])

print("=" * 60)
print("CDC Consumer запущен")
print("Топик: inventory.public.customers")
print("Broker: kafka:9092 (Docker internal)")
print("=" * 60)
print("\nОжидание событий...")
print("Вставьте/измените/удалите записи в PostgreSQL для генерации событий")
print("Нажмите Ctrl+C или остановите kernel для завершения\n")

try:
    while True:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue

        if msg.error():
            print(f"Consumer error: {msg.error()}")
            continue

        try:
            event = json.loads(msg.value().decode('utf-8'))
            payload = event.get('payload', {})

            # Определение типа операции
            op = payload.get('op', '?')
            op_names = {
                'r': 'SNAPSHOT',  # Read - начальный snapshot
                'c': 'CREATE',    # Create - INSERT
                'u': 'UPDATE',    # Update - UPDATE
                'd': 'DELETE'     # Delete - DELETE
            }
            op_name = op_names.get(op, f'UNKNOWN({op})')

            # Получение данных (after для create/update, before для delete)
            data = payload.get('after') or payload.get('before')

            print(f"[{op_name}] {data}")

        except (json.JSONDecodeError, UnicodeDecodeError) as e:
            print(f"Ошибка декодирования: {e}")
            continue

except KeyboardInterrupt:
    print("\n\nОстановка consumer...")
finally:
    consumer.close()
    print("Consumer закрыт.")

Тестирование consumer

Шаг 1: Запустите код в JupyterLab

Скопируйте полный пример выше в ячейку и запустите (Shift+Enter).

Шаг 2: В отдельном терминале выполните SQL

Откройте терминал на вашем компьютере и выполните:

# INSERT - создание новой записи
cd labs/
docker compose exec postgres psql -U postgres -d inventory -c "
INSERT INTO customers (name, email) VALUES ('Тестовый Пользователь', '[email protected]');
"

В JupyterLab вы увидите:

[CREATE] {'id': 4, 'name': 'Тестовый Пользователь', 'email': '[email protected]', 'created_at': ...}

Шаг 3: Протестируйте UPDATE

docker compose exec postgres psql -U postgres -d inventory -c "
UPDATE customers SET name = 'Обновленный Пользователь' WHERE email = '[email protected]';
"

Результат:

[UPDATE] {'id': 4, 'name': 'Обновленный Пользователь', 'email': '[email protected]', 'created_at': ...}

Шаг 4: Протестируйте DELETE

docker compose exec postgres psql -U postgres -d inventory -c "
DELETE FROM customers WHERE email = '[email protected]';
"

Результат:

[DELETE] {'id': 4, 'name': 'Обновленный Пользователь', 'email': '[email protected]', 'created_at': ...}

Поток данных

PostgreSQL
WAL
Debezium
CDC Events
Kafka Topic
poll
Python Consumer

Разбор poll() timeout

msg = consumer.poll(timeout=1.0)

Параметр timeout определяет, сколько секунд ждать сообщение:

ЗначениеПоведениеКогда использовать
timeout=0Немедленный возврат (не ждать)Неблокирующий режим
timeout=1.0Ждать до 1 секундыИнтерактивные демонстрации
timeout=0.1Ждать 100мсProduction high-throughput
timeout=NoneЖдать бесконечноРедко используется

Если сообщений нет:

  • poll() возвращает None по истечении timeout
  • Это нормальное поведение, не ошибка
  • Просто продолжайте цикл (continue)

Рекомендации:

  • Для демонстраций и обучения: timeout=1.0
  • Для production с высокой нагрузкой: timeout=0.1 или меньше
  • Меньший timeout = выше CPU usage, но быстрее реакция
Проверка знаний
Что произойдет, если consumer использует poll(timeout=0) в бесконечном цикле? Какой компромисс возникает?
Ответ
При timeout=0 метод poll() возвращается немедленно, даже если сообщений нет. В цикле while True это создает busy-wait, потребляющий 100% CPU. Компромисс: минимальная задержка реакции на новые события, но максимальное потребление ресурсов. Для production рекомендуется timeout=0.1, который обеспечивает быструю реакцию (максимум 100мс) при минимальном CPU в периоды без сообщений.

Обработка ошибок

Наш код обрабатывает два типа ошибок:

1. Ошибки Kafka

if msg.error():
    print(f"Consumer error: {msg.error()}")
    continue

Примеры: разрыв соединения, неизвестный топик, проблемы с брокером.

2. Ошибки парсинга JSON

try:
    event = json.loads(msg.value().decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
    print(f"Ошибка декодирования: {e}")
    continue

Примеры: поврежденное сообщение, неожиданный формат (например, Avro вместо JSON).

Что мы узнали

  1. confluent-kafka — production-ready библиотека для работы с Kafka из Python
  2. kafka:9092 vs localhost:9092 — внутри Docker используем kafka, снаружи — localhost
  3. Consumer group — механизм отслеживания позиции чтения
  4. poll() timeout — контролирует баланс между responsiveness и CPU usage
  5. Обработка ошибок — проверяем msg.error() и оборачиваем JSON parsing в try/except

Что дальше?

В следующем уроке мы разберем структуру CDC-событий Debezium и напишем функцию для парсинга envelope-формата с обработкой всех типов операций.

Check Your Understanding

Score: 0 of 0
Conceptual
Question 1 of 4. Почему библиотека confluent-kafka рекомендуется для production-использования вместо kafka-python?

Finished the lesson?

Mark it as complete to track your progress