Flink-Kafka coordination — full pipeline
В предыдущих уроках мы видели Two-Phase Commit protocol в Flink (TwoPhaseCommittingSink) и Kafka transactional producer internals. Теперь соединим их вместе: как Flink coordinates checkpoint barriers с Kafka transaction commits для end-to-end exactly-once delivery.
В этом уроке мы trace путь одного record от source через Flink через KafkaSink на broker, видя все coordination points между Flink checkpoint protocol и Kafka transaction protocol.
End-to-end exactly-once: практикаBig picture
End-to-end pipeline:
Kafka source
-> Flink processing (multiple operators)
-> KafkaSink
Coordinations:
1. Flink checkpoint barrier propagates через DAG (Chandy-Lamport ABS)
2. Source records offset в checkpoint
3. Operators record state в checkpoint
4. Sink: prepareCommit -> save TIDs + offsets к checkpoint
5. Checkpoint complete notification
6. Sink: commit Kafka transaction (records become visible)
Если failure anywhere в pipeline:
- Restore from last successful checkpoint
- Source replays from saved offsets
- Pending Kafka transactions either committed (если notify happened) or aborted
- No duplicates, no lost data
Sequence diagram: complete commit cycle
Time | JobManager | Source TM | Process TM | Sink TM | Kafka Broker
T0 | trigger checkpoint |
T1 | -> barrier | |
T2 | | snapshot offsets |
T3 | | -> barrier | |
T4 | | | snapshot state |
T5 | | | -> barrier |
T6 | | | | prepareCommit() |
T7 | | | | | producer.flush()
T8 | | | | | (records durable, not visible)
T9 | | | | return CommT (TID, offsets)
T10 | | | | save к checkpoint state
T11 | ackCheckpoint from all subtasks |
T12 | checkpoint complete | |
T13 | -> notifyCheckpointComplete to all |
T14 | | | | committer.commit(CommT) |
T15 | | | | | commitTransaction
T16 | | | | | EndTxnRequest(commit=true)
T17 | | | | | WriteTxnMarkers
T18 | | | | | (records now visible!)
T19 | | | | commit success returned
T20 | | | | release TID к pool
Это full timeline. Each step matters. Now let’s зум в.
Step-by-step detailed
Step 1: Checkpoint trigger
JobManager periodic timer (every execution.checkpointing.interval) triggers checkpoint:
// JobManager logic (упрощённо)
class CheckpointCoordinator {
void triggerPeriodicCheckpoint() {
long checkpointId = nextCheckpointId();
pendingCheckpoints.put(checkpointId, new PendingCheckpoint(...));
// Send barrier triggers to all source subtasks
sources.forEach(s -> s.triggerCheckpoint(checkpointId, timestamp));
}
}
Step 2: Barrier propagation от source
// Source operator (KafkaSource)
class KafkaSourceReader {
void snapshotState(long checkpointId) {
// Save current Kafka offsets к state
Map<TopicPartition, Long> offsets = currentOffsets();
offsetsState.update(offsets);
// Emit barrier downstream
emitBarrier(checkpointId);
}
}
Barrier — special record. Operators downstream wait для barriers from all upstreams (если есть multiple inputs), then snapshot themselves.
Step 3: Operator state snapshot
// Generic stateful operator
class StatefulOperator {
void snapshotState(long checkpointId) {
// Make state durable
stateBackend.checkpoint(checkpointId);
// Emit barrier downstream
emitBarrier(checkpointId);
}
}
С ForstDB это lightweight (snapshot manifest only). С RocksDB — async upload SST файлов в S3.
Step 4: Sink prepareCommit
Когда barrier достигает sink:
// KafkaSinkWriter (от FlinkKafkaInternalProducer perspective)
class KafkaSinkWriter implements PrecommittingSinkWriter<...> {
private FlinkKafkaInternalProducer producer;
private String transactionalIdPrefix;
private int subtaskId;
private long currentTransactionId;
@Override
public Collection<KafkaCommittable> prepareCommit() {
// Flush pending sends к Kafka brokers
producer.flush();
// Records are now durable on brokers, but in pre-commit state
// They won't be visible to read_committed consumers yet
// Build committable: TID + checkpoint metadata
String txnId = transactionalIdPrefix + "-" + subtaskId + "-" + currentTransactionId;
KafkaCommittable committable = new KafkaCommittable(
producer.getProducerId(), // PID assigned by coordinator
producer.getEpoch(), // current epoch
txnId, // transactional.id
...
);
// Important: create next producer for upcoming records
// The old producer is "frozen" with pending transaction
switchToNextTransaction();
return Collections.singletonList(committable);
}
private void switchToNextTransaction() {
currentTransactionId++;
// Get next producer from pool
producer = producerPool.acquire();
producer.initTransactions();
producer.beginTransaction();
}
}
KafkaCommittable содержит:
- transactional.id (for coordinator lookup)
- producerId + epoch (for fencing checks)
- Topic partitions involved (for verification)
Step 5: Save committables к checkpoint state
Flink runtime сохраняет CommT (committables) в checkpoint state:
// Flink runtime (CommittingSinkOperator)
void snapshotState(StateSnapshotContext context) {
// Get committables from writer
Collection<CommT> committables = writer.prepareCommit();
// Serialize through SimpleVersionedSerializer
byte[] serialized = serializer.serialize(committables);
// Save к committables state (per subtask)
committablesState.add(serialized);
}
State storage с другим snapshot — на ForstDB будет в S3.
Step 6: ACK к JobManager
Каждая subtask ACK завершение snapshot:
// Subtask -> JobManager
void completeCheckpoint(long checkpointId) {
coordinator.notifyCheckpointComplete(
getJobId(),
getExecutionAttemptId(),
checkpointId,
new TaskStateSnapshot(...)
);
}
// JobManager collects acks
class CheckpointCoordinator {
void receiveAcknowledgeMessage(AcknowledgeCheckpoint msg) {
PendingCheckpoint pc = pendingCheckpoints.get(msg.getCheckpointId());
pc.acknowledgeTask(msg.getTaskExecutionId());
if (pc.isFullyAcknowledged()) {
// ALL subtasks acked - checkpoint complete!
completeCheckpoint(pc);
}
}
void completeCheckpoint(PendingCheckpoint pc) {
// Finalize checkpoint metadata
CompletedCheckpoint cp = pc.finalize();
completedCheckpoints.add(cp);
// Notify all operators of completion
broadcastNotifyCheckpointComplete(pc.getCheckpointId());
}
}
Step 7: NotifyCheckpointComplete broadcast
JobManager broadcasts notification всем subtasks:
// JobManager
void broadcastNotifyCheckpointComplete(long checkpointId) {
executionGraph.getAllVertices().forEach(v -> {
v.getCurrentExecution().notifyCheckpointComplete(checkpointId);
});
}
// Subtask receives
class SubtaskRunner {
void notifyCheckpointComplete(long checkpointId) {
// Pass к operator
streamOperator.notifyCheckpointComplete(checkpointId);
}
}
Step 8: Sink committer commit
Now это finally время commit Kafka transaction:
// CommittingSinkOperator received notify
class CommittingSinkOperator {
void notifyCheckpointComplete(long checkpointId) {
// Read committables from state
Collection<CommT> committables = readCommittablesFromState(checkpointId);
// Call committer
committer.commit(committables);
// Clear committables from state (now committed durably)
clearCommittablesState(checkpointId);
}
}
// KafkaCommitter implementation
class KafkaCommitter implements Committer<KafkaCommittable> {
@Override
public void commit(Collection<CommittableWithLineage<KafkaCommittable>> committables) {
for (var c : committables) {
KafkaCommittable kc = c.getCommittable();
// Resume producer (or new instance с same TID)
FlinkKafkaInternalProducer producer = resumeProducer(
kc.getTransactionalId(),
kc.getProducerId(),
kc.getEpoch()
);
try {
// Actual commit к Kafka broker
producer.commitTransaction();
// Records now visible к read_committed consumers
} catch (ProducerFencedException e) {
// Fenced means another producer took over с newer epoch
// This commit failed, no-op (already committed by other or aborted)
LOG.warn("Producer fenced, transaction may have been committed by replacement");
} catch (InvalidTxnStateException e) {
// Transaction state invalid — could be already committed or aborted
// Idempotency depends on Kafka coordinator state
LOG.warn("Invalid transaction state — possibly already committed");
}
}
}
}
Step 9: Records become visible
Когда Kafka coordinator processes commitTransaction:
Inside Kafka broker:
1. Coordinator receives EndTxnRequest(TID, commit=true, epoch=N)
2. Coordinator verifies epoch — current epoch must match
3. Coordinator writes PREPARE_COMMIT к __transaction_state
4. Coordinator sends WriteTxnMarkerRequest к each partition leader
5. Each partition leader writes commit marker к its log
6. read_committed consumers can now skip pending markers и see records
7. Coordinator writes COMPLETE_COMMIT к __transaction_state
8. Returns success к Flink
С шага 5 — records visible. Это main point exactly-once visibility.
Failure scenarios
Scenario 1: TaskManager dies между prepare and commit notification
- Flink starts new TaskManager
- Loads checkpoint (с committables)
- Resumes от checkpoint
- Committables NOT yet committed (notify не пришло до crash)
- Sink committer не called
- Pending transactions в Kafka abort после timeout
- On restart: source from saved offsets, processes again
- Output: new transaction commits
- Result: exactly-once (lost pre-commit data was duplicates)
Scenario 2: TaskManager dies AFTER commit notification, перед commit completes
- Flink starts new TaskManager
- Loads checkpoint (с committables)
- But notify already broadcasted — JobManager moved on
- On restart, sink committer NOT called automatically
- PROBLEM: pending Kafka transactions can stay open до timeout
- Solution: Flink runtime tracks pending commits через CheckpointCommitter
- On startup: scan pending committables, retry commits
Scenario 3: JobManager dies между notify decision and broadcast
- JobManager replacement восстанавливает state from durable log
- Re-sends notify
- Sinks receive duplicate notify (idempotent)
- Commits proceed
Scenario 4: Kafka coordinator dies during commit
- Producer.commitTransaction() throws
- Coordinator replacement читает __transaction_state
- If state PREPARE_COMMIT: completes commit
- If state PREPARE_ABORT: completes abort
- Producer retries commitTransaction() — sees current state, идемпотентно
CheckpointCommitter tracking
Flink runtime tracks pending commits через CheckpointCommitter mechanism:
Per subtask:
committables_pending: LinkedHashMap<checkpoint_id, List<CommT>>
committables_committed: Set<checkpoint_id>
On notifyCheckpointComplete(N):
committables = committables_pending.remove(N)
committer.commit(committables)
committables_committed.add(N)
On restart:
Load committables_pending from checkpoint state
These are committables from successful prepareCommit but ungommitted
Replay committer.commit() for each
(Идемпотентно if Kafka coordinator handles correctly)
Timing characteristics
Typical timing breakdown (1-second checkpoint interval):
T+0 Checkpoint triggered
T+50ms Barrier reaches sources
T+200ms Barrier reaches processing operators (with state snapshot)
T+500ms Barrier reaches sinks (full state snapshot done)
T+550ms Sink prepareCommit (flush к Kafka, ~50ms)
T+600ms All subtasks ACK
T+610ms JobManager broadcasts complete
T+650ms Sinks call commitTransaction
T+700ms Kafka coordinator writes PREPARE_COMMIT
T+750ms WriteTxnMarkers sent к partition leaders
T+800ms All markers written, records visible
T+810ms Coordinator writes COMPLETE_COMMIT
Total commit-to-visibility: ~800 ms
If checkpoint interval = 60s:
Records visible to downstream с average 30s delay (half interval)
Plus commit duration ~800ms
Recommendation:
- For low latency: checkpoint interval 1-5 seconds (с ForstDB)
- For high throughput: checkpoint interval 30-60 seconds
Common issues
1. Long checkpoint duration
Symptom: lag between record processing и visibility large
Causes: huge state, slow S3/network, RocksDB backup
Solutions: ForstDB, async checkpoint, unaligned checkpoints
2. Commit timeouts
Symptom: commitTransaction throws
Causes: slow Kafka coordinator, network issues
Solutions: increase timeout, scale coordinator
3. Pool exhaustion
Symptom: cannot get new TID для transaction
Causes: too many concurrent in-flight checkpoints
Solutions: increase pool size, reduce max-concurrent-checkpoints
4. Pending commits leaking
Symptom: many uncommitted Kafka transactions over time
Causes: TaskManager crashes after prepare, commits don't replay
Solutions: monitor через CheckpointCommitter, manual recovery
5. Fencing on restart
Symptom: ProducerFencedException
Causes: TID epoch incremented by replacement
Solutions: normal recovery, transaction aborts, source replays
Configuration cheat sheet
# flink-conf.yaml
# Checkpointing
execution.checkpointing.interval: 30s
execution.checkpointing.timeout: 10min
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.unaligned: true # для low latency
# Kafka sink
sink.kafka.transactional.id.prefix: ${JOB_NAME}-${VERSION}
sink.kafka.transaction.timeout.ms: 900000 # 15min
sink.kafka.acks: all
# Performance
sink.kafka.batch.size: 65536 # 64KB
sink.kafka.linger.ms: 5
sink.kafka.compression.type: snappy
# Producer config to ensure exactly-once
sink.kafka.enable.idempotence: true
sink.kafka.max.in.flight.requests.per.connection: 5
End-to-end latency optimization
1. Reduce checkpoint interval
- 30s -> 5s reduces visibility latency from ~15s to ~2.5s
- С ForstDB feasible (lightweight checkpoints)
- Trade-off: more overhead, but visible delay smaller
2. Unaligned checkpoints
execution.checkpointing.unaligned: true
- Allows barriers to overtake records
- Reduces wait time для slow channels
- Slightly higher state size
- Critical для skewed workloads
3. Pre-emptive commit
- Tune Kafka transaction timeout
- Tune commit retry strategy
4. Async commit
- Sink V2 (FLIP-372): allow async TwoPhaseCommittingSink writer
- Reduces commit blocking
Чтение source
Flink source:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/
CommittingSinkOperator.java -- main coordinator
SinkWriterOperator.java
CommittablesSnapshotState.java -- state serialization
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/
CheckpointCoordinator.java -- JobManager checkpoint logic
PendingCheckpoint.java
CheckpointStatsTracker.java
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/
KafkaSink.java
KafkaCommitter.java
FlinkKafkaInternalProducer.java
KafkaCommittableSerializer.java
FLIP документы:
FLIP-372: Allow TwoPhaseCommittingSink Writer to be Async
FLIP-191: TwoPhaseCommittingSink
Production blogs:
Apache Flink blog: "Exactly-once semantics with Kafka"
Confluent blog: "Kafka Transactions Explained"