Learning Platform
Глоссарий Troubleshooting
Урок 21.03 · 25 мин
Продвинутый
KafkaMirrorMaker 2Cluster LinkingCross-regionOffset translation

Kafka mirroring для DR — MirrorMaker 2 и Cluster Linking

Savepoint-based DR из предыдущего урока решает проблему state и job graph, но оставляет открытой главную проблему: где Flink job в DR-регионе возьмёт входные события? Если Kafka работает только в primary регионе и primary упал — DR-кластер просто не имеет данных. Поэтому Kafka mirroring — обязательный компонент полноценной DR-стратегии.

В этом уроке разберём два production-подхода: MirrorMaker 2 (open-source) и Confluent Cluster Linking (commercial). Главное внимание уделим offset translation — нетривиальной задаче переноса offsets между независимыми Kafka кластерами без потерь и дубликатов.

MirrorMaker 2: архитектура cross-region репликации Offset translation в cross-region репликации Kafka RPO/RTO и процедуры failover в Kafka

Почему mirroring нужен и что он не решает

В streaming pipeline Kafka — это persistence layer для входных событий. Если primary Kafka упал и события за час остались на дисках упавших broker’ов, то даже идеально работающая Flink-DR-инфраструктура не сможет ничего сделать без данных.

Mirroring решает: события из primary Kafka непрерывно копируются в DR Kafka. Когда происходит failover Flink, новый job читает из DR Kafka — данные уже там.

Mirroring не решает:

  • Exactly-once across regions. Между двумя Kafka-кластерами невозможен 2PC — это два разных коммерческих транзакционных systems. Реплицированные сообщения могут оказаться дубликатами после failover.
  • Lag = potential data loss. Если MirrorMaker лагает на 30 секунд и в этот момент случается disaster, эти 30 секунд событий потеряны навсегда (если они не успели реплицироваться).
  • Offset preservation. Offsets в primary и DR Kafka — независимы. Сообщение с offset 12345 в primary может быть offset 67890 в DR. Это создаёт проблему: где Flink job в DR-регионе должен начать чтение после restore из savepoint?
Mirroring topology: active-passive
Primary Kafka (A)Primary Kafka cluster в region A. Producers пишут сюда события. Topic events партиционирован по userId (32 партиции).
MirrorMaker 2 / Cluster Linking
Mirror Kafka (B)Mirror Kafka cluster в region B. Топик events создан с тем же количеством партиций. MirrorMaker копирует сообщения, в идеале с лагом меньше 1s.
Primary Flink (A)Primary Flink job в region A. Читает из Primary Kafka, обрабатывает с stateful operators, пишет в downstream.
savepoint в S3 CRR
Standby Flink (B)Standby Flink в region B. После failover читает из Mirror Kafka с offsets, восстановленными из savepoint. Здесь и начинается проблема offset translation.

MirrorMaker 2: архитектура и конфигурация

MirrorMaker 2 (MM2) — это набор Kafka Connect connectors:

  • MirrorSourceConnector — копирует topic data с source на target.
  • MirrorHeartbeatConnector — пишет heartbeat в специальный топик heartbeats для мониторинга лага.
  • MirrorCheckpointConnector — копирует consumer group offsets с source на target и поддерживает offset translation.

Конфигурация в mm2.properties:

# Cluster aliases
clusters = primary, dr
primary.bootstrap.servers = primary-broker:9092
dr.bootstrap.servers = dr-broker:9092

# Replication flow: primary -> dr
primary->dr.enabled = true
primary->dr.topics = events.*,orders.*,users.*

# DR -> primary flow disabled (active-passive)
dr->primary.enabled = false

# Replication settings
replication.factor = 3
checkpoints.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
offset-syncs.topic.replication.factor = 3

# Sync internal kafka data
refresh.topics.enabled = true
refresh.topics.interval.seconds = 60
sync.topic.configs.enabled = true
sync.topic.acls.enabled = false

# Performance tuning
tasks.max = 16
producer.batch.size = 65536
producer.linger.ms = 100
producer.compression.type = lz4

MM2 по умолчанию переименовывает топики с префиксом source cluster alias. То есть топик events из primary появляется в DR как primary.events. Это полезно для multi-source топологий (когда несколько кластеров пишут в один DR), но усложняет конфигурацию Flink — приходится указывать другое имя топика.

Альтернатива — replication.policy.class=IdentityReplicationPolicy:

replication.policy.class = org.apache.kafka.connect.mirror.IdentityReplicationPolicy

С этой policy топики не переименовываются: events остаётся events в DR кластере. Это нативный паттерн для DR-сценариев.

WARNING

IdentityReplicationPolicy опасна для multi-source топологий: если у вас два primary кластера пишут в один DR, и оба имеют топик events, MM2 будет писать оба в один и тот же DR-топик events. Это разрушит данные. Используйте Identity только в чистом one-to-one mirroring.


