Learning Platform
Глоссарий Troubleshooting
Урок 12.04 · 20 мин
Средний
End to EndExactly OnceTwo Phase CommitIcebergJDBC

End-to-end exactly-once: source replay + sink 2PC

В предыдущих уроках мы разобрали три компонента exactly-once:

  • Source replay (Kafka offsets в checkpoint).
  • Flink-internal exactly-once (Chandy-Lamport barriers).
  • Sink transactional commit (KafkaSink с DeliveryGuarantee.EXACTLY_ONCE).

В этом уроке соберём всю картину: как эти три компонента работают вместе для end-to-end exactly-once, разберём двухфазный commit (2PC) с точки зрения Flink, и пройдёмся по конкретным sinks — Iceberg, JDBC с upsert, file sinks — которые поддерживают EOS.


Триада end-to-end exactly-once

[Source replay] -> [Flink internal EOS] -> [Sink 2PC]
        |                 |                    |
        v                 v                    v
   Kafka offsets    Chandy-Lamport       Transactions
   in checkpoint    barriers             or atomic commit

Все три должны работать вместе. Если хоть один компонент slabее — end-to-end EOS не достигается:

SourceFlinkSinkРезультат
ReplayEOS2PCEnd-to-end EOS
ReplayEOSAt-least-onceДубликаты в sink
No replayEOS2PCПотери при crash
ReplayAt-least-once2PCДубликаты обработки state

2PC internals: как Flink координирует distributed commit

2PC — классический алгоритм distributed transactions:

Phase 1 (Pre-commit):

  • Каждый participant подготавливает transaction.
  • Записывает изменения в durable storage в pending state.
  • Возвращает «yes/no» координатору.

Phase 2 (Commit):

  • Если все participants ответили «yes» -> координатор посылает commit.
  • Каждый participant делает changes visible.
  • Если хоть один «no» -> координатор посылает abort.

В Flink:

  • Координатор: JobManager.
  • Participants: sink subtask instances.
  • Phase 1 trigger: barrier приходит в sink.
  • Phase 2 trigger: JobManager получает все ACK от operators, объявляет checkpoint complete.
2PC между Flink и external system

Checkpoint N triggered

JobManager инициирует checkpoint N. Барьер пропагируется через всю DAG.
барьер достигает sink

Phase 1 pre commit

Phase 1: sink делает pre-commit. Все буферизованные события flush'ены в Kafka transaction (или в Iceberg manifest). Transaction в pending state.

Data pending

External system: данные записаны, но invisible. Kafka: transaction in ONGOING. Iceberg: manifest written, not yet referenced in snapshot.

ACK to JobManager

Sink посылает ACK в JobManager. Все sink instances подтвердили pre-commit.
JM получил все ACK

Phase 2 commit

Checkpoint N complete. JobManager посылает commit во все sink instances.

Data visible

External system: данные visible. Kafka: TX committed, read_committed consumers видят. Iceberg: snapshot updated, queries видят новые rows.

Что если crash в момент 2PC

Главная сложность 2PC — что делать при crash между Phase 1 и Phase 2.

Сценарий А: Crash до Phase 2

  • Sink сделал pre-commit (transaction в pending).
  • JobManager не успел послать commit.
  • Crash.

Recovery:

  1. Flink restore из last successful checkpoint N-1.
  2. Pending transaction в external system auto-aborted (по timeout или fencing).
  3. Source replay’ит данные с offset N-1.
  4. Все события переобрабатываются.
  5. Новая transaction открывается, новый commit.

Гарантия: нет дубликатов (aborted TX не visible), нет потерь (replay покрыл missing данные).

Сценарий B: Crash после Phase 2 но до notification всех sinks

  • JobManager объявил checkpoint complete.
  • Часть sink instances получили commit и применили.
  • Другая часть не получила.
  • Crash.

Recovery:

  1. JobManager знает, что checkpoint N complete (записано в metadata до crash).
  2. При restart: для sink instances, которые УЖЕ commit’или — ничего делать не надо.
  3. Для тех, что не commit’или — JobManager шлёт notifyCheckpointComplete снова. Sink повторяет commit.

Идемпотентность commit’а обязательна для этого сценария. Kafka commit идемпотентен (повторный commit одной transaction — no-op). Iceberg commit идемпотентен (атомарное обновление snapshot).

WARNING

2PC требует, чтобы commit operation в external system была идемпотентной или атомарной. Без этого crash recovery приведёт к двойному commit, дубликатам или ошибкам.


Iceberg sink: atomic commits

Apache Iceberg — open table format, поддерживает atomic commits через snapshot mechanism. FlinkIcebergSink использует это для 2PC.

