End-to-end exactly-once: source replay + sink 2PC
В предыдущих уроках мы разобрали три компонента exactly-once:
- Source replay (Kafka offsets в checkpoint).
- Flink-internal exactly-once (Chandy-Lamport barriers).
- Sink transactional commit (KafkaSink с DeliveryGuarantee.EXACTLY_ONCE).
В этом уроке соберём всю картину: как эти три компонента работают вместе для end-to-end exactly-once, разберём двухфазный commit (2PC) с точки зрения Flink, и пройдёмся по конкретным sinks — Iceberg, JDBC с upsert, file sinks — которые поддерживают EOS.
Триада end-to-end exactly-once
[Source replay] -> [Flink internal EOS] -> [Sink 2PC]
| | |
v v v
Kafka offsets Chandy-Lamport Transactions
in checkpoint barriers or atomic commit
Все три должны работать вместе. Если хоть один компонент slabее — end-to-end EOS не достигается:
| Source | Flink | Sink | Результат |
|---|---|---|---|
| Replay | EOS | 2PC | End-to-end EOS |
| Replay | EOS | At-least-once | Дубликаты в sink |
| No replay | EOS | 2PC | Потери при crash |
| Replay | At-least-once | 2PC | Дубликаты обработки state |
Двухфазный commit (2PC) в Flink
2PC internals: как Flink координирует distributed commit2PC — классический алгоритм distributed transactions:
Phase 1 (Pre-commit):
- Каждый participant подготавливает transaction.
- Записывает изменения в durable storage в pending state.
- Возвращает «yes/no» координатору.
Phase 2 (Commit):
- Если все participants ответили «yes» -> координатор посылает commit.
- Каждый participant делает changes visible.
- Если хоть один «no» -> координатор посылает abort.
В Flink:
- Координатор: JobManager.
- Participants: sink subtask instances.
- Phase 1 trigger: barrier приходит в sink.
- Phase 2 trigger: JobManager получает все ACK от operators, объявляет checkpoint complete.
Checkpoint N triggered
JobManager инициирует checkpoint N. Барьер пропагируется через всю DAG.Phase 1 pre commit
Phase 1: sink делает pre-commit. Все буферизованные события flush'ены в Kafka transaction (или в Iceberg manifest). Transaction в pending state.Data pending
External system: данные записаны, но invisible. Kafka: transaction in ONGOING. Iceberg: manifest written, not yet referenced in snapshot.ACK to JobManager
Sink посылает ACK в JobManager. Все sink instances подтвердили pre-commit.Phase 2 commit
Checkpoint N complete. JobManager посылает commit во все sink instances.Data visible
External system: данные visible. Kafka: TX committed, read_committed consumers видят. Iceberg: snapshot updated, queries видят новые rows.Что если crash в момент 2PC
Главная сложность 2PC — что делать при crash между Phase 1 и Phase 2.
Сценарий А: Crash до Phase 2
- Sink сделал pre-commit (transaction в pending).
- JobManager не успел послать commit.
- Crash.
Recovery:
- Flink restore из last successful checkpoint N-1.
- Pending transaction в external system auto-aborted (по timeout или fencing).
- Source replay’ит данные с offset N-1.
- Все события переобрабатываются.
- Новая transaction открывается, новый commit.
Гарантия: нет дубликатов (aborted TX не visible), нет потерь (replay покрыл missing данные).
Сценарий B: Crash после Phase 2 но до notification всех sinks
- JobManager объявил checkpoint complete.
- Часть sink instances получили commit и применили.
- Другая часть не получила.
- Crash.
Recovery:
- JobManager знает, что checkpoint N complete (записано в metadata до crash).
- При restart: для sink instances, которые УЖЕ commit’или — ничего делать не надо.
- Для тех, что не commit’или — JobManager шлёт notifyCheckpointComplete снова. Sink повторяет commit.
Идемпотентность commit’а обязательна для этого сценария. Kafka commit идемпотентен (повторный commit одной transaction — no-op). Iceberg commit идемпотентен (атомарное обновление snapshot).
2PC требует, чтобы commit operation в external system была идемпотентной или атомарной. Без этого crash recovery приведёт к двойному commit, дубликатам или ошибкам.
Iceberg sink: atomic commits
Apache Iceberg — open table format, поддерживает atomic commits через snapshot mechanism. FlinkIcebergSink использует это для 2PC.
FlinkSink.forRow(dataStream, schema)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(8)
.upsert(true) // или append
.append(); // создаёт actual sink
Как 2PC работает в Iceberg:
- Pre-commit (Phase 1): каждый sink subtask пишет data files в storage (S3, HDFS) и формирует manifest file со списком файлов.
- Commit (Phase 2): один subtask (committer) делает atomic operation на Iceberg metadata — добавляет manifest в новый snapshot.
- Если committer успел — snapshot updated, файлы видны через query engine (Trino, Spark, Flink).
- Если committer crash до atomic op — manifest есть, но не в snapshot. Игнорируется. При restore Flink перезаписывает.
Iceberg commit атомарен по природе (один POST в catalog API), поэтому 2PC легко.
JDBC sink: upsert pattern
JDBC не поддерживает 2PC напрямую (или XA transactions, которые редко настроены в production). Поэтому JdbcSink с EOS использует upsert pattern:
JdbcSink.exactlyOnceSink(
"INSERT INTO orders (id, user, amount) VALUES (?, ?, ?) " +
"ON CONFLICT (id) DO UPDATE SET user = EXCLUDED.user, amount = EXCLUDED.amount",
(statement, order) -> {
statement.setLong(1, order.getId());
statement.setString(2, order.getUser());
statement.setBigDecimal(3, order.getAmount());
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://db:5432/orders")
.withDriverName("org.postgresql.Driver")
.build()
);
Гарантия: at-least-once delivery + idempotent UPSERT = effectively exactly-once. Если запись применится дважды — второй раз ON CONFLICT сделает update, не INSERT. Семантика та же.
Альтернатива: XA transactions:
JdbcSink.exactlyOnceSink(
insertStmt,
statementBuilder,
JdbcExecutionOptions.builder().build(),
JdbcExactlyOnceOptions.builder()
.withTransactionPerConnection(true)
.build(),
() -> {
XADataSource xaDS = new PGXADataSource();
// ... configure ...
return xaDS;
}
);
XA — true 2PC через JDBC. Сложно настроить (нужен XA-aware driver, distributed transaction manager), редко используется в production. Upsert-as-idempotent проще и работает в 95% случаев.
File sink: atomic rename
FileSink (S3, HDFS, локальная FS) использует atomic rename для 2PC:
FileSink<String> sink = FileSink
.forRowFormat(new Path("s3://bucket/output"), new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.withMaxPartSize(MemorySize.ofMebiBytes(128))
.build()
)
.build();
Как:
- Pre-commit: данные пишутся во временные файлы (
.in-progressили.pending). - Commit: при checkpoint complete
.pending-> final filename (atomic rename in S3 since 2020). - Reader, который смотрит на final path, видит только commit’нутые файлы.
HDFS поддерживает atomic rename нативно. S3 после 2020 — тоже (через Strong Consistency).
Sinks с поддержкой exactly-once
| Sink | EOS support | Механизм |
|---|---|---|
| KafkaSink | Да | Kafka transactions (2PC) |
| FlinkIcebergSink | Да | Iceberg atomic snapshot |
| JdbcSink | Да (через upsert или XA) | Idempotent UPSERT или XA |
| FileSink (S3/HDFS) | Да | Atomic rename |
| KinesisSink | Да | Kinesis Producer Library (KPL) |
| ElasticsearchSink | Да (через ID-based upsert) | doc-level upsert |
| RedisSink | Условно | Нужно идемпотентность на уровне команд |
| HTTP REST sink | Нет | Нет 2PC |
| StdOutSink | Нет | Нет 2PC |
Sources с поддержкой exactly-once
| Source | EOS support | Replay механизм |
|---|---|---|
| KafkaSource | Да | Offset stored in checkpoint |
| KinesisSource | Да | Sequence number |
| PulsarSource | Да | Message ID |
| FileSource | Да (для batch и periodic monitoring) | File position |
| HDFSSource | Да | File position |
| JdbcSource | Условно (для CDC) | Offset в CDC log |
| SocketSource | Нет | Не replayable |
| HTTP push source | Нет | Не replayable |
Полная картина: KafkaSource -> Flink -> IcebergSink
// 1. Kafka source (exactly-once read)
KafkaSource<Event> kafkaSource = KafkaSource.<Event>builder()
.setBootstrapServers("kafka:9092")
.setTopics("input-events")
.setGroupId("flink-eos-consumer")
.setStartingOffsets(OffsetsInitializer.committedOffsets(
OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new EventDeserializer())
.build();
DataStream<Event> events = env.fromSource(
kafkaSource,
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10)),
"kafka-source"
).uid("source");
// 2. Flink processing (internal EOS via barriers)
DataStream<AggregatedEvent> aggregated = events
.keyBy(Event::getUserId)
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(1)))
.aggregate(new EventAggregator())
.uid("aggregator");
// 3. Iceberg sink (2PC via atomic snapshot)
FlinkSink.forRow(aggregated.map(new RowMapper()), rowSchema)
.tableLoader(tableLoader)
.upsert(true)
.uidPrefix("iceberg-sink")
.append();
// 4. Checkpointing in EXACTLY_ONCE
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);
env.execute("Events Aggregation Pipeline");
End-to-end exactly-once:
- Crash в момент обработки -> Flink restore с last checkpoint, Kafka replay с last offset, Iceberg aborted manifest игнорируется. Никаких дубликатов в Iceberg, никаких потерь.
Когда EOS не нужен (даже если возможен)
- Real-time dashboards, метрики: at-least-once + UPSERT в OLAP проще и быстрее.
- Logs: at-least-once с dedup на стороне log analytics достаточно.
- Notifications (email, SMS): дублирование менее критично, чем задержка от EOS.
- Стрим-to-стрим аналитика: если downstream сам делает агрегацию по уникальным ID, дубликаты схлопываются.
EOS добавляет complexity, latency и operational overhead. Используйте, когда действительно нужно — биллинг, финансы, audit.
Попробуй сам
- Подними локально Kafka + Iceberg (через docker-compose).
- Запусти пример pipeline выше: Kafka -> Flink -> Iceberg.
- Сгенерируй 100K событий.
- В середине обработки сделай
kill -9TaskManager. - Wait для restart, проверь в Iceberg через Trino:
SELECT COUNT(*) FROM aggregated. Должно быть ровно 100K результатов (после агрегации, не больше). - Сравни с тем же сценарием на AT_LEAST_ONCE — увидь, что результаты не равны 100K.
Ключевые выводы
- End-to-end EOS = source replay + Flink-internal EOS + sink 2PC. Все три компонента нужны.
- 2PC: Phase 1 (pre-commit при barrier) + Phase 2 (commit при checkpoint complete от JobManager).
- Sources с replay: Kafka, Kinesis, файлы, Pulsar.
- Sinks с 2PC/идемпотентностью: Kafka, Iceberg, JDBC (через upsert), File (через atomic rename).
- Sinks без EOS: HTTP REST, stdout, RabbitMQ без manual ACK.
- JDBC через UPSERT: at-least-once + idempotent UPSERT = effectively-exactly-once. Проще XA в большинстве случаев.
- EOS добавляет latency (~ checkpoint_interval/2) и не для real-time low-latency use cases.