Итоги модуля: Producers
Модуль охватил полный путь от базового send() до транзакционных продюсеров. Этот урок — структурированное повторение для закрепления и подготовки к экзамену.
Ключевые концепции
Producer API
KafkaProducer — асинхронный по своей природе. send() помещает запись в Record Accumulator и немедленно возвращает FutureRecordMetadata. Sender Thread работает в фоне, группируя записи в батчи и отправляя их broker leader’ам.
producer = KafkaProducer(bootstrap_servers=['broker:9092'])
future = producer.send('topic', value=b'data', key=b'key')
producer.flush() # Дождаться отправки всех батчей
producer.close() # Освободить ресурсы
Батчинг и сжатие
Record Accumulator буферизует записи по TopicPartition. Батч отправляется при выполнении любого условия:
- Размер достиг
batch.size(по умолчанию 16 KB) - Прошло
linger.msмиллисекунд (по умолчанию 0)
Сжатие применяется на батч-уровне, хранится на брокере в сжатом виде, декомпрессируется на консьюмере.
Гарантии доставки (acks)
| acks | Подтверждение | Риск | Применение |
|---|---|---|---|
| 0 | Никакого | Потеря при любом сбое | Метрики, непринципиальные события |
| 1 | Leader | Потеря при падении leader до репликации | Большинство сценариев |
| all | Все ISR | Нет при min.isr=2 + RF=3 | Финансовые данные, критичные события |
Комбинация acks=all + min.insync.replicas=2 + replication.factor=3 — стандарт production для данных без права на потерю.
Идемпотентность
enable.idempotence=True (по умолчанию в Kafka 3.0+) — broker отслеживает (PID, partition, sequence_number) и отбрасывает повторные отправки. Гарантирует exactly-once внутри одной партиции.
Транзакции
Атомарная запись в несколько партиций. Консьюмер с read_committed видит либо все записи транзакции, либо ни одной.
Дерево решений: как выбрать конфигурацию
Начало: требования сценария
Начните с определения требований вашего сценария. Три ключевых вопроса определяют конфигурацию.ДА: Транзакции
ДА: нужны транзакции. Используйте transactional.id, init_transactions, begin_transaction/commit_transaction. Изоляция консьюмера: read_committed. Сценарии: финансовые переводы, Kafka Streams EOS, read-process-write паттерн.НЕТ: продолжить
НЕТ: транзакции не нужны. Переходим к вопросу о надёжности.ДА: acks=all + min.isr=2
ДА: acks=all, min.insync.replicas=2, replication.factor=3, enable.idempotence=True. Сценарии: заказы, платежи, финансовые события, audit log.НЕТ: продолжить
НЕТ: продолжить к вопросу о latency.LATENCY: acks=1, linger.ms=0
LATENCY: acks=1, linger.ms=0, compression_type=None. Сценарии: real-time уведомления, чаты, системы мониторинга с SLA менее 10 мс.THROUGHPUT: linger.ms=50, zstd
THROUGHPUT: acks=1, linger.ms=50+, batch.size=65536+, compression_type=zstd. Сценарии: логи приложений, аналитические события, ETL-пайплайны.Когда использовать каждую гарантию
acks=0
# Телеметрия IoT, высокочастотные метрики
metrics_producer = KafkaProducer(
bootstrap_servers=['broker:9092'],
acks=0,
linger_ms=5,
batch_size=131072,
compression_type='lz4',
)
Потеря 0.01-0.1% событий — приемлема. Максимальный throughput и минимальная latency — критичны.
acks=1
# Логи приложений, события активности пользователей
log_producer = KafkaProducer(
bootstrap_servers=['broker:9092'],
acks='1',
linger_ms=20,
batch_size=65536,
compression_type='zstd',
)
Потеря при падении leader сразу после записи — редкое, но допустимое событие.
acks=all + идемпотентность
# Заказы, платежи, критичные бизнес-события
orders_producer = KafkaProducer(
bootstrap_servers=['broker:9092'],
acks='all',
enable_idempotence=True, # По умолчанию в Kafka 3.0+
linger_ms=5,
compression_type='lz4',
)
acks=all + транзакции
# Атомарные операции между несколькими топиками
transfer_producer = KafkaProducer(
bootstrap_servers=['broker:9092'],
transactional_id='transfer-service-1',
)
transfer_producer.init_transactions()
Чек-лист для Production
acks=all+min.insync.replicas=2для критичных топиковenable.idempotence=True(включён по умолчанию с Kafka 3.0)compression_type='zstd'для throughput-оптимизацииlinger.ms=5-20для большинства сценариевbuffer.memoryмониторится через JMXbuffer-available-bytesdelivery.timeout.msнастроен с учётом максимально допустимого времени ожиданияproducer.close()всегда вfinallyдля корректного завершения
Ключевые выводы модуля
send()— асинхронный. Всегда вызывайтеflush()перед завершением.- Батчинг:
linger.ms=0→ latency,linger.ms=50+→ throughput.batch.size— верхний лимит. acks=allбезmin.insync.replicas=2— ложная безопасность.- Идемпотентность (Kafka 3.0+): по умолчанию. Exactly-once внутри партиции.
- Транзакции: атомарность между партициями. Консьюмер нужен с
read_committed.