Learning Platform
Глоссарий Troubleshooting
Урок 12.03 · 22 мин
Средний
KafkaSinkTransactionsDelivery GuaranteeIsolation Level

KafkaSink exactly-once: transactions и isolation_level

KafkaSink — самый частый sink в Flink production. Записать результаты обработки в Kafka topic, который потом читают downstream сервисы. Если нужен end-to-end exactly-once — KafkaSink поддерживает его через Kafka transactions, но настройка не тривиальна и есть тонкие моменты, которые ломают гарантии.

В этом уроке разберём, как настроить KafkaSink с DeliveryGuarantee.EXACTLY_ONCE, что такое transactional.id prefix, как Flink использует Kafka transactions внутри, как настроить consumer на стороне даунстрима (isolation.level=read_committed), и какие grабли — transaction.timeout.ms, lingering transactions при crash, и т.д.


Три уровня delivery в KafkaSink

KafkaSink поддерживает три семантики:

KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("output")
        .setValueSerializationSchema(new SimpleStringSchema())
        .build())
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)  // <-- здесь
    .setTransactionalIdPrefix("my-flink-job")  // обязательно для EOS
    .build();
DeliveryGuaranteeЧто значитКогда использовать
NONEFire-and-forget. Возможны потери.Метрики, не критичные данные
AT_LEAST_ONCEProducer ждёт ACK. Возможны дубликаты.Большинство случаев
EXACTLY_ONCEТранзакции. Нет дубликатов, нет потерь.Биллинг, финансовые данные

EXACTLY_ONCE — наш фокус.


При DeliveryGuarantee.EXACTLY_ONCE KafkaSink использует Kafka transactions (доступны с Kafka 0.11). Workflow:

Kafka транзакции: idempotent producer и 2PC Transactional Kafka producer — broker protocol (internals)
  1. Открытие transaction: KafkaSink вызывает beginTransaction() после последнего checkpoint.
  2. Запись событий: все события между checkpoint N и N+1 пишутся в transaction.
  3. Pre-commit: при barrier N+1 приходящем в sink — KafkaSink вызывает flush() для отправки всех записей. Transaction остаётся в PREPARING state.
  4. Checkpoint complete: когда Flink получает confirmation что checkpoint N+1 успешно записан в state backend -> JobManager вызывает commitTransaction() в sink.
  5. Commit: Kafka transaction коммитится. Сообщения становятся видимыми для consumer’ов с isolation.level=read_committed.
KafkaSink exactly-once flow

Begin TX events 1 2 3

После checkpoint N KafkaSink открывает новую Kafka transaction. Записывает события 1, 2, 3.

Kafka topic pending

Сообщения попадают в Kafka topic, но в pending state — invisible для read_committed consumers.

Barrier N+1 flush

Чекпоинт-барьер N+1 пришёл в sink. KafkaSink делает flush — все события в Kafka. Transaction в PREPARING state.

TX preparing

Все события записаны в Kafka, но transaction ещё не committed.

Checkpoint N+1 complete

JobManager получил ACK от всех операторов — checkpoint N+1 complete. JM вызывает commitTransaction в всех sink instances.

TX committed visible

commitTransaction делает события 1, 2, 3 visible для read_committed consumers.

transactional.id prefix: критичный параметр

KafkaSink требует обязательного transactionalIdPrefix:

.setTransactionalIdPrefix("my-flink-job")

Это префикс для transactional.id каждого Kafka producer instance, который создаётся внутри KafkaSink. Flink формирует полный ID как <prefix>-<subtask_index>-<checkpoint_id>:

  • my-flink-job-0-100
  • my-flink-job-1-100
  • my-flink-job-0-101

Почему это важно:

transactional.id — глобально уникальный идентификатор producer’а в Kafka cluster. Если две разных Flink job используют один и тот же prefix — они конфликтуют. Kafka выполнит fencing (см. ниже).

Правила:

  • Каждая Flink job должна иметь уникальный transactionalIdPrefix.
  • Если job renamed/redeployed — менять prefix нельзя (иначе предыдущие transactions «зависнут»).
  • При двух репликах job (например, blue-green) — у них разные prefixes.
WARNING

