Idempotent Producer
Идемпотентный продюсер решает одну из самых сложных проблем распределённых систем: как гарантировать, что повторная отправка не создаёт дублирующихся записей. Это достигается через механизм PID (Producer ID) и порядковых номеров, который позволяет брокеру отличить повторную отправку от новой записи.
Проблема: дубликаты при retry
Рассмотрим стандартный сценарий без идемпотентности:
1. Продюсер отправляет запись (sequence N) → сеть
2. Брокер записывает запись в log → offset 42
3. Брокер отправляет ACK продюсеру → сеть
4. ACK потерян в сети → продюсер получает timeout
5. Продюсер делает retry → отправляет запись снова
6. Брокер записывает ПОВТОРНО → offset 43 (дубликат!)
7. Брокер отправляет ACK → продюсер считает отправку успешной
Итог: одна запись присутствует на брокере дважды (offset 42 и 43). Продюсер не знает об этом. Консьюмер прочитает оба дубликата.
Producer
Продюсер отправляет ProduceRequest. Запись записана на брокере. Брокер посылает ACK.Broker: записал offset=42
Брокер записывает запись в partition log. Offset 42 занят. Запись реплицирована. ACK готов к отправке.Producer: timeout → RETRY
Продюсер получает TimeoutException. Не знает, дошла ли запись. Выполняет retry согласно настройкам retries и delivery.timeout.ms.Broker: записал offset=43 (ДУБЛИКАТ!)
Брокер получает повторный ProduceRequest. Без идемпотентности — не может отличить от новой записи. Записывает снова в offset 43. Дубликат создан.Решение: enable.idempotence=True
producer = KafkaProducer(
bootstrap_servers=['broker:9092'],
enable_idempotence=True,
# Следующие параметры устанавливаются автоматически:
# acks='all'
# retries=2147483647
# max_in_flight_requests_per_connection=5
)
При включении идемпотентности:
- Брокер назначает продюсеру PID (Producer ID) — уникальный числовой идентификатор
- Продюсер присваивает каждому батчу Sequence Number — монотонно возрастающий счётчик, индивидуальный для каждой TopicPartition
- Брокер отслеживает последний полученный sequence number для каждой пары
(PID, TopicPartition)
Механизм PID и Sequence Number
PID: 42
TopicPartition: orders/0
Продюсер отправляет:
Batch 1 → sequence=0
Batch 2 → sequence=1
Batch 3 → sequence=2
Брокер хранит:
(PID=42, orders/0): last_seq=2
Логика обнаружения дубликатов:
# Псевдокод на стороне брокера
def handle_produce_request(pid, sequence_num, partition, records):
last_seq = get_last_sequence(pid, partition)
if sequence_num == last_seq + 1:
# Ожидаемый следующий батч — записать
append_to_log(partition, records)
update_last_sequence(pid, partition, sequence_num)
return ACK
elif sequence_num <= last_seq:
# Повторная отправка (retry) — дубликат!
# НЕ записывать, но вернуть ACK (как если бы записали)
return ACK # Продюсер доволен, дубликат отброшен
else:
# sequence_num > last_seq + 1 — пропущен батч!
# Что-то пошло не так с порядком
return OUT_OF_ORDER_SEQUENCE_EXCEPTION
Producer (PID=42): seq=0
Продюсер отправляет первый батч с PID=42, seq=0. Брокер записывает offset=42, сохраняет last_seq(42, orders/0)=0. ACK потерян.Broker: offset=42, last_seq=0
Брокер записал. last_seq(PID=42, orders/0) = 0. ACK отправлен, но не дошёл до продюсера.Producer: timeout → retry seq=0
Продюсер не получил ACK (таймаут). Делает retry — снова отправляет тот же батч с тем же seq=0.Broker: seq=0 == last_seq → ДУБЛИКАТ, ACK без записи
Брокер видит seq=0, last_seq=0. seq <= last_seq — это дубликат! Брокер НЕ записывает, но возвращает ACK. Дубликат отброшен без ведома продюсера.Producer: ACK получен. Успех.
Продюсер получил ACK — считает операцию успешной. На брокере только одна запись (offset=42). Exactly-once на уровне партиции достигнуто.Broker: только offset=42
Partition log содержит только одну запись с offset=42. Дубликат предотвращён.Epoch Fencing — защита от «зомби-продюсеров»
PID присваивается продюсеру при старте. Что происходит, если один и тот же логический продюсер перезапускается (например, из-за сбоя)?
Проблема: два экземпляра одного продюсера пишут в одну и ту же партицию — «зомби-проблема».
Решение: transactional.id + эпоха.
# Продюсер с transactional_id
producer = KafkaProducer(
bootstrap_servers=['broker:9092'],
transactional_id='orders-producer-1', # Стабильный ID
enable_idempotence=True, # Включается автоматически при transactional_id
)
Механизм epoch fencing:
- Продюсер v1 стартует с
transactional_id='orders-producer-1'→ брокер присваиваетPID=42, epoch=0 - Продюсер v1 зависает (GC pause, сетевые проблемы)
- Продюсер v2 (новый экземпляр) стартует с тем же
transactional_id→ брокер присваиваетPID=42, epoch=1 - Продюсер v1 «оживает» и пытается писать с
epoch=0 - Брокер отклоняет запросы v1 (
ProducerFencedException— epoch устарела)
transactional.id = 'orders-producer-1'
Продюсер v1: PID=42, epoch=0 → пишет...
[сбой, перезапуск]
Продюсер v2: PID=42, epoch=1 → пишет...
Продюсер v1 оживает → запросы с epoch=0 отклоняются!
ProducerFencedException — v1 должен завершиться
transactional.id хранится в брокере постоянно. Новый экземпляр продюсера с тем же transactional.id получает следующую эпоху и «фехтует» (fences) предыдущего. Это ключевой механизм для реализации транзакций (следующий урок).
Идемпотентность в Kafka 3.0+
Начиная с Kafka 3.0, enable.idempotence=True является значением по умолчанию. Это означает:
# Kafka 3.0+: эти настройки применяются по умолчанию
# enable.idempotence = true
# acks = all (переопределяется принудительно)
# retries = 2147483647
# max.in.flight.requests.per.connection = 5
producer = KafkaProducer(bootstrap_servers=['broker:9092'])
# Продюсер автоматически идемпотентен!
Чтобы явно отключить идемпотентность (не рекомендуется без веской причины):
producer = KafkaProducer(
bootstrap_servers=['broker:9092'],
enable_idempotence=False,
acks='1', # Нужно явно указать, иначе конфликт с acks='all'
)
Ограничения идемпотентности
Идемпотентность гарантирует exactly-once в рамках одной партиции. Это важно понимать:
| Сценарий | Идемпотентность покрывает? |
|---|---|
| Одна запись, одна партиция, retry | Да — дубликат предотвращён |
| Одна запись в партиции 0 + одна в партиции 1 | Нет — атомарности нет |
| Запись в Kafka + запись в БД | Нет — нужны транзакции |
| Смерть и рестарт продюсера (разные PID) | Нет — нужен transactional.id |
Идемпотентный продюсер предотвращает дубликаты при retry, но не решает задачу атомарной записи в несколько партиций или внешних систем. Для этого используются транзакции (следующий урок).
Ключевые выводы
- Проблема: retry при потере ACK → дублирующиеся записи на брокере.
- Решение:
enable.idempotence=True— брокер отслеживает PID + sequence number для каждой(PID, TopicPartition)и отбрасывает повторы. - Epoch fencing через
transactional.id: новый экземпляр продюсера инкрементирует эпоху, «зомби»-продюсер получаетProducerFencedException. - Kafka 3.0+: идемпотентность включена по умолчанию.
- Ограничение: exactly-once только внутри одной партиции. Кросс-партиционные гарантии — задача транзакций.