Транзакции
Kafka-транзакции решают задачу, которую идемпотентность не может: атомарная запись в несколько партиций и/или топиков. Либо все записи транзакции видимы консьюмерам, либо ни одна — промежуточных состояний нет.
Это основа для реализации exactly-once семантики в потоковой обработке (Kafka Streams, Apache Flink с Kafka-коннекторами).
Зачем нужны транзакции
Рассмотрим сценарий: transfer-сервис перемещает деньги между счетами. Нужно атомарно записать списание со счёта A и зачисление на счёт B.
# Без транзакций — опасный код!
producer.send('account-A', value={'delta': -100, 'tx_id': 'tx-001'})
producer.send('account-B', value={'delta': +100, 'tx_id': 'tx-001'})
producer.flush()
# Что если crash после первого send() но до второго?
# account-A списан, account-B не зачислен — деньги исчезли
Без транзакций: два send() — это два независимых события. Консьюмер может увидеть первую запись, не увидев вторую.
С транзакциями: обе записи — единое атомарное действие.
transactional.id — стабильный идентификатор
transactional.id — строковый идентификатор, который связывает транзакционный продюсер с его историей на брокере. При рестарте продюсера с тем же transactional.id:
- Брокер нашёл незавершённую транзакцию от предыдущего экземпляра → автоматически откатывает её
- Брокер присваивает новую эпоху → старый экземпляр «фехтуется»
producer = KafkaProducer(
bootstrap_servers=['broker:9092'],
transactional_id='transfer-service-1', # Уникальный ID в кластере
# enable_idempotence=True включается автоматически
# acks='all' устанавливается автоматически
)
transactional.id должен быть уникальным среди всех продюсеров в кластере. Если два продюсера используют один transactional.id — они будут фехтовать друг друга. В микросервисной архитектуре используют шаблон {service-name}-{partition-id} или {service-name}-{pod-name}.
Жизненный цикл транзакции
producer = KafkaProducer(
bootstrap_servers=['broker:9092'],
transactional_id='transfer-service-1',
)
# 1. Инициализация (один раз при старте продюсера)
producer.init_transactions()
# 2. Начало транзакции
producer.begin_transaction()
try:
# 3. Отправка записей в рамках транзакции
producer.send('account-debit',
value={'account': 'A', 'delta': -100, 'tx_id': 'tx-001'})
producer.send('account-credit',
value={'account': 'B', 'delta': +100, 'tx_id': 'tx-001'})
# 4. Фиксация транзакции
producer.commit_transaction()
print("Транзакция зафиксирована")
except Exception as e:
# 5. Откат при ошибке
producer.abort_transaction()
print(f"Транзакция откатана: {e}")
Диаграмма последовательности транзакции
Producer
Транзакционный продюсер. transactional.id='transfer-service-1'. Идемпотентность включена автоматически. acks=all установлен автоматически.Transaction Coordinator
Transaction Coordinator — один из брокеров кластера, ответственный за управление транзакциями. Выбирается по хешу transactional.id. Хранит состояние транзакций в служебном топике __transaction_state.Coordinator: epoch++, abort old txns
Координатор завершает незавершённые транзакции прошлого экземпляра (abort). Инкрементирует эпоху. Возвращает новый PID и epoch.begin_transaction()
begin_transaction() — только локальная операция. Меняет внутренний флаг продюсера. НЕ отправляет сообщение брокеру.Coordinator: регистрирует partitions
При первом send() после begin_transaction() продюсер уведомляет координатора о партициях, участвующих в транзакции. Координатор записывает их в __transaction_state.Partition Leaders: append (not visible to read_committed)
Partition Leader получает запись, помечает её как 'незафиксированная' (in-transaction). Запись физически существует в log, но не видна консьюмерам с isolation.level=read_committed. Консьюмеры с read_uncommitted видят её сразу.commit_transaction()
commit_transaction() — продюсер просит координатора зафиксировать транзакцию.Coordinator: двухфазный commit
Координатор записывает PREPARE_COMMIT в __transaction_state (двухфазный commit). Затем рассылает WriteTxnMarkersRequest каждому partition leader — просит записать COMMIT-маркер.Partitions: COMMIT marker → записи видимы
Partition Leader записывает COMMIT-маркер в log после всех транзакционных записей. После этого записи становятся видимы консьюмерам с read_committed. Атомарность обеспечивается: или все COMMIT-маркеры записаны, или ни один.Изоляция консьюмера: read_committed vs read_uncommitted
Транзакции полезны только если консьюмер правильно их поддерживает.
from kafka import KafkaConsumer
# Только зафиксированные транзакционные записи
consumer = KafkaConsumer(
'account-debit', 'account-credit',
bootstrap_servers=['broker:9092'],
isolation_level='read_committed', # Ключевой параметр!
group_id='transfer-auditor',
)
# Все записи включая незафиксированные (in-flight transactions)
consumer_raw = KafkaConsumer(
'account-debit',
bootstrap_servers=['broker:9092'],
isolation_level='read_uncommitted', # Значение по умолчанию
group_id='debug-consumer',
)
| Параметр | Транзакционные записи | Нетранзакционные записи | Применение |
|---|---|---|---|
read_committed | Только после commit | Сразу | Production consumers |
read_uncommitted | Все (включая in-flight) | Сразу | Дебаггинг, мониторинг |
read_committed имеет побочный эффект: консьюмер буферизует записи до получения COMMIT или ABORT маркера. Для длинных транзакций это увеличивает latency консьюмера. Избегайте транзакций, которые длятся секунды — держите их короткими (миллисекунды).
Transaction Coordinator
Transaction Coordinator — специальная роль, которую выполняет один из брокеров кластера. Он определяется по хешу transactional.id:
coordinator_id = hash(transactional.id) % __transaction_state.num_partitions
Координатор хранит состояние транзакций в служебном топике __transaction_state с replication.factor=3. Состояния транзакции:
EMPTY → [begin_transaction()] → ONGOING → [commit_transaction()] →
PREPARE_COMMIT → [WriteTxnMarkers] → COMPLETE_COMMIT
EMPTY → [begin_transaction()] → ONGOING → [abort_transaction()] →
PREPARE_ABORT → [WriteTxnMarkers] → COMPLETE_ABORT
Производительность и overhead транзакций
Транзакции добавляют несколько источников overhead:
Сетевые round-trips:
InitProducerIdRequest— при старте продюсераAddPartitionsToTxnRequest— при первом send() после beginEndTxnRequest— при commit или abortWriteTxnMarkersRequest— на каждый partition leader
Дисковые записи:
- Транзакционные метаданные в
__transaction_state - COMMIT/ABORT маркер в каждой участвующей партиции
# Оптимизация: батчинг записей внутри транзакции
producer.begin_transaction()
for order in orders_batch: # 1000 записей
producer.send('orders', value=order)
# Один overhead транзакции на 1000 записей — эффективно
producer.commit_transaction()
Не делайте транзакцию на каждую запись отдельно — это антипаттерн. Группируйте записи в транзакцию.
Exactly-once в Kafka Streams
Транзакции — основа exactly-once semantics (EOS) в Kafka Streams:
[read from input topic] → [process] → [write to output topic]
Без EOS: consumer offset commit и producer write — два разных действия. Crash между ними → запись дублируется или offset устарел.
С EOS (Kafka Streams processing.guarantee=exactly_once_v2):
- Producer write + consumer offset commit атомарны через одну транзакцию
- Результат: каждая входная запись обрабатывается ровно один раз
# В Kafka Streams (Java/Scala) — концептуально:
# streams.config.put("processing.guarantee", "exactly_once_v2")
# Это включает транзакции под капотом. Один продюсер на task,
# transactional.id = "{application.id}-{task.id}"
Ключевые выводы
- Транзакции обеспечивают атомарную запись в несколько партиций: либо все — либо ничего.
transactional.id— стабильный ID, который связывает продюсера с историей транзакций на брокере. Обеспечивает epoch fencing.- Жизненный цикл:
init_transactions()→begin_transaction()→send()× N →commit_transaction()илиabort_transaction(). - Консьюмер с
isolation.level=read_committedвидит только зафиксированные записи. - Transaction Coordinator управляет двухфазным коммитом через
__transaction_state. - Overhead: транзакции добавляют сетевые round-trips. Батчируйте записи внутри одной транзакции.