Learning Platform
Глоссарий Troubleshooting
Урок 12.01 · 30 мин
Продвинутый
MirrorMaker 2Multi-DCKafka ConnectReplicationDisaster Recovery

Архитектура MirrorMaker 2

Когда один кластер Kafka обслуживает единственный дата-центр, вы получаете надёжность в рамках одного физического местоположения. Но настоящая устойчивость начинается тогда, когда потеря целого дата-центра не останавливает ваш бизнес. Для этого нужна межкластерная репликация.

MirrorMaker 2 (MM2) — официальный инструмент Kafka для репликации данных между кластерами. В Kafka 4.0 это единственный поддерживаемый механизм межкластерной репликации: старый MirrorMaker 1 (простой мост consumer-producer из эпохи Kafka 1.x) полностью удалён. MM2 решает задачи, которые MirrorMaker 1 решал неэффективно: трансляцию offset’ов, мониторинг здоровья репликации и предотвращение циклической репликации в двунаправленных топологиях.

Ключевое архитектурное решение: MM2 не является отдельной системой. Он полностью построен на фреймворке Kafka Connect, что означает: все операционные практики, мониторинг и отказоустойчивость Connect применимы к MM2 без изменений.


Три коннектора MirrorMaker 2

MM2 состоит из трёх специализированных Kafka Connect коннекторов. Каждый выполняет строго определённую задачу. Понимание разделения ответственности между ними критично для правильной эксплуатации.

MM2: три коннектора и их функции
Три коннектора MM2 работают совместно. Каждый независим, но вместе они образуют полную систему репликации.

Source Cluster (DC-1)

Исходный кластер (Source Cluster). Продюсеры пишут данные сюда. MirrorSourceConnector читает топики через внутренний потребитель и воспроизводит записи в целевой кластер. Один MirrorSourceConnector обслуживает репликацию dc1->dc2.

orders / payments / users

Топик orders на source-кластере. Продюсеры пишут заказы. MirrorSourceConnector читает этот топик и реплицирует на target-кластер. Имя на target зависит от ReplicationPolicy: DefaultReplicationPolicy добавляет префикс 'dc1.' -> 'dc1.orders'.
MirrorSourceConnectorОсновной коннектор репликации данных. Читает записи из топиков source-кластера и воспроизводит их в target-кластер. Поддерживает фильтрацию топиков (topics pattern), параллелизм через tasks.max (один task на партицию), инъекцию заголовков для предотвращения циклической репликации. Конфиг: dc1->dc2.enabled=true, dc1->dc2.topics=.* (все топики).
реплицирует записи
MirrorCheckpointConnectorКоннектор трансляции offset'ов consumer groups. Периодически (emit.checkpoints.interval.seconds, default 60) читает committed offset'ы consumer groups на source-кластере и записывает маппинг (source-offset -> target-offset) в специальный топик {source}.checkpoints.internal на target-кластере. При sync.group.offsets.enabled=true автоматически commit'ит переведённые offset'ы в consumer groups на target-кластере.
синхронизирует offset'ы
MirrorHeartbeatConnectorКоннектор мониторинга здоровья репликации. Периодически записывает heartbeat-записи в топик heartbeats на source-кластере. MM2 реплицирует эти heartbeat'ы на target-кластер. Потребитель на target-кластере, читающий топик heartbeats, видит задержку репликации: разница между timestamp heartbeat и текущим временем = replication latency. Алерт: heartbeat_age > 30 секунд.
эмитирует heartbeat'ы

Target Cluster (DC-2)

Целевой кластер (Target Cluster / DR). Получает реплицированные данные. В active-passive: в нормальном режиме не обслуживает продюсеров. В active-active: обслуживает свой набор продюсеров и consumer'ов параллельно с source-кластером.

dc1.orders / dc1.payments / dc1.users

Реплицированные топики на target-кластере. С DefaultReplicationPolicy: dc1.orders, dc1.payments, dc1.users. С IdentityReplicationPolicy: orders, payments, users (те же имена). Внутренние топики MM2: heartbeats, dc1.checkpoints.internal, mm2-configs.dc1.internal, mm2-offsets.dc1.internal, mm2-status.dc1.internal.

MirrorSourceConnector: ядро репликации

MirrorSourceConnector — это Kafka Connect Source Connector. Он создаёт внутреннего потребителя (consumer), который читает записи из топиков source-кластера, и внутреннего продюсера, который воспроизводит эти записи в топики target-кластера.

Критически важные характеристики:

