Перейти к содержанию
Learning Platform
Продвинутый
45 минут
Python confluent-kafka Exactly-Once Error Handling Transactions

Требуемые знания:

  • module-1/05-python-consumer

Продвинутые паттерны Python Consumer

В Module 1 мы написали базовый Python consumer с простой конфигурацией. Это отлично работает для обучения и прототипирования, но для production-систем требуются более продвинутые паттерны.

От tutorial к production

Базовый consumer из Module 1:

  • Автоматический commit offset
  • Простая обработка ошибок (print и continue)
  • At-most-once семантика (данные могут потеряться при сбое)

Production-ready consumer:

  • Контролируемая семантика доставки (at-least-once или exactly-once)
  • Различение fatal и non-fatal ошибок
  • Graceful shutdown и управление rebalancing
  • Мониторинг и observability

В этом уроке мы разберем паттерны, которые делают consumer готовым к production.


At-Least-Once vs Exactly-Once семантика

Kafka предоставляет три гарантии доставки сообщений:

At-Most-Once (максимум один раз)

  • Сообщение может быть потеряно, но никогда не обработано дважды
  • Commit offset до обработки
  • Используется редко (логи, метрики, где потеря приемлема)

At-Least-Once (минимум один раз)

  • Сообщение никогда не теряется, но может быть обработано дважды
  • Commit offset после обработки
  • Самая распространенная семантика для CDC
  • Требует идемпотентной обработки на стороне приложения

Exactly-Once (ровно один раз)

  • Сообщение обработано ровно один раз (никакой потери, никаких дубликатов)
  • Использует транзакционный API Kafka
  • Применяется для критичных операций (финансы, платежи, inventory)
At-Least-Once

Commit offset после обработки

Получить сообщение
Обработать
Commit offset
Crash?
До commit
Повторная обработка
После commit
OK
Exactly-OnceRecommended

Транзакционный API для атомарности

Получить сообщение
Begin Transaction
Обработать
Produce результат
Commit offset в транзакции
Commit Transaction
Crash?
До commit
Rollback всей транзакции
После commit
OK атомарно

Когда использовать какую семантику:

СемантикаКогда использоватьПример
At-Least-OnceИдемпотентная обработка возможна (upsert в БД, обновление cache)Синхронизация данных в поисковую систему, обновление materialized view
Exactly-OnceКритичная корректность, идемпотентность невозможна (счетчики, финансы)Обработка платежей, списание со счета, инвентаризация товаров
At-Most-OnceПотеря данных приемлема, дубликаты недопустимыЛогирование метрик, отправка email-уведомлений

Конфигурация для At-Least-Once

Для at-least-once семантики нужно гарантировать, что offset коммитится только после успешной обработки.

Паттерн: Manual Offset Store

from confluent_kafka import Consumer, KafkaException

config = {
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'cdc-processor',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True,          # Авто-commit включен
    'enable.auto.offset.store': False    # Manual store для контроля
}