Никогда не запускайте две Flink job с одним transactionalIdPrefix в один Kafka cluster. Они «зафехтуют» друг друга — одна job всё время будет получать ProducerFencedException и падать в retry loop.


transaction.timeout.ms: важная настройка

Kafka transactions имеют timeout. Если transaction открыта дольше transaction.timeout.ms, broker автоматически её прерывает (aborts).

.setProperty("transaction.timeout.ms", String.valueOf(15 * 60 * 1000))  // 15 минут

Правило: transaction.timeout.ms >= 2x checkpoint_interval + buffer.

Почему:

  • Transaction открывается после checkpoint N, закрывается после checkpoint N+1.
  • Между ними ~ checkpoint interval.
  • Если checkpoint N+1 задерживается (backpressure, retry) — transaction может быть открыта дольше.

Default transaction.timeout.ms в Kafka: 60 секунд. Если checkpoint interval > 30 секунд — могут быть проблемы.

Default transaction.max.timeout.ms на broker’е: 15 минут (900 секунд). Это верхний предел для transaction.timeout.ms. Если хотите больше — нужно увеличить broker config (и убедиться, что админ Kafka не против).

Производственная настройка:

  • checkpoint_interval = 60s.
  • transaction.timeout.ms = 600000 (10 минут).
  • transaction.max.timeout.ms (на broker’е) = 900000 (15 минут).

Lingering transactions при crash

Что если Flink job crashed в момент когда transaction открыта, но не закоммичена?

Сценарий:

  1. Transaction X открыта в момент t.
  2. Flink job crashed в момент t+30s.
  3. Transaction X остаётся «висеть» в Kafka в состоянии ONGOING.

Без epoch fencing: при restore Flink стартует new producer с тем же transactional.id — и Kafka fences old producer instance, авто-аборт transaction X.

С epoch fencing (Kafka 2.5+):

  • Flink при restart передаёт тот же transactional.id.
  • Broker инкрементирует epoch для этого ID.
  • Любые запросы от старого producer (если он каким-то чудом восстановился) — отклоняются с ProducerFencedException.
  • Pending transactions автоматически aborted.

Это значит, что после crash transaction X гарантированно abort’ится при restore. Никаких ручных действий не нужно.


Consumer side: isolation.level=read_committed

На стороне consumer’а, который читает из output topic, нужно обязательно настроить isolation level:

consumer = KafkaConsumer(
    'output',
    bootstrap_servers=['kafka:9092'],
    isolation_level='read_committed',  # <- критично
)
props.put("isolation.level", "read_committed");

Что это даёт:

  • read_uncommitted (default): consumer видит все сообщения, включая те, что в открытых или aborted transactions.
  • read_committed: consumer видит только committed transaction messages. Aborted transactions — невидимы. Pending transactions блокируют consumer (он не двинется дальше pending offset, пока transaction не закоммичена).

Если consumer работает с read_uncommitted — он будет читать дубликаты при every aborted transaction. EXACTLY_ONCE producer сломан consumer’ом.

DANGER

Это самая частая ошибка с Kafka EOS: producer настроен на EXACTLY_ONCE, но consumer читает с read_uncommitted. Тогда dедуплицирование сломано — consumer видит aborted transactions как «дубликаты» successful ones. Всегда проверяйте isolation.level у downstream сервисов.


Latency cost: visibility delay

При EXACTLY_ONCE сообщения становятся visible для consumer’ов только после checkpoint. Это значит, что average latency растёт примерно на checkpoint_interval / 2.

Checkpoint intervalAvg visibility delay
10 секунд5 секунд
30 секунд15 секунд
60 секунд30 секунд
5 минут2.5 минуты

Если ваш use case требует sub-second latency — EXACTLY_ONCE не подходит, используйте AT_LEAST_ONCE с идемпотентным downstream.


Полный пример Java

KafkaSink<Event> sink = KafkaSink.<Event>builder()
    .setBootstrapServers("kafka:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.<Event>builder()
        .setTopic("processed-events")
        .setValueSerializationSchema(new EventSerializer())
        .build())
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("event-processor-v1")
    .setProperty("transaction.timeout.ms", "600000")
    .build();

DataStream<Event> events = env
    .fromSource(kafkaSource, watermarkStrategy(), "kafka-source")
    .map(new EnrichmentMapper())
    .uid("enrich");

