Learning Platform
Глоссарий Troubleshooting
Урок 04.02 · 20 мин
Средний
Offsetauto.offset.resetenable.auto.commitcommitSynccommitAsync

Управление Offset

Offset — это порядковый номер сообщения в партиции, который определяет, что consumer уже прочитал. Kafka хранит закоммиченные offsets в служебном топике __consumer_offsets. От правильной стратегии управления offset зависит, получат ли потребители сообщения ровно один раз, хотя бы один раз или не более одного раза.


auto.offset.reset: старт без закоммиченного offset

Когда consumer впервые присоединяется к группе или создаётся новая consumer group для существующего топика, в __consumer_offsets нет записей для этой группы. Параметр auto_offset_reset определяет, откуда начать чтение.

auto.offset.reset — поведение при старте

earliest

auto_offset_reset='earliest': consumer читает с самого начала партиции — с offset 0 (или с первого доступного offset, если часть данных удалена retention-политикой). Используется когда важно обработать всю историю топика.
начинает с offset 0
[msg0] → [msg1] → ... → [msgN]Партиция: [msg0, msg1, msg2, ..., msg999, msg1000 (new)]. earliest → начинает с msg0. Обработает всю историю топика до текущего момента.

latest

auto_offset_reset='latest': consumer читает только новые сообщения, записанные после его старта. Пропускает всю существующую историю. Используется для real-time обработки где исторические данные не нужны.
начинает с конца
Пропускает историю → только новыеПартиция: [msg0...msg999 (ignored)] → [msg1000 (first read)]. latest → ждёт только новые сообщения. Историческая часть игнорируется.

none → exception (нет committed offset)

none: если нет закоммиченного offset — выбрасывает исключение. Используется в production для обнаружения ошибок конфигурации — consumer не должен стартовать без явного указания начального offset.
TIP

earliest — безопасный выбор при разработке: вы всегда увидите все данные, записанные в топик. latest — для production real-time сервисов, где историческая обработка нежелательна. Не путайте auto_offset_reset с управлением offset при повторном запуске — он применяется только когда нет закоммиченного offset для данной consumer group.


enable.auto.commit: автоматическая фиксация offset

По умолчанию Kafka автоматически коммитит offset каждые auto.commit.interval.ms миллисекунд (5000 мс по умолчанию). Это удобно, но несёт риск потери сообщений.

Риск авто-коммита

poll() → msg1000…msg1099

Шаг 1: Consumer получает сообщения msg1000-msg1099 через poll(). enable_auto_commit=True.

auto-commit → offset=1100

Шаг 2: Auto-commit интервал истекает. Kafka автоматически коммитит offset=1100 в __consumer_offsets. Это происходит внутри следующего вызова poll().
consumer падает

Обработано: msg1000…msg1050

Шаг 3: Consumer обрабатывает msg1000-msg1050, затем падает (crash).

Рестарт → читает с offset=1100

Шаг 4: После рестарта consumer читает с закоммиченного offset=1100. Сообщения msg1051-msg1099 пропущены навсегда — они были закоммичены, но не обработаны.

ПОТЕРЯ: msg1051-msg1099 пропущены

Результат: msg1051-msg1099 потеряны. Offset был закоммичен авто-коммитом до завершения их обработки. Это at-most-once семантика — каждое сообщение обрабатывается не более одного раза, но возможна потеря.
# Авто-коммит (по умолчанию):
consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',
    enable_auto_commit=True,          # по умолчанию True
    auto_commit_interval_ms=5000,     # каждые 5 секунд
)

Ручной commit: commitSync и commitAsync

Для контроля над семантикой доставки используется ручной commit с enable_auto_commit=False.

Ручной commit — последовательность
  1. records = consumer.poll(timeout_ms=1000)
poll(): получаем батч сообщений. enable_auto_commit=False — Kafka не коммитит автоматически.
  1. Обработать каждое сообщение
Полная обработка: бизнес-логика, запись в БД, вызов API. Только после успешной обработки — commit. Если здесь падение, сообщения будут прочитаны снова после рестарта.
  1. consumer.commit() # commitSync
consumer.commit(): синхронный commit. Блокирует до получения подтверждения от брокера. Гарантирует, что offset зафиксирован перед следующим poll(). Медленнее, но надёжнее.
  1. Следующий poll() с нового offset
