Batching и Compression
Производительность продюсера определяется тем, насколько эффективно он группирует сообщения в батчи и сжимает их перед отправкой. По умолчанию (linger.ms=0) Kafka отправляет каждую запись немедленно — это максимальная latency-эффективность, но минимальный throughput. Правильная настройка батчинга и компрессии может увеличить пропускную способность в 10-20 раз при минимальном росте latency.
Record Accumulator — буфер между приложением и сетью
Record Accumulator — внутренняя структура данных продюсера. Это словарь, где ключ — TopicPartition, значение — двусвязная очередь ProducerBatch объектов.
send(topic=‘orders’, partition=0, value=…)
Вызов send() добавляет ProducerRecord в хвост (tail) deque батчей для соответствующей TopicPartition. Если tail-батч заполнен (batch.size достигнут) — создаётся новый батч. Весь процесс синхронен с вызывающим потоком (кроме ожидания памяти при buffer.memory исчерпан).orders/P0: [batch1][batch2]
Deque батчей для orders/partition=0. Каждый ProducerBatch занимает до batch.size байт (по умолчанию 16 384 байт = 16 KB). Когда активный батч заполнен, он перемещается в 'ready' состояние и Sender Thread забирает его для отправки.orders/P1: [batch1]
Deque батчей для orders/partition=1. Партиции независимы — батч для P0 может быть готов к отправке, пока батч для P1 ещё заполняется.events/P0: [batch1]
Deque батчей для events/partition=0. Разные топики хранятся в одном Accumulator. Общая память ограничена buffer.memory (по умолчанию 32 MB). Если память исчерпана, send() блокируется на max.block.ms (60 секунд по умолчанию).Sender Thread (фоновый)
Sender Thread периодически проверяет Accumulator на наличие готовых батчей. Батч готов если: (1) batch.size достигнут, или (2) linger.ms истёк. Sender Thread группирует батчи по broker-leader и отправляет один ProduceRequest на broker, содержащий батчи для всех партиций этого брокера.linger.ms — задержка для накопления батча
linger.ms — время (в миллисекундах), которое Sender Thread ожидает перед отправкой неполного батча. По умолчанию — 0 (отправить немедленно, не дожидаться заполнения батча).
# Конфигурация: минимальная latency (по умолчанию)
producer = KafkaProducer(
bootstrap_servers=['broker:9092'],
linger_ms=0, # Отправлять немедленно
batch_size=16384, # 16 KB — максимальный размер батча
)
# Конфигурация: высокий throughput
producer = KafkaProducer(
bootstrap_servers=['broker:9092'],
linger_ms=100, # Ждать до 100 мс для заполнения батча
batch_size=65536, # 64 KB — большие батчи
)
Влияние linger.ms на производительность
| linger.ms | Поведение | Latency | Throughput | Сценарий |
|---|---|---|---|---|
| 0 | Отправка при наличии любой записи | Минимальная | Низкий | Финансовые транзакции, real-time события |
| 5-20 мс | Баланс latency/throughput | Небольшой рост | Значительный рост | Большинство production-сценариев |
| 50-100 мс | Агрессивный батчинг | Заметная задержка | Максимальный | Пакетные загрузки, ETL |
linger.ms задаёт верхнюю границу ожидания. Если батч заполнился до истечения linger.ms — он отправляется немедленно. Параметр работает как «клапан»: отправить, когда батч полон ИЛИ когда истёк таймаут.
batch.size — максимальный размер батча
batch.size — максимальное количество байт, которое один ProducerBatch может накопить для одной TopicPartition. По умолчанию — 16384 байт (16 KB).
# Расчёт оптимального batch.size
# Сценарий: 1000 записей/сек, каждая ~500 байт, linger.ms=20 мс
# За 20 мс приходит ~20 записей * 500 байт = 10 000 байт
# batch.size должен быть >= 10 000 байт
# Рекомендуется 64-256 KB для большинства production-нагрузок
producer = KafkaProducer(
bootstrap_servers=['broker:9092'],
linger_ms=20,
batch_size=65536, # 64 KB
buffer_memory=67108864, # 64 MB общий буфер
)
Что происходит, если batch.size меньше одной записи? Запись всё равно отправляется — ограничение не является жёстким минимумом. Kafka создаёт батч из одной записи, даже если она превышает batch.size.
Взаимодействие linger.ms и batch.size
Время ──────────────────────────────────────────────>
send() send() send() send() BATCH FULL → отправка (batch.size)
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
[r1] [r2] [r3] [r4] [r5] → Sender Thread
▲
send() linger.ms истёк → отправка ──────────┘
│
▼
[r1] ... (пауза 100 мс) ... → отправляется неполный батч
Правило: батч отправляется при выполнении любого из условий:
- Размер батча достиг
batch.size - Прошло
linger.msмиллисекунд с момента добавления первой записи в батч
Compression — сжатие данных
compression.type — алгоритм сжатия, применяемый к батчу перед отправкой. По умолчанию — none. Сжатие применяется на уровне батча (не отдельной записи), что даёт значительный выигрыш при больших батчах.
# gzip — максимальная степень сжатия
producer = KafkaProducer(compression_type='gzip')
# lz4 — максимальная скорость
producer = KafkaProducer(compression_type='lz4')
# snappy — баланс скорости и размера (от Google)
producer = KafkaProducer(compression_type='snappy')
# zstd — лучшее соотношение всех параметров (Kafka 2.1+)
producer = KafkaProducer(compression_type='zstd')
Сравнение алгоритмов сжатия
gzip — лучшее сжатие (медленно)
gzip: стандартный алгоритм DEFLATE. Лучшее сжатие среди всех вариантов (3-10x для JSON/текста). Но самый медленный — CPU-intensive компрессия и декомпрессия. Хорош для архивных топиков с длительным retention, где экономия дискового пространства важнее скорости.lz4 — максимальная скорость (хуже сжатие)
lz4: создан специально для скорости. Компрессия в 10-40x быстрее gzip. Сжатие хуже (2-4x для текста), но при lz4 CPU не является узким местом. Идеален для high-throughput топиков, где важна скорость обработки. Поддерживается нативно в Java Kafka клиенте.snappy — быстрая декомпрессия (Hadoop-традиция)
snappy: алгоритм от Google, оптимизированный для скорости декомпрессии. Умеренное сжатие (2-4x), быстрая декомпрессия важна для consumer-heavy нагрузок. Исторически популярен в экосистеме Hadoop. Требует нативных библиотек в некоторых клиентах.zstd — оптимальный баланс (рекомендуется)
zstd: Facebook Zstandard. Превосходит все остальные алгоритмы по соотношению скорость/степень сжатия. Сравнимая с lz4 скорость при сжатии близком к gzip. Появился в Kafka 2.1. Рекомендован для новых кластеров — если нет ограничений совместимости, используйте zstd.Где происходит компрессия и декомпрессия
- Компрессия: на продюсере перед отправкой батча
- Хранение: брокер хранит батч в сжатом виде (не декомпрессирует)
- Декомпрессия: на консьюмере после получения батча
Это означает, что брокер экономит CPU — он не участвует в компрессии/декомпрессии для стандартных топиков. Исключение: если продюсер и консьюмер используют разные алгоритмы — брокер должен перекомпрессировать (крайне нежелательно).
Не смешивайте алгоритмы компрессии в одном кластере без необходимости. Если топик настроен с compression.type=producer (по умолчанию), брокер хранит данные в формате продюсера. Если явно указать другой алгоритм на уровне топика, брокер перекомпрессирует — это высокая CPU-нагрузка без реальной пользы.
buffer.memory — общий буфер продюсера
buffer.memory — суммарный объём памяти для всех буферизованных записей во всех TopicPartition. По умолчанию — 33554432 байт (32 MB).
producer = KafkaProducer(
bootstrap_servers=['broker:9092'],
buffer_memory=67108864, # 64 MB
max_block_ms=60000, # ждать 60 сек при полном буфере (по умолчанию)
)
Что происходит при исчерпании buffer.memory:
- Вызов
send()блокируется (не выбрасывает исключение немедленно) - Продюсер ждёт до
max_block_msмиллисекунд, пока Sender Thread не освободит память - Если за
max_block_msместо не освободилось — выбрасываетсяBufferError
try:
for i in range(1_000_000):
producer.send('high-volume', value=b'x' * 1024)
except Exception as e:
print(f"Буфер переполнен: {e}")
# Решение: увеличить buffer_memory или замедлить продюсера
Для мониторинга: метрика JMX producer-metrics.buffer-available-bytes показывает свободную память буфера. Если она стабильно близка к 0 — увеличьте buffer.memory или проверьте, не медленнее ли broker, чем ожидалось.
Практическая конфигурация для разных сценариев
# Сценарий 1: Финансовые транзакции (низкая latency)
financial_producer = KafkaProducer(
bootstrap_servers=['broker:9092'],
linger_ms=0, # Немедленная отправка
batch_size=16384, # Стандартный размер
compression_type=None, # Без сжатия (latency важнее)
acks='all',
)
# Сценарий 2: Логи приложений (высокий throughput)
log_producer = KafkaProducer(
bootstrap_servers=['broker:9092'],
linger_ms=50, # 50 мс ожидания
batch_size=131072, # 128 KB батчи
compression_type='zstd', # Лучший алгоритм
buffer_memory=134217728, # 128 MB буфер
acks='1', # Только leader-подтверждение
)
# Сценарий 3: Event sourcing (баланс)
event_producer = KafkaProducer(
bootstrap_servers=['broker:9092'],
linger_ms=20,
batch_size=65536, # 64 KB
compression_type='lz4', # Скорость важна
acks='all',
enable_idempotence=True,
)
Ключевые выводы
- Батчинг повышает throughput за счёт отправки множества записей в одном ProduceRequest.
- linger.ms задаёт ожидание перед отправкой неполного батча.
linger.ms=0— минимальная latency.linger.ms=100— максимальный throughput. - batch.size ограничивает максимальный размер одного батча. Батч отправляется при достижении
batch.sizeИЛИ истеченииlinger.ms. - Компрессия применяется на батч-уровне.
zstd— оптимальный выбор для новых систем. - buffer.memory — общий лимит памяти продюсера. При переполнении
send()блокируется.