Параллелизм через tasks.max. Один task обслуживает одну или несколько партиций. Параметр tasks.max определяет максимальное число параллельных задач репликации. Рекомендация: устанавливайте tasks.max равным общему числу партиций реплицируемых топиков для максимальной параллельности.

Фильтрация топиков. Параметр dc1->dc2.topics принимает регулярное выражение. .* реплицирует все топики. orders|payments — только два конкретных топика. ^(?!__).+ — все топики, не начинающиеся с __ (исключает внутренние).

Инъекция заголовков. К каждой реплицированной записи MM2 добавляет заголовок __mm2.record.header.source.cluster.alias=dc1. Когда target-кластер в active-active режиме реплицирует обратно в source-кластер, MM2 видит этот заголовок и НЕ реплицирует запись повторно. Так предотвращается бесконечный цикл.

MirrorCheckpointConnector: трансляция offset’ов

Это наименее очевидный, но критически важный коннектор для disaster recovery. Проблема, которую он решает: offset 1000 в топике orders на DC-1 соответствует конкретной записи, но offset 1000 в реплицированном топике dc1.orders на DC-2 может соответствовать совершенно другой записи (или вовсе не существовать), потому что MM2 создаёт новые записи на target-кластере начиная с offset 0.

MirrorCheckpointConnector решает это через периодическое создание маппинга: для каждой consumer group, каждого топика и каждой партиции он записывает пару (source-offset, target-offset). Эти маппинги хранятся в топике {source-cluster}.checkpoints.internal.

MirrorHeartbeatConnector: мониторинг здоровья

Этот коннектор решает операционную задачу: как убедиться, что MM2 вообще работает, если business-топики неактивны? MirrorHeartbeatConnector записывает в топик heartbeats записи с текущим timestamp каждые heartbeats.interval.seconds (default: 1 секунда). MM2 реплицирует эти записи на target-кластер. Мониторинговый потребитель на target-кластере вычисляет задержку: current_time - heartbeat_timestamp = replication_latency.


Конфигурация mm2.properties

Рассмотрим минимальный, но полный конфигурационный файл для репликации DC-1 -> DC-2:

# Псевдонимы кластеров. Используются во всех параметрах вида alias.param и alias1->alias2.param
clusters = dc1, dc2

# Bootstrap servers для каждого кластера
dc1.bootstrap.servers = dc1-broker1:9092,dc1-broker2:9092,dc1-broker3:9092
dc2.bootstrap.servers = dc2-broker1:9092,dc2-broker2:9092,dc2-broker3:9092

# Включить репликацию dc1 -> dc2
dc1->dc2.enabled = true

# Реплицировать все топики
dc1->dc2.topics = .*

# Исключить внутренние топики из репликации (критично!)
dc1->dc2.topics.exclude = .*\.internal, heartbeats, .*\.replica

# Включить heartbeat коннектор
dc1->dc2.emit.heartbeats.enabled = true
dc1->dc2.heartbeats.interval.seconds = 1

# Включить checkpoint коннектор
dc1->dc2.emit.checkpoints.enabled = true
dc1->dc2.emit.checkpoints.interval.seconds = 60

# Автоматическая синхронизация consumer group offset'ов
dc1->dc2.sync.group.offsets.enabled = true
dc1->dc2.sync.group.offsets.interval.seconds = 60

# Политика именования топиков (DefaultReplicationPolicy добавляет префикс)
replication.policy.class = org.apache.kafka.connect.mirror.DefaultReplicationPolicy

# Replication factor для реплицированных топиков
dc1->dc2.replication.factor = 3

# Connect worker параметры (внутренние топики MM2)
offset.storage.replication.factor = 3
config.storage.replication.factor = 3
status.storage.replication.factor = 3

Dedicated Mode: автономный процесс MM2

В Kafka 4.0 рекомендуется запускать MM2 как отдельный процесс (dedicated Connect cluster), независимый от основного Connect-кластера приложения. Это даёт независимое масштабирование и изоляцию сбоев.

# Запуск MM2 как standalone-процесса
# Скрипт connect-mirror-maker.sh входит в стандартный дистрибутив Kafka 4.0
bin/connect-mirror-maker.sh mm2.properties

Для production рекомендуется запуск как Connect-кластера с регистрацией коннекторов через REST API:

