Transactional Kafka — broker-level internals
KafkaSink в Flink реализует exactly-once delivery через transactional Kafka producer. Это не просто producer.send() + producer.flush() — это координированный протокол между Flink (как client) и Kafka broker (как coordinator транзакций).
В этом уроке мы разбираем, как Kafka transactions работают на уровне brokers: что такое transactional.id, как работает fencing, что значит isolation.level=read_committed, какие state транзакций существуют, и почему transactional.id pre-fix per subtask критичен для Flink.
Эволюция Kafka producer: from at-least-once to exactly-once
Kafka 0.11 (2017):
- Idempotent producer (enable.idempotence=true)
* Producer assigns sequence number per record
* Broker dedupes duplicates within session
* Solves: retries после network errors
- Transactional producer (transactional.id)
* Multiple producers can commit atomically
* Cross-partition atomic writes
* Required для exactly-once delivery
Kafka 3.0+:
- Default enable.idempotence=true для producers
- acks=all default
Apache Kafka 4.0 (2024):
- KIP-848: New consumer protocol (КRaft only)
- Transactional improvements
- Better fencing
Idempotent producer foundations
Перед transactions Kafka добавила idempotent producer (опытная гарантия “exactly-once per session”):
Idempotent producer mechanics:
1. Producer connects к broker
2. Broker assigns producer ID (PID) + epoch
3. Each record: (PID, sequence_number)
4. Broker tracks sequence numbers
5. If broker receives duplicate (same PID+seq), drops silently
6. If broker receives out-of-order, returns error (OutOfOrderSequenceException)
Limitations:
- Только within a session (one producer instance)
- Если producer restarts -> new PID -> no dedup across restarts
- Не handles cross-partition atomic writes
Idempotent producer не достаточен для Flink — Flink restarts и нужна durability across restarts. Это где transactions входят.
Transactional producer
Transactional producer adds:
1. transactional.id (TID)
- User-supplied identifier
- Sticky across producer restarts
- Coordinator (broker) tracks active transactions per TID
- Allows fencing zombie producers
2. Transaction coordinator
- One broker per __transaction_state topic partition
- Manages transaction lifecycle
- Logs transaction state changes к __transaction_state
3. Transactional state machine
States: Empty, Ongoing, PrepareCommit, PrepareAbort, CompleteCommit, CompleteAbort, Dead
4. Atomic commit across partitions
- Records spread across multiple topics/partitions
- Either ALL committed or ALL aborted
5. Consumer isolation
- read_committed: only committed transactions visible
- read_uncommitted: see in-progress (для special use cases)
Producer-broker protocol
Lifecycle транзакции:
1. Producer initializes:
producer.initTransactions()
-> InitProducerIdRequest (TID)
-> Coordinator responds: (PID, epoch)
-> If TID already exists с different PID, fences old PID
2. Begin transaction:
producer.beginTransaction()
-> Producer state: TRANSACTION_OPEN
-> No broker call yet
3. Send records:
producer.send(record)
-> Producer adds (TID, PID, epoch, sequence) к record metadata
-> Sends к respective topic-partition leader
-> Broker logs к __consumer_offsets__ как part of transaction
-> First record sent triggers AddPartitionsToTxnRequest к coordinator
4. Commit:
producer.commitTransaction()
-> EndTxnRequest(commit=true) к coordinator
-> Coordinator writes to __transaction_state: PREPARE_COMMIT
-> Coordinator sends WriteTxnMarkerRequest к partition leaders
-> Partition leaders write commit markers к their logs
-> Records become visible к read_committed consumers
-> Coordinator writes: COMMIT_COMPLETE
5. Abort (если error или explicit):
producer.abortTransaction()
-> EndTxnRequest(commit=false)
-> Coordinator writes PREPARE_ABORT
-> Partition leaders write abort markers
-> Records skipped by read_committed consumers
-> Coordinator writes ABORT_COMPLETE
Transaction state machine на broker
Coordinator tracks per-TID state:
EMPTY:
- No active transaction
- InitProducerIdRequest assigns PID/epoch
ONGOING:
- producer.beginTransaction() called
- Records being written
- Partitions added к transaction via AddPartitionsToTxnRequest
PREPARE_COMMIT:
- producer.commitTransaction() initiated
- Coordinator transitioning к commit
- If coordinator crashes here: on recovery, complete the commit
PREPARE_ABORT:
- producer.abortTransaction() initiated
- Coordinator transitioning к abort
- If coordinator crashes here: on recovery, complete the abort
COMPLETE_COMMIT:
- All partition markers written
- Transaction complete
- Transition к EMPTY for next transaction
COMPLETE_ABORT:
- Same as COMPLETE_COMMIT but aborted
DEAD:
- Coordinator timeout reached
- Transactional.id pruned
- Future use creates new transaction state
Coordinator durability: state persisted в __transaction_state topic (replicated, fault tolerant). On coordinator failover (broker dies), new coordinator reads state from log, continues protocol.
Fencing — zombie producer prevention
Critical: что если old Flink TaskManager think он still running и tries commit?
Scenario:
t=0: TaskManager A starts producer с TID="flink-job-1-subtask-3"
Coordinator assigns PID=42, epoch=1
Producer.send some records
t=5: Network partition. JobManager думает A dead.
JobManager schedules new TaskManager B на другой node.
t=6: TaskManager B starts producer с тем же TID="flink-job-1-subtask-3"
InitProducerIdRequest к coordinator
Coordinator: "TID already exists, PID=42 epoch=1"
Coordinator response: increment epoch к 2, fence PID=42
New PID/epoch for B: PID=42, epoch=2
(PID может быть same, only epoch changes для fencing)
t=7: TaskManager A recovers network, tries producer.commitTransaction()
EndTxnRequest(PID=42, epoch=1) к coordinator
Coordinator: "Epoch 1 fenced, current is 2"
-> ProducerFencedException returned к A
-> A producer disabled, throws to application
-> Flink runtime handles, A's transaction aborted
Result: Only B's transaction can commit. No duplicates.
Fencing = critical safety mechanism. Без него concurrent producers могли бы both commit, creating duplicates.
transactional.id design в Flink
Flink uses sticky TID per subtask:
Naming convention (Flink KafkaSink):
TID = "{prefix}-{subtask_index}-{transaction_attempt}"
Example:
"flink-checkpoint-job-1-subtask-0-tx-12"
"flink-checkpoint-job-1-subtask-1-tx-12"
...
"flink-checkpoint-job-1-subtask-N-tx-12"
Properties:
1. Sticky per subtask:
- Same subtask reuses same TID prefix
- Survives Flink restarts (config + savepoint preservation)
2. Unique across job:
- Different subtasks have different TIDs
- No collision
3. Includes attempt number:
- Each Flink checkpoint = new transaction attempt
- Allows multiple in-flight transactions per subtask
- Up to max-in-flight checkpoints (configurable)
Configuration:
# Flink KafkaSink configuration
sink.transactional.id.prefix: my-job
sink.transactional.id.suffix: subtask-0 # auto-generated typically
# Pool size (max concurrent transactions per subtask)
sink.transaction.timeout-ms: 60min
Pool of transactional.ids
Flink имеет pool of TIDs per subtask:
Per subtask: pool of N TIDs (default 5):
tx-0, tx-1, tx-2, tx-3, tx-4
Lifecycle:
- At checkpoint, subtask uses один tx-N
- Pre-commits with that TID
- On checkpoint complete, commits
- TID returned к pool for reuse
Why pool? Async commits:
- Checkpoint N+1 может start до того как N commit complete
- Both in-flight transactions need different TIDs
- Pool prevents blocking
Configuration:
table.exec.sink.transactional-id-prefix
Pool size implied by execution.checkpointing.max-concurrent-checkpoints
read_committed vs read_uncommitted
Consumer должен явно opt in для exactly-once semantics:
# Consumer config
isolation.level: read_committed # see only committed transactions
isolation.level: read_uncommitted # see all records (default)
read_committed semantics:
Consumer fetches records:
- Skips records inside aborted transactions
- Skips in-progress transactions
- Returns only committed records
Mechanism:
- Each record metadata: TID + transaction status marker
- Consumer maintains state of pending transactions
- Stalls return until transaction completes (commit или abort)
Consequences:
- End-to-end latency increases
Wait для transaction commit before consume
Equals checkpoint interval + commit duration
- Consistent reads possible
Either entire transaction visible или nothing
Timing implications
End-to-end latency с Kafka transactions:
Without transactions (at-least-once):
Source -> Flink processing -> Kafka producer.send -> durable
Latency: ~10 ms
With transactions (exactly-once):
Source -> Flink processing -> Kafka producer.send (durable in pre-commit)
Wait for Flink checkpoint barrier -> prepareCommit
Wait for all subtasks checkpoint ack
commitTransaction -> WriteTxnMarkers
Records visible к downstream
Total latency: ~10 ms + checkpoint_interval (e.g., 1-60 sec) + commit_duration (~100 ms)
This is fundamental trade-off:
- Exactly-once delivery requires checkpoint coordination
- Latency = max(processing, checkpoint_interval) + commit overhead
- Frequent checkpoints reduce latency но cost throughput
Common production issues
1. transactional.id collisions
Problem: Two Flink jobs использовать same TID prefix
Result: Fencing happens, both jobs fail
Solution: Unique prefix per job (включая environment, version)
2. Transaction timeouts
Default: 15 min
Problem: Long checkpoint duration > timeout -> transactions expire, aborted
Solution: Increase transaction.timeout.ms на producer config
3. Pool exhaustion
Default pool size: 5 per subtask
Problem: Many concurrent checkpoints fail к get TID
Solution: Increase pool size + max-concurrent-checkpoints
4. Coordinator failover
Problem: Coordinator brokers fail, transactions stuck
Result: Pending transactions могут не commit/abort
Solution: __transaction_state replication factor ≥ 3, monitoring
5. Zombie producers
Problem: Old TaskManager hanging onto transaction
Mitigated by fencing
But: Configuration must allow fast detection
Monitoring transactions
Kafka broker metrics:
- kafka.coordinator.transaction:type=TransactionMetadataManager,name=...
- Transaction commit/abort/timeout rates
- Active transactions count
- Average transaction duration
Flink metrics:
- sink.numTransactionsCommitted
- sink.numTransactionsAborted
- sink.commitDuration
Common alerts:
- Abort rate > 1% — investigate
- Average commit duration > 1 sec — slow brokers
- Active transactions > pool * subtasks — coordination issue
- Failed transactions — fencing or timeouts
Performance tuning
Producer tuning:
acks: all (для durability)
enable.idempotence: true (default in 3.0+)
transaction.timeout.ms: 60min (увеличить для long checkpoint intervals)
batch.size: 64KB-1MB (larger batches reduce overhead)
linger.ms: 5-10 (allow batching)
compression.type: snappy or lz4
Broker tuning:
transactional.id.expiration.ms: 7d (default ОК)
transaction.max.timeout.ms: ≥ producer timeout
__transaction_state replication.factor: 3 (minimum for durability)
Coordinator load balancing:
Multiple coordinator partitions (50 default)
TIDs hash to partition determines coordinator
Even distribution important
Чтение source
Kafka source:
kafka/clients/src/main/java/org/apache/kafka/clients/producer/
KafkaProducer.java
internals/TransactionManager.java
internals/TxnRequestHandler.java
kafka/core/src/main/scala/kafka/coordinator/transaction/
TransactionCoordinator.scala
TransactionMarkerChannelManager.scala
TransactionStateManager.scala
Flink source:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/
KafkaSink.java
KafkaCommitter.java
KafkaWriter.java
FlinkKafkaInternalProducer.java
KIP documents:
KIP-98: Exactly Once Delivery and Transactional Messaging
KIP-66: Single Message Transforms
KIP-845: Bound the number of concurrent transactions