FlinkSink.forRow(dataStream, schema)
    .table(table)
    .tableLoader(tableLoader)
    .writeParallelism(8)
    .upsert(true)  // или append
    .append();  // создаёт actual sink

Как 2PC работает в Iceberg:

  1. Pre-commit (Phase 1): каждый sink subtask пишет data files в storage (S3, HDFS) и формирует manifest file со списком файлов.
  2. Commit (Phase 2): один subtask (committer) делает atomic operation на Iceberg metadata — добавляет manifest в новый snapshot.
  3. Если committer успел — snapshot updated, файлы видны через query engine (Trino, Spark, Flink).
  4. Если committer crash до atomic op — manifest есть, но не в snapshot. Игнорируется. При restore Flink перезаписывает.

Iceberg commit атомарен по природе (один POST в catalog API), поэтому 2PC легко.


JDBC sink: upsert pattern

JDBC не поддерживает 2PC напрямую (или XA transactions, которые редко настроены в production). Поэтому JdbcSink с EOS использует upsert pattern:

JdbcSink.exactlyOnceSink(
    "INSERT INTO orders (id, user, amount) VALUES (?, ?, ?) " +
    "ON CONFLICT (id) DO UPDATE SET user = EXCLUDED.user, amount = EXCLUDED.amount",
    (statement, order) -> {
        statement.setLong(1, order.getId());
        statement.setString(2, order.getUser());
        statement.setBigDecimal(3, order.getAmount());
    },
    JdbcExecutionOptions.builder()
        .withBatchSize(1000)
        .build(),
    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withUrl("jdbc:postgresql://db:5432/orders")
        .withDriverName("org.postgresql.Driver")
        .build()
);

Гарантия: at-least-once delivery + idempotent UPSERT = effectively exactly-once. Если запись применится дважды — второй раз ON CONFLICT сделает update, не INSERT. Семантика та же.

Альтернатива: XA transactions:

JdbcSink.exactlyOnceSink(
    insertStmt,
    statementBuilder,
    JdbcExecutionOptions.builder().build(),
    JdbcExactlyOnceOptions.builder()
        .withTransactionPerConnection(true)
        .build(),
    () -> {
        XADataSource xaDS = new PGXADataSource();
        // ... configure ...
        return xaDS;
    }
);

XA — true 2PC через JDBC. Сложно настроить (нужен XA-aware driver, distributed transaction manager), редко используется в production. Upsert-as-idempotent проще и работает в 95% случаев.


File sink: atomic rename

FileSink (S3, HDFS, локальная FS) использует atomic rename для 2PC:

FileSink<String> sink = FileSink
    .forRowFormat(new Path("s3://bucket/output"), new SimpleStringEncoder<>("UTF-8"))
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(Duration.ofMinutes(15))
            .withInactivityInterval(Duration.ofMinutes(5))
            .withMaxPartSize(MemorySize.ofMebiBytes(128))
            .build()
    )
    .build();

Как:

  1. Pre-commit: данные пишутся во временные файлы (.in-progress или .pending).
  2. Commit: при checkpoint complete .pending -> final filename (atomic rename in S3 since 2020).
  3. Reader, который смотрит на final path, видит только commit’нутые файлы.

HDFS поддерживает atomic rename нативно. S3 после 2020 — тоже (через Strong Consistency).


Sinks с поддержкой exactly-once

SinkEOS supportМеханизм
KafkaSinkДаKafka transactions (2PC)
FlinkIcebergSinkДаIceberg atomic snapshot
JdbcSinkДа (через upsert или XA)Idempotent UPSERT или XA
FileSink (S3/HDFS)ДаAtomic rename
KinesisSinkДаKinesis Producer Library (KPL)
ElasticsearchSinkДа (через ID-based upsert)doc-level upsert
RedisSinkУсловноНужно идемпотентность на уровне команд
HTTP REST sinkНетНет 2PC
StdOutSinkНетНет 2PC

Sources с поддержкой exactly-once

SourceEOS supportReplay механизм
KafkaSourceДаOffset stored in checkpoint
KinesisSourceДаSequence number
PulsarSourceДаMessage ID
FileSourceДа (для batch и periodic monitoring)File position
HDFSSourceДаFile position
JdbcSourceУсловно (для CDC)Offset в CDC log
SocketSourceНетНе replayable
HTTP push sourceНетНе replayable

// 1. Kafka source (exactly-once read)
KafkaSource<Event> kafkaSource = KafkaSource.<Event>builder()
    .setBootstrapServers("kafka:9092")
    .setTopics("input-events")
    .setGroupId("flink-eos-consumer")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(
        OffsetResetStrategy.EARLIEST))
    .setValueOnlyDeserializer(new EventDeserializer())
    .build();