# Регистрация MirrorSourceConnector через REST API
curl -X POST http://mm2-connect:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "mm2-source-dc1-to-dc2",
    "config": {
      "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
      "source.cluster.alias": "dc1",
      "target.cluster.alias": "dc2",
      "source.cluster.bootstrap.servers": "dc1-broker1:9092,dc1-broker2:9092",
      "target.cluster.bootstrap.servers": "dc2-broker1:9092,dc2-broker2:9092",
      "topics": ".*",
      "tasks.max": "24",
      "replication.factor": "3"
    }
  }'

Внутренние топики MM2

MM2 создаёт несколько внутренних служебных топиков. Их нельзя удалять вручную.

# Внутренние топики Connect-фреймворка для MM2
mm2-configs.dc1.internal     # Конфигурация коннекторов (Connect offset storage)
mm2-offsets.dc1.internal     # Внутренние offset'ы коннекторов (не consumer group offsets)
mm2-status.dc1.internal      # Статусы коннекторов и задач

# Топики MM2 для мониторинга и failover
dc1.checkpoints.internal     # Маппинги offset'ов consumer groups (MirrorCheckpointConnector)
heartbeats                   # Heartbeat-записи для измерения задержки репликации
WARNING

MM2 реплицирует данные топиков, но НЕ конфигурацию топиков (число партиций, retention, cleanup.policy). Если вы создаёте топик orders с 12 партициями на DC-1, реплицированный топик dc1.orders на DC-2 будет создан с настройками по умолчанию. Чтобы синхронизировать конфигурацию, включите параметр sync.topic.configs.enabled=true в конфигурацию MM2.


Политики именования топиков

Как именуются топики на target-кластере — один из ключевых архитектурных выборов при настройке MM2.

DefaultReplicationPolicy (по умолчанию): добавляет псевдоним source-кластера как префикс. Топик orders на DC-1 появляется как dc1.orders на DC-2. Это безопасно для active-passive топологий: имена не конфликтуют. Недостаток: consumer’ы на DR-кластере должны быть перенастроены на чтение dc1.{topic} после failover.

IdentityReplicationPolicy: топики сохраняют оригинальное имя. orders остаётся orders на обоих кластерах. Обязателен для active-active топологий (подробнее в уроке 02). Риск: без правильной настройки исключений может привести к циклической репликации.

# DefaultReplicationPolicy (default, active-passive)
replication.policy.class = org.apache.kafka.connect.mirror.DefaultReplicationPolicy

# IdentityReplicationPolicy (active-active)
replication.policy.class = org.apache.kafka.connect.mirror.IdentityReplicationPolicy

Интерактивная топология MM2

Ниже — интерактивная диаграмма, которая показывает обе топологии. Переключитесь между Active-Passive и Active-Active для сравнения архитектурных различий.

MirrorMaker 2: Топологии репликации
DC-1 (Primary)Основной дата-центр. Все продюсеры пишут сюда. Consumers читают из этого кластера в нормальном режиме работы. При сбое DC-1 трафик переключается на DC-2 (failover). DefaultReplicationPolicy добавляет префикс 'dc1.' к именам топиков на DR-кластере.
ordersТопик orders на DC-1. Key = orderId для партиционирования. Продюсеры пишут напрямую. MirrorSourceConnector реплицирует на DC-2 как 'dc1.orders'. RF=3, min.insync.replicas=2 для надёжности.
paymentsТопик payments на DC-1. Содержит события об оплатах. Реплицируется на DC-2 как 'dc1.payments'. Retention: compact (хранит последнее состояние по payment-id для идемпотентной обработки).
usersТопик users на DC-1. Профили пользователей, обновления аккаунтов. Реплицируется на DC-2 как 'dc1.users'. log.cleanup.policy=compact: всегда доступен последний профиль каждого пользователя.
MM2 Source Connector
DC-2 (DR Standby)Резервный дата-центр (DR Standby). В нормальном режиме не обслуживает продюсеров. Топики имеют префикс 'dc1.' (DefaultReplicationPolicy). При failover: (1) перевести consumer offsets через MirrorCheckpointConnector, (2) перенаправить DNS, (3) запустить продюсеры на DC-2.
dc1.ordersРеплицированный топик dc1.orders на DR-кластере. Префикс 'dc1.' добавлен DefaultReplicationPolicy. При failover consumers переключаются с 'orders' (DC-1) на 'dc1.orders' (DC-2). sync.group.offsets.enabled=true автоматически переносит consumer group offsets.
dc1.paymentsРеплицированный топик dc1.payments на DR-кластере. Данные получены от MirrorSourceConnector. Репликационный lag = разница между LEO на DC-1 и текущим offset на DC-2. Мониторируется через JMX MBean kafka.mirror:type=MirrorSourceConnector.
dc1.usersРеплицированный топик dc1.users на DR-кластере. Consumers после failover используют RemoteClusterUtils.translateOffsets() для конвертации offsets из dc1.users (source) в dc1.users (target namespace) для корректного возобновления чтения.
RPOсекунды — минутыRecovery Point Objective — допустимая потеря данных. В Active-Passive зависит от lag MirrorSourceConnector. Типично: 1-30 секунд. Формула: RPO = avg(replication-latency-ms). Уменьшить RPO: увеличить tasks.max, уменьшить emit.checkpoints.interval.seconds (default 60s).
RTO5-20 минутRecovery Time Objective — время восстановления сервиса. Включает: обнаружение сбоя через мониторинг heartbeat (1-5 мин), переключение DNS/load balancer (TTL, 1-5 мин), перевод consumer offsets (MirrorCheckpointConnector, ~1 мин), перезапуск приложений (1-5 мин). При sync.group.offsets.enabled=true шаг перевода offsets автоматический.
MirrorCheckpointConnectordc1.checkpoints.internalХранит маппинг смещений: (consumer-group, source-topic, source-partition) -> (source-offset, target-offset). Топик: {source}.checkpoints.internal. Интервал: emit.checkpoints.interval.seconds=60 (по умолчанию). При failover: RemoteClusterUtils.translateOffsets() конвертирует consumer group offsets для корректного возобновления чтения на DR-кластере.

