Producer API
Продюсер — это первое звено в цепочке Kafka. Любое приложение, которое записывает данные в Kafka, использует Producer API. Понимание внутренней механики продюсера критически важно: неправильная конфигурация ведёт к потере сообщений, дублированию или неожиданным задержкам.
Этот урок охватывает весь путь сообщения — от вызова send() в коде до записи на диск брокера.
KafkaProducer — конструктор и конфигурация
KafkaProducer — основной класс, инкапсулирующий всю логику продюсера: сериализацию, партиционирование, батчинг и сетевой ввод-вывод.
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['broker1:9092', 'broker2:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all',
retries=3,
linger_ms=10,
batch_size=16384,
)
Ключевые параметры конструктора:
bootstrap_servers — список адресов брокеров для первоначального подключения. Продюсер получает от них метаданные о кластере (все брокеры, топики, партиции, leader-партиции) и после этого подключается напрямую к нужным брокерам. Достаточно указать 2-3 брокера для отказоустойчивости — полный список не нужен.
value_serializer — функция, преобразующая объект Python в bytes. Kafka хранит только байты — нет встроенного понятия “строка” или “JSON”. Здесь: json.dumps(v).encode('utf-8') сериализует словарь в UTF-8 JSON.
key_serializer — функция для сериализации ключа. Ключ используется для партиционирования: все записи с одинаковым ключом попадают в одну и ту же партицию, что гарантирует порядок внутри ключа.
bootstrap_servers используются только для получения метаданных при первом подключении. После этого продюсер общается напрямую с leader-брокерами каждой партиции. Если один из bootstrap-брокеров недоступен, продюсер попробует следующий из списка.
Метод send() — отправка сообщений
send() — основной метод для публикации сообщения в топик. Вызов не блокирует поток — сообщение помещается во внутренний буфер (Record Accumulator) и возвращается Future-объект.
# Минимальный вызов — только топик и значение
future = producer.send('orders', value={'order_id': 'ord-123', 'amount': 500})
# С ключом — гарантирует попадание в одну и ту же партицию
future = producer.send('orders',
key='customer-42',
value={'order_id': 'ord-456', 'amount': 750})
# С явным указанием партиции
future = producer.send('orders',
value={'order_id': 'ord-789'},
partition=2)
Параметры send():
| Параметр | Тип | Описание |
|---|---|---|
topic | str | Имя топика. Обязательный параметр. |
value | bytes | Значение записи. None допустим (tombstone-запись для log compaction). |
key | bytes | Ключ для партиционирования. None = round-robin по партициям. |
partition | int | Явный номер партиции. Переопределяет ключевое партиционирование. |
Путь сообщения от send() до брокера
Вызов send()
Вызов producer.send(topic, value, key) — асинхронный. Метод помещает запись в внутренний буфер и немедленно возвращает FutureRecordMetadata. Поток приложения не блокируется.Serializer
Сериализатор вызывает value_serializer(value) и key_serializer(key). Результат — байты. Если сериализатор выбрасывает исключение, send() бросает SerializationError синхронно, до помещения в буфер.Partitioner
Partitioner определяет номер партиции: если ключ задан — murmur2-хеш от ключа, взятый по модулю числа партиций. Если ключ None — sticky partitioner (Kafka 2.4+): заполняет один batch до лимита, затем переключается. До Kafka 2.4 — round-robin.Record Accumulator
Record Accumulator — основной буфер продюсера. Содержит deque батчей для каждой TopicPartition. Новые записи добавляются в последний (tail) батч. Когда батч заполнен (batch.size байт) или истёк linger.ms — батч помечается как готовый к отправке.Sender Thread
Sender Thread (фоновый I/O-поток) — работает в фоне всё время жизни продюсера. Забирает готовые батчи из Accumulator, группирует по broker (каждый broker получает один ProduceRequest с записями для всех партиций, которые он ведёт как leader), отправляет по сети.Broker Leader
Broker получает ProduceRequest, записывает в partition log (append-only на диске), отвечает ProduceResponse с offset каждой записи. Продюсер разрешает Future с RecordMetadata или отклоняет с ошибкой.Синхронная vs асинхронная отправка
Асинхронная (рекомендуется для высокого throughput)
futures = []
for order in orders:
future = producer.send('orders', value=order)
futures.append(future)
# Дождаться отправки всех батчей
producer.flush()
# Проверить результаты
for future in futures:
record_metadata = future.get(timeout=10)
print(f"Отправлено в partition={record_metadata.partition}, "
f"offset={record_metadata.offset}")
Синхронная — через future.get()
future.get() блокирует вызывающий поток до получения ответа от брокера. Это гарантирует подтверждение каждого сообщения, но драматически снижает пропускную способность:
for order in orders:
future = producer.send('orders', value=order)
record_metadata = future.get(timeout=10) # блокирует поток!
print(f"Offset: {record_metadata.offset}")
Это антипаттерн для production-нагрузок. Каждый get() ждёт сетевого round-trip (обычно 1-10 мс). При 1000 сообщений в секунду это 1-10 секунд ожидания вместо единиц миллисекунд при асинхронном батчинге.
Callback-подход — асинхронный с обработкой ошибок
def on_send_success(record_metadata):
print(f"Topic: {record_metadata.topic}, "
f"Partition: {record_metadata.partition}, "
f"Offset: {record_metadata.offset}")
def on_send_error(exception):
print(f"Ошибка отправки: {exception}")
producer.send('orders', value={'order_id': 'ord-001'}) \
.add_callback(on_send_success) \
.add_errback(on_send_error)
Колбэки вызываются из Sender Thread, не из потока приложения. Нельзя вызывать блокирующие операции из колбэка (например, другой producer.send() с блокировкой) — это вызовет дедлок. Используйте колбэки только для логирования и метрик.
Логика выбора партиции
Kafka определяет партицию по следующим правилам (в порядке приоритета):
- Явная партиция (
partition=N) — запись идёт строго в партицию N. - Ключ задан — номер партиции =
murmur2(key) % num_partitions. Детерминированно: один ключ всегда попадает в одну партицию. - Ключ не задан — sticky partitioner (Kafka 2.4+): продюсер «прилипает» к одной партиции, заполняя батч, затем переключается на следующую.
# Демонстрация детерминированного партиционирования
producer.send('orders', key='customer-42', value={'amount': 100})
producer.send('orders', key='customer-42', value={'amount': 200})
# Оба сообщения попадут в одну и ту же партицию — порядок гарантирован
Если топик имеет N партиций и ключ фиксирован, то при увеличении числа партиций (например, с 6 до 12) старые ключи попадут в другие партиции. Это нарушает порядок для исторических данных. Увеличивать число партиций в production надо с осторожностью.
Жизненный цикл продюсера
# 1. Создание
producer = KafkaProducer(bootstrap_servers=['broker:9092'])
# 2. Отправка сообщений (можно многократно)
for event in event_stream:
producer.send('events', value=event)
# 3. flush() — гарантирует отправку всех буферизованных батчей
# Блокирует до получения ответов от брокеров или timeout
producer.flush(timeout=30)
# 4. close() — flush + освобождение ресурсов (сетевые соединения, потоки)
producer.close(timeout=30)
Всегда вызывайте producer.close() при завершении работы. Без него буферизованные сообщения, не отправленные ещё из Record Accumulator, будут потеряны. Используйте try/finally или контекстный менеджер для гарантированного закрытия.
# Паттерн с гарантированным закрытием
producer = KafkaProducer(bootstrap_servers=['broker:9092'])
try:
for order in orders:
producer.send('orders', value=order)
producer.flush()
finally:
producer.close()
Ключевые выводы
send()— асинхронный: помещает запись в буфер и возвращаетFuture. Поток приложения не блокируется.- Sender Thread — фоновый поток, отправляющий батчи брокерам. Работает параллельно с кодом приложения.
- Партиционирование: ключ определяет партицию детерминированно через murmur2-хеш. Без ключа — sticky partitioner.
flush()обязателен перед завершением — гарантирует отправку буферизованных сообщений.close()всегда вfinally— предотвращает потерю данных и утечку ресурсов.