Learning Platform
Глоссарий Troubleshooting
Урок 13.02 · 25 мин
Продвинутый
Kafka TransactionsTransactional ProducerIdempotent ProducerFencingIsolation LevelProducer Protocol

Transactional Kafka — broker-level internals

KafkaSink в Flink реализует exactly-once delivery через transactional Kafka producer. Это не просто producer.send() + producer.flush() — это координированный протокол между Flink (как client) и Kafka broker (как coordinator транзакций).

В этом уроке мы разбираем, как Kafka transactions работают на уровне brokers: что такое transactional.id, как работает fencing, что значит isolation.level=read_committed, какие state транзакций существуют, и почему transactional.id pre-fix per subtask критичен для Flink.

Kafka транзакции: producer protocol в деталях Idempotent Kafka producer Управление offset: isolation.level и read_committed

Эволюция Kafka producer: from at-least-once to exactly-once

Kafka 0.11 (2017):
  - Idempotent producer (enable.idempotence=true)
    * Producer assigns sequence number per record
    * Broker dedupes duplicates within session
    * Solves: retries после network errors
  - Transactional producer (transactional.id)
    * Multiple producers can commit atomically
    * Cross-partition atomic writes
    * Required для exactly-once delivery

Kafka 3.0+:
  - Default enable.idempotence=true для producers
  - acks=all default

Apache Kafka 4.0 (2024):
  - KIP-848: New consumer protocol (КRaft only)
  - Transactional improvements
  - Better fencing

Idempotent producer foundations

Перед transactions Kafka добавила idempotent producer (опытная гарантия “exactly-once per session”):

Idempotent producer mechanics:

1. Producer connects к broker
2. Broker assigns producer ID (PID) + epoch
3. Each record: (PID, sequence_number)
4. Broker tracks sequence numbers
5. If broker receives duplicate (same PID+seq), drops silently
6. If broker receives out-of-order, returns error (OutOfOrderSequenceException)

Limitations:
  - Только within a session (one producer instance)
  - Если producer restarts -> new PID -> no dedup across restarts
  - Не handles cross-partition atomic writes

Idempotent producer не достаточен для Flink — Flink restarts и нужна durability across restarts. Это где transactions входят.

Transactional producer

Transactional producer adds:

1. transactional.id (TID)
   - User-supplied identifier
   - Sticky across producer restarts
   - Coordinator (broker) tracks active transactions per TID
   - Allows fencing zombie producers

2. Transaction coordinator
   - One broker per __transaction_state topic partition
   - Manages transaction lifecycle
   - Logs transaction state changes к __transaction_state

3. Transactional state machine
   States: Empty, Ongoing, PrepareCommit, PrepareAbort, CompleteCommit, CompleteAbort, Dead

4. Atomic commit across partitions
   - Records spread across multiple topics/partitions
   - Either ALL committed or ALL aborted

5. Consumer isolation
   - read_committed: only committed transactions visible
   - read_uncommitted: see in-progress (для special use cases)

Producer-broker protocol

Lifecycle транзакции:

1. Producer initializes:
   producer.initTransactions()
   -> InitProducerIdRequest (TID)
   -> Coordinator responds: (PID, epoch)
   -> If TID already exists с different PID, fences old PID

2. Begin transaction:
   producer.beginTransaction()
   -> Producer state: TRANSACTION_OPEN
   -> No broker call yet

3. Send records:
   producer.send(record)
   -> Producer adds (TID, PID, epoch, sequence) к record metadata
   -> Sends к respective topic-partition leader
   -> Broker logs к __consumer_offsets__ как part of transaction
   -> First record sent triggers AddPartitionsToTxnRequest к coordinator

4. Commit:
   producer.commitTransaction()
   -> EndTxnRequest(commit=true) к coordinator
   -> Coordinator writes to __transaction_state: PREPARE_COMMIT
   -> Coordinator sends WriteTxnMarkerRequest к partition leaders
   -> Partition leaders write commit markers к their logs
   -> Records become visible к read_committed consumers
   -> Coordinator writes: COMMIT_COMPLETE

5. Abort (если error или explicit):
   producer.abortTransaction()
   -> EndTxnRequest(commit=false)
   -> Coordinator writes PREPARE_ABORT
   -> Partition leaders write abort markers
   -> Records skipped by read_committed consumers
   -> Coordinator writes ABORT_COMPLETE

Transaction state machine на broker

Coordinator tracks per-TID state:

EMPTY:
  - No active transaction
  - InitProducerIdRequest assigns PID/epoch

ONGOING:
  - producer.beginTransaction() called
  - Records being written
  - Partitions added к transaction via AddPartitionsToTxnRequest