Связь с Kafka Connect

MM2 — это полноценные Kafka Connect коннекторы. Все инструменты управления Connect работают с MM2:

# Список коннекторов MM2
curl http://mm2-connect:8083/connectors

# Статус MirrorSourceConnector
curl http://mm2-connect:8083/connectors/mm2-source-dc1-to-dc2/status

# Пауза репликации (например, для maintenance)
curl -X PUT http://mm2-connect:8083/connectors/mm2-source-dc1-to-dc2/pause

# Возобновление
curl -X PUT http://mm2-connect:8083/connectors/mm2-source-dc1-to-dc2/resume
NOTE

Если вы изучали Модуль 05 (Kafka Connect), вся операционная модель MM2 вам уже знакома: REST API управления, автоматическая перебалансировка tasks при падении worker’а, мониторинг через JMX. MM2 не добавляет новых концепций управления — он расширяет уже известный фреймворк.


Ключевые выводы

  1. MM2 = три коннектора на Kafka Connect: MirrorSourceConnector (данные), MirrorCheckpointConnector (offset’ы), MirrorHeartbeatConnector (мониторинг). Каждый независим и выполняет строго одну роль.
  2. DefaultReplicationPolicy добавляет префикс к именам топиков (dc1.orders). IdentityReplicationPolicy сохраняет оригинальное имя. Выбор определяет топологию: passive или active-active.
  3. Внутренние топики MM2 (mm2-configs.*.internal, checkpoints.internal, heartbeats) создаются автоматически. Не удалять.
  4. Конфигурация топиков не реплицируется автоматически. Включайте sync.topic.configs.enabled=true или создавайте топики на target-кластере вручную.
  5. В Kafka 4.0 MirrorMaker 1 полностью удалён. MM2 — единственный поддерживаемый механизм межкластерной репликации.
Проверка знанийKnowledge check
Команда настраивает MM2 для репликации dc1 -> dc2. Через неделю они замечают: на DC-2 существуют топики dc1.orders и dc1.payments, но конфигурация топика dc1.orders (partitions=12, retention.ms=86400000) не совпадает с оригиналом на DC-1 (partitions=12, retention.ms=604800000). Что произошло и как исправить?
ОтветAnswer
MM2 по умолчанию реплицирует данные топиков, но не их конфигурацию. Топик dc1.orders был создан на DC-2 с настройками по умолчанию (retention.ms=604800000 = 7 дней по умолчанию, а не значение с DC-1). Чтобы MM2 синхронизировал конфигурацию топиков, нужно добавить в mm2.properties параметр sync.topic.configs.enabled=true. После этого MirrorSourceConnector будет периодически (sync.topic.configs.interval.seconds) копировать конфигурацию топиков с source на target-кластер. Немедленное исправление: kafka-configs.sh --alter --entity-type topics --entity-name dc1.orders --add-config retention.ms=604800000 --bootstrap-server dc2-broker1:9092.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Из каких трёх коннекторов состоит MirrorMaker 2 и какова роль каждого?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5