Offset Translation и Consumer Group Sync
После того как вы настроили MM2 и данные реплицируются между кластерами, возникает неочевидная проблема. Consumer group orders-processor на DC-1 обработала все события до offset 50000 в топике orders, partition 0. DC-1 выходит из строя. Вы переключаетесь на DC-2, где реплицированный топик называется dc1.orders. Но с какого offset читать там?
Ответ не “50000”. Offset’ы на DC-2 не совпадают с offset’ами на DC-1.
Почему offset’ы не совпадают между кластерами
Kafka offset — это порядковый номер записи в конкретной партиции конкретного кластера. Он начинается с 0 и монотонно возрастает. Когда MirrorSourceConnector реплицирует данные, он создаёт новые записи на target-кластере начиная с offset 0.
При этом одна-к-одной соответствия не гарантируется по нескольким причинам:
Репликация начинается не с начала. Если MM2 начал реплицировать уже работающий топик с auto.offset.reset=earliest, он начнёт с начала лога. Но если retention уже удалил старые записи, часть истории DC-1 никогда не попадёт на DC-2.
Топик мог существовать до начала репликации. DC-1 накопил 1 000 000 записей прежде чем MM2 был настроен. MM2 начинает репликацию с offset 0 на DC-1, создавая записи начиная с offset 0 на DC-2. Здесь offset’ы совпадают. Но если MM2 начал с auto.offset.reset=latest, первая запись DC-1 offset 1000000 соответствует DC-2 offset 0.
Фильтрация и трансформации. Если MM2 настроен с трансформациями (SMT), часть записей может быть пропущена. Запись DC-1 offset 42 может не существовать на DC-2.
Компакция. Если топик на DC-1 скомпактирован, часть записей удалена. Соответствие offset’ов нарушается.
Итог: offset X на DC-1 и offset X на DC-2 — это разные записи. Для корректного возобновления чтения после failover необходим маппинг: offset на source -> соответствующий offset на target.
MirrorCheckpointConnector: формат и хранение
MirrorCheckpointConnector решает проблему трансляции. Каждые emit.checkpoints.interval.seconds (default: 60) он:
- Читает committed offset’ы всех consumer groups на source-кластере.
- Для каждой пары (consumer group, topic-partition) определяет соответствующий offset на target-кластере.
- Записывает checkpoint в топик
{source-cluster}.checkpoints.internalна target-кластере.
DC-1 (Source)
DC-1 Source Cluster. Consumer group 'orders-processor' committed offset 50000 в orders-partition-0. MirrorCheckpointConnector читает этот committed offset и записывает маппинг на DC-2.DC-2 (Target)
DC-2 Target Cluster. Получает checkpoint-записи в топик dc1.checkpoints.internal. Содержит маппинг: для consumer group 'orders-processor', для partition orders-0, offset 50000 на DC-1 соответствует offset 51423 в dc1.orders-0 на DC-2.Структура checkpoint-записи
Логическая структура каждой записи в топике {source}.checkpoints.internal:
Key: (consumer-group-id, source-topic, source-partition)
Value: (upstream-offset, downstream-offset, metadata)
Пример:
Key: ("orders-processor", "orders", 0)
Value: (50000, 51423, "")
Интерпретация:
Consumer group "orders-processor" committed offset 50000
в партиции 0 топика "orders" на DC-1.
Соответствующий offset в dc1.orders партиция 0 на DC-2 = 51423.
RemoteClusterUtils API: ручная трансляция
Для programmatic failover Kafka предоставляет утилитный класс RemoteClusterUtils (kafka-clients 3.1+, доступен в Kafka 4.0):
import org.apache.kafka.connect.mirror.RemoteClusterUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
// Шаг 1: Создать AdminClient для target-кластера (DC-2)
Properties adminProps = new Properties();
adminProps.put("bootstrap.servers", "dc2-broker1:9092,dc2-broker2:9092");
AdminClient targetAdminClient = AdminClient.create(adminProps);
// Шаг 2: Получить текущие committed offset'ы consumer group на source (DC-1)
// В сценарии failover DC-1 недоступен -- используем последний checkpoint
// Если DC-1 ещё доступен:
Properties sourceAdminProps = new Properties();
sourceAdminProps.put("bootstrap.servers", "dc1-broker1:9092");
AdminClient sourceAdminClient = AdminClient.create(sourceAdminProps);
Map<TopicPartition, OffsetAndMetadata> sourceOffsets =
sourceAdminClient.listConsumerGroupOffsets("orders-processor")
.partitionsToOffsetAndMetadata()
.get();
// Шаг 3: Транслировать offset'ы через checkpoint топик на DC-2
Map<TopicPartition, OffsetAndMetadata> targetOffsets =
RemoteClusterUtils.translateOffsets(
targetAdminClient, // AdminClient target-кластера
"dc1", // псевдоним source-кластера (совпадает с clusters= в mm2.properties)
"orders-processor", // ID consumer group
sourceOffsets, // offset'ы на source
Duration.ofSeconds(10) // таймаут поиска в checkpoint топике
);
// Шаг 4: Применить переведённые offset'ы на target-кластере
targetAdminClient.alterConsumerGroupOffsets(
"orders-processor",
targetOffsets
).all().get();
System.out.println("Offset'ы транслированы. orders-processor готова к запуску на DC-2.");
Checkpoint лаг означает: часть событий будет обработана повторно после failover. Если consumer group committed offset 50000 на DC-1, но последний checkpoint был записан для offset 49800, consumer группа на DC-2 получит offset 49800 и повторно обработает 200 записей. Это at-least-once семантика. Приложения ДОЛЖНЫ быть идемпотентными: повторная обработка одних и тех же событий не должна вызывать дублирование бизнес-результатов. Проверяйте idempotency key перед записью в БД.
Процедура миграции consumer group при failover
Полная процедура с кодом, охватывающая оба сценария: с sync.group.offsets.enabled=true и без него.
Сценарий 1: sync.group.offsets.enabled=true (рекомендуемый)
При включённом параметре MirrorCheckpointConnector автоматически commit’ит переведённые offset’ы. При failover consumer’ы просто подключаются к DC-2 и продолжают с правильной позиции:
// Consumer конфигурация -- ОДИН ДЛЯ ОБОИХ DC
Properties consumerProps = new Properties();
// В нормальном режиме: dc1-broker1:9092
// При failover: dc2-broker1:9092 (меняется только bootstrap.servers)
consumerProps.put("bootstrap.servers", System.getenv("KAFKA_BOOTSTRAP_SERVERS"));
consumerProps.put("group.id", "orders-processor");
consumerProps.put("auto.offset.reset", "none"); // Не сбрасывать на earliest/latest
KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(consumerProps);
// При failover consumer подключается к DC-2 и находит committed offset 51423
// auto.offset.reset=none: если committed offset не найден, выброс NoOffsetForPartitionException
consumer.subscribe(List.of("dc1.orders")); // На DC-2 топик называется dc1.orders
Сценарий 2: Ручная трансляция (sync.group.offsets.enabled=false)
Если автоматическая синхронизация отключена, нужен failover-скрипт:
public class FailoverScript {
public static void main(String[] args) throws Exception {
AdminClient sourceAdmin = createAdminClient("dc1-broker1:9092");
AdminClient targetAdmin = createAdminClient("dc2-broker1:9092");
List<String> groupsToMigrate = List.of("orders-processor", "payments-consumer");
for (String group : groupsToMigrate) {
Map<TopicPartition, OffsetAndMetadata> sourceOffsets =
sourceAdmin.listConsumerGroupOffsets(group)
.partitionsToOffsetAndMetadata()
.get();
if (sourceOffsets.isEmpty()) {
System.err.println("Нет committed offset'ов для группы: " + group);
continue;
}
Map<TopicPartition, OffsetAndMetadata> targetOffsets =
RemoteClusterUtils.translateOffsets(
targetAdmin, "dc1", group, sourceOffsets, Duration.ofSeconds(30)
);
targetAdmin.alterConsumerGroupOffsets(group, targetOffsets).all().get();
System.out.printf("Группа %s: %d партиций мигрировано%n",
group, targetOffsets.size());
}
}
}
Граничные случаи и ограничения
Что происходит при разном числе партиций
Если топик orders имеет 12 партиций на DC-1, но по какой-то причине dc1.orders на DC-2 имеет только 6 партиций (например, создан вручную с неверной конфигурацией), трансляция offset’ов работает некорректно: данные партиций 6-11 не существуют на DC-2. RemoteClusterUtils.translateOffsets() выбросит исключение или вернёт неполный маппинг.
Решение: sync.topic.configs.enabled=true в MM2 конфигурации синхронизирует число партиций автоматически. Либо создавайте топики на DR-кластере вручную с теми же параметрами.
Компакция checkpoint-топика
Топик {source}.checkpoints.internal по умолчанию настроен с cleanup.policy=compact. Это означает: для каждой пары (consumer-group, partition) хранится только последний checkpoint. Старые маппинги удаляются компактором.
Это нормально для failover: вам нужен только последний маппинг. Но если consumer group давно не работает и её последний committed offset был давно, компакция может удалить старые маппинги, оставив только актуальные.
Checkpoint lag и RPO
emit.checkpoints.interval.seconds непосредственно влияет на RPO:
RPO_worst_case = emit.checkpoints.interval.seconds + network_latency + failover_detection_time
При default значении 60 секунд: до 60 секунд событий может быть обработано повторно после failover (не потеряно — они будут обработаны дважды).
Компромисс при снижении интервала:
| emit.checkpoints.interval.seconds | RPO (повторная обработка) | Дополнительная нагрузка |
|---|---|---|
| 60 (default) | до 60 сек | Минимальная |
| 30 | до 30 сек | Умеренная |
| 10 | до 10 сек | Заметная |
| 5 | до 5 сек | Высокая (рекомендуется только при строгих SLA) |
Ключевые выводы
- Offset’ы не совпадают между кластерами. Offset 50000 на DC-1 соответствует другому offset’у на DC-2. MirrorCheckpointConnector хранит маппинг в
{source}.checkpoints.internal. - sync.group.offsets.enabled=true автоматически commit’ит переведённые offset’ы на target-кластере. Рекомендуется для упрощения failover.
- RemoteClusterUtils.translateOffsets() — API для programmatic трансляции при ручном failover.
- Checkpoint lag = RPO. Снижайте
emit.checkpoints.interval.secondsдо 10-15 секунд для строгих SLA. Default 60 секунд приводит к повторной обработке до 60 секунд событий. - At-least-once семантика после failover. Приложения должны быть идемпотентными.