Offset translation: главный камень преткновения

Каждое сообщение в Kafka имеет offset — уникальный 64-bit integer per partition. Offsets непрерывны и монотонны внутри партиции. Когда MM2 копирует сообщения, она пишет их в target кластер с новыми offsets, потому что target кластер ведёт свой счётчик независимо.

Это значит: если в primary кластере сообщение X имеет offset 12345 в partition 7, в DR кластере то же сообщение X будет иметь, например, offset 12340 (потому что в primary мог быть retention drop, или DR начал реплицировать позже). Mapping primary_offset -> dr_offset MM2 ведёт в специальном топике _offset-syncs.primary.internal:

# Сообщения в _offset-syncs.primary.internal:
{topicPartition: events-7, upstream: 12345, downstream: 12340}
{topicPartition: events-7, upstream: 12450, downstream: 12445}
{topicPartition: events-7, upstream: 12550, downstream: 12545}
...

При failover Flink в DR-регионе восстанавливается из savepoint. Savepoint содержит Kafka offsets для primary кластера: {events-7: 12500}. Но в DR кластере offset 12500 указывает на другое сообщение (потому что DR offsets — независимы). Если просто использовать тот же offset, Flink перечитает либо слишком много, либо слишком мало сообщений.

MirrorCheckpointConnector решает эту проблему: при копировании consumer group offsets он переводит upstream offsets в downstream через offset-syncs топик. Также экспонирует API RemoteClusterUtils.translateOffsets() для programmatic перевода.

Offset translation flow
Primary: msg M offset=12345Сообщение M записано в primary с offset=12345 в partition 7. Producer не контролирует offset — broker присваивает при append.
MM2 copies
DR: msg M offset=12340Сообщение M реплицировано в DR с offset=12340 (другой счётчик). MM2 записывает mapping (12345 -> 12340) в _offset-syncs топик.
Savepoint: events-7 -> 12345Savepoint содержит Kafka offsets из primary: events-7 -> 12345. Это offset в primary кластере, не в DR.
translate
DR: start from offset 12340RemoteClusterUtils.translateOffsets() читает _offset-syncs топик и переводит 12345 в DR-offset 12340. Flink в DR начинает чтение с 12340.

Сценарий failover с offset translation:

#!/bin/bash
# failover-with-offset-translation.sh

JOB_NAME="fraud-detection"
SAVEPOINT_URI="s3://flink-savepoints-dr/$JOB_NAME/savepoint-abc123"

# 1. Извлечь Kafka offsets из savepoint
# (это не тривиально, см. State Processor API)
java -jar savepoint-reader.jar \
  --savepoint $SAVEPOINT_URI \
  --output /tmp/primary-offsets.json
# /tmp/primary-offsets.json:
# {"events-0": 1234567, "events-1": 1234890, ...}

# 2. Translate primary offsets в DR offsets через MM2
java -cp kafka-clients.jar:connect-mirror.jar \
  org.apache.kafka.connect.mirror.RemoteClusterUtils \
  --consumer-config dr-consumer.properties \
  --primary-offsets /tmp/primary-offsets.json \
  --output /tmp/dr-offsets.json

# 3. Создать новый savepoint с translated offsets через State Processor API
java -jar savepoint-rewriter.jar \
  --input $SAVEPOINT_URI \
  --offsets /tmp/dr-offsets.json \
  --output s3://flink-savepoints-dr/$JOB_NAME/translated-savepoint

# 4. Restore Flink в DR из translated savepoint
kubectl apply -f flink-dr-deployment.yaml \
  --set spec.job.initialSavepointPath=s3://flink-savepoints-dr/$JOB_NAME/translated-savepoint

Это сложно. В production обычно делают по-другому: вместо translation в savepoint используют семантически правильную позицию по timestamp.


Альтернатива translation: seek by timestamp

Kafka умеет искать offset по timestamp (offsetsForTimes). Если у вас watermark-based pipeline (event time близок к ingestion time), можно вместо точного offset translation использовать timestamp-based seek:

// При старте Flink job в DR-регионе:
// Использовать KafkaSource с OffsetsInitializer.timestamp()
KafkaSource<Event> source = KafkaSource.<Event>builder()
    .setBootstrapServers("dr-broker:9092")
    .setTopics("events")
    .setStartingOffsets(OffsetsInitializer.timestamp(savepointTimestamp))
    .setValueOnlyDeserializer(new EventDeserializer())
    .build();

savepointTimestamp — это timestamp последнего обработанного события из savepoint (можно сохранить через CheckpointListener при snapshot, или взять из event-time watermark). Flink в DR начнёт читать с того момента в DR Kafka, где timestamp = savepointTimestamp.