Следующий poll() начнёт чтение с нового offset. При падении между шагом 2 и 3 — сообщения будут повторно доставлены. Это at-least-once семантика.
consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    group_id='analytics',
    auto_offset_reset='earliest',
    enable_auto_commit=False,  # отключаем авто-коммит
)
consumer.subscribe(['orders'])

while True:
    records = consumer.poll(timeout_ms=1000)
    for tp, messages in records.items():
        for msg in messages:
            # Обработка сообщения
            process_order(msg.value)

    # Коммитим только после успешной обработки всего батча
    consumer.commit()  # commitSync — блокирует

commitSync vs commitAsync:

МетодБлокировкаНадёжностьLatency
consumer.commit() (sync)Да — ждёт ACK от брокераВысокая — retry при ошибкеВыше
commitAsync()Нет — неблокирующийНиже — нет авто-retryНиже
NOTE

В kafka_sim.py consumer.commit() — это синхронный commit. В реальном kafka-python commitAsync(callback) принимает callback, вызываемый после подтверждения. Для максимальной надёжности в production используйте commitSync в блоке finally, чтобы commit выполнился даже при исключении.


At-least-once vs At-most-once

Выбор стратегии commit определяет гарантии доставки:

Семантики доставки

At-most-once

At-most-once: коммитим offset ДО обработки. Сообщение никогда не обрабатывается повторно. При падении между commit и обработкой — сообщение теряется. Подходит для метрик, где потеря не критична.

commit() → process()

Паттерн: commit() → process(). Если process() падает — сообщение уже закоммичено и потеряно. Авто-коммит ведёт себя аналогично.

At-least-once

At-least-once: коммитим offset ПОСЛЕ обработки. Сообщение всегда будет обработано минимум один раз. При падении до commit — сообщение будет перечитано и обработано повторно (дубликат). Для дедупликации нужна идемпотентная обработка.

process() → commit()

Паттерн: process() → commit(). Если consumer падает между process() и commit() — сообщение обрабатывается снова после рестарта. Требует идемпотентности бизнес-логики (проверка на дубликаты).

Exactly-once семантика в Kafka достигается через транзакции на стороне продюсера (enable.idempotence + transactional.id) и consume-transform-produce паттерн в Kafka Streams. Чистый consumer с ручным commit обеспечивает at-least-once при правильной реализации.


Seek: сброс offset для replay

consumer.seek(partition, offset) позволяет переместить позицию чтения в произвольное место партиции. Это мощный инструмент для повторной обработки событий.

from kafka import TopicPartition

tp = TopicPartition('events', 0)
consumer.assign([tp])

# Перемотать на начало партиции
consumer.seek(tp, 0)

# Читать 100 сообщений, начиная с offset 500
consumer.seek(tp, 500)
records = consumer.poll(timeout_ms=1000, max_records=100)
TIP

Seek необходим при реализации event sourcing replay, исправлении данных после обнаружения ошибки в обработчике, или для точного воспроизведения конкретного инцидента. Seek работает только после assign() — нельзя использовать с subscribe() до получения назначения партиций (первый poll() после subscribe инициирует назначение).


Итог

Управление offset — это компромисс между надёжностью и производительностью. auto_offset_reset определяет стартовую точку для новой группы. enable_auto_commit удобен, но создаёт риск потери при crash. Ручной commit (enable_auto_commit=False) + commit() после обработки — стандарт для at-least-once гарантий. seek() даёт полный контроль для replay-сценариев.

Проверка знанийKnowledge check
Consumer настроен с enable_auto_commit=True и auto_commit_interval_ms=5000. Consumer получает батч из 100 сообщений через poll(), начинает их обрабатывать — и через 2 секунды (ДО истечения 5-секундного интервала авто-коммита) падает после обработки первых 30 сообщений. Какие сообщения будут повторно обработаны после перезапуска?
ОтветAnswer
Все 100 сообщений из батча будут прочитаны снова. Поскольку consumer упал до истечения auto_commit_interval_ms (5 сек), авто-коммит не произошёл. Последний закоммиченный offset остался на значении, предшествующем этому батчу. После рестарта consumer прочитает все 100 сообщений заново. Сообщения 1-30 будут обработаны повторно (дубликаты). Это at-least-once поведение при авто-коммите — он не гарантирует защиту от повторной обработки при crash.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Consumer настроен с enable_auto_commit=False. После poll() consumer обработал все сообщения и вызвал consumer.commit(). Затем consumer пал. После рестарта consumer вызывает poll() снова. Какие сообщения он получит?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7