Learning Platform
Глоссарий Troubleshooting
Урок 13.03 · 24 мин
Продвинутый
Checkpoint CoordinationCheckpointCommitterNotifyCheckpointCompleteFlink-Kafka IntegrationEnd-to-End Exactly Once

Flink-Kafka coordination — full pipeline

В предыдущих уроках мы видели Two-Phase Commit protocol в Flink (TwoPhaseCommittingSink) и Kafka transactional producer internals. Теперь соединим их вместе: как Flink coordinates checkpoint barriers с Kafka transaction commits для end-to-end exactly-once delivery.

В этом уроке мы trace путь одного record от source через Flink через KafkaSink на broker, видя все coordination points между Flink checkpoint protocol и Kafka transaction protocol.

End-to-end exactly-once: практика

Big picture

End-to-end pipeline:

Kafka source
  -> Flink processing (multiple operators)
    -> KafkaSink

Coordinations:
  1. Flink checkpoint barrier propagates через DAG (Chandy-Lamport ABS)
  2. Source records offset в checkpoint
  3. Operators record state в checkpoint
  4. Sink: prepareCommit -> save TIDs + offsets к checkpoint
  5. Checkpoint complete notification
  6. Sink: commit Kafka transaction (records become visible)

Если failure anywhere в pipeline:
  - Restore from last successful checkpoint
  - Source replays from saved offsets
  - Pending Kafka transactions either committed (если notify happened) or aborted
  - No duplicates, no lost data

Sequence diagram: complete commit cycle

Time | JobManager | Source TM | Process TM | Sink TM | Kafka Broker
T0   | trigger checkpoint                          |
T1   | -> barrier  |                               |
T2   |             | snapshot offsets              |
T3   |             | -> barrier  |                 |
T4   |             |             | snapshot state  |
T5   |             |             | -> barrier      |
T6   |             |             |                 | prepareCommit() |
T7   |             |             |                 |                 | producer.flush()
T8   |             |             |                 |                 | (records durable, not visible)
T9   |             |             |                 | return CommT (TID, offsets)
T10  |             |             |                 | save к checkpoint state
T11  | ackCheckpoint from all subtasks               |
T12  | checkpoint complete       |                 |
T13  | -> notifyCheckpointComplete to all          |
T14  |             |             |                 | committer.commit(CommT) |
T15  |             |             |                 |                 | commitTransaction
T16  |             |             |                 |                 | EndTxnRequest(commit=true)
T17  |             |             |                 |                 | WriteTxnMarkers
T18  |             |             |                 |                 | (records now visible!)
T19  |             |             |                 | commit success returned
T20  |             |             |                 | release TID к pool

Это full timeline. Each step matters. Now let’s зум в.

Step-by-step detailed

Step 1: Checkpoint trigger

JobManager periodic timer (every execution.checkpointing.interval) triggers checkpoint:

// JobManager logic (упрощённо)
class CheckpointCoordinator {
    void triggerPeriodicCheckpoint() {
        long checkpointId = nextCheckpointId();
        pendingCheckpoints.put(checkpointId, new PendingCheckpoint(...));
        
        // Send barrier triggers to all source subtasks
        sources.forEach(s -> s.triggerCheckpoint(checkpointId, timestamp));
    }
}

Step 2: Barrier propagation от source

// Source operator (KafkaSource)
class KafkaSourceReader {
    void snapshotState(long checkpointId) {
        // Save current Kafka offsets к state
        Map<TopicPartition, Long> offsets = currentOffsets();
        offsetsState.update(offsets);
        
        // Emit barrier downstream
        emitBarrier(checkpointId);
    }
}

Barrier — special record. Operators downstream wait для barriers from all upstreams (если есть multiple inputs), then snapshot themselves.

Step 3: Operator state snapshot

// Generic stateful operator
class StatefulOperator {
    void snapshotState(long checkpointId) {
        // Make state durable
        stateBackend.checkpoint(checkpointId);
        
        // Emit barrier downstream
        emitBarrier(checkpointId);
    }
}

С ForstDB это lightweight (snapshot manifest only). С RocksDB — async upload SST файлов в S3.

Step 4: Sink prepareCommit

Когда barrier достигает sink:

// KafkaSinkWriter (от FlinkKafkaInternalProducer perspective)
class KafkaSinkWriter implements PrecommittingSinkWriter<...> {
    private FlinkKafkaInternalProducer producer;
    private String transactionalIdPrefix;
    private int subtaskId;
    private long currentTransactionId;
    
    @Override
    public Collection<KafkaCommittable> prepareCommit() {
        // Flush pending sends к Kafka brokers
        producer.flush();
        
        // Records are now durable on brokers, but in pre-commit state
        // They won't be visible to read_committed consumers yet
        
        // Build committable: TID + checkpoint metadata
        String txnId = transactionalIdPrefix + "-" + subtaskId + "-" + currentTransactionId;
        KafkaCommittable committable = new KafkaCommittable(
            producer.getProducerId(),       // PID assigned by coordinator
            producer.getEpoch(),             // current epoch
            txnId,                           // transactional.id
            ...
        );
        
        // Important: create next producer for upcoming records
        // The old producer is "frozen" with pending transaction
        switchToNextTransaction();
        
        return Collections.singletonList(committable);
    }
    
    private void switchToNextTransaction() {
        currentTransactionId++;
        // Get next producer from pool
        producer = producerPool.acquire();
        producer.initTransactions();
        producer.beginTransaction();
    }
}

KafkaCommittable содержит:

  • transactional.id (for coordinator lookup)
  • producerId + epoch (for fencing checks)
  • Topic partitions involved (for verification)

Step 5: Save committables к checkpoint state

Flink runtime сохраняет CommT (committables) в checkpoint state:

// Flink runtime (CommittingSinkOperator)
void snapshotState(StateSnapshotContext context) {
    // Get committables from writer
    Collection<CommT> committables = writer.prepareCommit();
    
    // Serialize through SimpleVersionedSerializer
    byte[] serialized = serializer.serialize(committables);
    
    // Save к committables state (per subtask)
    committablesState.add(serialized);
}

State storage с другим snapshot — на ForstDB будет в S3.

Step 6: ACK к JobManager

Каждая subtask ACK завершение snapshot:

// Subtask -> JobManager
void completeCheckpoint(long checkpointId) {
    coordinator.notifyCheckpointComplete(
        getJobId(),
        getExecutionAttemptId(),
        checkpointId,
        new TaskStateSnapshot(...)
    );
}

// JobManager collects acks
class CheckpointCoordinator {
    void receiveAcknowledgeMessage(AcknowledgeCheckpoint msg) {
        PendingCheckpoint pc = pendingCheckpoints.get(msg.getCheckpointId());
        pc.acknowledgeTask(msg.getTaskExecutionId());
        
        if (pc.isFullyAcknowledged()) {
            // ALL subtasks acked - checkpoint complete!
            completeCheckpoint(pc);
        }
    }
    
    void completeCheckpoint(PendingCheckpoint pc) {
        // Finalize checkpoint metadata
        CompletedCheckpoint cp = pc.finalize();
        completedCheckpoints.add(cp);
        
        // Notify all operators of completion
        broadcastNotifyCheckpointComplete(pc.getCheckpointId());
    }
}

Step 7: NotifyCheckpointComplete broadcast

JobManager broadcasts notification всем subtasks:

// JobManager
void broadcastNotifyCheckpointComplete(long checkpointId) {
    executionGraph.getAllVertices().forEach(v -> {
        v.getCurrentExecution().notifyCheckpointComplete(checkpointId);
    });
}

// Subtask receives
class SubtaskRunner {
    void notifyCheckpointComplete(long checkpointId) {
        // Pass к operator
        streamOperator.notifyCheckpointComplete(checkpointId);
    }
}

Step 8: Sink committer commit

Now это finally время commit Kafka transaction:

// CommittingSinkOperator received notify
class CommittingSinkOperator {
    void notifyCheckpointComplete(long checkpointId) {
        // Read committables from state
        Collection<CommT> committables = readCommittablesFromState(checkpointId);
        
        // Call committer
        committer.commit(committables);
        
        // Clear committables from state (now committed durably)
        clearCommittablesState(checkpointId);
    }
}

// KafkaCommitter implementation
class KafkaCommitter implements Committer<KafkaCommittable> {
    @Override
    public void commit(Collection<CommittableWithLineage<KafkaCommittable>> committables) {
        for (var c : committables) {
            KafkaCommittable kc = c.getCommittable();
            
            // Resume producer (or new instance с same TID)
            FlinkKafkaInternalProducer producer = resumeProducer(
                kc.getTransactionalId(),
                kc.getProducerId(),
                kc.getEpoch()
            );
            
            try {
                // Actual commit к Kafka broker
                producer.commitTransaction();
                // Records now visible к read_committed consumers
            } catch (ProducerFencedException e) {
                // Fenced means another producer took over с newer epoch
                // This commit failed, no-op (already committed by other or aborted)
                LOG.warn("Producer fenced, transaction may have been committed by replacement");
            } catch (InvalidTxnStateException e) {
                // Transaction state invalid — could be already committed or aborted
                // Idempotency depends on Kafka coordinator state
                LOG.warn("Invalid transaction state — possibly already committed");
            }
        }
    }
}

Step 9: Records become visible

Когда Kafka coordinator processes commitTransaction:

Inside Kafka broker:
  1. Coordinator receives EndTxnRequest(TID, commit=true, epoch=N)
  2. Coordinator verifies epoch — current epoch must match
  3. Coordinator writes PREPARE_COMMIT к __transaction_state
  4. Coordinator sends WriteTxnMarkerRequest к each partition leader
  5. Each partition leader writes commit marker к its log
  6. read_committed consumers can now skip pending markers и see records
  7. Coordinator writes COMPLETE_COMMIT к __transaction_state
  8. Returns success к Flink

С шага 5 — records visible. Это main point exactly-once visibility.

Failure scenarios

Scenario 1: TaskManager dies между prepare and commit notification
  - Flink starts new TaskManager
  - Loads checkpoint (с committables)
  - Resumes от checkpoint
  - Committables NOT yet committed (notify не пришло до crash)
  - Sink committer не called
  - Pending transactions в Kafka abort после timeout
  - On restart: source from saved offsets, processes again
  - Output: new transaction commits
  - Result: exactly-once (lost pre-commit data was duplicates)

Scenario 2: TaskManager dies AFTER commit notification, перед commit completes
  - Flink starts new TaskManager
  - Loads checkpoint (с committables)
  - But notify already broadcasted — JobManager moved on
  - On restart, sink committer NOT called automatically
  - PROBLEM: pending Kafka transactions can stay open до timeout
  - Solution: Flink runtime tracks pending commits через CheckpointCommitter
  - On startup: scan pending committables, retry commits

Scenario 3: JobManager dies между notify decision and broadcast
  - JobManager replacement восстанавливает state from durable log
  - Re-sends notify
  - Sinks receive duplicate notify (idempotent)
  - Commits proceed

Scenario 4: Kafka coordinator dies during commit
  - Producer.commitTransaction() throws
  - Coordinator replacement читает __transaction_state
  - If state PREPARE_COMMIT: completes commit
  - If state PREPARE_ABORT: completes abort
  - Producer retries commitTransaction() — sees current state, идемпотентно

CheckpointCommitter tracking

Flink runtime tracks pending commits через CheckpointCommitter mechanism:

Per subtask:
  committables_pending: LinkedHashMap<checkpoint_id, List<CommT>>
  committables_committed: Set<checkpoint_id>

On notifyCheckpointComplete(N):
  committables = committables_pending.remove(N)
  committer.commit(committables)
  committables_committed.add(N)

On restart:
  Load committables_pending from checkpoint state
  These are committables from successful prepareCommit but ungommitted
  Replay committer.commit() for each
  (Идемпотентно if Kafka coordinator handles correctly)
End-to-end exactly-once sequence
JM: trigger checkpoint NJobManager triggers checkpoint N via timer. Sends barrier to all sources. Barrier propagation begins.
barrier
Source -> Process: snapshot stateSource saves Kafka offsets к state, emits barrier downstream. Processing operators snapshot state, emit barrier further.
Sink: prepareCommit (flush)KafkaSink reaches barrier. prepareCommit() called: producer.flush() — records durable on brokers в pre-commit state, not visible к consumers.
committables
Save committables + ACKCommittables (TID, PID, epoch) saved к checkpoint state. All subtasks ACK к JobManager.
JM: notify completeAll ACKs received. JobManager broadcasts notifyCheckpointComplete(N). Sinks now safe to commit Kafka transactions.
commit
Committer: Kafka commitKafkaCommitter.commit(): producer.commitTransaction(). Coordinator writes PREPARE_COMMIT, sends WriteTxnMarkers к partition leaders.
visible
Records visiblePartition leaders write commit markers. Records visible к read_committed consumers. Coordinator writes COMPLETE_COMMIT.

