Learning Platform
Глоссарий Troubleshooting
Урок 12.03 · 25 мин
Продвинутый
Offset TranslationMirrorCheckpointConnectorConsumer Group MigrationFailoverRemoteClusterUtils

Offset Translation и Consumer Group Sync

После того как вы настроили MM2 и данные реплицируются между кластерами, возникает неочевидная проблема. Consumer group orders-processor на DC-1 обработала все события до offset 50000 в топике orders, partition 0. DC-1 выходит из строя. Вы переключаетесь на DC-2, где реплицированный топик называется dc1.orders. Но с какого offset читать там?

Ответ не “50000”. Offset’ы на DC-2 не совпадают с offset’ами на DC-1.


Почему offset’ы не совпадают между кластерами

Kafka offset — это порядковый номер записи в конкретной партиции конкретного кластера. Он начинается с 0 и монотонно возрастает. Когда MirrorSourceConnector реплицирует данные, он создаёт новые записи на target-кластере начиная с offset 0.

При этом одна-к-одной соответствия не гарантируется по нескольким причинам:

Репликация начинается не с начала. Если MM2 начал реплицировать уже работающий топик с auto.offset.reset=earliest, он начнёт с начала лога. Но если retention уже удалил старые записи, часть истории DC-1 никогда не попадёт на DC-2.

Топик мог существовать до начала репликации. DC-1 накопил 1 000 000 записей прежде чем MM2 был настроен. MM2 начинает репликацию с offset 0 на DC-1, создавая записи начиная с offset 0 на DC-2. Здесь offset’ы совпадают. Но если MM2 начал с auto.offset.reset=latest, первая запись DC-1 offset 1000000 соответствует DC-2 offset 0.

Фильтрация и трансформации. Если MM2 настроен с трансформациями (SMT), часть записей может быть пропущена. Запись DC-1 offset 42 может не существовать на DC-2.

Компакция. Если топик на DC-1 скомпактирован, часть записей удалена. Соответствие offset’ов нарушается.

Итог: offset X на DC-1 и offset X на DC-2 — это разные записи. Для корректного возобновления чтения после failover необходим маппинг: offset на source -> соответствующий offset на target.


MirrorCheckpointConnector: формат и хранение

MirrorCheckpointConnector решает проблему трансляции. Каждые emit.checkpoints.interval.seconds (default: 60) он:

  1. Читает committed offset’ы всех consumer groups на source-кластере.
  2. Для каждой пары (consumer group, topic-partition) определяет соответствующий offset на target-кластере.
  3. Записывает checkpoint в топик {source-cluster}.checkpoints.internal на target-кластере.
MirrorCheckpointConnector: маппинг offset'ов
Каждые emit.checkpoints.interval.seconds (default 60s) коннектор записывает актуальный маппинг

DC-1 (Source)

DC-1 Source Cluster. Consumer group 'orders-processor' committed offset 50000 в orders-partition-0. MirrorCheckpointConnector читает этот committed offset и записывает маппинг на DC-2.
Consumer Group Offsets (DC-1)__consumer_offsets на DC-1. Consumer group 'orders-processor': orders-0 -> 50000, orders-1 -> 48500, orders-2 -> 51200. MirrorCheckpointConnector читает эти offset'ы через AdminClient.listConsumerGroupOffsets().
checkpoint запись

DC-2 (Target)

DC-2 Target Cluster. Получает checkpoint-записи в топик dc1.checkpoints.internal. Содержит маппинг: для consumer group 'orders-processor', для partition orders-0, offset 50000 на DC-1 соответствует offset 51423 в dc1.orders-0 на DC-2.
dc1.checkpoints.internalТопик на DC-2. Ключ записи: (consumer-group, source-topic, source-partition). Значение: (upstream-offset=50000, downstream-offset=51423, metadata). Записи обновляются каждые emit.checkpoints.interval.seconds. При failover RemoteClusterUtils.translateOffsets() читает этот топик.
emit.checkpoints.interval.seconds=60Default: 60 секунд. Это означает: checkpoint лаг может достигать 60 секунд. Если DC-1 выходит из строя прямо перед checkpoint, consumer group потеряет до 60 секунд прогресса (сообщения будут обработаны повторно). Для snизкого RPO снижайте до 10-15 секунд, но учитывайте дополнительную нагрузку на брокеры.
sync.group.offsets.enabled=trueЕсли включён: MirrorCheckpointConnector автоматически commit'ит переведённые offset'ы в consumer groups на target-кластере. Consumer group 'orders-processor' на DC-2 получает committed offset 51423 в dc1.orders-0. При failover consumer'ы подключаются к DC-2 и продолжают с 51423 без ручной трансляции.

Структура checkpoint-записи