events.sinkTo(sink).uid("kafka-sink");

env.execute();
# PyFlink 2.x
from pyflink.datastream.connectors.kafka import KafkaSink, KafkaRecordSerializationSchema
from pyflink.datastream.connectors.base import DeliveryGuarantee

sink = (KafkaSink
    .builder()
    .set_bootstrap_servers("kafka:9092")
    .set_record_serializer(
        KafkaRecordSerializationSchema
            .builder()
            .set_topic("processed-events")
            .set_value_serialization_schema(JsonRowSerializationSchema.builder().build())
            .build()
    )
    .set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .set_transactional_id_prefix("event-processor-v1")
    .set_property("transaction.timeout.ms", "600000")
    .build()
)
events.sink_to(sink)

Чек-лист настройки EOS с Kafka

  1. setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).
  2. Уникальный transactionalIdPrefix (не использовать в других job’ах).
  3. transaction.timeout.ms >= 2 * checkpoint_interval.
  4. Broker transaction.max.timeout.ms достаточный.
  5. Consumer downstream с isolation.level=read_committed.
  6. Kafka cluster версия >= 2.5 (для epoch fencing).
  7. Flink checkpoint enabled в EXACTLY_ONCE mode.
  8. Один transactionalIdPrefix на одну job на один Kafka cluster.

Попробуй сам

  1. Запусти Flink job с KafkaSink в EXACTLY_ONCE mode.
  2. Запусти consumer с isolation.level=read_committed. Посмотри latency между event и появлением в consumer.
  3. Сделай искусственный crash Flink (kill TaskManager) в момент когда transaction открыта. Посмотри в Kafka logs: transaction должна быть auto-aborted.
  4. Запусти second Flink job с тем же transactionalIdPrefix. Посмотри логи — должен получить ProducerFencedException.
  5. Переключи consumer на read_uncommitted, симулируй crash -> restore -> увидь дубликаты.

Ключевые выводы

  1. DeliveryGuarantee.EXACTLY_ONCE в KafkaSink использует Kafka transactions.
  2. transactionalIdPrefix — обязательный параметр. Уникальный per job per Kafka cluster.
  3. transaction.timeout.ms должен быть >= 2x checkpoint interval, иначе abortion риск.
  4. Crash auto-recovery: при restore epoch fencing автоматически abort’ит lingering transactions.
  5. Consumer должен использовать isolation.level=read_committed — иначе EOS бесполезен, дубликаты от aborted transactions.
  6. Latency cost: visibility delay ~ checkpoint_interval/2. Не для sub-second use cases.
Проверка знанийKnowledge check
Вы настроили Flink job с KafkaSink EXACTLY_ONCE, transactionalIdPrefix='my-job', transaction.timeout.ms=60000 (60s), checkpoint interval=120s. Downstream consumer работает с isolation.level=read_committed. После запуска job через час Kafka broker логирует InvalidTxnStateException и Flink job постоянно рестартует. Что не так и как исправить?
ОтветAnswer
Проблема: transaction.timeout.ms=60s МЕНЬШЕ checkpoint_interval=120s. Между checkpoint N и N+1 проходит 120 секунд, но Kafka транзакция таймаутится через 60 секунд. Broker auto-aborts transaction до того, как Flink успевает её commit. KafkaSink при попытке commit получает InvalidTxnStateException — transaction уже aborted. Job падает, restart, повтор. Исправить: установить transaction.timeout.ms >= 2 x checkpoint_interval + buffer = 300000 (5 минут) минимум. Также проверить broker config transaction.max.timeout.ms — он должен быть >= нашего transaction.timeout.ms (default 900000 = 15 минут, должно хватить). После исправления — restart job. Дополнительная проверка: запустить тестовый scenario с искусственным backpressure (медленный downstream), убедиться что даже при slow checkpoints transaction.timeout достаточен. Также рассмотреть: возможно, checkpoint interval 120s избыточен — в EOS режиме это даёт visibility delay 60 секунд в среднем, что много для большинства use cases. Уменьшить до 30-60 секунд, плюс держать transaction.timeout 2x от этого.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что делает DeliveryGuarantee.EXACTLY_ONCE в KafkaSink под капотом?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4