Управление Offset
Offset — это порядковый номер сообщения в партиции, который определяет, что consumer уже прочитал. Kafka хранит закоммиченные offsets в служебном топике __consumer_offsets. От правильной стратегии управления offset зависит, получат ли потребители сообщения ровно один раз, хотя бы один раз или не более одного раза.
auto.offset.reset: старт без закоммиченного offset
Когда consumer впервые присоединяется к группе или создаётся новая consumer group для существующего топика, в __consumer_offsets нет записей для этой группы. Параметр auto_offset_reset определяет, откуда начать чтение.
earliest
auto_offset_reset='earliest': consumer читает с самого начала партиции — с offset 0 (или с первого доступного offset, если часть данных удалена retention-политикой). Используется когда важно обработать всю историю топика.latest
auto_offset_reset='latest': consumer читает только новые сообщения, записанные после его старта. Пропускает всю существующую историю. Используется для real-time обработки где исторические данные не нужны.none → exception (нет committed offset)
none: если нет закоммиченного offset — выбрасывает исключение. Используется в production для обнаружения ошибок конфигурации — consumer не должен стартовать без явного указания начального offset.earliest — безопасный выбор при разработке: вы всегда увидите все данные, записанные в топик. latest — для production real-time сервисов, где историческая обработка нежелательна. Не путайте auto_offset_reset с управлением offset при повторном запуске — он применяется только когда нет закоммиченного offset для данной consumer group.
enable.auto.commit: автоматическая фиксация offset
По умолчанию Kafka автоматически коммитит offset каждые auto.commit.interval.ms миллисекунд (5000 мс по умолчанию). Это удобно, но несёт риск потери сообщений.
poll() → msg1000…msg1099
Шаг 1: Consumer получает сообщения msg1000-msg1099 через poll(). enable_auto_commit=True.auto-commit → offset=1100
Шаг 2: Auto-commit интервал истекает. Kafka автоматически коммитит offset=1100 в __consumer_offsets. Это происходит внутри следующего вызова poll().Обработано: msg1000…msg1050
Шаг 3: Consumer обрабатывает msg1000-msg1050, затем падает (crash).Рестарт → читает с offset=1100
Шаг 4: После рестарта consumer читает с закоммиченного offset=1100. Сообщения msg1051-msg1099 пропущены навсегда — они были закоммичены, но не обработаны.ПОТЕРЯ: msg1051-msg1099 пропущены
Результат: msg1051-msg1099 потеряны. Offset был закоммичен авто-коммитом до завершения их обработки. Это at-most-once семантика — каждое сообщение обрабатывается не более одного раза, но возможна потеря.# Авто-коммит (по умолчанию):
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
group_id='my-group',
enable_auto_commit=True, # по умолчанию True
auto_commit_interval_ms=5000, # каждые 5 секунд
)
Ручной commit: commitSync и commitAsync
Для контроля над семантикой доставки используется ручной commit с enable_auto_commit=False.
- records = consumer.poll(timeout_ms=1000)
- Обработать каждое сообщение
- consumer.commit() # commitSync
- Следующий poll() с нового offset
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
group_id='analytics',
auto_offset_reset='earliest',
enable_auto_commit=False, # отключаем авто-коммит
)
consumer.subscribe(['orders'])
while True:
records = consumer.poll(timeout_ms=1000)
for tp, messages in records.items():
for msg in messages:
# Обработка сообщения
process_order(msg.value)
# Коммитим только после успешной обработки всего батча
consumer.commit() # commitSync — блокирует
commitSync vs commitAsync:
| Метод | Блокировка | Надёжность | Latency |
|---|---|---|---|
consumer.commit() (sync) | Да — ждёт ACK от брокера | Высокая — retry при ошибке | Выше |
commitAsync() | Нет — неблокирующий | Ниже — нет авто-retry | Ниже |
В kafka_sim.py consumer.commit() — это синхронный commit. В реальном kafka-python commitAsync(callback) принимает callback, вызываемый после подтверждения. Для максимальной надёжности в production используйте commitSync в блоке finally, чтобы commit выполнился даже при исключении.
At-least-once vs At-most-once
Выбор стратегии commit определяет гарантии доставки:
At-most-once
At-most-once: коммитим offset ДО обработки. Сообщение никогда не обрабатывается повторно. При падении между commit и обработкой — сообщение теряется. Подходит для метрик, где потеря не критична.commit() → process()
Паттерн: commit() → process(). Если process() падает — сообщение уже закоммичено и потеряно. Авто-коммит ведёт себя аналогично.At-least-once
At-least-once: коммитим offset ПОСЛЕ обработки. Сообщение всегда будет обработано минимум один раз. При падении до commit — сообщение будет перечитано и обработано повторно (дубликат). Для дедупликации нужна идемпотентная обработка.process() → commit()
Паттерн: process() → commit(). Если consumer падает между process() и commit() — сообщение обрабатывается снова после рестарта. Требует идемпотентности бизнес-логики (проверка на дубликаты).Exactly-once семантика в Kafka достигается через транзакции на стороне продюсера (enable.idempotence + transactional.id) и consume-transform-produce паттерн в Kafka Streams. Чистый consumer с ручным commit обеспечивает at-least-once при правильной реализации.
Seek: сброс offset для replay
consumer.seek(partition, offset) позволяет переместить позицию чтения в произвольное место партиции. Это мощный инструмент для повторной обработки событий.
from kafka import TopicPartition
tp = TopicPartition('events', 0)
consumer.assign([tp])
# Перемотать на начало партиции
consumer.seek(tp, 0)
# Читать 100 сообщений, начиная с offset 500
consumer.seek(tp, 500)
records = consumer.poll(timeout_ms=1000, max_records=100)
Seek необходим при реализации event sourcing replay, исправлении данных после обнаружения ошибки в обработчике, или для точного воспроизведения конкретного инцидента. Seek работает только после assign() — нельзя использовать с subscribe() до получения назначения партиций (первый poll() после subscribe инициирует назначение).
Итог
Управление offset — это компромисс между надёжностью и производительностью. auto_offset_reset определяет стартовую точку для новой группы. enable_auto_commit удобен, но создаёт риск потери при crash. Ручной commit (enable_auto_commit=False) + commit() после обработки — стандарт для at-least-once гарантий. seek() даёт полный контроль для replay-сценариев.