PREPARE_COMMIT:
  - producer.commitTransaction() initiated
  - Coordinator transitioning к commit
  - If coordinator crashes here: on recovery, complete the commit

PREPARE_ABORT:
  - producer.abortTransaction() initiated
  - Coordinator transitioning к abort
  - If coordinator crashes here: on recovery, complete the abort

COMPLETE_COMMIT:
  - All partition markers written
  - Transaction complete
  - Transition к EMPTY for next transaction

COMPLETE_ABORT:
  - Same as COMPLETE_COMMIT but aborted

DEAD:
  - Coordinator timeout reached
  - Transactional.id pruned
  - Future use creates new transaction state

Coordinator durability: state persisted в __transaction_state topic (replicated, fault tolerant). On coordinator failover (broker dies), new coordinator reads state from log, continues protocol.

Fencing — zombie producer prevention

Critical: что если old Flink TaskManager think он still running и tries commit?

Scenario:
  t=0: TaskManager A starts producer с TID="flink-job-1-subtask-3"
       Coordinator assigns PID=42, epoch=1
       Producer.send some records
       
  t=5: Network partition. JobManager думает A dead.
       JobManager schedules new TaskManager B на другой node.
       
  t=6: TaskManager B starts producer с тем же TID="flink-job-1-subtask-3"
       InitProducerIdRequest к coordinator
       Coordinator: "TID already exists, PID=42 epoch=1"
       Coordinator response: increment epoch к 2, fence PID=42
       New PID/epoch for B: PID=42, epoch=2
       (PID может быть same, only epoch changes для fencing)

  t=7: TaskManager A recovers network, tries producer.commitTransaction()
       EndTxnRequest(PID=42, epoch=1) к coordinator
       Coordinator: "Epoch 1 fenced, current is 2"
       -> ProducerFencedException returned к A
       -> A producer disabled, throws to application
       -> Flink runtime handles, A's transaction aborted

  Result: Only B's transaction can commit. No duplicates.

Fencing = critical safety mechanism. Без него concurrent producers могли бы both commit, creating duplicates.

Flink uses sticky TID per subtask:

Naming convention (Flink KafkaSink):
  TID = "{prefix}-{subtask_index}-{transaction_attempt}"
  
  Example:
    "flink-checkpoint-job-1-subtask-0-tx-12"
    "flink-checkpoint-job-1-subtask-1-tx-12"
    ...
    "flink-checkpoint-job-1-subtask-N-tx-12"

Properties:
  1. Sticky per subtask:
     - Same subtask reuses same TID prefix
     - Survives Flink restarts (config + savepoint preservation)
  
  2. Unique across job:
     - Different subtasks have different TIDs
     - No collision
  
  3. Includes attempt number:
     - Each Flink checkpoint = new transaction attempt
     - Allows multiple in-flight transactions per subtask
     - Up to max-in-flight checkpoints (configurable)

Configuration:

# Flink KafkaSink configuration
sink.transactional.id.prefix: my-job
sink.transactional.id.suffix: subtask-0   # auto-generated typically

# Pool size (max concurrent transactions per subtask)
sink.transaction.timeout-ms: 60min

Pool of transactional.ids

Flink имеет pool of TIDs per subtask:

Per subtask: pool of N TIDs (default 5):
  tx-0, tx-1, tx-2, tx-3, tx-4

Lifecycle:
  - At checkpoint, subtask uses один tx-N
  - Pre-commits with that TID
  - On checkpoint complete, commits
  - TID returned к pool for reuse

Why pool? Async commits:
  - Checkpoint N+1 может start до того как N commit complete
  - Both in-flight transactions need different TIDs
  - Pool prevents blocking

Configuration:
  table.exec.sink.transactional-id-prefix
  Pool size implied by execution.checkpointing.max-concurrent-checkpoints

read_committed vs read_uncommitted

Consumer должен явно opt in для exactly-once semantics:

# Consumer config
isolation.level: read_committed  # see only committed transactions
isolation.level: read_uncommitted  # see all records (default)
read_committed semantics:

Consumer fetches records:
  - Skips records inside aborted transactions
  - Skips in-progress transactions
  - Returns only committed records

Mechanism:
  - Each record metadata: TID + transaction status marker
  - Consumer maintains state of pending transactions
  - Stalls return until transaction completes (commit или abort)

Consequences:
  - End-to-end latency increases
    Wait для transaction commit before consume
    Equals checkpoint interval + commit duration
  - Consistent reads possible
    Either entire transaction visible или nothing
