Learning Platform
Глоссарий Troubleshooting
Урок 04.01 · 20 мин
Средний
KafkaConsumersubscribepollConsumerRecordassign

Consumer API

KafkaConsumer — это основной интерфейс для чтения сообщений из Kafka. В отличие от push-модели (как в RabbitMQ, где брокер доставляет сообщения), Kafka использует pull-модель: consumer сам запрашивает данные от брокера, контролируя темп чтения. Это решение оказалось ключевым для масштабируемости — брокер не отслеживает скорость каждого потребителя.


KafkaConsumer: конфигурация и создание

Конструктор KafkaConsumer принимает несколько ключевых параметров, определяющих поведение при чтении.

Параметры KafkaConsumer

bootstrap_servers

bootstrap_servers: список брокеров для начального подключения. Consumer получает метаданные кластера от одного из них и далее работает напрямую с лидерами партиций.

group_id

group_id: идентификатор consumer group. Все consumer с одним group_id совместно читают топик — каждая партиция назначается ровно одному consumer группы. Без group_id consumer работает независимо.

auto_offset_reset

auto_offset_reset: поведение при отсутствии закоммиченного offset. 'earliest' — читать с начала партиции (offset 0). 'latest' — читать только новые сообщения, написанные после старта consumer.

enable_auto_commit

enable_auto_commit: автоматически фиксировать offset каждые auto.commit.interval.ms (по умолчанию 5000 мс). Удобно, но рискованно — committed offset может опередить реально обработанные сообщения.

max_poll_records

max_poll_records: максимальное количество записей, возвращаемых за один вызов poll(). Влияет на latency и throughput. Рекомендуется 100-500 для большинства сценариев.

session_timeout_ms

session_timeout_ms: если брокер не получает heartbeat от consumer за это время, считает его мёртвым и инициирует ребалансировку. По умолчанию 45000 мс в Kafka 4.0.
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    group_id='analytics',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    # group.protocol=consumer для KIP-848 (Kafka 4.0+)
)
NOTE

В Kafka 4.0 для использования нового протокола ребалансировки KIP-848 добавьте group_protocol='consumer' (или group.protocol=consumer в properties-файле). В Kafka 4.2 это поведение включено по умолчанию.


subscribe vs assign: два режима подписки

Consumer может получать сообщения одним из двух способов. Выбор определяет, кто управляет назначением партиций.

subscribe vs assign

subscribe(topics=[…])

subscribe(topics): group-managed режим. Consumer регистрируется в consumer group. Брокер (group coordinator) назначает партиции автоматически через протокол ребалансировки. При добавлении нового consumer партиции перераспределяются.
group coordinator

Автоматическое назначение партиций

Автоматическое назначение: 6 партиций на 3 consumer = по 2 партиции каждому. При падении одного — его партиции перераспределяются среди оставшихся.

assign(partitions=[…])

assign(partitions): manual режим. Consumer явно указывает, какие TopicPartition читать. Нет участия в consumer group — offset всё ещё отслеживается, но ребалансировка не происходит. Используется для точного контроля: Kafka Streams, специализированные consumer.
прямое назначение

Фиксированные партиции

Ручное управление: consumer читает только те партиции, которые указаны явно через TopicPartition(topic, partition). Нет ребалансировки — consumer не взаимодействует с group coordinator.
from kafka import KafkaConsumer, TopicPartition

# Режим 1: subscribe — group-managed (рекомендуется)
consumer.subscribe(topics=['events', 'orders'])

# Режим 2: assign — ручное назначение конкретных партиций
consumer.assign(partitions=[
    TopicPartition('events', 0),
    TopicPartition('events', 1),
])

Когда использовать assign: при реализации специализированного consumer (Kafka Streams, consumer, который должен всегда читать конкретную партицию) или в тестовом коде. В большинстве production-сценариев используется subscribe.


poll() loop: сердце consumer

Метод poll() — это единственный способ получить сообщения от Kafka. Он блокирует вызывающий поток на время до timeout_ms миллисекунд, ожидая данных.

Poll Loop — жизненный цикл consumer
  1. Создать KafkaConsumer
KafkaConsumer создаётся с конфигурацией. Соединение с брокером устанавливается лениво — при первом вызове poll() или subscribe().
  1. subscribe(topics)
subscribe(topics) регистрирует consumer в group coordinator. Первый poll() инициирует JoinGroup (legacy) или ConsumerGroupHeartbeat (KIP-848) запрос и получает назначение партиций.
  1. records = consumer.poll(timeout_ms=1000)
poll(timeout_ms, max_records): отправляет FetchRequest брокерам-лидерам всех назначенных партиций. Ждёт ответа до timeout_ms мс. Возвращает dict: {TopicPartition: [ConsumerRecord, ...]}. Пустой dict если нет новых сообщений.
  1. Обработать records