Этот подход проще, но даёт at-least-once, а не exactly-once: возможны дубликаты из-за нечётной границы (несколько событий с одним timestamp), и из-за того, что timestamp в Kafka сообщении это producer timestamp, а не event timestamp.

TIP

Для большинства production pipeline (особенно non-financial) at-least-once с дедупликацией на consumer-стороне приемлем. Exactly-once across regions — это академический идеал, который редко достижим в практике без огромных compromise в complexity.


Confluent Cluster Linking: упрощённая альтернатива

Confluent Cluster Linking (commercial, в Confluent Platform 6.0+) решает многие проблемы MM2:

Преимущества Cluster Linking:

  • Offset preservation: реплицированные сообщения имеют те же offsets, что и в source. Никакой translation не нужен.
  • Read-only mirror topics: producers не могут писать в mirror topics в DR — protected by broker. Это предотвращает split-brain.
  • Native broker feature: не требует отдельного MM2 worker pool. Replication работает на уровне brokers.
  • Lower latency: broker-to-broker replication быстрее, чем consumer-producer цикл в MM2.

Настройка:

# На DR cluster:
confluent kafka link create dr-link \
  --source-cluster primary-cluster-id \
  --source-bootstrap-server primary-broker:9092 \
  --config replication.factor=3,linger.ms=1000

# Создать mirror topic на DR cluster:
kafka-mirrors --create \
  --source-cluster dr-link \
  --source-topic events \
  --mirror-topic events

С Cluster Linking failover упрощается:

# 1. Promote mirror topic (один раз, при failover):
kafka-mirrors --promote --topic events

# 2. Flink в DR использует те же offsets, что и primary
# Просто restart из savepoint без translation

Недостатки:

  • Commercial-only (Confluent Platform / Confluent Cloud).
  • Vendor lock-in.
  • Стоимость: для high-throughput топиков Cluster Linking может стоить thousands of dollars в месяц на cross-region traffic.

Мониторинг replication lag

Без мониторинга вы не знаете фактического RPO. Минимум — replication lag в Prometheus:

# kafka-exporter с MirrorMaker metrics:
- name: kafka_mirror_lag_messages
  query: |
    avg by (topic, partition) (
      kafka_consumergroup_lag{consumergroup=~"mm2-.*"}
    )

- name: kafka_mirror_lag_ms
  query: |
    histogram_quantile(0.99,
      kafka_consumer_records_lag_max_seconds_bucket{consumergroup=~"mm2-.*"}
    ) * 1000

Алерты:

groups:
- name: kafka_mirror
  rules:
  - alert: KafkaMirrorLagHigh
    expr: kafka_mirror_lag_ms > 30000
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "MirrorMaker lag > 30s for {{ $labels.topic }}"
      description: "Current lag: {{ $value }}ms"

  - alert: KafkaMirrorLagCritical
    expr: kafka_mirror_lag_ms > 120000
    for: 2m
    labels:
      severity: critical
    annotations:
      summary: "MirrorMaker lag > 2min for {{ $labels.topic }} — RPO at risk"

Дополнительно — health check через heartbeats топик: MM2 пишет heartbeat каждую секунду, если он не появляется в DR в течение 30 секунд — replication broken.


Итоги

Kafka mirroring — обязательный компонент для streaming DR. MirrorMaker 2 решает basic replication, но требует ручной работы с offset translation. Confluent Cluster Linking упрощает (offset preservation), но это commercial. Для большинства pipeline acceptable trade-off — timestamp-based seek в DR с дедупликацией на consumer-стороне.

В следующем уроке разберём active-active архитектуру — более сложный паттерн, но с RTO ≈ 0.

Проверка знанийKnowledge check
Job в primary читает Kafka topic events (32 partitions). MM2 mirrors в DR. После failover Flink в DR стартует из того же savepoint без offset translation, используя primary offsets напрямую. Что произойдёт и почему?
ОтветAnswer
MirrorMaker 2 (без специальной настройки) копирует сообщения с новыми offsets в target кластере, потому что Kafka offsets — это локальный счётчик каждого broker, и mirror cluster ведёт свой независимый счётчик. Mapping primary к dr хранится в _offset-syncs.primary.internal топике. Без translation Flink использует primary offsets в DR — это семантически бессмысленно. Конкретный сценарий: если в primary partition 7 offset 12345 — это event X, в DR offset 12345 может быть event Y (другое сообщение или вообще пусто). Решения: (1) RemoteClusterUtils.translateOffsets, (2) timestamp-based seek через OffsetsInitializer.timestamp(), (3) Confluent Cluster Linking, который preserves offsets.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. MirrorMaker 2 копирует topic events с primary в DR. Что происходит с offsets каждого message и как Flink восстанавливается в DR после failover?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4