Learning Platform
Глоссарий Troubleshooting
Урок 03.01 · 20 мин
Средний
KafkaProducersend()ProducerRecordFutureCallback

Producer API

Продюсер — это первое звено в цепочке Kafka. Любое приложение, которое записывает данные в Kafka, использует Producer API. Понимание внутренней механики продюсера критически важно: неправильная конфигурация ведёт к потере сообщений, дублированию или неожиданным задержкам.

Этот урок охватывает весь путь сообщения — от вызова send() в коде до записи на диск брокера.


KafkaProducer — конструктор и конфигурация

KafkaProducer — основной класс, инкапсулирующий всю логику продюсера: сериализацию, партиционирование, батчинг и сетевой ввод-вывод.

from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['broker1:9092', 'broker2:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None,
    acks='all',
    retries=3,
    linger_ms=10,
    batch_size=16384,
)

Ключевые параметры конструктора:

bootstrap_servers — список адресов брокеров для первоначального подключения. Продюсер получает от них метаданные о кластере (все брокеры, топики, партиции, leader-партиции) и после этого подключается напрямую к нужным брокерам. Достаточно указать 2-3 брокера для отказоустойчивости — полный список не нужен.

value_serializer — функция, преобразующая объект Python в bytes. Kafka хранит только байты — нет встроенного понятия “строка” или “JSON”. Здесь: json.dumps(v).encode('utf-8') сериализует словарь в UTF-8 JSON.

key_serializer — функция для сериализации ключа. Ключ используется для партиционирования: все записи с одинаковым ключом попадают в одну и ту же партицию, что гарантирует порядок внутри ключа.

NOTE

bootstrap_servers используются только для получения метаданных при первом подключении. После этого продюсер общается напрямую с leader-брокерами каждой партиции. Если один из bootstrap-брокеров недоступен, продюсер попробует следующий из списка.


Метод send() — отправка сообщений

send() — основной метод для публикации сообщения в топик. Вызов не блокирует поток — сообщение помещается во внутренний буфер (Record Accumulator) и возвращается Future-объект.

# Минимальный вызов — только топик и значение
future = producer.send('orders', value={'order_id': 'ord-123', 'amount': 500})

# С ключом — гарантирует попадание в одну и ту же партицию
future = producer.send('orders',
    key='customer-42',
    value={'order_id': 'ord-456', 'amount': 750})

# С явным указанием партиции
future = producer.send('orders',
    value={'order_id': 'ord-789'},
    partition=2)

Параметры send():

ПараметрТипОписание
topicstrИмя топика. Обязательный параметр.
valuebytesЗначение записи. None допустим (tombstone-запись для log compaction).
keybytesКлюч для партиционирования. None = round-robin по партициям.
partitionintЯвный номер партиции. Переопределяет ключевое партиционирование.

Путь сообщения от send() до брокера

Путь сообщения через Producer

Вызов send()

Вызов producer.send(topic, value, key) — асинхронный. Метод помещает запись в внутренний буфер и немедленно возвращает FutureRecordMetadata. Поток приложения не блокируется.

Serializer

Сериализатор вызывает value_serializer(value) и key_serializer(key). Результат — байты. Если сериализатор выбрасывает исключение, send() бросает SerializationError синхронно, до помещения в буфер.

Partitioner

Partitioner определяет номер партиции: если ключ задан — murmur2-хеш от ключа, взятый по модулю числа партиций. Если ключ None — sticky partitioner (Kafka 2.4+): заполняет один batch до лимита, затем переключается. До Kafka 2.4 — round-robin.

Record Accumulator

Record Accumulator — основной буфер продюсера. Содержит deque батчей для каждой TopicPartition. Новые записи добавляются в последний (tail) батч. Когда батч заполнен (batch.size байт) или истёк linger.ms — батч помечается как готовый к отправке.

Sender Thread

Sender Thread (фоновый I/O-поток) — работает в фоне всё время жизни продюсера. Забирает готовые батчи из Accumulator, группирует по broker (каждый broker получает один ProduceRequest с записями для всех партиций, которые он ведёт как leader), отправляет по сети.

Broker Leader

Broker получает ProduceRequest, записывает в partition log (append-only на диске), отвечает ProduceResponse с offset каждой записи. Продюсер разрешает Future с RecordMetadata или отклоняет с ошибкой.

Синхронная vs асинхронная отправка

Асинхронная (рекомендуется для высокого throughput)

futures = []
for order in orders:
    future = producer.send('orders', value=order)
    futures.append(future)

# Дождаться отправки всех батчей
producer.flush()

# Проверить результаты
for future in futures:
    record_metadata = future.get(timeout=10)
    print(f"Отправлено в partition={record_metadata.partition}, "
          f"offset={record_metadata.offset}")