Обработка: итерация по records.items(). Каждая запись — ConsumerRecord с полями topic, partition, offset, key, value, timestamp.
  1. Commit offset
Commit offset: после обработки фиксируем прогресс. Auto commit коммитит автоматически каждые 5 сек. Manual commit — явный вызов consumer.commit().
повторить
  1. Повторить с шага 3
Цикл повторяется бесконечно. Единственное условие завершения — явный break или KeyboardInterrupt. После завершения цикла — close() для корректного выхода из consumer group.
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    group_id='analytics',
    auto_offset_reset='earliest',
)
consumer.subscribe(['events'])

try:
    while True:
        # poll() возвращает dict {TopicPartition: [ConsumerRecord]}
        records = consumer.poll(timeout_ms=1000, max_records=100)

        for tp, messages in records.items():
            for msg in messages:
                # Обработка каждого сообщения
                print(f'Topic: {msg.topic}, Partition: {msg.partition}, '
                      f'Offset: {msg.offset}, Value: {msg.value}')

        # Offset коммитится автоматически (enable_auto_commit=True)
finally:
    consumer.close()
WARNING

Вызывать poll() нужно регулярно — не реже чем раз в max.poll.interval.ms (по умолчанию 5 минут в Kafka 4.0). Если consumer не вызывает poll() дольше этого интервала, группа считает его мёртвым и инициирует ребалансировку. Длительная обработка одного батча сообщений — классическая причина неожиданных ребалансировок.


ConsumerRecord: поля сообщения

Каждое сообщение возвращается как объект ConsumerRecord со следующими полями:

ConsumerRecord — поля
topicИмя топика, из которого прочитано сообщение. Строка. Используется при подписке на несколько топиков для роутинга обработки.
partitionНомер партиции (целое число, начиная с 0). Вместе с offset однозначно идентифицирует позицию сообщения в топике.
offsetПорядковый номер сообщения внутри партиции. Монотонно возрастает. Используется для commit: после обработки offset N коммитируем N+1 (следующий для чтения).
keyКлюч сообщения (bytes или None). Используется для партиционирования: сообщения с одинаковым ключом гарантированно попадают в одну партицию. None — round-robin распределение.
valueТело сообщения (bytes или None). Основной payload. Требует десериализации на стороне consumer — JSON, Avro, Protobuf или custom формат.
timestampUnix timestamp в миллисекундах. Может быть CreateTime (время записи продюсером) или LogAppendTime (время записи брокером), в зависимости от message.timestamp.type топика.
for tp, messages in records.items():
    for record in messages:
        print(f"Топик: {record.topic}")
        print(f"Партиция: {record.partition}, Offset: {record.offset}")
        print(f"Ключ: {record.key}")
        print(f"Значение: {record.value}")
        print(f"Timestamp: {record.timestamp}")

Жизненный цикл consumer

Правильная инициализация и завершение работы consumer — критически важны для корректного управления группой:

  1. СоздатьKafkaConsumer(bootstrap_servers=..., group_id=...)
  2. Подписатьсяsubscribe(topics) или assign(partitions)
  3. Poll в цикле — основная работа, регулярно вызывать poll()
  4. Обработать — бизнес-логика для каждого ConsumerRecord
  5. Commit — зафиксировать прогресс (авто или ручной)
  6. Closeconsumer.close() для корректного выхода из consumer group и освобождения ресурсов
TIP

consumer.close() важен. Он отправляет LeaveGroup запрос брокеру, что позволяет немедленно инициировать ребалансировку и перераспределить партиции оставшимся consumer. Без close() (например, при kill -9) ребалансировка произойдёт только после истечения session.timeout.ms.


Итог

Consumer API строится вокруг простой модели: создать → подписаться → poll в цикле → обработать → закрыть. poll() — единственный способ получить данные. subscribe() делегирует назначение партиций группе, assign() даёт полный контроль. ConsumerRecord содержит всю метаинформацию, необходимую для обработки и управления offset.

Проверка знанийKnowledge check
В чём принципиальное отличие consumer.subscribe(topics) от consumer.assign(partitions)?
ОтветAnswer
subscribe(topics) регистрирует consumer в consumer group — group coordinator автоматически назначает партиции и перераспределяет их при ребалансировке. assign(partitions) даёт consumer прямое управление конкретными TopicPartition без участия в группе и без ребалансировки. subscribe используется в большинстве production-сценариев; assign — для специализированных случаев (Kafka Streams, точное управление партициями).

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Consumer вызывает consumer.subscribe(['orders']). Через некоторое время группа перебалансируется и consumer получает Partition 2 и Partition 3. Что произойдёт, если этот же consumer добавит ещё один топик через subscribe(['orders', 'payments'])?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7