Consumer API
KafkaConsumer — это основной интерфейс для чтения сообщений из Kafka. В отличие от push-модели (как в RabbitMQ, где брокер доставляет сообщения), Kafka использует pull-модель: consumer сам запрашивает данные от брокера, контролируя темп чтения. Это решение оказалось ключевым для масштабируемости — брокер не отслеживает скорость каждого потребителя.
KafkaConsumer: конфигурация и создание
Конструктор KafkaConsumer принимает несколько ключевых параметров, определяющих поведение при чтении.
bootstrap_servers
bootstrap_servers: список брокеров для начального подключения. Consumer получает метаданные кластера от одного из них и далее работает напрямую с лидерами партиций.group_id
group_id: идентификатор consumer group. Все consumer с одним group_id совместно читают топик — каждая партиция назначается ровно одному consumer группы. Без group_id consumer работает независимо.auto_offset_reset
auto_offset_reset: поведение при отсутствии закоммиченного offset. 'earliest' — читать с начала партиции (offset 0). 'latest' — читать только новые сообщения, написанные после старта consumer.enable_auto_commit
enable_auto_commit: автоматически фиксировать offset каждые auto.commit.interval.ms (по умолчанию 5000 мс). Удобно, но рискованно — committed offset может опередить реально обработанные сообщения.max_poll_records
max_poll_records: максимальное количество записей, возвращаемых за один вызов poll(). Влияет на latency и throughput. Рекомендуется 100-500 для большинства сценариев.session_timeout_ms
session_timeout_ms: если брокер не получает heartbeat от consumer за это время, считает его мёртвым и инициирует ребалансировку. По умолчанию 45000 мс в Kafka 4.0.from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
group_id='analytics',
auto_offset_reset='earliest',
enable_auto_commit=True,
# group.protocol=consumer для KIP-848 (Kafka 4.0+)
)
В Kafka 4.0 для использования нового протокола ребалансировки KIP-848 добавьте group_protocol='consumer' (или group.protocol=consumer в properties-файле). В Kafka 4.2 это поведение включено по умолчанию.
subscribe vs assign: два режима подписки
Consumer может получать сообщения одним из двух способов. Выбор определяет, кто управляет назначением партиций.
subscribe(topics=[…])
subscribe(topics): group-managed режим. Consumer регистрируется в consumer group. Брокер (group coordinator) назначает партиции автоматически через протокол ребалансировки. При добавлении нового consumer партиции перераспределяются.Автоматическое назначение партиций
Автоматическое назначение: 6 партиций на 3 consumer = по 2 партиции каждому. При падении одного — его партиции перераспределяются среди оставшихся.assign(partitions=[…])
assign(partitions): manual режим. Consumer явно указывает, какие TopicPartition читать. Нет участия в consumer group — offset всё ещё отслеживается, но ребалансировка не происходит. Используется для точного контроля: Kafka Streams, специализированные consumer.Фиксированные партиции
Ручное управление: consumer читает только те партиции, которые указаны явно через TopicPartition(topic, partition). Нет ребалансировки — consumer не взаимодействует с group coordinator.from kafka import KafkaConsumer, TopicPartition
# Режим 1: subscribe — group-managed (рекомендуется)
consumer.subscribe(topics=['events', 'orders'])
# Режим 2: assign — ручное назначение конкретных партиций
consumer.assign(partitions=[
TopicPartition('events', 0),
TopicPartition('events', 1),
])
Когда использовать assign: при реализации специализированного consumer (Kafka Streams, consumer, который должен всегда читать конкретную партицию) или в тестовом коде. В большинстве production-сценариев используется subscribe.
poll() loop: сердце consumer
Метод poll() — это единственный способ получить сообщения от Kafka. Он блокирует вызывающий поток на время до timeout_ms миллисекунд, ожидая данных.
- Создать KafkaConsumer
- subscribe(topics)
- records = consumer.poll(timeout_ms=1000)
- Обработать records
- Commit offset
- Повторить с шага 3
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
group_id='analytics',
auto_offset_reset='earliest',
)
consumer.subscribe(['events'])
try:
while True:
# poll() возвращает dict {TopicPartition: [ConsumerRecord]}
records = consumer.poll(timeout_ms=1000, max_records=100)
for tp, messages in records.items():
for msg in messages:
# Обработка каждого сообщения
print(f'Topic: {msg.topic}, Partition: {msg.partition}, '
f'Offset: {msg.offset}, Value: {msg.value}')
# Offset коммитится автоматически (enable_auto_commit=True)
finally:
consumer.close()
Вызывать poll() нужно регулярно — не реже чем раз в max.poll.interval.ms (по умолчанию 5 минут в Kafka 4.0). Если consumer не вызывает poll() дольше этого интервала, группа считает его мёртвым и инициирует ребалансировку. Длительная обработка одного батча сообщений — классическая причина неожиданных ребалансировок.
ConsumerRecord: поля сообщения
Каждое сообщение возвращается как объект ConsumerRecord со следующими полями:
for tp, messages in records.items():
for record in messages:
print(f"Топик: {record.topic}")
print(f"Партиция: {record.partition}, Offset: {record.offset}")
print(f"Ключ: {record.key}")
print(f"Значение: {record.value}")
print(f"Timestamp: {record.timestamp}")
Жизненный цикл consumer
Правильная инициализация и завершение работы consumer — критически важны для корректного управления группой:
- Создать —
KafkaConsumer(bootstrap_servers=..., group_id=...) - Подписаться —
subscribe(topics)илиassign(partitions) - Poll в цикле — основная работа, регулярно вызывать
poll() - Обработать — бизнес-логика для каждого
ConsumerRecord - Commit — зафиксировать прогресс (авто или ручной)
- Close —
consumer.close()для корректного выхода из consumer group и освобождения ресурсов
consumer.close() важен. Он отправляет LeaveGroup запрос брокеру, что позволяет немедленно инициировать ребалансировку и перераспределить партиции оставшимся consumer. Без close() (например, при kill -9) ребалансировка произойдёт только после истечения session.timeout.ms.
Итог
Consumer API строится вокруг простой модели: создать → подписаться → poll в цикле → обработать → закрыть. poll() — единственный способ получить данные. subscribe() делегирует назначение партиций группе, assign() даёт полный контроль. ConsumerRecord содержит всю метаинформацию, необходимую для обработки и управления offset.