Learning Platform
Глоссарий Troubleshooting
Урок 03.06 · 10 мин
Средний
SummaryReview

Итоги модуля: Producers

Модуль охватил полный путь от базового send() до транзакционных продюсеров. Этот урок — структурированное повторение для закрепления и подготовки к экзамену.


Ключевые концепции

Producer API

KafkaProducer — асинхронный по своей природе. send() помещает запись в Record Accumulator и немедленно возвращает FutureRecordMetadata. Sender Thread работает в фоне, группируя записи в батчи и отправляя их broker leader’ам.

producer = KafkaProducer(bootstrap_servers=['broker:9092'])
future = producer.send('topic', value=b'data', key=b'key')
producer.flush()   # Дождаться отправки всех батчей
producer.close()   # Освободить ресурсы

Батчинг и сжатие

Record Accumulator буферизует записи по TopicPartition. Батч отправляется при выполнении любого условия:

  • Размер достиг batch.size (по умолчанию 16 KB)
  • Прошло linger.ms миллисекунд (по умолчанию 0)

Сжатие применяется на батч-уровне, хранится на брокере в сжатом виде, декомпрессируется на консьюмере.

Гарантии доставки (acks)

acksПодтверждениеРискПрименение
0НикакогоПотеря при любом сбоеМетрики, непринципиальные события
1LeaderПотеря при падении leader до репликацииБольшинство сценариев
allВсе ISRНет при min.isr=2 + RF=3Финансовые данные, критичные события

Комбинация acks=all + min.insync.replicas=2 + replication.factor=3 — стандарт production для данных без права на потерю.

Идемпотентность

enable.idempotence=True (по умолчанию в Kafka 3.0+) — broker отслеживает (PID, partition, sequence_number) и отбрасывает повторные отправки. Гарантирует exactly-once внутри одной партиции.

Транзакции

Атомарная запись в несколько партиций. Консьюмер с read_committed видит либо все записи транзакции, либо ни одной.


Дерево решений: как выбрать конфигурацию

Выбор конфигурации продюсера

Начало: требования сценария

Начните с определения требований вашего сценария. Три ключевых вопроса определяют конфигурацию.
Важна ли атомарность записи в несколько топиков/партиций?

ДА: Транзакции

ДА: нужны транзакции. Используйте transactional.id, init_transactions, begin_transaction/commit_transaction. Изоляция консьюмера: read_committed. Сценарии: финансовые переводы, Kafka Streams EOS, read-process-write паттерн.

НЕТ: продолжить

НЕТ: транзакции не нужны. Переходим к вопросу о надёжности.
Критична ли потеря данных?

ДА: acks=all + min.isr=2

ДА: acks=all, min.insync.replicas=2, replication.factor=3, enable.idempotence=True. Сценарии: заказы, платежи, финансовые события, audit log.

НЕТ: продолжить

НЕТ: продолжить к вопросу о latency.
Важна ли минимальная latency vs throughput?

LATENCY: acks=1, linger.ms=0

LATENCY: acks=1, linger.ms=0, compression_type=None. Сценарии: real-time уведомления, чаты, системы мониторинга с SLA менее 10 мс.

THROUGHPUT: linger.ms=50, zstd

THROUGHPUT: acks=1, linger.ms=50+, batch.size=65536+, compression_type=zstd. Сценарии: логи приложений, аналитические события, ETL-пайплайны.

Когда использовать каждую гарантию

acks=0

# Телеметрия IoT, высокочастотные метрики
metrics_producer = KafkaProducer(
    bootstrap_servers=['broker:9092'],
    acks=0,
    linger_ms=5,
    batch_size=131072,
    compression_type='lz4',
)

Потеря 0.01-0.1% событий — приемлема. Максимальный throughput и минимальная latency — критичны.

acks=1

# Логи приложений, события активности пользователей
log_producer = KafkaProducer(
    bootstrap_servers=['broker:9092'],
    acks='1',
    linger_ms=20,
    batch_size=65536,
    compression_type='zstd',
)

Потеря при падении leader сразу после записи — редкое, но допустимое событие.

acks=all + идемпотентность

# Заказы, платежи, критичные бизнес-события
orders_producer = KafkaProducer(
    bootstrap_servers=['broker:9092'],
    acks='all',
    enable_idempotence=True,  # По умолчанию в Kafka 3.0+
    linger_ms=5,
    compression_type='lz4',
)

acks=all + транзакции

# Атомарные операции между несколькими топиками
transfer_producer = KafkaProducer(
    bootstrap_servers=['broker:9092'],
    transactional_id='transfer-service-1',
)
transfer_producer.init_transactions()

Чек-лист для Production

  • acks=all + min.insync.replicas=2 для критичных топиков
  • enable.idempotence=True (включён по умолчанию с Kafka 3.0)
  • compression_type='zstd' для throughput-оптимизации
  • linger.ms=5-20 для большинства сценариев
  • buffer.memory мониторится через JMX buffer-available-bytes
  • delivery.timeout.ms настроен с учётом максимально допустимого времени ожидания
  • producer.close() всегда в finally для корректного завершения

Ключевые выводы модуля

  1. send() — асинхронный. Всегда вызывайте flush() перед завершением.
  2. Батчинг: linger.ms=0 → latency, linger.ms=50+ → throughput. batch.size — верхний лимит.
  3. acks=all без min.insync.replicas=2 — ложная безопасность.
  4. Идемпотентность (Kafka 3.0+): по умолчанию. Exactly-once внутри партиции.
  5. Транзакции: атомарность между партициями. Консьюмер нужен с read_committed.
Проверка знанийKnowledge check
Архитектор выбирает конфигурацию для payment-сервиса: каждый платёж должен атомарно обновить два топика (payments и account-balances), никакие данные не могут быть потеряны, и сервис должен корректно перезапускаться. Какая минимально необходимая конфигурация продюсера?
ОтветAnswer
Необходимы транзакции с transactional.id (для атомарности между двумя топиками и корректного рестарта), acks=all (устанавливается автоматически с transactional.id), min.insync.replicas=2 на обоих топиках с replication.factor=3 (для защиты от потери данных), enable.idempotence=True (включается автоматически с transactional.id). Жизненный цикл: init_transactions() при старте, begin_transaction()/send()/send()/commit_transaction() на каждый платёж. Консьюмер account-balances должен использовать isolation.level=read_committed. transactional.id должен быть уникальным для каждого экземпляра сервиса (например, включать pod-name или номер партиции).

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Payment-сервис требует: (1) атомарная запись в topics 'debits' и 'credits', (2) никакой потери данных при отказе брокера, (3) корректный перезапуск при сбое. Минимально необходимая конфигурация продюсера?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6