Learning Platform
Глоссарий Troubleshooting
Урок 13.01 · 26 мин
Продвинутый
Two-Phase CommitTwoPhaseCommittingSinkFLIP-191Exactly-OnceDistributed TransactionsState Machine

Two-Phase Commit — protocol для exactly-once

Exactly-once semantics в distributed системе — одна из самых сложных гарантий. Это не “событие обрабатывается ровно один раз” (что часто невозможно из-за retries), а “side effects (writes в sink) видны как будто событие обработано ровно один раз”. Flink achieves this через Two-Phase Commit (2PC) protocol, formalized в TwoPhaseCommittingSink interface (FLIP-191).

В этом уроке разбираем 2PC protocol detail by detail: какие фазы, как они работают, какие гарантии дают, и как Flink реализует через interface methods. Это foundation для понимания KafkaSink, IcebergSink и других transactional sinks.

End-to-end exactly-once: source replay + sink 2PC Kafka транзакции со стороны producer
Гарантии delivery semantics:

At-most-once:
  - Сообщения могут быть lost
  - Никогда не дублируются
  - Fastest, but data loss possible

At-least-once:
  - Сообщения гарантированно delivered
  - Могут быть дублированы (replay после failure)
  - Most streaming systems по умолчанию

Exactly-once:
  - Сообщения delivered exactly один раз
  - Без дублирования или потери
  - Hardest to achieve, expensive

Critical distinction:
  Exactly-once processing — message processed на operator один раз
  Exactly-once delivery — writes в sink видны один раз
  
  Flink дает оба, но они different.

Exactly-once delivery требует координации между source (для reset reading), processing (для replay deterministic), и sink (для atomic writes).

Зачем нужен 2PC

Простой commit модель не работает для distributed sink:

Naïve sink commit:
  for each record:
    1. write к sink (Kafka, DB, ...)
    2. emit record downstream

Problem: между write и notify checkpoint failure
  - Record already written, но не recorded в state
  - On replay: same record written again (duplicate)

Fix #1: Idempotent writes
  - Каждый record имеет unique ID
  - Sink dedupes
  - Works для некоторых sinks (Kafka keyed, KV stores)
  - НЕ works для append-only (file systems, append-only logs)

Fix #2: Transactional writes
  - Begin transaction
  - Write records
  - Commit transaction (или abort)
  - Только commits видны readers

Flink uses Fix #2 — transactional writes coordinated через checkpoints.

Two-Phase Commit basics

Classic 2PC из distributed systems literature:

Participants:
  - Coordinator (JobManager в Flink)
  - Participants (TaskManager sinks в Flink)

Phase 1: Prepare
  - Coordinator asks: "Can you commit?"
  - Each participant prepares (write to durable log, lock resources)
  - Replies: PREPARED или ABORT
  - Если все PREPARED -> proceed to Phase 2
  - Если хоть один ABORT -> coordinator abort

Phase 2: Commit (или Abort)
  - Coordinator broadcasts: COMMIT (или ABORT)
  - Each participant commits (или aborts)
  - Replies: ACK

Critical properties:
  - Если coordinator crashes между phases — все participants stuck
  - Each participant должен know что делать при recovery
  - Coordinator выживает между restarts (durable log)

В Flink JobManager = coordinator, TaskManager sinks = participants. Checkpoint barriers = phase 1 trigger.

Flink integration с checkpoints:

Checkpoint barrier propagates через DAG
  -> reaches sink TaskManager
  -> sink begins prepareCommit
  -> sink writes pre-committed data (or marks transaction)
  -> sink returns commit metadata (transaction ID или handle)
  -> commit metadata included в checkpoint state

Checkpoint complete notification (after all subtasks ack)
  -> sink calls commit() с stored metadata
  -> records become visible к downstream consumers
  -> transaction "made durable"

Failure scenarios:
  - Failure before checkpoint complete: pre-commit data discarded
  - Failure between prepare and commit: pending transactions
    -> on recovery, sink looks at stored metadata
    -> retries commit (idempotent через transaction ID)
  - Failure after commit: data already durable, OK

TwoPhaseCommittingSink interface (FLIP-191)

public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {
    
    // Phase 1: write pre-committed data
    PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context);
    
    // Phase 2: commit pre-committed data
    Committer<CommT> createCommitter();
    
    // For recovery (handling pending transactions)
    SimpleVersionedSerializer<CommT> getCommittableSerializer();
}

public interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> {
    
    // Called per checkpoint
    // Pre-commit: durable записи готовы к commit
    // Возвращает committables (что нужно committer'у для финального commit)
    Collection<CommT> prepareCommit();
}

public interface Committer<CommT> extends Closeable {
    
    // Called после checkpoint complete
    // Принимает committables, делает actual commit
    void commit(Collection<CommittableWithLineage<CommT>> committables);
}

