Prerequisites:
- module-1/05-python-consumer
Продвинутые паттерны Python Consumer
В Module 1 мы написали базовый Python consumer с простой конфигурацией. Это отлично работает для обучения и прототипирования, но для production-систем требуются более продвинутые паттерны.
От tutorial к production
Базовый consumer из Module 1:
- Автоматический commit offset
- Простая обработка ошибок (print и continue)
- At-most-once семантика (данные могут потеряться при сбое)
Production-ready consumer:
- Контролируемая семантика доставки (at-least-once или exactly-once)
- Различение fatal и non-fatal ошибок
- Graceful shutdown и управление rebalancing
- Мониторинг и observability
В этом уроке мы разберем паттерны, которые делают consumer готовым к production.
At-Least-Once vs Exactly-Once семантика
Kafka предоставляет три гарантии доставки сообщений:
At-Most-Once (максимум один раз)
- Сообщение может быть потеряно, но никогда не обработано дважды
- Commit offset до обработки
- Используется редко (логи, метрики, где потеря приемлема)
At-Least-Once (минимум один раз)
- Сообщение никогда не теряется, но может быть обработано дважды
- Commit offset после обработки
- Самая распространенная семантика для CDC
- Требует идемпотентной обработки на стороне приложения
Exactly-Once (ровно один раз)
- Сообщение обработано ровно один раз (никакой потери, никаких дубликатов)
- Использует транзакционный API Kafka
- Применяется для критичных операций (финансы, платежи, inventory)
Commit offset после обработки
Транзакционный API для атомарности
Когда использовать какую семантику:
| Семантика | Когда использовать | Пример |
|---|---|---|
| At-Least-Once | Идемпотентная обработка возможна (upsert в БД, обновление cache) | Синхронизация данных в поисковую систему, обновление materialized view |
| Exactly-Once | Критичная корректность, идемпотентность невозможна (счетчики, финансы) | Обработка платежей, списание со счета, инвентаризация товаров |
| At-Most-Once | Потеря данных приемлема, дубликаты недопустимы | Логирование метрик, отправка email-уведомлений |
Конфигурация для At-Least-Once
Для at-least-once семантики нужно гарантировать, что offset коммитится только после успешной обработки.
Паттерн: Manual Offset Store
from confluent_kafka import Consumer, KafkaException
config = {
'bootstrap.servers': 'kafka:9092',
'group.id': 'cdc-processor',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True, # Авто-commit включен
'enable.auto.offset.store': False # Manual store для контроля
}
consumer = Consumer(config)
consumer.subscribe(['dbserver1.public.orders'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
# Обработка ошибок (см. следующий раздел)
continue
try:
# Обработка сообщения
process_cdc_event(msg.value())
# Сохраняем offset ТОЛЬКО после успешной обработки
consumer.store_offsets(msg)
except Exception as e:
# Обработка не удалась - НЕ сохраняем offset
# При следующем запуске consumer повторит обработку
print(f"Processing failed: {e}")
continue
finally:
consumer.close()
Объяснение конфигурации:
| Параметр | Значение | Почему |
|---|---|---|
enable.auto.commit | True | Автоматический commit в фоне каждые 5 секунд (по умолчанию) |
enable.auto.offset.store | False | Ключевой момент: offset сохраняется только вручную через store_offsets() |
Как это работает:
- Consumer получает сообщение
- Приложение обрабатывает сообщение
- Если обработка успешна →
store_offsets(msg)сохраняет offset - Фоновый поток авто-commit периодически коммитит все сохраненные offset
- Если crash происходит до
store_offsets()→ при перезапуске consumer повторит обработку
Гарантия: Сообщение никогда не будет потеряно (но может быть обработано дважды при сбое).
Проверка знанийВ чем ключевая разница между enable.auto.commit = False и enable.auto.offset.store = False для реализации at-least-once семантики?
Exactly-Once с транзакционным API
Для exactly-once семантики используется транзакционный API Kafka, который объединяет чтение, обработку и запись в одну атомарную операцию.
Паттерн: Transactional Consumer-Producer
from confluent_kafka import Consumer, Producer, KafkaException
# ==============================================================================
# Конфигурация transactional Producer
# ==============================================================================
producer_config = {
'bootstrap.servers': 'kafka:9092',
'transactional.id': 'cdc-transformer-1', # Уникальный ID для каждого instance
'enable.idempotence': True # Обязательно для транзакций
}
producer = Producer(producer_config)
producer.init_transactions() # Инициализация транзакционного API
# ==============================================================================
# Конфигурация read-committed Consumer
# ==============================================================================
consumer_config = {
'bootstrap.servers': 'kafka:9092',
'group.id': 'cdc-processor',
'isolation.level': 'read_committed', # Читать только committed сообщения
'enable.auto.commit': False # Ручной commit через транзакции
}
consumer = Consumer(consumer_config)
consumer.subscribe(['dbserver1.public.orders'])
# ==============================================================================
# Транзакционный цикл обработки
# ==============================================================================
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
# Начало транзакции
producer.begin_transaction()
try:
# 1. Обработка CDC события
result = transform_cdc_event(msg.value())
# 2. Отправка результата в output topic
producer.produce('output-topic', value=result)
# 3. Commit offset как часть транзакции
producer.send_offsets_to_transaction(
consumer.position(consumer.assignment()),
consumer.consumer_group_metadata()
)
# 4. Commit транзакции (атомарно)
producer.commit_transaction()
except Exception as e:
# При ошибке - abort транзакции
producer.abort_transaction()
print(f"Transaction aborted: {e}")
# Сообщение будет повторно обработано
finally:
consumer.close()
Ключевые параметры:
| Параметр | Где | Значение | Назначение |
|---|---|---|---|
transactional.id | Producer | cdc-transformer-1 | Уникальный ID для идемпотентности. При перезапуске с тем же ID Kafka завершит незаконченные транзакции |
enable.idempotence | Producer | True | Обязателен для транзакций. Гарантирует, что дубликаты не будут записаны |
isolation.level | Consumer | read_committed | Читать только committed сообщения (не видеть aborted транзакции) |
enable.auto.commit | Consumer | False | Offset управляется через send_offsets_to_transaction() |
Гарантии exactly-once:
- Либо всё выполнилось (обработка + produce + offset commit), либо ничего
- Дубликаты невозможны (idempotence + transactional.id)
- Потеря данных невозможна (offset commit в транзакции)
Важно: Exactly-once работает только для Kafka → обработка → Kafka паттерна. Для записи в БД требуется дополнительная идемпотентность на уровне приложения.
ВАЖНО: kafka:9092 vs localhost:9092
Когда использовать какой адрес?
Где запускаете код Адрес Kafka Внутри Docker (JupyterLab, другие контейнеры) kafka:9092На вашем компьютере (терминал Mac/Windows/Linux) localhost:9092В примерах этого урока используется
kafka:9092(JupyterLab).Это частая причина ошибок подключения — всегда проверяйте, откуда запускается ваш код!
Обработка ошибок: Fatal vs Non-Fatal
Consumer может встретить два класса ошибок:
Non-Fatal Errors (можно продолжить)
- Partition EOF (конец раздела — это не ошибка, а информация)
- Временные сбои сети (reconnect автоматически)
- Rate limiting (retry с backoff)
Fatal Errors (необходимо остановить consumer)
- Authentication failure (неверные credentials)
- Authorization failure (нет прав на чтение топика)
- Unknown topic or partition
- Broker failures (все брокеры недоступны)
from confluent_kafka import Consumer, KafkaException, KafkaError
consumer = Consumer(config)
consumer.subscribe(['dbserver1.public.orders'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
# ==============================================================================
# Проверка типа ошибки
# ==============================================================================
if msg.error():
error_code = msg.error().code()
# Non-fatal: конец раздела (это нормально)
if error_code == KafkaError._PARTITION_EOF:
continue
# Fatal: проверяем флаг fatal()
if msg.error().fatal():
print(f"FATAL ERROR: {msg.error()}")
raise KafkaException(msg.error())
# Non-fatal: логируем и продолжаем
print(f"Non-fatal error: {msg.error()}")
continue
# Обработка сообщения
try:
process_message(msg)
consumer.store_offsets(msg)
except Exception as e:
print(f"Processing error: {e}")
continue
except KeyboardInterrupt:
print("Shutting down...")
finally:
consumer.close()
Объяснение:
| Ошибка | Код | fatal() | Действие |
|---|---|---|---|
_PARTITION_EOF | -191 | False | Достигнут конец раздела. Continue. |
_TRANSPORT | -195 | False | Сетевая проблема. Retry автоматически. |
_ALL_BROKERS_DOWN | -187 | True | Все брокеры недоступны. Raise exception. |
_AUTHENTICATION | -169 | True | Неверная аутентификация. Raise exception. |
_TOPIC_AUTHORIZATION_FAILED | 29 | True | Нет прав на топик. Raise exception. |
Рекомендация:
- Всегда проверяйте
msg.error().fatal() - Fatal errors → завершение приложения (не пытаться retry)
- Non-fatal errors → логирование и продолжение работы
- Мониторинг non-fatal ошибок для выявления проблем инфраструктуры
Проверка знанийПочему при получении _PARTITION_EOF ошибки consumer должен продолжить работу, а не остановиться? Ведь EOF обычно означает конец данных.
Rebalancing и max.poll.interval.ms
Частая проблема: Consumer выгоняется из группы во время обработки тяжелых сообщений.
Что такое rebalancing?
Consumer group координируется через Group Coordinator. Если consumer не вызывает poll() в течение max.poll.interval.ms, он считается “мертвым” и выгоняется из группы.
Consumer выгоняется из группы при превышении max.poll.interval.ms
- Уменьшите max.poll.records (меньше сообщений за один poll)
- Увеличьте max.poll.interval.ms (больше времени на обработку)
- Оптимизируйте обработку (уменьшите время на сообщение)
- Offload тяжелой работы в асинхронные задачи (Celery, RQ)
Параметры управления rebalancing:
| Параметр | Дефолт | Описание |
|---|---|---|
max.poll.interval.ms | 300000 (5 минут) | Максимальное время между poll() вызовами |
max.poll.records | 500 | Максимум сообщений за один poll() |
session.timeout.ms | 45000 (45 секунд) | Таймаут heartbeat (независимо от poll()) |
Решение: ограничение batch size
config = {
'bootstrap.servers': 'kafka:9092',
'group.id': 'cdc-processor',
'max.poll.interval.ms': 300000, # 5 минут (дефолт)
'max.poll.records': 100, # Ограничение: максимум 100 сообщений за poll()
'enable.auto.offset.store': False
}
consumer = Consumer(config)
consumer.subscribe(['dbserver1.public.orders'])
# Если обработка одного сообщения занимает 1 секунду,
# то 100 сообщений = 100 секунд < 300 секунд (max.poll.interval.ms)
# Rebalancing не произойдет
Рекомендации:
- Если обработка одного сообщения занимает > 5 секунд → уменьшите
max.poll.records - Если обработка < 1 секунды → увеличьте
max.poll.recordsдля throughput - Если требуется > 5 минут → увеличьте
max.poll.interval.ms(но не более 30 минут)
Альтернатива: Offload тяжелой работы в асинхронные задачи (Celery, RQ), а consumer только кладет задачи в очередь.
Лабораторная работа: Resilient Consumer
Создайте production-ready consumer с обработкой ошибок и at-least-once семантикой.
Задание
- Откройте JupyterLab: http://localhost:8888
- Создайте новый notebook
- Реализуйте consumer с:
- At-least-once семантикой (
enable.auto.offset.store=False) - Обработкой fatal/non-fatal ошибок
- Graceful shutdown (KeyboardInterrupt)
- At-least-once семантикой (
- Протестируйте устойчивость:
- Запустите consumer
- Во время работы остановите Kafka:
docker compose stop kafka - Наблюдайте non-fatal ошибки в логах
- Запустите Kafka:
docker compose start kafka - Убедитесь, что consumer восстановился автоматически
Шаблон решения
from confluent_kafka import Consumer, KafkaException, KafkaError
import json
# ==============================================================================
# Конфигурация: At-Least-Once с manual offset store
# ==============================================================================
config = {
'bootstrap.servers': 'kafka:9092',
'group.id': 'resilient-consumer-lab',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True,
'enable.auto.offset.store': False, # Manual offset store
'max.poll.records': 50 # Ограничение batch size
}
consumer = Consumer(config)
consumer.subscribe(['dbserver1.public.orders'])
print("=" * 70)
print("Resilient Consumer запущен")
print("Топик: dbserver1.public.orders")
print("Семантика: At-Least-Once")
print("=" * 70)
print("\nПопробуйте остановить Kafka во время работы:")
print(" docker compose stop kafka")
print("Затем запустите обратно:")
print(" docker compose start kafka")
print("\nНаблюдайте за поведением consumer при сбоях\n")
processed_count = 0
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
# ==============================================================================
# Обработка ошибок
# ==============================================================================
if msg.error():
error_code = msg.error().code()
# Non-fatal: конец раздела
if error_code == KafkaError._PARTITION_EOF:
continue
# Fatal errors
if msg.error().fatal():
print(f"\n❌ FATAL ERROR: {msg.error()}")
print("Consumer останавливается из-за критической ошибки")
raise KafkaException(msg.error())
# Non-fatal errors (сетевые проблемы и т.д.)
print(f"⚠️ Non-fatal error: {msg.error()}")
continue
# ==============================================================================
# Обработка сообщения
# ==============================================================================
try:
event = json.loads(msg.value().decode('utf-8'))
payload = event.get('payload', {})
op = payload.get('op', '?')
# Имитация обработки
# В реальном приложении здесь была бы запись в БД, вызов API и т.д.
processed_count += 1
print(f"[{processed_count}] Обработано: op={op}")
# ВАЖНО: offset сохраняется ТОЛЬКО после успешной обработки
consumer.store_offsets(msg)
except Exception as e:
# Обработка не удалась - offset НЕ сохраняется
# При перезапуске consumer повторит обработку этого сообщения
print(f"❌ Processing error: {e}")
continue
except KeyboardInterrupt:
print("\n\n🛑 Получен сигнал остановки")
finally:
print(f"Всего обработано: {processed_count} сообщений")
print("Закрытие consumer...")
consumer.close()
print("✅ Consumer закрыт корректно")
Проверка работы
Шаг 1: Запустите consumer в JupyterLab
Шаг 2: В отдельном терминале сгенерируйте события
cd labs/
docker compose exec postgres psql -U postgres -d inventory -c "
INSERT INTO orders (customer_id, product_id, quantity)
SELECT 1, 1, 1 FROM generate_series(1, 20);
"
Шаг 3: Остановите Kafka во время работы
docker compose stop kafka
Наблюдайте сообщения об ошибках в consumer. Consumer не крашится, а продолжает retry.
Шаг 4: Запустите Kafka обратно
docker compose start kafka
Consumer автоматически восстановится и продолжит обработку с последнего сохраненного offset.
Что мы узнали
- At-Least-Once семантика:
enable.auto.offset.store=False+consumer.store_offsets()после обработки - Exactly-Once семантика: Transactional API с
isolation.level=read_committed - Fatal vs Non-Fatal ошибки: Используйте
msg.error().fatal()для различения - Rebalancing: Управляйте через
max.poll.interval.msиmax.poll.records - Production-ready: Resilient consumer автоматически восстанавливается при сбоях
Что дальше?
В следующем уроке мы интегрируем CDC события с Pandas DataFrame для batch анализа и трансформаций.
Check Your Understanding
Finished the lesson?
Mark it as complete to track your progress