KafkaSink exactly-once: transactions и isolation_level
KafkaSink — самый частый sink в Flink production. Записать результаты обработки в Kafka topic, который потом читают downstream сервисы. Если нужен end-to-end exactly-once — KafkaSink поддерживает его через Kafka transactions, но настройка не тривиальна и есть тонкие моменты, которые ломают гарантии.
В этом уроке разберём, как настроить KafkaSink с DeliveryGuarantee.EXACTLY_ONCE, что такое transactional.id prefix, как Flink использует Kafka transactions внутри, как настроить consumer на стороне даунстрима (isolation.level=read_committed), и какие grабли — transaction.timeout.ms, lingering transactions при crash, и т.д.
Три уровня delivery в KafkaSink
KafkaSink поддерживает три семантики:
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("output")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // <-- здесь
.setTransactionalIdPrefix("my-flink-job") // обязательно для EOS
.build();
| DeliveryGuarantee | Что значит | Когда использовать |
|---|---|---|
NONE | Fire-and-forget. Возможны потери. | Метрики, не критичные данные |
AT_LEAST_ONCE | Producer ждёт ACK. Возможны дубликаты. | Большинство случаев |
EXACTLY_ONCE | Транзакции. Нет дубликатов, нет потерь. | Биллинг, финансовые данные |
EXACTLY_ONCE — наш фокус.
Как Flink использует Kafka transactions
При DeliveryGuarantee.EXACTLY_ONCE KafkaSink использует Kafka transactions (доступны с Kafka 0.11). Workflow:
- Открытие transaction:
KafkaSinkвызываетbeginTransaction()после последнего checkpoint. - Запись событий: все события между checkpoint N и N+1 пишутся в transaction.
- Pre-commit: при barrier N+1 приходящем в sink —
KafkaSinkвызываетflush()для отправки всех записей. Transaction остаётся вPREPARINGstate. - Checkpoint complete: когда Flink получает confirmation что checkpoint N+1 успешно записан в state backend -> JobManager вызывает
commitTransaction()в sink. - Commit: Kafka transaction коммитится. Сообщения становятся видимыми для consumer’ов с
isolation.level=read_committed.
Begin TX events 1 2 3
После checkpoint N KafkaSink открывает новую Kafka transaction. Записывает события 1, 2, 3.Kafka topic pending
Сообщения попадают в Kafka topic, но в pending state — invisible для read_committed consumers.Barrier N+1 flush
Чекпоинт-барьер N+1 пришёл в sink. KafkaSink делает flush — все события в Kafka. Transaction в PREPARING state.TX preparing
Все события записаны в Kafka, но transaction ещё не committed.Checkpoint N+1 complete
JobManager получил ACK от всех операторов — checkpoint N+1 complete. JM вызывает commitTransaction в всех sink instances.TX committed visible
commitTransaction делает события 1, 2, 3 visible для read_committed consumers.transactional.id prefix: критичный параметр
KafkaSink требует обязательного transactionalIdPrefix:
.setTransactionalIdPrefix("my-flink-job")
Это префикс для transactional.id каждого Kafka producer instance, который создаётся внутри KafkaSink. Flink формирует полный ID как <prefix>-<subtask_index>-<checkpoint_id>:
my-flink-job-0-100my-flink-job-1-100my-flink-job-0-101
Почему это важно:
transactional.id — глобально уникальный идентификатор producer’а в Kafka cluster. Если две разных Flink job используют один и тот же prefix — они конфликтуют. Kafka выполнит fencing (см. ниже).
Правила:
- Каждая Flink job должна иметь уникальный
transactionalIdPrefix. - Если job renamed/redeployed — менять prefix нельзя (иначе предыдущие transactions «зависнут»).
- При двух репликах job (например, blue-green) — у них разные prefixes.
Никогда не запускайте две Flink job с одним transactionalIdPrefix в один Kafka cluster. Они «зафехтуют» друг друга — одна job всё время будет получать ProducerFencedException и падать в retry loop.
transaction.timeout.ms: важная настройка
Kafka transactions имеют timeout. Если transaction открыта дольше transaction.timeout.ms, broker автоматически её прерывает (aborts).
.setProperty("transaction.timeout.ms", String.valueOf(15 * 60 * 1000)) // 15 минут
Правило: transaction.timeout.ms >= 2x checkpoint_interval + buffer.
Почему:
- Transaction открывается после checkpoint N, закрывается после checkpoint N+1.
- Между ними ~ checkpoint interval.
- Если checkpoint N+1 задерживается (backpressure, retry) — transaction может быть открыта дольше.
Default transaction.timeout.ms в Kafka: 60 секунд. Если checkpoint interval > 30 секунд — могут быть проблемы.
Default transaction.max.timeout.ms на broker’е: 15 минут (900 секунд). Это верхний предел для transaction.timeout.ms. Если хотите больше — нужно увеличить broker config (и убедиться, что админ Kafka не против).
Производственная настройка:
checkpoint_interval = 60s.transaction.timeout.ms = 600000(10 минут).transaction.max.timeout.ms(на broker’е) = 900000 (15 минут).
Lingering transactions при crash
Что если Flink job crashed в момент когда transaction открыта, но не закоммичена?
Сценарий:
- Transaction X открыта в момент t.
- Flink job crashed в момент t+30s.
- Transaction X остаётся «висеть» в Kafka в состоянии
ONGOING.
Без epoch fencing: при restore Flink стартует new producer с тем же transactional.id — и Kafka fences old producer instance, авто-аборт transaction X.
С epoch fencing (Kafka 2.5+):
- Flink при restart передаёт тот же transactional.id.
- Broker инкрементирует epoch для этого ID.
- Любые запросы от старого producer (если он каким-то чудом восстановился) — отклоняются с
ProducerFencedException. - Pending transactions автоматически aborted.
Это значит, что после crash transaction X гарантированно abort’ится при restore. Никаких ручных действий не нужно.
Consumer side: isolation.level=read_committed
На стороне consumer’а, который читает из output topic, нужно обязательно настроить isolation level:
consumer = KafkaConsumer(
'output',
bootstrap_servers=['kafka:9092'],
isolation_level='read_committed', # <- критично
)
props.put("isolation.level", "read_committed");
Что это даёт:
read_uncommitted(default): consumer видит все сообщения, включая те, что в открытых или aborted transactions.read_committed: consumer видит только committed transaction messages. Aborted transactions — невидимы. Pending transactions блокируют consumer (он не двинется дальше pending offset, пока transaction не закоммичена).
Если consumer работает с read_uncommitted — он будет читать дубликаты при every aborted transaction. EXACTLY_ONCE producer сломан consumer’ом.
Это самая частая ошибка с Kafka EOS: producer настроен на EXACTLY_ONCE, но consumer читает с read_uncommitted. Тогда dедуплицирование сломано — consumer видит aborted transactions как «дубликаты» successful ones. Всегда проверяйте isolation.level у downstream сервисов.
Latency cost: visibility delay
При EXACTLY_ONCE сообщения становятся visible для consumer’ов только после checkpoint. Это значит, что average latency растёт примерно на checkpoint_interval / 2.
| Checkpoint interval | Avg visibility delay |
|---|---|
| 10 секунд | 5 секунд |
| 30 секунд | 15 секунд |
| 60 секунд | 30 секунд |
| 5 минут | 2.5 минуты |
Если ваш use case требует sub-second latency — EXACTLY_ONCE не подходит, используйте AT_LEAST_ONCE с идемпотентным downstream.
Полный пример Java
KafkaSink<Event> sink = KafkaSink.<Event>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.<Event>builder()
.setTopic("processed-events")
.setValueSerializationSchema(new EventSerializer())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("event-processor-v1")
.setProperty("transaction.timeout.ms", "600000")
.build();
DataStream<Event> events = env
.fromSource(kafkaSource, watermarkStrategy(), "kafka-source")
.map(new EnrichmentMapper())
.uid("enrich");
events.sinkTo(sink).uid("kafka-sink");
env.execute();
# PyFlink 2.x
from pyflink.datastream.connectors.kafka import KafkaSink, KafkaRecordSerializationSchema
from pyflink.datastream.connectors.base import DeliveryGuarantee
sink = (KafkaSink
.builder()
.set_bootstrap_servers("kafka:9092")
.set_record_serializer(
KafkaRecordSerializationSchema
.builder()
.set_topic("processed-events")
.set_value_serialization_schema(JsonRowSerializationSchema.builder().build())
.build()
)
.set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE)
.set_transactional_id_prefix("event-processor-v1")
.set_property("transaction.timeout.ms", "600000")
.build()
)
events.sink_to(sink)
Чек-лист настройки EOS с Kafka
setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).- Уникальный
transactionalIdPrefix(не использовать в других job’ах). transaction.timeout.ms >= 2 * checkpoint_interval.- Broker
transaction.max.timeout.msдостаточный. - Consumer downstream с
isolation.level=read_committed. - Kafka cluster версия >= 2.5 (для epoch fencing).
- Flink checkpoint enabled в EXACTLY_ONCE mode.
- Один
transactionalIdPrefixна одну job на один Kafka cluster.
Попробуй сам
- Запусти Flink job с
KafkaSinkвEXACTLY_ONCEmode. - Запусти consumer с
isolation.level=read_committed. Посмотри latency между event и появлением в consumer. - Сделай искусственный crash Flink (kill TaskManager) в момент когда transaction открыта. Посмотри в Kafka logs: transaction должна быть auto-aborted.
- Запусти second Flink job с тем же
transactionalIdPrefix. Посмотри логи — должен получитьProducerFencedException. - Переключи consumer на
read_uncommitted, симулируй crash -> restore -> увидь дубликаты.
Ключевые выводы
DeliveryGuarantee.EXACTLY_ONCEвKafkaSinkиспользует Kafka transactions.transactionalIdPrefix— обязательный параметр. Уникальный per job per Kafka cluster.transaction.timeout.msдолжен быть >= 2x checkpoint interval, иначе abortion риск.- Crash auto-recovery: при restore epoch fencing автоматически abort’ит lingering transactions.
- Consumer должен использовать
isolation.level=read_committed— иначе EOS бесполезен, дубликаты от aborted transactions. - Latency cost: visibility delay ~ checkpoint_interval/2. Не для sub-second use cases.