Три уровня доставки: at-most, at-least, exactly-once
«Exactly-once» — одна из самых неправильно понимаемых концепций в стриминге. Маркетинговые материалы вендоров обещают «гарантированную доставку ровно один раз», но реальность гораздо более тонкая. В этом уроке разберём три уровня доставки сообщений в распределённых системах, поймём, где Flink гарантирует exactly-once, а где нет, и научимся отличать внутренний exactly-once от end-to-end.
Это фундамент для следующих трёх уроков, где разберём механизм Chandy-Lamport барьеров и реальную имплементацию с Kafka, Iceberg и JDBC sinks.
Три семантики доставки
At-most-once
Каждое сообщение обрабатывается не более одного раза. Если сообщение потеряно в сети или crash случился в момент обработки — оно потеряно навсегда.
Где встречается:
- UDP-based телеметрия.
- Логи без подтверждений (fire-and-forget).
- Метрики (Prometheus pull model).
Когда подходит: допустимы потери, главное — низкая latency и простота.
At-least-once
Каждое сообщение обрабатывается хотя бы один раз, но возможны дубликаты. Это самая распространённая семантика в production.
Механизм: retry до получения acknowledgement. Если ACK потерян — отправитель повторяет, получатель видит ту же запись дважды.
Где встречается:
- Kafka producer без idempotence.
- HTTP с retry.
- Большинство message queues по умолчанию.
Когда подходит: если downstream идемпотентен (UPSERT, set semantics, deduplication).
Exactly-once
Каждое сообщение обрабатывается ровно один раз. Никаких потерь, никаких дубликатов.
Это сложно. В распределённой системе невозможно достичь exactly-once без координации — нужны транзакции, idempotency keys, или Chandy-Lamport snapshots.
Где встречается:
- Финансовые транзакции.
- Биллинг.
- Stateful streaming aggregations (Flink).
Где у Flink exactly-once
Flink даёт exactly-once внутри Flink job (Flink-internal):
- State updates exactly-once.
- Window aggregations exactly-once.
- Operator output exactly-once.
Но это только внутри. Между Flink и внешними системами (source и sink) гарантии другие:
Source Kafka
Source — внешняя система. Kafka, Kinesis, файлы. Здесь гарантия зависит от source.Flink internal exactly once
Flink runtime гарантирует, что внутри job каждое событие обрабатывается ровно один раз. State, агрегации, фильтры — всё exactly-once.State and aggregations
Flink-internal exactly-once включает: state updates, window aggregations, keyed processing, broadcast updates. Всё внутри job защищено Chandy-Lamport snapshots.Sink PostgreSQL
Sink — внешняя система. PostgreSQL, Kafka, S3. Здесь гарантия зависит от поддержки 2PC sink'ом.End to end exactly once
End-to-end exactly-once = source replay + Flink-internal + sink 2PC. Без этой триады возможны дубликаты или потери на границах.Source: replay capability
Чтобы source мог обеспечить exactly-once, ему нужна replay capability — возможность перечитать события с определённой точки.
Управление offset в KafkaSource с replay (поддерживают exactly-once):
- Kafka — offset stored in checkpoint, replay from offset.
- Kinesis — sequence number stored in checkpoint.
- File source — file position stored in checkpoint.
- Pulsar — message ID stored in checkpoint.
Source без replay (max at-least-once, обычно at-most-once):
- TCP socket — события приходят раз, нельзя перечитать.
- HTTP push — события push’ятся, нет API replay.
- RabbitMQ (с auto-ack) — после ACK сообщение удалено.
При crash Flink восстанавливает state из checkpoint и запрашивает у source replay с offset’а на момент checkpoint. Если source не поддерживает replay — данные между last checkpoint и crash потеряны.
Sink: transactional commits
Чтобы sink не создавал дубликатов при restore, ему нужен transactional commit. Flink использует двухфазный commit (2PC):
- Pre-commit: данные между checkpoint’ами буферизуются (или записываются в transaction, не видимую для consumers).
- Commit: после успешного checkpoint Flink посылает commit во все sink’и. Данные становятся видимыми атомарно.
При crash до commit:
- Сторонняя система видит, что transaction не закоммичена (aborted).
- При restore Flink начнёт новую transaction с того же offset’а.
- Дубликатов нет, потерь нет.
Sinks с transactional support:
- Kafka —
DeliveryGuarantee.EXACTLY_ONCEчерез transactions. - Iceberg — atomic table commits.
- File sink — atomic rename (в HDFS, S3).
- JDBC — XA transactions или upsert-as-idempotent.
Sinks без transactional support (max at-least-once):
- HTTP API без idempotency-key.
- Print/stdout.
- Plain JDBC INSERT (без UPSERT и без XA).
Внутренние гарантии: Chandy-Lamport
Flink-internal exactly-once основан на алгоритме Chandy-Lamport (1985 год). Идея:
- Координатор (JobManager) посылает «барьеры» в source.
- Барьеры пропагируются через DAG операторов.
- Каждый оператор, получив барьер из всех входных каналов, делает snapshot своего state.
- Когда все операторы завершили snapshot — checkpoint complete.
Это даёт consistent snapshot всего распределённого state в один логический момент времени — без остановки обработки.
Подробно об этом — в уроке 2.
Latency vs exactly-once
Exactly-once дороже at-least-once:
| Метрика | At-least-once | Exactly-once |
|---|---|---|
| Latency (source to sink) | Минимальная | +1 checkpoint interval |
| Throughput | Максимальный | -5-20% (alignment overhead) |
| Sink visibility | Сразу | После commit (по checkpoint interval) |
| Complexity | Простая | Сложная (transactions, idempotency) |
Почему +1 checkpoint interval latency: sink сompit’ит данные только после успешного checkpoint. Если interval = 60 секунд, downstream увидит событие через 30 секунд в среднем после Flink обработки.
Это критично для real-time use cases. Если latency требование меньше 1 секунды, exactly-once с Kafka 2PC не подходит — checkpoint interval должен быть меньше секунды, что нагрузит state backend.
Когда нужен exactly-once
| Use case | Гарантия |
|---|---|
| Биллинг — расчёт суммы за период | Exactly-once (дубликаты = двойные списания) |
| Финансовые транзакции | Exactly-once |
| Real-time dashboard — счётчики | At-least-once (UPSERT в OLAP) |
| Fraud detection — alert | At-least-once (дубликаты не критичны) |
| User activity tracking | At-least-once (UPSERT по event_id) |
| ETL в DWH — батч processing | Exactly-once или idempotent UPSERT |
| Real-time metrics, dashboards | At-least-once |
| Stream-to-stream join | Exactly-once (state must be consistent) |
Универсального ответа нет. Часто at-least-once + идемпотентный downstream проще и быстрее, чем exactly-once.
Идемпотентность как альтернатива
Часто вместо exactly-once проще сделать downstream идемпотентным:
- INSERT INTO orders -> UPSERT с PK (
ON CONFLICT DO UPDATE). - HTTP POST /api/payments ->
Idempotency-Keyheader. - Send email -> не делать exactly-once, использовать
deduptable. - Atomic counter increment -> UPSERT с conditional increment.
Идемпотентность работает с at-least-once и даёт effectively-exactly-once без сложности 2PC.
Спросите себя: «Будет ли проблема, если событие обработается дважды?» Если ответ «нет — UPSERT покроет» — используйте at-least-once. Если «да — деньги двойные» — нужен exactly-once с 2PC.
Quiz термина
Exactly-once может означать разное:
- Exactly-once processing — внутри Flink, state обновляется ровно один раз даже при failure.
- Exactly-once delivery — каждое сообщение доставлено в sink ровно один раз.
- Effectively exactly-once — at-least-once + idempotent downstream = тот же эффект.
- End-to-end exactly-once — source replay + processing + sink 2PC.
Когда читаете маркетинг — спрашивайте, какой именно exactly-once имеется в виду.
Попробуй сам
Подумай, какая семантика подходит для:
- Stream-to-Iceberg ETL — счётчики продаж per region per hour.
- Real-time fraud detection alerts -> Slack webhook.
- Аудит-лог входов в систему -> ClickHouse.
- Side-effect: при определённом событии отправить SMS пользователю.
Для каждого случая — обоснуй: at-most, at-least, или exactly-once. Что страшнее: потеря или дубликат?
Ключевые выводы
- Три семантики: at-most (возможны потери), at-least (возможны дубликаты), exactly-once (ни того, ни другого).
- Flink internal exactly-once — гарантия внутри job (state, агрегации). Не покрывает source и sink автоматически.
- End-to-end exactly-once = source replay + processing + sink 2PC. Требует поддержки от всех трёх компонентов.
- Source replay: Kafka, Kinesis — да. TCP socket, HTTP push — нет.
- Sink 2PC: Kafka transactions, Iceberg, JDBC XA — да. Plain INSERT, HTTP — нет.
- Idempotent downstream часто проще exactly-once. UPSERT или idempotency keys = effectively-exactly-once.
- Latency cost: exactly-once добавляет ~ checkpoint interval к sink visibility. Не подходит для sub-second use cases.