Three roles:

  1. SinkWriter — writes data (per element)
  2. prepareCommit — phase 1, prepare for checkpoint
  3. Committer — phase 2, actually commit

Detailed flow

Time T0: Job running normally
  Records flowing
  SinkWriter buffers/writes pre-committed data
  Internal state: not yet committed

Time T1: Checkpoint barrier received
  Flink runtime calls writer.prepareCommit()
  Writer returns: List<CommT> committables
    Example для Kafka: List of (topic, partition, transactional.id, offset)
    Example для files: List of (file path, size, checksum)
  Committables saved в checkpoint state via getCommittableSerializer()
  Writer внутренне signals durability (commit log entry, fsync)

Time T2: All subtasks finish checkpoint
  JobManager collects acks
  Checkpoint marked complete
  JobManager notifies: notifyCheckpointComplete(checkpointId)
  
Time T3: Sink notified of complete
  Flink runtime calls committer.commit(committables)
  Committer выполняет actual commit:
    Kafka: producer.commitTransaction()
    Files: rename .tmp to .committed
    DB: COMMIT TRANSACTION
  Records become visible

Time T1.5 - failure case (между prepare и commit):
  TaskManager dies
  Job restarts от last successful checkpoint
  Loaded committables from checkpoint state
  Committer called с loaded committables
  Same commit operation performed (idempotent через transaction ID)
  No duplicates because previously commit didn't happen
  No lost data because pre-commit data preserved

State machine

SinkWriter state machine:

WRITING:
  - Accepts records via write(element, context)
  - Buffers internally
  - On prepareCommit() -> COMMITTING

COMMITTING:
  - prepareCommit() returns committables
  - State persisted в checkpoint
  - Wait для notifyCheckpointComplete
  - On notify -> DONE

DONE:
  - committer.commit() called
  - Pre-committed data made durable
  - Resources released
  - New transaction может begin -> back к WRITING

FAILURE:
  - Если фейл до notify: pre-commit data discarded on recovery
  - Если фейл после notify, перед commit: committer replay from checkpoint
  - Идемпотентность через transaction ID
Two-Phase Commit state machine
WRITINGInitial state. Sink accepts records, buffers или writes pre-committed данные (Kafka producer.send без commit, file .tmp).
Checkpoint barrier
prepareCommit()prepareCommit() called. Writer flushes buffered data в durable storage (Kafka pre-commit, file metadata). Returns List`<CommT>`.
COMMITTING (waiting)Committables (transaction IDs, file handles) saved в checkpoint state via SimpleVersionedSerializer. JobManager waits для все subtasks.
all subtasks acked
notifyCheckpointCompleteCheckpoint complete. JobManager broadcasts notifyCheckpointComplete. Sink committer activates.
commit()committer.commit(committables) executes actual commit. Kafka: commitTransaction(). Files: rename .tmp -> final. DB: COMMIT TRANSACTION.
success
DONERecords visible к downstream. Resources released. Ready для next transaction.
FAILURE recoveryFailure scenarios. Before notify: pre-commit data discarded. After notify до commit: replay committer с loaded committables (idempotent через transaction ID).
restore state
Restored to consistent stateSink restarts с last successful checkpoint. Pending committables replayed. No duplicates, no lost data — that is exactly-once.

Example: simple file sink

public class FileSinkExample implements TwoPhaseCommittingSink<String, FileCommittable> {
    
    @Override
    public PrecommittingSinkWriter<String, FileCommittable> createWriter(InitContext ctx) {
        return new FileWriter(...);
    }
    
    @Override
    public Committer<FileCommittable> createCommitter() {
        return new FileCommitter();
    }
    
    @Override
    public SimpleVersionedSerializer<FileCommittable> getCommittableSerializer() {
        return new FileCommittableSerializer();
    }
}

class FileWriter implements PrecommittingSinkWriter<String, FileCommittable> {
    private File tempFile;
    private BufferedWriter writer;
    
    @Override
    public void write(String element, Context context) {
        if (tempFile == null) {
            tempFile = createTempFile();
            writer = new BufferedWriter(new FileWriter(tempFile));
        }
        writer.write(element);
        writer.newLine();
    }
    
    @Override
    public Collection<FileCommittable> prepareCommit() {
        writer.flush();
        writer.close();
        // Pre-commit: file is fully written to disk
        // But still has .tmp suffix, not yet visible
        FileCommittable committable = new FileCommittable(
            tempFile.getPath(),
            calculateChecksum(tempFile)
        );
        tempFile = null;
        return Collections.singletonList(committable);
    }
}

class FileCommitter implements Committer<FileCommittable> {
    @Override
    public void commit(Collection<CommittableWithLineage<FileCommittable>> committables) {
        for (var c : committables) {
            File tempFile = new File(c.getCommittable().getPath());
            File finalFile = new File(tempFile.getPath().replace(".tmp", ".final"));
            // Atomic rename — file becomes visible
            tempFile.renameTo(finalFile);
        }
    }
}