Логическая структура каждой записи в топике {source}.checkpoints.internal:

Key:   (consumer-group-id, source-topic, source-partition)
Value: (upstream-offset, downstream-offset, metadata)

Пример:
  Key:   ("orders-processor", "orders", 0)
  Value: (50000, 51423, "")

Интерпретация:
  Consumer group "orders-processor" committed offset 50000
  в партиции 0 топика "orders" на DC-1.
  Соответствующий offset в dc1.orders партиция 0 на DC-2 = 51423.

RemoteClusterUtils API: ручная трансляция

Для programmatic failover Kafka предоставляет утилитный класс RemoteClusterUtils (kafka-clients 3.1+, доступен в Kafka 4.0):

import org.apache.kafka.connect.mirror.RemoteClusterUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

// Шаг 1: Создать AdminClient для target-кластера (DC-2)
Properties adminProps = new Properties();
adminProps.put("bootstrap.servers", "dc2-broker1:9092,dc2-broker2:9092");
AdminClient targetAdminClient = AdminClient.create(adminProps);

// Шаг 2: Получить текущие committed offset'ы consumer group на source (DC-1)
// В сценарии failover DC-1 недоступен -- используем последний checkpoint
// Если DC-1 ещё доступен:
Properties sourceAdminProps = new Properties();
sourceAdminProps.put("bootstrap.servers", "dc1-broker1:9092");
AdminClient sourceAdminClient = AdminClient.create(sourceAdminProps);

Map<TopicPartition, OffsetAndMetadata> sourceOffsets =
    sourceAdminClient.listConsumerGroupOffsets("orders-processor")
        .partitionsToOffsetAndMetadata()
        .get();

// Шаг 3: Транслировать offset'ы через checkpoint топик на DC-2
Map<TopicPartition, OffsetAndMetadata> targetOffsets =
    RemoteClusterUtils.translateOffsets(
        targetAdminClient,   // AdminClient target-кластера
        "dc1",               // псевдоним source-кластера (совпадает с clusters= в mm2.properties)
        "orders-processor",  // ID consumer group
        sourceOffsets,       // offset'ы на source
        Duration.ofSeconds(10)  // таймаут поиска в checkpoint топике
    );

// Шаг 4: Применить переведённые offset'ы на target-кластере
targetAdminClient.alterConsumerGroupOffsets(
    "orders-processor",
    targetOffsets
).all().get();

System.out.println("Offset'ы транслированы. orders-processor готова к запуску на DC-2.");
WARNING

Checkpoint лаг означает: часть событий будет обработана повторно после failover. Если consumer group committed offset 50000 на DC-1, но последний checkpoint был записан для offset 49800, consumer группа на DC-2 получит offset 49800 и повторно обработает 200 записей. Это at-least-once семантика. Приложения ДОЛЖНЫ быть идемпотентными: повторная обработка одних и тех же событий не должна вызывать дублирование бизнес-результатов. Проверяйте idempotency key перед записью в БД.


Процедура миграции consumer group при failover

Полная процедура с кодом, охватывающая оба сценария: с sync.group.offsets.enabled=true и без него.

Сценарий 1: sync.group.offsets.enabled=true (рекомендуемый)

При включённом параметре MirrorCheckpointConnector автоматически commit’ит переведённые offset’ы. При failover consumer’ы просто подключаются к DC-2 и продолжают с правильной позиции:

// Consumer конфигурация -- ОДИН ДЛЯ ОБОИХ DC
Properties consumerProps = new Properties();
// В нормальном режиме: dc1-broker1:9092
// При failover: dc2-broker1:9092 (меняется только bootstrap.servers)
consumerProps.put("bootstrap.servers", System.getenv("KAFKA_BOOTSTRAP_SERVERS"));
consumerProps.put("group.id", "orders-processor");
consumerProps.put("auto.offset.reset", "none"); // Не сбрасывать на earliest/latest

KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(consumerProps);
// При failover consumer подключается к DC-2 и находит committed offset 51423
// auto.offset.reset=none: если committed offset не найден, выброс NoOffsetForPartitionException
consumer.subscribe(List.of("dc1.orders")); // На DC-2 топик называется dc1.orders

Сценарий 2: Ручная трансляция (sync.group.offsets.enabled=false)

Если автоматическая синхронизация отключена, нужен failover-скрипт:

public class FailoverScript {
    public static void main(String[] args) throws Exception {
        AdminClient sourceAdmin = createAdminClient("dc1-broker1:9092");
        AdminClient targetAdmin = createAdminClient("dc2-broker1:9092");

        List<String> groupsToMigrate = List.of("orders-processor", "payments-consumer");

        for (String group : groupsToMigrate) {
            Map<TopicPartition, OffsetAndMetadata> sourceOffsets =
                sourceAdmin.listConsumerGroupOffsets(group)
                    .partitionsToOffsetAndMetadata()
                    .get();

            if (sourceOffsets.isEmpty()) {
                System.err.println("Нет committed offset'ов для группы: " + group);
                continue;
            }

            Map<TopicPartition, OffsetAndMetadata> targetOffsets =
                RemoteClusterUtils.translateOffsets(
                    targetAdmin, "dc1", group, sourceOffsets, Duration.ofSeconds(30)
                );

            targetAdmin.alterConsumerGroupOffsets(group, targetOffsets).all().get();
            System.out.printf("Группа %s: %d партиций мигрировано%n",
                group, targetOffsets.size());
        }
    }
}

Граничные случаи и ограничения

Что происходит при разном числе партиций

Если топик orders имеет 12 партиций на DC-1, но по какой-то причине dc1.orders на DC-2 имеет только 6 партиций (например, создан вручную с неверной конфигурацией), трансляция offset’ов работает некорректно: данные партиций 6-11 не существуют на DC-2. RemoteClusterUtils.translateOffsets() выбросит исключение или вернёт неполный маппинг.

Решение: sync.topic.configs.enabled=true в MM2 конфигурации синхронизирует число партиций автоматически. Либо создавайте топики на DR-кластере вручную с теми же параметрами.

Компакция checkpoint-топика

Топик {source}.checkpoints.internal по умолчанию настроен с cleanup.policy=compact. Это означает: для каждой пары (consumer-group, partition) хранится только последний checkpoint. Старые маппинги удаляются компактором.

Это нормально для failover: вам нужен только последний маппинг. Но если consumer group давно не работает и её последний committed offset был давно, компакция может удалить старые маппинги, оставив только актуальные.

Checkpoint lag и RPO

emit.checkpoints.interval.seconds непосредственно влияет на RPO:

RPO_worst_case = emit.checkpoints.interval.seconds + network_latency + failover_detection_time

При default значении 60 секунд: до 60 секунд событий может быть обработано повторно после failover (не потеряно — они будут обработаны дважды).

Компромисс при снижении интервала:

emit.checkpoints.interval.secondsRPO (повторная обработка)Дополнительная нагрузка
60 (default)до 60 секМинимальная
30до 30 секУмеренная
10до 10 секЗаметная
5до 5 секВысокая (рекомендуется только при строгих SLA)

Ключевые выводы

  1. Offset’ы не совпадают между кластерами. Offset 50000 на DC-1 соответствует другому offset’у на DC-2. MirrorCheckpointConnector хранит маппинг в {source}.checkpoints.internal.
  2. sync.group.offsets.enabled=true автоматически commit’ит переведённые offset’ы на target-кластере. Рекомендуется для упрощения failover.
  3. RemoteClusterUtils.translateOffsets() — API для programmatic трансляции при ручном failover.
  4. Checkpoint lag = RPO. Снижайте emit.checkpoints.interval.seconds до 10-15 секунд для строгих SLA. Default 60 секунд приводит к повторной обработке до 60 секунд событий.
  5. At-least-once семантика после failover. Приложения должны быть идемпотентными.
Проверка знанийKnowledge check
MirrorCheckpointConnector настроен с emit.checkpoints.interval.seconds=60 и sync.group.offsets.enabled=false. DC-1 вышел из строя в 12:00:45. Последний checkpoint был записан в 12:00:00. Consumer group 'orders-processor' в момент сбоя имела committed offset 75000 на partition orders-0 на DC-1. Что конкретно нужно сделать для корректного возобновления на DC-2 и каков максимальный RPO в данном сценарии?
ОтветAnswer
Действия: (1) Запустить failover-скрипт с RemoteClusterUtils.translateOffsets(): читает checkpoint из dc1.checkpoints.internal на DC-2, находит маппинг для (orders-processor, orders, 0) -> offset на dc1.orders-0 на DC-2. Checkpoint записан в 12:00:00, значит маппинг актуален для offset'а который был committed в ~12:00:00. Если в 12:00:00 committed был, например, 74800 (не 75000), то транслируется именно 74800. (2) Применить полученный mapped offset через AdminClient.alterConsumerGroupOffsets(). (3) Запустить consumer с bootstrap.servers=dc2, group.id=orders-processor, auto.offset.reset=none. Максимальный RPO: время между последним checkpoint (12:00:00) и моментом сбоя (12:00:45) = 45 секунд. Consumer группа повторно обработает все записи между offset, зафиксированным в 12:00:00, и последним обработанным в 12:00:45. Это at-least-once семантика -- приложение должно быть идемпотентным.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 3. Что именно хранит MirrorCheckpointConnector в топике {source-cluster}.checkpoints.internal?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5