Kafka mirroring для DR — MirrorMaker 2 и Cluster Linking
Savepoint-based DR из предыдущего урока решает проблему state и job graph, но оставляет открытой главную проблему: где Flink job в DR-регионе возьмёт входные события? Если Kafka работает только в primary регионе и primary упал — DR-кластер просто не имеет данных. Поэтому Kafka mirroring — обязательный компонент полноценной DR-стратегии.
В этом уроке разберём два production-подхода: MirrorMaker 2 (open-source) и Confluent Cluster Linking (commercial). Главное внимание уделим offset translation — нетривиальной задаче переноса offsets между независимыми Kafka кластерами без потерь и дубликатов.
MirrorMaker 2: архитектура cross-region репликации Offset translation в cross-region репликации Kafka RPO/RTO и процедуры failover в KafkaПочему mirroring нужен и что он не решает
В streaming pipeline Kafka — это persistence layer для входных событий. Если primary Kafka упал и события за час остались на дисках упавших broker’ов, то даже идеально работающая Flink-DR-инфраструктура не сможет ничего сделать без данных.
Mirroring решает: события из primary Kafka непрерывно копируются в DR Kafka. Когда происходит failover Flink, новый job читает из DR Kafka — данные уже там.
Mirroring не решает:
- Exactly-once across regions. Между двумя Kafka-кластерами невозможен 2PC — это два разных коммерческих транзакционных systems. Реплицированные сообщения могут оказаться дубликатами после failover.
- Lag = potential data loss. Если MirrorMaker лагает на 30 секунд и в этот момент случается disaster, эти 30 секунд событий потеряны навсегда (если они не успели реплицироваться).
- Offset preservation. Offsets в primary и DR Kafka — независимы. Сообщение с offset 12345 в primary может быть offset 67890 в DR. Это создаёт проблему: где Flink job в DR-регионе должен начать чтение после restore из savepoint?
MirrorMaker 2: архитектура и конфигурация
MirrorMaker 2 (MM2) — это набор Kafka Connect connectors:
- MirrorSourceConnector — копирует topic data с source на target.
- MirrorHeartbeatConnector — пишет heartbeat в специальный топик heartbeats для мониторинга лага.
- MirrorCheckpointConnector — копирует consumer group offsets с source на target и поддерживает offset translation.
Конфигурация в mm2.properties:
# Cluster aliases
clusters = primary, dr
primary.bootstrap.servers = primary-broker:9092
dr.bootstrap.servers = dr-broker:9092
# Replication flow: primary -> dr
primary->dr.enabled = true
primary->dr.topics = events.*,orders.*,users.*
# DR -> primary flow disabled (active-passive)
dr->primary.enabled = false
# Replication settings
replication.factor = 3
checkpoints.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
offset-syncs.topic.replication.factor = 3
# Sync internal kafka data
refresh.topics.enabled = true
refresh.topics.interval.seconds = 60
sync.topic.configs.enabled = true
sync.topic.acls.enabled = false
# Performance tuning
tasks.max = 16
producer.batch.size = 65536
producer.linger.ms = 100
producer.compression.type = lz4
MM2 по умолчанию переименовывает топики с префиксом source cluster alias. То есть топик events из primary появляется в DR как primary.events. Это полезно для multi-source топологий (когда несколько кластеров пишут в один DR), но усложняет конфигурацию Flink — приходится указывать другое имя топика.
Альтернатива — replication.policy.class=IdentityReplicationPolicy:
replication.policy.class = org.apache.kafka.connect.mirror.IdentityReplicationPolicy
С этой policy топики не переименовываются: events остаётся events в DR кластере. Это нативный паттерн для DR-сценариев.
IdentityReplicationPolicy опасна для multi-source топологий: если у вас два primary кластера пишут в один DR, и оба имеют топик events, MM2 будет писать оба в один и тот же DR-топик events. Это разрушит данные. Используйте Identity только в чистом one-to-one mirroring.
Offset translation: главный камень преткновения
Каждое сообщение в Kafka имеет offset — уникальный 64-bit integer per partition. Offsets непрерывны и монотонны внутри партиции. Когда MM2 копирует сообщения, она пишет их в target кластер с новыми offsets, потому что target кластер ведёт свой счётчик независимо.
Это значит: если в primary кластере сообщение X имеет offset 12345 в partition 7, в DR кластере то же сообщение X будет иметь, например, offset 12340 (потому что в primary мог быть retention drop, или DR начал реплицировать позже). Mapping primary_offset -> dr_offset MM2 ведёт в специальном топике _offset-syncs.primary.internal:
# Сообщения в _offset-syncs.primary.internal:
{topicPartition: events-7, upstream: 12345, downstream: 12340}
{topicPartition: events-7, upstream: 12450, downstream: 12445}
{topicPartition: events-7, upstream: 12550, downstream: 12545}
...
При failover Flink в DR-регионе восстанавливается из savepoint. Savepoint содержит Kafka offsets для primary кластера: {events-7: 12500}. Но в DR кластере offset 12500 указывает на другое сообщение (потому что DR offsets — независимы). Если просто использовать тот же offset, Flink перечитает либо слишком много, либо слишком мало сообщений.
MirrorCheckpointConnector решает эту проблему: при копировании consumer group offsets он переводит upstream offsets в downstream через offset-syncs топик. Также экспонирует API RemoteClusterUtils.translateOffsets() для programmatic перевода.
Сценарий failover с offset translation:
#!/bin/bash
# failover-with-offset-translation.sh
JOB_NAME="fraud-detection"
SAVEPOINT_URI="s3://flink-savepoints-dr/$JOB_NAME/savepoint-abc123"
# 1. Извлечь Kafka offsets из savepoint
# (это не тривиально, см. State Processor API)
java -jar savepoint-reader.jar \
--savepoint $SAVEPOINT_URI \
--output /tmp/primary-offsets.json
# /tmp/primary-offsets.json:
# {"events-0": 1234567, "events-1": 1234890, ...}
# 2. Translate primary offsets в DR offsets через MM2
java -cp kafka-clients.jar:connect-mirror.jar \
org.apache.kafka.connect.mirror.RemoteClusterUtils \
--consumer-config dr-consumer.properties \
--primary-offsets /tmp/primary-offsets.json \
--output /tmp/dr-offsets.json
# 3. Создать новый savepoint с translated offsets через State Processor API
java -jar savepoint-rewriter.jar \
--input $SAVEPOINT_URI \
--offsets /tmp/dr-offsets.json \
--output s3://flink-savepoints-dr/$JOB_NAME/translated-savepoint
# 4. Restore Flink в DR из translated savepoint
kubectl apply -f flink-dr-deployment.yaml \
--set spec.job.initialSavepointPath=s3://flink-savepoints-dr/$JOB_NAME/translated-savepoint
Это сложно. В production обычно делают по-другому: вместо translation в savepoint используют семантически правильную позицию по timestamp.
Альтернатива translation: seek by timestamp
Kafka умеет искать offset по timestamp (offsetsForTimes). Если у вас watermark-based pipeline (event time близок к ingestion time), можно вместо точного offset translation использовать timestamp-based seek:
// При старте Flink job в DR-регионе:
// Использовать KafkaSource с OffsetsInitializer.timestamp()
KafkaSource<Event> source = KafkaSource.<Event>builder()
.setBootstrapServers("dr-broker:9092")
.setTopics("events")
.setStartingOffsets(OffsetsInitializer.timestamp(savepointTimestamp))
.setValueOnlyDeserializer(new EventDeserializer())
.build();
savepointTimestamp — это timestamp последнего обработанного события из savepoint (можно сохранить через CheckpointListener при snapshot, или взять из event-time watermark). Flink в DR начнёт читать с того момента в DR Kafka, где timestamp = savepointTimestamp.
Этот подход проще, но даёт at-least-once, а не exactly-once: возможны дубликаты из-за нечётной границы (несколько событий с одним timestamp), и из-за того, что timestamp в Kafka сообщении это producer timestamp, а не event timestamp.
Для большинства production pipeline (особенно non-financial) at-least-once с дедупликацией на consumer-стороне приемлем. Exactly-once across regions — это академический идеал, который редко достижим в практике без огромных compromise в complexity.
Confluent Cluster Linking: упрощённая альтернатива
Confluent Cluster Linking (commercial, в Confluent Platform 6.0+) решает многие проблемы MM2:
Преимущества Cluster Linking:
- Offset preservation: реплицированные сообщения имеют те же offsets, что и в source. Никакой translation не нужен.
- Read-only mirror topics: producers не могут писать в mirror topics в DR — protected by broker. Это предотвращает split-brain.
- Native broker feature: не требует отдельного MM2 worker pool. Replication работает на уровне brokers.
- Lower latency: broker-to-broker replication быстрее, чем consumer-producer цикл в MM2.
Настройка:
# На DR cluster:
confluent kafka link create dr-link \
--source-cluster primary-cluster-id \
--source-bootstrap-server primary-broker:9092 \
--config replication.factor=3,linger.ms=1000
# Создать mirror topic на DR cluster:
kafka-mirrors --create \
--source-cluster dr-link \
--source-topic events \
--mirror-topic events
С Cluster Linking failover упрощается:
# 1. Promote mirror topic (один раз, при failover):
kafka-mirrors --promote --topic events
# 2. Flink в DR использует те же offsets, что и primary
# Просто restart из savepoint без translation
Недостатки:
- Commercial-only (Confluent Platform / Confluent Cloud).
- Vendor lock-in.
- Стоимость: для high-throughput топиков Cluster Linking может стоить thousands of dollars в месяц на cross-region traffic.
Мониторинг replication lag
Без мониторинга вы не знаете фактического RPO. Минимум — replication lag в Prometheus:
# kafka-exporter с MirrorMaker metrics:
- name: kafka_mirror_lag_messages
query: |
avg by (topic, partition) (
kafka_consumergroup_lag{consumergroup=~"mm2-.*"}
)
- name: kafka_mirror_lag_ms
query: |
histogram_quantile(0.99,
kafka_consumer_records_lag_max_seconds_bucket{consumergroup=~"mm2-.*"}
) * 1000
Алерты:
groups:
- name: kafka_mirror
rules:
- alert: KafkaMirrorLagHigh
expr: kafka_mirror_lag_ms > 30000
for: 5m
labels:
severity: warning
annotations:
summary: "MirrorMaker lag > 30s for {{ $labels.topic }}"
description: "Current lag: {{ $value }}ms"
- alert: KafkaMirrorLagCritical
expr: kafka_mirror_lag_ms > 120000
for: 2m
labels:
severity: critical
annotations:
summary: "MirrorMaker lag > 2min for {{ $labels.topic }} — RPO at risk"
Дополнительно — health check через heartbeats топик: MM2 пишет heartbeat каждую секунду, если он не появляется в DR в течение 30 секунд — replication broken.
Итоги
Kafka mirroring — обязательный компонент для streaming DR. MirrorMaker 2 решает basic replication, но требует ручной работы с offset translation. Confluent Cluster Linking упрощает (offset preservation), но это commercial. Для большинства pipeline acceptable trade-off — timestamp-based seek в DR с дедупликацией на consumer-стороне.
В следующем уроке разберём active-active архитектуру — более сложный паттерн, но с RTO ≈ 0.