consumer = Consumer(config)
consumer.subscribe(['dbserver1.public.orders'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue

        if msg.error():
            # Обработка ошибок (см. следующий раздел)
            continue

        try:
            # Обработка сообщения
            process_cdc_event(msg.value())

            # Сохраняем offset ТОЛЬКО после успешной обработки
            consumer.store_offsets(msg)

        except Exception as e:
            # Обработка не удалась - НЕ сохраняем offset
            # При следующем запуске consumer повторит обработку
            print(f"Processing failed: {e}")
            continue

finally:
    consumer.close()

Объяснение конфигурации:

ПараметрЗначениеПочему
enable.auto.commitTrueАвтоматический commit в фоне каждые 5 секунд (по умолчанию)
enable.auto.offset.storeFalseКлючевой момент: offset сохраняется только вручную через store_offsets()

Как это работает:

  1. Consumer получает сообщение
  2. Приложение обрабатывает сообщение
  3. Если обработка успешна → store_offsets(msg) сохраняет offset
  4. Фоновый поток авто-commit периодически коммитит все сохраненные offset
  5. Если crash происходит до store_offsets() → при перезапуске consumer повторит обработку

Гарантия: Сообщение никогда не будет потеряно (но может быть обработано дважды при сбое).

Проверка знаний
В чем ключевая разница между enable.auto.commit = False и enable.auto.offset.store = False для реализации at-least-once семантики?
Ответ
enable.auto.commit = False полностью отключает автоматический commit оффсетов, и вы должны вызывать commit вручную. enable.auto.offset.store = False оставляет авто-commit включённым, но контролирует, какие оффсеты попадут в store через явный вызов store_offsets(). Второй подход проще для at-least-once: фоновый поток коммитит оффсеты автоматически, а вы контролируете только момент сохранения оффсета после успешной обработки. Первый подход (auto.commit=False) используется для exactly-once с транзакционным API.

Exactly-Once с транзакционным API

Для exactly-once семантики используется транзакционный API Kafka, который объединяет чтение, обработку и запись в одну атомарную операцию.

Паттерн: Transactional Consumer-Producer

from confluent_kafka import Consumer, Producer, KafkaException

# ==============================================================================
# Конфигурация transactional Producer
# ==============================================================================
producer_config = {
    'bootstrap.servers': 'kafka:9092',
    'transactional.id': 'cdc-transformer-1',  # Уникальный ID для каждого instance
    'enable.idempotence': True                 # Обязательно для транзакций
}

producer = Producer(producer_config)
producer.init_transactions()  # Инициализация транзакционного API

# ==============================================================================
# Конфигурация read-committed Consumer
# ==============================================================================
consumer_config = {
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'cdc-processor',
    'isolation.level': 'read_committed',  # Читать только committed сообщения
    'enable.auto.commit': False            # Ручной commit через транзакции
}

consumer = Consumer(consumer_config)
consumer.subscribe(['dbserver1.public.orders'])

# ==============================================================================
# Транзакционный цикл обработки
# ==============================================================================
try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue

        if msg.error():
            raise KafkaException(msg.error())

        # Начало транзакции
        producer.begin_transaction()

        try:
            # 1. Обработка CDC события
            result = transform_cdc_event(msg.value())

            # 2. Отправка результата в output topic
            producer.produce('output-topic', value=result)

            # 3. Commit offset как часть транзакции
            producer.send_offsets_to_transaction(
                consumer.position(consumer.assignment()),
                consumer.consumer_group_metadata()
            )

            # 4. Commit транзакции (атомарно)
            producer.commit_transaction()

        except Exception as e:
            # При ошибке - abort транзакции
            producer.abort_transaction()
            print(f"Transaction aborted: {e}")
            # Сообщение будет повторно обработано

finally:
    consumer.close()

Ключевые параметры:

ПараметрГдеЗначениеНазначение
transactional.idProducercdc-transformer-1Уникальный ID для идемпотентности. При перезапуске с тем же ID Kafka завершит незаконченные транзакции
enable.idempotenceProducerTrueОбязателен для транзакций. Гарантирует, что дубликаты не будут записаны
isolation.levelConsumerread_committedЧитать только committed сообщения (не видеть aborted транзакции)
enable.auto.commitConsumerFalseOffset управляется через send_offsets_to_transaction()

Гарантии exactly-once:

  1. Либо всё выполнилось (обработка + produce + offset commit), либо ничего
  2. Дубликаты невозможны (idempotence + transactional.id)
  3. Потеря данных невозможна (offset commit в транзакции)

Важно: Exactly-once работает только для Kafka → обработка → Kafka паттерна. Для записи в БД требуется дополнительная идемпотентность на уровне приложения.


ВАЖНО: kafka:9092 vs localhost:9092

Когда использовать какой адрес?

Где запускаете кодАдрес Kafka
Внутри Docker (JupyterLab, другие контейнеры)kafka:9092
На вашем компьютере (терминал Mac/Windows/Linux)localhost:9092

В примерах этого урока используется kafka:9092 (JupyterLab).

Это частая причина ошибок подключения — всегда проверяйте, откуда запускается ваш код!


Обработка ошибок: Fatal vs Non-Fatal

Consumer может встретить два класса ошибок:

Non-Fatal Errors (можно продолжить)

  • Partition EOF (конец раздела — это не ошибка, а информация)
  • Временные сбои сети (reconnect автоматически)
  • Rate limiting (retry с backoff)

Fatal Errors (необходимо остановить consumer)

  • Authentication failure (неверные credentials)
  • Authorization failure (нет прав на чтение топика)
  • Unknown topic or partition
  • Broker failures (все брокеры недоступны)
from confluent_kafka import Consumer, KafkaException, KafkaError

consumer = Consumer(config)
consumer.subscribe(['dbserver1.public.orders'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue

        # ==============================================================================
        # Проверка типа ошибки
        # ==============================================================================
        if msg.error():
            error_code = msg.error().code()

            # Non-fatal: конец раздела (это нормально)
            if error_code == KafkaError._PARTITION_EOF:
                continue

            # Fatal: проверяем флаг fatal()
            if msg.error().fatal():
                print(f"FATAL ERROR: {msg.error()}")
                raise KafkaException(msg.error())

            # Non-fatal: логируем и продолжаем
            print(f"Non-fatal error: {msg.error()}")
            continue

        # Обработка сообщения
        try:
            process_message(msg)
            consumer.store_offsets(msg)
        except Exception as e:
            print(f"Processing error: {e}")
            continue

except KeyboardInterrupt:
    print("Shutting down...")
finally:
    consumer.close()

Объяснение:

ОшибкаКодfatal()Действие
_PARTITION_EOF-191FalseДостигнут конец раздела. Continue.
_TRANSPORT-195FalseСетевая проблема. Retry автоматически.
_ALL_BROKERS_DOWN-187TrueВсе брокеры недоступны. Raise exception.
_AUTHENTICATION-169TrueНеверная аутентификация. Raise exception.
_TOPIC_AUTHORIZATION_FAILED29TrueНет прав на топик. Raise exception.

Рекомендация:

  • Всегда проверяйте msg.error().fatal()
  • Fatal errors → завершение приложения (не пытаться retry)
  • Non-fatal errors → логирование и продолжение работы
  • Мониторинг non-fatal ошибок для выявления проблем инфраструктуры
Проверка знаний
Почему при получении _PARTITION_EOF ошибки consumer должен продолжить работу, а не остановиться? Ведь EOF обычно означает конец данных.
Ответ
_PARTITION_EOF означает, что consumer прочитал все доступные сообщения в партиции на данный момент. Это не ошибка и не конец потока -- в streaming-сценарии новые CDC события будут продолжать поступать. Consumer должен вызвать poll() снова и ожидать новых данных. Это отличается от batch-обработки файлов, где EOF действительно означает завершение. В Kafka partition бесконечен -- _PARTITION_EOF это информационное сообщение, а не сигнал об остановке.

Rebalancing и max.poll.interval.ms

Частая проблема: Consumer выгоняется из группы во время обработки тяжелых сообщений.

Что такое rebalancing?

Consumer group координируется через Group Coordinator. Если consumer не вызывает poll() в течение max.poll.interval.ms, он считается “мертвым” и выгоняется из группы.

Rebalancing: max.poll.interval.ms exceeded

Consumer выгоняется из группы при превышении max.poll.interval.ms

Consumer
Group Coordinator
Kafka
poll() - получить сообщениеBatch сообщенийОбработка (3 минуты)poll() - следующее сообщениеПроверка: max.poll.interval.ms = 5 минутpoll() после 6 минут (превышен таймаут)❌ Rebalancing triggeredПартиции переназначены другим consumers
Решение проблемы:
  • Уменьшите max.poll.records (меньше сообщений за один poll)
  • Увеличьте max.poll.interval.ms (больше времени на обработку)
  • Оптимизируйте обработку (уменьшите время на сообщение)
  • Offload тяжелой работы в асинхронные задачи (Celery, RQ)

Параметры управления rebalancing:

ПараметрДефолтОписание
max.poll.interval.ms300000 (5 минут)Максимальное время между poll() вызовами
max.poll.records500Максимум сообщений за один poll()
session.timeout.ms45000 (45 секунд)Таймаут heartbeat (независимо от poll())

Решение: ограничение batch size

config = {
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'cdc-processor',
    'max.poll.interval.ms': 300000,  # 5 минут (дефолт)
    'max.poll.records': 100,         # Ограничение: максимум 100 сообщений за poll()
    'enable.auto.offset.store': False
}

consumer = Consumer(config)
consumer.subscribe(['dbserver1.public.orders'])

# Если обработка одного сообщения занимает 1 секунду,
# то 100 сообщений = 100 секунд < 300 секунд (max.poll.interval.ms)
# Rebalancing не произойдет

Рекомендации:

  • Если обработка одного сообщения занимает > 5 секунд → уменьшите max.poll.records
  • Если обработка < 1 секунды → увеличьте max.poll.records для throughput
  • Если требуется > 5 минут → увеличьте max.poll.interval.ms (но не более 30 минут)

Альтернатива: Offload тяжелой работы в асинхронные задачи (Celery, RQ), а consumer только кладет задачи в очередь.


Лабораторная работа: Resilient Consumer

Создайте production-ready consumer с обработкой ошибок и at-least-once семантикой.

Задание

  1. Откройте JupyterLab: http://localhost:8888
  2. Создайте новый notebook
  3. Реализуйте consumer с:
    • At-least-once семантикой (enable.auto.offset.store=False)
    • Обработкой fatal/non-fatal ошибок
    • Graceful shutdown (KeyboardInterrupt)
  4. Протестируйте устойчивость:
    • Запустите consumer
    • Во время работы остановите Kafka: docker compose stop kafka
    • Наблюдайте non-fatal ошибки в логах
    • Запустите Kafka: docker compose start kafka
    • Убедитесь, что consumer восстановился автоматически

Шаблон решения

from confluent_kafka import Consumer, KafkaException, KafkaError
import json

# ==============================================================================
# Конфигурация: At-Least-Once с manual offset store
# ==============================================================================
config = {
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'resilient-consumer-lab',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True,
    'enable.auto.offset.store': False,  # Manual offset store
    'max.poll.records': 50              # Ограничение batch size
}

consumer = Consumer(config)
consumer.subscribe(['dbserver1.public.orders'])

print("=" * 70)
print("Resilient Consumer запущен")
print("Топик: dbserver1.public.orders")
print("Семантика: At-Least-Once")
print("=" * 70)
print("\nПопробуйте остановить Kafka во время работы:")
print("  docker compose stop kafka")
print("Затем запустите обратно:")
print("  docker compose start kafka")
print("\nНаблюдайте за поведением consumer при сбоях\n")

processed_count = 0

try:
    while True:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue

        # ==============================================================================
        # Обработка ошибок
        # ==============================================================================
        if msg.error():
            error_code = msg.error().code()

            # Non-fatal: конец раздела
            if error_code == KafkaError._PARTITION_EOF:
                continue

            # Fatal errors
            if msg.error().fatal():
                print(f"\n❌ FATAL ERROR: {msg.error()}")
                print("Consumer останавливается из-за критической ошибки")
                raise KafkaException(msg.error())

            # Non-fatal errors (сетевые проблемы и т.д.)
            print(f"⚠️  Non-fatal error: {msg.error()}")
            continue

        # ==============================================================================
        # Обработка сообщения
        # ==============================================================================
        try:
            event = json.loads(msg.value().decode('utf-8'))
            payload = event.get('payload', {})
            op = payload.get('op', '?')

            # Имитация обработки
            # В реальном приложении здесь была бы запись в БД, вызов API и т.д.
            processed_count += 1
            print(f"[{processed_count}] Обработано: op={op}")

            # ВАЖНО: offset сохраняется ТОЛЬКО после успешной обработки
            consumer.store_offsets(msg)

        except Exception as e:
            # Обработка не удалась - offset НЕ сохраняется
            # При перезапуске consumer повторит обработку этого сообщения
            print(f"❌ Processing error: {e}")
            continue

except KeyboardInterrupt:
    print("\n\n🛑 Получен сигнал остановки")
finally:
    print(f"Всего обработано: {processed_count} сообщений")
    print("Закрытие consumer...")
    consumer.close()
    print("✅ Consumer закрыт корректно")

Проверка работы

Шаг 1: Запустите consumer в JupyterLab

Шаг 2: В отдельном терминале сгенерируйте события

cd labs/
docker compose exec postgres psql -U postgres -d inventory -c "
INSERT INTO orders (customer_id, product_id, quantity)
SELECT 1, 1, 1 FROM generate_series(1, 20);
"

Шаг 3: Остановите Kafka во время работы

docker compose stop kafka

Наблюдайте сообщения об ошибках в consumer. Consumer не крашится, а продолжает retry.

Шаг 4: Запустите Kafka обратно

docker compose start kafka

Consumer автоматически восстановится и продолжит обработку с последнего сохраненного offset.


Что мы узнали

  1. At-Least-Once семантика: enable.auto.offset.store=False + consumer.store_offsets() после обработки
  2. Exactly-Once семантика: Transactional API с isolation.level=read_committed
  3. Fatal vs Non-Fatal ошибки: Используйте msg.error().fatal() для различения
  4. Rebalancing: Управляйте через max.poll.interval.ms и max.poll.records
  5. Production-ready: Resilient consumer автоматически восстанавливается при сбоях

Что дальше?

В следующем уроке мы интегрируем CDC события с Pandas DataFrame для batch анализа и трансформаций.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какой параметр конфигурации confluent-kafka consumer является ключевым для реализации at-least-once семантики?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс