Two-Phase Commit — protocol для exactly-once
Exactly-once semantics в distributed системе — одна из самых сложных гарантий. Это не “событие обрабатывается ровно один раз” (что часто невозможно из-за retries), а “side effects (writes в sink) видны как будто событие обработано ровно один раз”. Flink achieves this через Two-Phase Commit (2PC) protocol, formalized в TwoPhaseCommittingSink interface (FLIP-191).
В этом уроке разбираем 2PC protocol detail by detail: какие фазы, как они работают, какие гарантии дают, и как Flink реализует через interface methods. Это foundation для понимания KafkaSink, IcebergSink и других transactional sinks.
End-to-end exactly-once: source replay + sink 2PC Kafka транзакции со стороны producerЧто такое exactly-once в context Flink
Гарантии delivery semantics:
At-most-once:
- Сообщения могут быть lost
- Никогда не дублируются
- Fastest, but data loss possible
At-least-once:
- Сообщения гарантированно delivered
- Могут быть дублированы (replay после failure)
- Most streaming systems по умолчанию
Exactly-once:
- Сообщения delivered exactly один раз
- Без дублирования или потери
- Hardest to achieve, expensive
Critical distinction:
Exactly-once processing — message processed на operator один раз
Exactly-once delivery — writes в sink видны один раз
Flink дает оба, но они different.
Exactly-once delivery требует координации между source (для reset reading), processing (для replay deterministic), и sink (для atomic writes).
Зачем нужен 2PC
Простой commit модель не работает для distributed sink:
Naïve sink commit:
for each record:
1. write к sink (Kafka, DB, ...)
2. emit record downstream
Problem: между write и notify checkpoint failure
- Record already written, но не recorded в state
- On replay: same record written again (duplicate)
Fix #1: Idempotent writes
- Каждый record имеет unique ID
- Sink dedupes
- Works для некоторых sinks (Kafka keyed, KV stores)
- НЕ works для append-only (file systems, append-only logs)
Fix #2: Transactional writes
- Begin transaction
- Write records
- Commit transaction (или abort)
- Только commits видны readers
Flink uses Fix #2 — transactional writes coordinated через checkpoints.
Two-Phase Commit basics
Classic 2PC из distributed systems literature:
Participants:
- Coordinator (JobManager в Flink)
- Participants (TaskManager sinks в Flink)
Phase 1: Prepare
- Coordinator asks: "Can you commit?"
- Each participant prepares (write to durable log, lock resources)
- Replies: PREPARED или ABORT
- Если все PREPARED -> proceed to Phase 2
- Если хоть один ABORT -> coordinator abort
Phase 2: Commit (или Abort)
- Coordinator broadcasts: COMMIT (или ABORT)
- Each participant commits (или aborts)
- Replies: ACK
Critical properties:
- Если coordinator crashes между phases — все participants stuck
- Each participant должен know что делать при recovery
- Coordinator выживает между restarts (durable log)
В Flink JobManager = coordinator, TaskManager sinks = participants. Checkpoint barriers = phase 1 trigger.
Flink mapping на 2PC
Flink integration с checkpoints:
Checkpoint barrier propagates через DAG
-> reaches sink TaskManager
-> sink begins prepareCommit
-> sink writes pre-committed data (or marks transaction)
-> sink returns commit metadata (transaction ID или handle)
-> commit metadata included в checkpoint state
Checkpoint complete notification (after all subtasks ack)
-> sink calls commit() с stored metadata
-> records become visible к downstream consumers
-> transaction "made durable"
Failure scenarios:
- Failure before checkpoint complete: pre-commit data discarded
- Failure between prepare and commit: pending transactions
-> on recovery, sink looks at stored metadata
-> retries commit (idempotent через transaction ID)
- Failure after commit: data already durable, OK
TwoPhaseCommittingSink interface (FLIP-191)
public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {
// Phase 1: write pre-committed data
PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context);
// Phase 2: commit pre-committed data
Committer<CommT> createCommitter();
// For recovery (handling pending transactions)
SimpleVersionedSerializer<CommT> getCommittableSerializer();
}
public interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> {
// Called per checkpoint
// Pre-commit: durable записи готовы к commit
// Возвращает committables (что нужно committer'у для финального commit)
Collection<CommT> prepareCommit();
}
public interface Committer<CommT> extends Closeable {
// Called после checkpoint complete
// Принимает committables, делает actual commit
void commit(Collection<CommittableWithLineage<CommT>> committables);
}
Three roles:
- SinkWriter — writes data (per element)
- prepareCommit — phase 1, prepare for checkpoint
- Committer — phase 2, actually commit
Detailed flow
Time T0: Job running normally
Records flowing
SinkWriter buffers/writes pre-committed data
Internal state: not yet committed
Time T1: Checkpoint barrier received
Flink runtime calls writer.prepareCommit()
Writer returns: List<CommT> committables
Example для Kafka: List of (topic, partition, transactional.id, offset)
Example для files: List of (file path, size, checksum)
Committables saved в checkpoint state via getCommittableSerializer()
Writer внутренне signals durability (commit log entry, fsync)
Time T2: All subtasks finish checkpoint
JobManager collects acks
Checkpoint marked complete
JobManager notifies: notifyCheckpointComplete(checkpointId)
Time T3: Sink notified of complete
Flink runtime calls committer.commit(committables)
Committer выполняет actual commit:
Kafka: producer.commitTransaction()
Files: rename .tmp to .committed
DB: COMMIT TRANSACTION
Records become visible
Time T1.5 - failure case (между prepare и commit):
TaskManager dies
Job restarts от last successful checkpoint
Loaded committables from checkpoint state
Committer called с loaded committables
Same commit operation performed (idempotent через transaction ID)
No duplicates because previously commit didn't happen
No lost data because pre-commit data preserved
State machine
SinkWriter state machine:
WRITING:
- Accepts records via write(element, context)
- Buffers internally
- On prepareCommit() -> COMMITTING
COMMITTING:
- prepareCommit() returns committables
- State persisted в checkpoint
- Wait для notifyCheckpointComplete
- On notify -> DONE
DONE:
- committer.commit() called
- Pre-committed data made durable
- Resources released
- New transaction может begin -> back к WRITING
FAILURE:
- Если фейл до notify: pre-commit data discarded on recovery
- Если фейл после notify, перед commit: committer replay from checkpoint
- Идемпотентность через transaction ID
Example: simple file sink
public class FileSinkExample implements TwoPhaseCommittingSink<String, FileCommittable> {
@Override
public PrecommittingSinkWriter<String, FileCommittable> createWriter(InitContext ctx) {
return new FileWriter(...);
}
@Override
public Committer<FileCommittable> createCommitter() {
return new FileCommitter();
}
@Override
public SimpleVersionedSerializer<FileCommittable> getCommittableSerializer() {
return new FileCommittableSerializer();
}
}
class FileWriter implements PrecommittingSinkWriter<String, FileCommittable> {
private File tempFile;
private BufferedWriter writer;
@Override
public void write(String element, Context context) {
if (tempFile == null) {
tempFile = createTempFile();
writer = new BufferedWriter(new FileWriter(tempFile));
}
writer.write(element);
writer.newLine();
}
@Override
public Collection<FileCommittable> prepareCommit() {
writer.flush();
writer.close();
// Pre-commit: file is fully written to disk
// But still has .tmp suffix, not yet visible
FileCommittable committable = new FileCommittable(
tempFile.getPath(),
calculateChecksum(tempFile)
);
tempFile = null;
return Collections.singletonList(committable);
}
}
class FileCommitter implements Committer<FileCommittable> {
@Override
public void commit(Collection<CommittableWithLineage<FileCommittable>> committables) {
for (var c : committables) {
File tempFile = new File(c.getCommittable().getPath());
File finalFile = new File(tempFile.getPath().replace(".tmp", ".final"));
// Atomic rename — file becomes visible
tempFile.renameTo(finalFile);
}
}
}
Этот sketch показывает основную идею: phase 1 пишет .tmp файл, phase 2 атомарно переименовывает в .final. Между фазами могут быть failures — но .tmp файлы могут быть discarded или re-renamed на recovery.
Critical guarantees
1. Atomicity:
Либо все committables в checkpoint committed, либо ни один
Не возможно: half committed, half not
Mechanism: либо все subtasks ack, либо checkpoint fails
2. Durability:
После commit() returns successfully, данные durable
Survive failures (TaskManager, JobManager, network)
Mechanism: Kafka commit log replication, file system fsync, DB WAL
3. Idempotency:
Если commit() retried (recovery scenario), same result
No duplicates, no errors
Mechanism: transaction IDs unique per checkpoint+subtask
4. Recovery:
Pending committables stored в checkpoint state
On restart: loaded и replayed
Mechanism: SimpleVersionedSerializer для durable серialize/deserialize
Common pitfalls
1. Committer должен быть idempotent
Wrong:
commit(committables) -> write file -> mark as committed
If replay: file already exists, error
Right:
commit(committables) -> atomic rename (idempotent — same operation)
2. prepareCommit должен flush durable
Wrong: prepareCommit() returns committables but data в memory only
Recovery -> data lost
Right: prepareCommit() flushes к durable storage (fsync, Kafka producer flush)
Recovery -> committer can pick up
3. Не использовать committables для logic
Wrong: committables содержат business logic data
Tied к specific sink implementation
Right: committables — только pointers/handles, not data
4. CommT должен быть serializable
Wrong: CommT содержит non-serializable references
Cannot save в checkpoint state
Right: CommT — POJO с primitive fields, paths, IDs
Sink V2 — modernized API
FLIP-191 (Flink 1.17+) replaced legacy SinkFunction API с unified Sink V2:
Legacy:
RichSinkFunction.invoke(record)
No exactly-once standard
Mixing logic concerns
Sink V2 (FLIP-191):
Sink<InputT> — top-level abstraction
SinkWriter<InputT> — writes records
TwoPhaseCommittingSink — adds prepare/commit (для exactly-once)
Committer<CommT> — commits committables
StatefulSink — для sinks с state
SinkV2 unified with TwoPhase
Benefits:
- Standardized exactly-once
- Clear separation: write vs prepare vs commit
- Better testability
- Async support (Sink V2 supports async via FLIP-440)
Implementations использующие 2PC
Built-in Flink:
KafkaSink (FlinkKafkaProducer уже deprecated)
FileSink (FileSystem sink)
IcebergSink
PaimonSink
PulsarSink
JdbcSink (с exactly-once)
ElasticsearchSink (с exactly-once mode)
Не использующие 2PC (at-least-once или idempotent):
KafkaSink с at-least-once
PrintSinkFunction
Не-transactional JDBC
Custom sinks:
Implement TwoPhaseCommittingSink
Define CommT type для your transaction handles
Implement prepare/commit logic для your storage
End-to-end exactly-once требования
Sink 2PC — это только часть. Полная exactly-once требует:
1. Replayable source
- Kafka offsets recovered
- File source position
- Pulsar message IDs
2. Deterministic processing
- Same input -> same output
- No external side effects in process functions
- No random ordering issues
3. 2PC sink
- Transactional writes
- Coordinated с checkpoints
4. Coordinated checkpoint protocol
- Chandy-Lamport ABS
- Barrier alignment
- Atomicity всех participants
All four critical. If one missing — only at-least-once.
End-to-end exactly-once требует exactly-once-aware sinks. Если ваша downstream system (например, custom HTTP service) не supports transactions, вы fundamentally limited к at-least-once в downstream. Solutions: idempotency через message IDs, или add intermediate exactly-once sink (Kafka) с downstream deduplication.
Чтение source
Flink source:
flink-core/src/main/java/org/apache/flink/api/connector/sink2/
Sink.java
SinkWriter.java
TwoPhaseCommittingSink.java
Committer.java
PrecommittingSinkWriter.java
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/
CommittingSinkOperator.java
SinkWriterOperator.java
CommittableSummary.java
FLIP документы:
FLIP-143: Unified Sink API
FLIP-191: Extend unified Sink API to support small files compaction
FLIP-372: Allow TwoPhaseCommittingSink Writer to be Async
Production examples:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/
KafkaSink.java
KafkaCommitter.java
KafkaWriter.java
flink-connectors/flink-connector-iceberg/src/main/java/org/apache/iceberg/flink/sink/
IcebergSink.java
SimpleVersionedSerializer