Синхронная — через future.get()

future.get() блокирует вызывающий поток до получения ответа от брокера. Это гарантирует подтверждение каждого сообщения, но драматически снижает пропускную способность:

for order in orders:
    future = producer.send('orders', value=order)
    record_metadata = future.get(timeout=10)  # блокирует поток!
    print(f"Offset: {record_metadata.offset}")

Это антипаттерн для production-нагрузок. Каждый get() ждёт сетевого round-trip (обычно 1-10 мс). При 1000 сообщений в секунду это 1-10 секунд ожидания вместо единиц миллисекунд при асинхронном батчинге.

Callback-подход — асинхронный с обработкой ошибок

def on_send_success(record_metadata):
    print(f"Topic: {record_metadata.topic}, "
          f"Partition: {record_metadata.partition}, "
          f"Offset: {record_metadata.offset}")

def on_send_error(exception):
    print(f"Ошибка отправки: {exception}")

producer.send('orders', value={'order_id': 'ord-001'}) \
    .add_callback(on_send_success) \
    .add_errback(on_send_error)
WARNING

Колбэки вызываются из Sender Thread, не из потока приложения. Нельзя вызывать блокирующие операции из колбэка (например, другой producer.send() с блокировкой) — это вызовет дедлок. Используйте колбэки только для логирования и метрик.


Логика выбора партиции

Kafka определяет партицию по следующим правилам (в порядке приоритета):

  1. Явная партиция (partition=N) — запись идёт строго в партицию N.
  2. Ключ задан — номер партиции = murmur2(key) % num_partitions. Детерминированно: один ключ всегда попадает в одну партицию.
  3. Ключ не задан — sticky partitioner (Kafka 2.4+): продюсер «прилипает» к одной партиции, заполняя батч, затем переключается на следующую.
# Демонстрация детерминированного партиционирования
producer.send('orders', key='customer-42', value={'amount': 100})
producer.send('orders', key='customer-42', value={'amount': 200})
# Оба сообщения попадут в одну и ту же партицию — порядок гарантирован
NOTE

Если топик имеет N партиций и ключ фиксирован, то при увеличении числа партиций (например, с 6 до 12) старые ключи попадут в другие партиции. Это нарушает порядок для исторических данных. Увеличивать число партиций в production надо с осторожностью.


Жизненный цикл продюсера

# 1. Создание
producer = KafkaProducer(bootstrap_servers=['broker:9092'])

# 2. Отправка сообщений (можно многократно)
for event in event_stream:
    producer.send('events', value=event)

# 3. flush() — гарантирует отправку всех буферизованных батчей
# Блокирует до получения ответов от брокеров или timeout
producer.flush(timeout=30)

# 4. close() — flush + освобождение ресурсов (сетевые соединения, потоки)
producer.close(timeout=30)
WARNING

Всегда вызывайте producer.close() при завершении работы. Без него буферизованные сообщения, не отправленные ещё из Record Accumulator, будут потеряны. Используйте try/finally или контекстный менеджер для гарантированного закрытия.

# Паттерн с гарантированным закрытием
producer = KafkaProducer(bootstrap_servers=['broker:9092'])
try:
    for order in orders:
        producer.send('orders', value=order)
    producer.flush()
finally:
    producer.close()

Ключевые выводы

  1. send()асинхронный: помещает запись в буфер и возвращает Future. Поток приложения не блокируется.
  2. Sender Thread — фоновый поток, отправляющий батчи брокерам. Работает параллельно с кодом приложения.
  3. Партиционирование: ключ определяет партицию детерминированно через murmur2-хеш. Без ключа — sticky partitioner.
  4. flush() обязателен перед завершением — гарантирует отправку буферизованных сообщений.
  5. close() всегда в finally — предотвращает потерю данных и утечку ресурсов.
Проверка знанийKnowledge check
Программист вызывает producer.send() в цикле для 10 000 записей, но НЕ вызывает flush() и close(). Что произойдёт с записями, которые остались в буфере Record Accumulator?
ОтветAnswer
Записи в буфере будут потеряны. send() помещает записи в внутренний буфер Record Accumulator — они не отправляются мгновенно. Sender Thread отправляет батчи асинхронно. Без flush() есть гарантия, что какое-то количество записей ещё не ушло на брокер в момент завершения программы. Без close() ресурсы не освобождаются и буфер не очищается. Правильный паттерн: flush() для ожидания отправки всех буферизованных сообщений, close() для освобождения ресурсов. В production всегда используют try/finally для вызова close().
Structured logging в Python

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Что возвращает метод producer.send(topic, value=data) в kafka-python?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6