Transactional Kafka flow
TaskManager subtaskFlink TaskManager subtask. KafkaSink instantiates KafkaProducer с transactional.id 'job-1-subtask-3-tx-5'.
initTransactions
Coordinator (assigns PID, epoch)Kafka coordinator broker. Assigns PID=42 epoch=1. Если TID существует, fence old epoch. Stored в __transaction_state.
send records (ongoing)producer.beginTransaction() — state ONGOING. producer.send() records к topic partitions. AddPartitionsToTxnRequest informs coordinator о partition usage.
checkpoint barrier
prepareCommit (durable, hidden)prepareCommit() в Flink. Producer flushes pending sends. Records durable но в pre-commit state — not visible к read_committed consumers yet.
commitTransactionCheckpoint complete. Committer calls producer.commitTransaction(). EndTxnRequest(commit=true) к coordinator. State PREPARE_COMMIT.
WriteTxnMarker
Partition leaders write markersCoordinator sends WriteTxnMarkerRequest к each partition leader. Partition leader writes commit marker к log. Records become visible.
visible
Visible to read_committedDownstream consumers с isolation.level=read_committed see records. Coordinator state COMPLETE_COMMIT.

Timing implications

End-to-end latency с Kafka transactions:

Without transactions (at-least-once):
  Source -> Flink processing -> Kafka producer.send -> durable
  Latency: ~10 ms

With transactions (exactly-once):
  Source -> Flink processing -> Kafka producer.send (durable in pre-commit)
  Wait for Flink checkpoint barrier -> prepareCommit
  Wait for all subtasks checkpoint ack
  commitTransaction -> WriteTxnMarkers
  Records visible к downstream
  
  Total latency: ~10 ms + checkpoint_interval (e.g., 1-60 sec) + commit_duration (~100 ms)

This is fundamental trade-off:
  - Exactly-once delivery requires checkpoint coordination
  - Latency = max(processing, checkpoint_interval) + commit overhead
  - Frequent checkpoints reduce latency но cost throughput

Common production issues

1. transactional.id collisions
   Problem: Two Flink jobs использовать same TID prefix
   Result: Fencing happens, both jobs fail
   Solution: Unique prefix per job (включая environment, version)

2. Transaction timeouts
   Default: 15 min
   Problem: Long checkpoint duration > timeout -> transactions expire, aborted
   Solution: Increase transaction.timeout.ms на producer config

3. Pool exhaustion
   Default pool size: 5 per subtask
   Problem: Many concurrent checkpoints fail к get TID
   Solution: Increase pool size + max-concurrent-checkpoints

4. Coordinator failover
   Problem: Coordinator brokers fail, transactions stuck
   Result: Pending transactions могут не commit/abort
   Solution: __transaction_state replication factor ≥ 3, monitoring

5. Zombie producers
   Problem: Old TaskManager hanging onto transaction
   Mitigated by fencing
   But: Configuration must allow fast detection

Monitoring transactions

Kafka broker metrics:
  - kafka.coordinator.transaction:type=TransactionMetadataManager,name=...
  - Transaction commit/abort/timeout rates
  - Active transactions count
  - Average transaction duration

Flink metrics:
  - sink.numTransactionsCommitted
  - sink.numTransactionsAborted
  - sink.commitDuration

Common alerts:
  - Abort rate > 1% — investigate
  - Average commit duration > 1 sec — slow brokers
  - Active transactions > pool * subtasks — coordination issue
  - Failed transactions — fencing or timeouts

Performance tuning

Producer tuning:
  acks: all (для durability)
  enable.idempotence: true (default in 3.0+)
  transaction.timeout.ms: 60min (увеличить для long checkpoint intervals)
  batch.size: 64KB-1MB (larger batches reduce overhead)
  linger.ms: 5-10 (allow batching)
  compression.type: snappy or lz4

Broker tuning:
  transactional.id.expiration.ms: 7d (default ОК)
  transaction.max.timeout.ms: ≥ producer timeout
  __transaction_state replication.factor: 3 (minimum for durability)

Coordinator load balancing:
  Multiple coordinator partitions (50 default)
  TIDs hash to partition determines coordinator
  Even distribution important

Чтение source

Kafka source:
  kafka/clients/src/main/java/org/apache/kafka/clients/producer/
    KafkaProducer.java
    internals/TransactionManager.java
    internals/TxnRequestHandler.java

  kafka/core/src/main/scala/kafka/coordinator/transaction/
    TransactionCoordinator.scala
    TransactionMarkerChannelManager.scala
    TransactionStateManager.scala

Flink source:
  flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/
    KafkaSink.java
    KafkaCommitter.java
    KafkaWriter.java
    FlinkKafkaInternalProducer.java

KIP documents:
  KIP-98: Exactly Once Delivery and Transactional Messaging
  KIP-66: Single Message Transforms
  KIP-845: Bound the number of concurrent transactions
Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какие компоненты в Kafka обеспечивают exactly-once delivery через transactional producer?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4