Learning Platform
Глоссарий Troubleshooting
Урок 03.05 · 25 мин
Продвинутый
Transactionstransactional.idread_committedAtomic WritesExactly-once Semantics

Транзакции

Kafka-транзакции решают задачу, которую идемпотентность не может: атомарная запись в несколько партиций и/или топиков. Либо все записи транзакции видимы консьюмерам, либо ни одна — промежуточных состояний нет.

Это основа для реализации exactly-once семантики в потоковой обработке (Kafka Streams, Apache Flink с Kafka-коннекторами).


Зачем нужны транзакции

Рассмотрим сценарий: transfer-сервис перемещает деньги между счетами. Нужно атомарно записать списание со счёта A и зачисление на счёт B.

# Без транзакций — опасный код!
producer.send('account-A', value={'delta': -100, 'tx_id': 'tx-001'})
producer.send('account-B', value={'delta': +100, 'tx_id': 'tx-001'})
producer.flush()
# Что если crash после первого send() но до второго?
# account-A списан, account-B не зачислен — деньги исчезли

Без транзакций: два send() — это два независимых события. Консьюмер может увидеть первую запись, не увидев вторую.

С транзакциями: обе записи — единое атомарное действие.


transactional.id — стабильный идентификатор

transactional.id — строковый идентификатор, который связывает транзакционный продюсер с его историей на брокере. При рестарте продюсера с тем же transactional.id:

  1. Брокер нашёл незавершённую транзакцию от предыдущего экземпляра → автоматически откатывает её
  2. Брокер присваивает новую эпоху → старый экземпляр «фехтуется»
producer = KafkaProducer(
    bootstrap_servers=['broker:9092'],
    transactional_id='transfer-service-1',  # Уникальный ID в кластере
    # enable_idempotence=True включается автоматически
    # acks='all' устанавливается автоматически
)
WARNING

transactional.id должен быть уникальным среди всех продюсеров в кластере. Если два продюсера используют один transactional.id — они будут фехтовать друг друга. В микросервисной архитектуре используют шаблон {service-name}-{partition-id} или {service-name}-{pod-name}.


Жизненный цикл транзакции

producer = KafkaProducer(
    bootstrap_servers=['broker:9092'],
    transactional_id='transfer-service-1',
)

# 1. Инициализация (один раз при старте продюсера)
producer.init_transactions()

# 2. Начало транзакции
producer.begin_transaction()

try:
    # 3. Отправка записей в рамках транзакции
    producer.send('account-debit',
                  value={'account': 'A', 'delta': -100, 'tx_id': 'tx-001'})
    producer.send('account-credit',
                  value={'account': 'B', 'delta': +100, 'tx_id': 'tx-001'})

    # 4. Фиксация транзакции
    producer.commit_transaction()
    print("Транзакция зафиксирована")

except Exception as e:
    # 5. Откат при ошибке
    producer.abort_transaction()
    print(f"Транзакция откатана: {e}")

Диаграмма последовательности транзакции

Жизненный цикл транзакции Kafka

Producer

Транзакционный продюсер. transactional.id='transfer-service-1'. Идемпотентность включена автоматически. acks=all установлен автоматически.
InitProducerIdRequest (transactional.id)

Transaction Coordinator

Transaction Coordinator — один из брокеров кластера, ответственный за управление транзакциями. Выбирается по хешу transactional.id. Хранит состояние транзакций в служебном топике __transaction_state.
Producer
PID=42, epoch=3

Coordinator: epoch++, abort old txns

Координатор завершает незавершённые транзакции прошлого экземпляра (abort). Инкрементирует эпоху. Возвращает новый PID и epoch.

begin_transaction()

begin_transaction() — только локальная операция. Меняет внутренний флаг продюсера. НЕ отправляет сообщение брокеру.
AddPartitionsToTxnRequest

Coordinator: регистрирует partitions

При первом send() после begin_transaction() продюсер уведомляет координатора о партициях, участвующих в транзакции. Координатор записывает их в __transaction_state.
Producer
ProduceRequest (PID=42, epoch=3)

Partition Leaders: append (not visible to read_committed)

Partition Leader получает запись, помечает её как 'незафиксированная' (in-transaction). Запись физически существует в log, но не видна консьюмерам с isolation.level=read_committed. Консьюмеры с read_uncommitted видят её сразу.

commit_transaction()

commit_transaction() — продюсер просит координатора зафиксировать транзакцию.
EndTxnRequest (COMMIT)

Coordinator: двухфазный commit

Координатор записывает PREPARE_COMMIT в __transaction_state (двухфазный commit). Затем рассылает WriteTxnMarkersRequest каждому partition leader — просит записать COMMIT-маркер.
Coordinator
WriteTxnMarkersRequest

Partitions: COMMIT marker → записи видимы

Partition Leader записывает COMMIT-маркер в log после всех транзакционных записей. После этого записи становятся видимы консьюмерам с read_committed. Атомарность обеспечивается: или все COMMIT-маркеры записаны, или ни один.

Изоляция консьюмера: read_committed vs read_uncommitted

Транзакции полезны только если консьюмер правильно их поддерживает.