Этот sketch показывает основную идею: phase 1 пишет .tmp файл, phase 2 атомарно переименовывает в .final. Между фазами могут быть failures — но .tmp файлы могут быть discarded или re-renamed на recovery.

Critical guarantees

1. Atomicity:
   Либо все committables в checkpoint committed, либо ни один
   Не возможно: half committed, half not
   
   Mechanism: либо все subtasks ack, либо checkpoint fails

2. Durability:
   После commit() returns successfully, данные durable
   Survive failures (TaskManager, JobManager, network)
   
   Mechanism: Kafka commit log replication, file system fsync, DB WAL

3. Idempotency:
   Если commit() retried (recovery scenario), same result
   No duplicates, no errors
   
   Mechanism: transaction IDs unique per checkpoint+subtask

4. Recovery:
   Pending committables stored в checkpoint state
   On restart: loaded и replayed
   
   Mechanism: SimpleVersionedSerializer для durable серialize/deserialize

Common pitfalls

1. Committer должен быть idempotent
   Wrong:
     commit(committables) -> write file -> mark as committed
     If replay: file already exists, error
   
   Right:
     commit(committables) -> atomic rename (idempotent — same operation)

2. prepareCommit должен flush durable
   Wrong: prepareCommit() returns committables but data в memory only
     Recovery -> data lost
   
   Right: prepareCommit() flushes к durable storage (fsync, Kafka producer flush)
     Recovery -> committer can pick up

3. Не использовать committables для logic
   Wrong: committables содержат business logic data
     Tied к specific sink implementation
   
   Right: committables — только pointers/handles, not data

4. CommT должен быть serializable
   Wrong: CommT содержит non-serializable references
     Cannot save в checkpoint state
   
   Right: CommT — POJO с primitive fields, paths, IDs

Sink V2 — modernized API

FLIP-191 (Flink 1.17+) replaced legacy SinkFunction API с unified Sink V2:

Legacy:
  RichSinkFunction.invoke(record)
  No exactly-once standard
  Mixing logic concerns

Sink V2 (FLIP-191):
  Sink<InputT>           — top-level abstraction
  SinkWriter<InputT>     — writes records
  TwoPhaseCommittingSink — adds prepare/commit (для exactly-once)
  Committer<CommT>       — commits committables
  StatefulSink           — для sinks с state
  SinkV2 unified with TwoPhase

Benefits:
  - Standardized exactly-once
  - Clear separation: write vs prepare vs commit
  - Better testability
  - Async support (Sink V2 supports async via FLIP-440)

Implementations использующие 2PC

Built-in Flink:
  KafkaSink (FlinkKafkaProducer уже deprecated)
  FileSink (FileSystem sink)
  IcebergSink
  PaimonSink
  PulsarSink
  JdbcSink (с exactly-once)
  ElasticsearchSink (с exactly-once mode)

Не использующие 2PC (at-least-once или idempotent):
  KafkaSink с at-least-once
  PrintSinkFunction
  Не-transactional JDBC

Custom sinks:
  Implement TwoPhaseCommittingSink
  Define CommT type для your transaction handles
  Implement prepare/commit logic для your storage

End-to-end exactly-once требования

Sink 2PC — это только часть. Полная exactly-once требует:

1. Replayable source
   - Kafka offsets recovered
   - File source position
   - Pulsar message IDs

2. Deterministic processing
   - Same input -> same output
   - No external side effects in process functions
   - No random ordering issues

3. 2PC sink
   - Transactional writes
   - Coordinated с checkpoints

4. Coordinated checkpoint protocol
   - Chandy-Lamport ABS
   - Barrier alignment
   - Atomicity всех participants

All four critical. If one missing — only at-least-once.
WARNING

End-to-end exactly-once требует exactly-once-aware sinks. Если ваша downstream system (например, custom HTTP service) не supports transactions, вы fundamentally limited к at-least-once в downstream. Solutions: idempotency через message IDs, или add intermediate exactly-once sink (Kafka) с downstream deduplication.

Чтение source

Flink source:
  flink-core/src/main/java/org/apache/flink/api/connector/sink2/
    Sink.java
    SinkWriter.java
    TwoPhaseCommittingSink.java
    Committer.java
    PrecommittingSinkWriter.java

  flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/
    CommittingSinkOperator.java
    SinkWriterOperator.java
    CommittableSummary.java

FLIP документы:
  FLIP-143: Unified Sink API
  FLIP-191: Extend unified Sink API to support small files compaction
  FLIP-372: Allow TwoPhaseCommittingSink Writer to be Async

Production examples:
  flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/
    KafkaSink.java
    KafkaCommitter.java
    KafkaWriter.java

  flink-connectors/flink-connector-iceberg/src/main/java/org/apache/iceberg/flink/sink/
    IcebergSink.java
    SimpleVersionedSerializer
Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. В чём принципиальная разница между exactly-once processing и exactly-once delivery?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4