DataStream<Event> events = env.fromSource(
    kafkaSource,
    WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10)),
    "kafka-source"
).uid("source");

// 2. Flink processing (internal EOS via barriers)
DataStream<AggregatedEvent> aggregated = events
    .keyBy(Event::getUserId)
    .window(TumblingEventTimeWindows.of(Duration.ofMinutes(1)))
    .aggregate(new EventAggregator())
    .uid("aggregator");

// 3. Iceberg sink (2PC via atomic snapshot)
FlinkSink.forRow(aggregated.map(new RowMapper()), rowSchema)
    .tableLoader(tableLoader)
    .upsert(true)
    .uidPrefix("iceberg-sink")
    .append();

// 4. Checkpointing in EXACTLY_ONCE
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);
env.execute("Events Aggregation Pipeline");

End-to-end exactly-once:

  • Crash в момент обработки -> Flink restore с last checkpoint, Kafka replay с last offset, Iceberg aborted manifest игнорируется. Никаких дубликатов в Iceberg, никаких потерь.

Когда EOS не нужен (даже если возможен)

  • Real-time dashboards, метрики: at-least-once + UPSERT в OLAP проще и быстрее.
  • Logs: at-least-once с dedup на стороне log analytics достаточно.
  • Notifications (email, SMS): дублирование менее критично, чем задержка от EOS.
  • Стрим-to-стрим аналитика: если downstream сам делает агрегацию по уникальным ID, дубликаты схлопываются.

EOS добавляет complexity, latency и operational overhead. Используйте, когда действительно нужно — биллинг, финансы, audit.


Попробуй сам

  1. Подними локально Kafka + Iceberg (через docker-compose).
  2. Запусти пример pipeline выше: Kafka -> Flink -> Iceberg.
  3. Сгенерируй 100K событий.
  4. В середине обработки сделай kill -9 TaskManager.
  5. Wait для restart, проверь в Iceberg через Trino: SELECT COUNT(*) FROM aggregated. Должно быть ровно 100K результатов (после агрегации, не больше).
  6. Сравни с тем же сценарием на AT_LEAST_ONCE — увидь, что результаты не равны 100K.

Ключевые выводы

  1. End-to-end EOS = source replay + Flink-internal EOS + sink 2PC. Все три компонента нужны.
  2. 2PC: Phase 1 (pre-commit при barrier) + Phase 2 (commit при checkpoint complete от JobManager).
  3. Sources с replay: Kafka, Kinesis, файлы, Pulsar.
  4. Sinks с 2PC/идемпотентностью: Kafka, Iceberg, JDBC (через upsert), File (через atomic rename).
  5. Sinks без EOS: HTTP REST, stdout, RabbitMQ без manual ACK.
  6. JDBC через UPSERT: at-least-once + idempotent UPSERT = effectively-exactly-once. Проще XA в большинстве случаев.
  7. EOS добавляет latency (~ checkpoint_interval/2) и не для real-time low-latency use cases.
Проверка знанийKnowledge check
Архитектор предлагает pipeline: KafkaSource (replay support) -> Flink (state aggregation в EXACTLY_ONCE mode) -> ElasticsearchSink (записывает aggregated документы). Команда хочет end-to-end exactly-once. Какие конфигурации/проверки нужны на каждом этапе? Что произойдёт при crash Flink в момент когда буфер ElasticsearchSink частично flushed?
ОтветAnswer
Конфигурации: KafkaSource (1) setGroupId, (2) setStartingOffsets из committed (3) checkpoint enabled — offsets автоматически сохранятся. Flink (1) enableCheckpointing(60000, EXACTLY_ONCE), (2) RETAIN_ON_CANCELLATION, (3) explicit .uid() на каждом операторе со state, (4) maxParallelism задан с запасом. ElasticsearchSink (1) использовать ID-based upsert (документы с stable doc_id, при PUT новой версии — overwrite, не дубликат), (2) ElasticsearchSink в Flink имеет EXACTLY_ONCE через bulk request flush на checkpoint barrier. При crash в момент когда буфер sink частично flushed: Flink restore из last successful checkpoint. Все события между last checkpoint и crash replay'ятся из Kafka. Sink снова сформирует bulk request с теми же документами (тот же doc_id). Elasticsearch применит UPSERT — документы будут переписаны теми же значениями (что было до crash, всё равно). Никаких дубликатов, потому что doc_id stable. Никаких потерь, потому что Kafka replay покрывает. Effectively exactly-once. Дополнительно: убедиться, что aggregation в Flink детерминирована (один и тот же input даёт один и тот же output) — критично для idempotent upsert работы корректно.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какие компоненты нужны для end-to-end exactly-once в Flink pipeline?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4