Timing characteristics

Typical timing breakdown (1-second checkpoint interval):

T+0   Checkpoint triggered
T+50ms Barrier reaches sources
T+200ms Barrier reaches processing operators (with state snapshot)
T+500ms Barrier reaches sinks (full state snapshot done)
T+550ms Sink prepareCommit (flush к Kafka, ~50ms)
T+600ms All subtasks ACK
T+610ms JobManager broadcasts complete
T+650ms Sinks call commitTransaction
T+700ms Kafka coordinator writes PREPARE_COMMIT
T+750ms WriteTxnMarkers sent к partition leaders
T+800ms All markers written, records visible
T+810ms Coordinator writes COMPLETE_COMMIT

Total commit-to-visibility: ~800 ms

If checkpoint interval = 60s:
  Records visible to downstream с average 30s delay (half interval)
  Plus commit duration ~800ms
  
Recommendation:
  - For low latency: checkpoint interval 1-5 seconds (с ForstDB)
  - For high throughput: checkpoint interval 30-60 seconds

Common issues

1. Long checkpoint duration
   Symptom: lag between record processing и visibility large
   Causes: huge state, slow S3/network, RocksDB backup
   Solutions: ForstDB, async checkpoint, unaligned checkpoints

2. Commit timeouts
   Symptom: commitTransaction throws
   Causes: slow Kafka coordinator, network issues
   Solutions: increase timeout, scale coordinator

3. Pool exhaustion
   Symptom: cannot get new TID для transaction
   Causes: too many concurrent in-flight checkpoints
   Solutions: increase pool size, reduce max-concurrent-checkpoints

4. Pending commits leaking
   Symptom: many uncommitted Kafka transactions over time
   Causes: TaskManager crashes after prepare, commits don't replay
   Solutions: monitor через CheckpointCommitter, manual recovery

5. Fencing on restart
   Symptom: ProducerFencedException
   Causes: TID epoch incremented by replacement
   Solutions: normal recovery, transaction aborts, source replays

Configuration cheat sheet

# flink-conf.yaml

# Checkpointing
execution.checkpointing.interval: 30s
execution.checkpointing.timeout: 10min
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.unaligned: true  # для low latency

# Kafka sink
sink.kafka.transactional.id.prefix: ${JOB_NAME}-${VERSION}
sink.kafka.transaction.timeout.ms: 900000  # 15min
sink.kafka.acks: all

# Performance
sink.kafka.batch.size: 65536  # 64KB
sink.kafka.linger.ms: 5
sink.kafka.compression.type: snappy

# Producer config to ensure exactly-once
sink.kafka.enable.idempotence: true
sink.kafka.max.in.flight.requests.per.connection: 5

End-to-end latency optimization

1. Reduce checkpoint interval
   - 30s -> 5s reduces visibility latency from ~15s to ~2.5s
   - С ForstDB feasible (lightweight checkpoints)
   - Trade-off: more overhead, but visible delay smaller

2. Unaligned checkpoints
   execution.checkpointing.unaligned: true
   - Allows barriers to overtake records
   - Reduces wait time для slow channels
   - Slightly higher state size
   - Critical для skewed workloads

3. Pre-emptive commit
   - Tune Kafka transaction timeout
   - Tune commit retry strategy

4. Async commit
   - Sink V2 (FLIP-372): allow async TwoPhaseCommittingSink writer
   - Reduces commit blocking

Чтение source

Flink source:
  flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/
    CommittingSinkOperator.java        -- main coordinator
    SinkWriterOperator.java
    CommittablesSnapshotState.java     -- state serialization

  flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/
    CheckpointCoordinator.java          -- JobManager checkpoint logic
    PendingCheckpoint.java
    CheckpointStatsTracker.java

  flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/
    KafkaSink.java
    KafkaCommitter.java
    FlinkKafkaInternalProducer.java
    KafkaCommittableSerializer.java

FLIP документы:
  FLIP-372: Allow TwoPhaseCommittingSink Writer to be Async
  FLIP-191: TwoPhaseCommittingSink

Production blogs:
  Apache Flink blog: "Exactly-once semantics with Kafka"
  Confluent blog: "Kafka Transactions Explained"
Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. В какой момент записи через KafkaSink становятся visible к read_committed consumers?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4