from kafka import KafkaConsumer

# Только зафиксированные транзакционные записи
consumer = KafkaConsumer(
    'account-debit', 'account-credit',
    bootstrap_servers=['broker:9092'],
    isolation_level='read_committed',  # Ключевой параметр!
    group_id='transfer-auditor',
)

# Все записи включая незафиксированные (in-flight transactions)
consumer_raw = KafkaConsumer(
    'account-debit',
    bootstrap_servers=['broker:9092'],
    isolation_level='read_uncommitted',  # Значение по умолчанию
    group_id='debug-consumer',
)
ПараметрТранзакционные записиНетранзакционные записиПрименение
read_committedТолько после commitСразуProduction consumers
read_uncommittedВсе (включая in-flight)СразуДебаггинг, мониторинг
NOTE

read_committed имеет побочный эффект: консьюмер буферизует записи до получения COMMIT или ABORT маркера. Для длинных транзакций это увеличивает latency консьюмера. Избегайте транзакций, которые длятся секунды — держите их короткими (миллисекунды).


Transaction Coordinator

Transaction Coordinator — специальная роль, которую выполняет один из брокеров кластера. Он определяется по хешу transactional.id:

coordinator_id = hash(transactional.id) % __transaction_state.num_partitions

Координатор хранит состояние транзакций в служебном топике __transaction_state с replication.factor=3. Состояния транзакции:

EMPTY → [begin_transaction()] → ONGOING → [commit_transaction()] →
PREPARE_COMMIT → [WriteTxnMarkers] → COMPLETE_COMMIT

EMPTY → [begin_transaction()] → ONGOING → [abort_transaction()] →
PREPARE_ABORT → [WriteTxnMarkers] → COMPLETE_ABORT

Производительность и overhead транзакций

Транзакции добавляют несколько источников overhead:

Сетевые round-trips:

  • InitProducerIdRequest — при старте продюсера
  • AddPartitionsToTxnRequest — при первом send() после begin
  • EndTxnRequest — при commit или abort
  • WriteTxnMarkersRequest — на каждый partition leader

Дисковые записи:

  • Транзакционные метаданные в __transaction_state
  • COMMIT/ABORT маркер в каждой участвующей партиции
# Оптимизация: батчинг записей внутри транзакции
producer.begin_transaction()
for order in orders_batch:  # 1000 записей
    producer.send('orders', value=order)
# Один overhead транзакции на 1000 записей — эффективно
producer.commit_transaction()

Не делайте транзакцию на каждую запись отдельно — это антипаттерн. Группируйте записи в транзакцию.


Exactly-once в Kafka Streams

Транзакции — основа exactly-once semantics (EOS) в Kafka Streams:

[read from input topic] → [process] → [write to output topic]

Без EOS: consumer offset commit и producer write — два разных действия. Crash между ними → запись дублируется или offset устарел.

С EOS (Kafka Streams processing.guarantee=exactly_once_v2):

  • Producer write + consumer offset commit атомарны через одну транзакцию
  • Результат: каждая входная запись обрабатывается ровно один раз
# В Kafka Streams (Java/Scala) — концептуально:
# streams.config.put("processing.guarantee", "exactly_once_v2")
# Это включает транзакции под капотом. Один продюсер на task,
# transactional.id = "{application.id}-{task.id}"

Ключевые выводы

  1. Транзакции обеспечивают атомарную запись в несколько партиций: либо все — либо ничего.
  2. transactional.id — стабильный ID, который связывает продюсера с историей транзакций на брокере. Обеспечивает epoch fencing.
  3. Жизненный цикл: init_transactions()begin_transaction()send() × N → commit_transaction() или abort_transaction().
  4. Консьюмер с isolation.level=read_committed видит только зафиксированные записи.
  5. Transaction Coordinator управляет двухфазным коммитом через __transaction_state.
  6. Overhead: транзакции добавляют сетевые round-trips. Батчируйте записи внутри одной транзакции.
Проверка знанийKnowledge check
Продюсер вызвал begin_transaction(), отправил 5 записей в 3 разных партиции, затем вызвал abort_transaction(). Консьюмер с isolation.level=read_committed читает эти партиции. Какие из 5 записей он увидит?
ОтветAnswer
Ни одной. При abort_transaction() Transaction Coordinator отправляет WriteTxnMarkersRequest с типом ABORT на все три partition leader'а. Каждый leader записывает ABORT-маркер после транзакционных записей. Консьюмер с read_committed буферизует транзакционные записи до получения маркера. Получив ABORT-маркер, консьюмер отбрасывает буферизованные записи — они никогда не передаются пользовательскому коду. Физически записи существуют в log (до cleanup/compaction), но read_committed-консьюмер их игнорирует. Консьюмер с read_uncommitted (по умолчанию) увидел бы все 5 записей плюс ABORT-маркер.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. Транзакционный продюсер вызвал begin_transaction(), отправил 10 записей в 3 партиции, затем вызвал commit_transaction(). Консьюмер с isolation.level=read_uncommitted читает эти партиции во время commit (до завершения двухфазного коммита). Что он видит?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6