RPO/RTO и процедуры Failover
Понимание MM2 архитектуры и offset трансляции — необходимые, но недостаточные знания для эксплуатации Multi-DC Kafka. Вам нужно уметь отвечать на вопросы: “Сколько данных мы потеряем при сбое?” и “Сколько времени займёт восстановление?”. Это RPO и RTO. И вам нужен работающий runbook — последовательность конкретных шагов, которую можно выполнить под давлением инцидента в 3 часа ночи.
RPO: Recovery Point Objective
RPO — максимальный допустимый объём потерянных данных, выражённый во времени. Если RPO = 30 секунд, это значит: при сбое первичного кластера допустима потеря не более 30 секунд данных.
В контексте MM2, RPO определяется задержкой репликации в момент сбоя:
RPO = replication_lag_at_time_of_failure
Если MM2 в момент сбоя DC-1 имел задержку репликации 5 секунд, то 5 секунд данных, уже записанных на DC-1, ещё не попали на DC-2. Эти данные будут потеряны (или обработаны повторно, если producer использовал идемпотентность и данные можно восстановить).
Ключевая JMX-метрика для RPO
kafka.mirror:type=MirrorSourceConnector,target=dc2
- replication-latency-ms-avg: средняя задержка репликации за последние N секунд
- replication-latency-ms-max: максимальная задержка за окно
Формула максимального RPO за окно наблюдения:
RPO_max = max(replication-latency-ms-max) / 1000 (в секундах)
Факторы, влияющие на RPO
Сетевая задержка между DC. Физическое расстояние между дата-центрами определяет нижний предел задержки репликации. Два DC в одном городе: 1-5ms. DC в разных странах: 50-100ms. DC на разных континентах: 100-300ms.
tasks.max MirrorSourceConnector. Каждый task реплицирует несколько партиций. Увеличение tasks.max увеличивает параллелизм репликации и снижает lag при высокой нагрузке.
emit.checkpoints.interval.seconds. Непосредственно влияет на RPO в смысле повторной обработки (подробно в уроке 03). Это checkpoint RPO, не data RPO.
Пропускная способность сети. Если суммарный объём данных больше bandwidth канала между DC, репликация не успевает за записью. Контролируйте byte-rate метрику.
Типичные значения RPO
| Конфигурация | Типичный RPO |
|---|---|
| DC в одном здании (cross-site), 10Gbps | 100ms - 1 секунда |
| DC в одном городе, 1Gbps | 1-5 секунд |
| DC в разных странах | 10-60 секунд |
| DC на разных континентах | 1-5 минут |
RTO: Recovery Time Objective
RTO — максимальное допустимое время от момента обнаружения сбоя до полного восстановления сервиса.
RTO складывается из нескольких последовательных фаз:
Снижение RTO
Низкий DNS TTL. Установите TTL = 30-60 секунд для DNS-записей Kafka bootstrap servers. Это сокращает фазу DNS propagation с 5 минут до 1 минуты. Риск: незначительное увеличение DNS query rate.
sync.group.offsets.enabled=true. Устраняет фазу ручной трансляции offset’ов. Экономия: 1-2 минуты.
Pre-warm consumers на DR. Держите consumer group на DC-2 в активном состоянии с минимальным poll rate, не обрабатывая реально. Offset’ы уже committed, приложение запустится мгновенно. Потребление ресурсов DC-2: минимальное.
Автоматизированные failover скрипты. Runbook в виде кода (Ansible, Bash, Python). При ручном выполнении операции занимают вдвое больше времени из-за стресса.
Пошаговая процедура Failover
Это runbook для Active-Passive топологии. Каждый шаг должен быть задокументирован в вашем внутреннем wiki и протестирован на staging.
Шаг 1: Обнаружение сбоя (автоматически)
# Prometheus alert (срабатывает автоматически):
# kafka_mirror_heartbeat_age_seconds{source="dc1"} > 30
# Ручная верификация:
kafka-consumer-groups.sh \
--bootstrap-server dc2-broker1:9092 \
--describe \
--group mm2-heartbeat-monitor
# Проверить доступность DC-1:
kafka-broker-api-versions.sh \
--bootstrap-server dc1-broker1:9092 \
--command-config client-ssl.properties
# Если таймаут -- DC-1 недоступен
Шаг 2: Остановка продюсеров на DC-1
Это предотвращает запись данных в DC-1, которые не успеют реплицироваться:
# В Kubernetes: уменьшить replicas продюсер-сервисов до 0
kubectl scale deployment order-service --replicas=0 -n production
kubectl scale deployment payment-service --replicas=0 -n production
# Подождать завершения in-flight records:
# producer.flush() вызывается автоматически при graceful shutdown
sleep 10
Шаг 3: Ожидание дренирования MM2
Если DC-1 ещё частично доступен (сеть живая, но брокеры перегружены), ждём завершения репликации:
# Проверить replication lag
kafka-consumer-groups.sh \
--bootstrap-server dc2-broker1:9092 \
--describe \
--group mm2-source-dc1-to-dc2
# Ждём пока LAG = 0
# Таймаут: 5 минут. Если за 5 минут не 0 -- принимаем текущий lag как RPO
Шаг 4: Трансляция offset’ов consumer groups
# Вариант A: sync.group.offsets.enabled=true (автоматически)
# Ничего делать не нужно. Offset'ы уже synchronized на DC-2.
# Вариант B: Ручная трансляция скриптом
java -jar failover-tools.jar translate-offsets \
--source-cluster dc1 \
--target-bootstrap dc2-broker1:9092 \
--groups orders-processor,payments-consumer,analytics-reader
# Верификация: проверить committed offset'ы на DC-2
kafka-consumer-groups.sh \
--bootstrap-server dc2-broker1:9092 \
--describe \
--group orders-processor
Шаг 5: Переключение DNS/Load Balancer
# AWS Route53 (Terraform):
resource "aws_route53_record" "kafka_bootstrap" {
name = "kafka.company.com"
type = "CNAME"
records = ["dc2-broker1.internal"]
ttl = 60 # Заранее установлен низкий TTL
}
# Или через AWS CLI:
aws route53 change-resource-record-sets \
--hosted-zone-id Z1234 \
--change-batch '{"Changes":[{"Action":"UPSERT","ResourceRecordSet":{"Name":"kafka.company.com","Type":"CNAME","TTL":60,"ResourceRecords":[{"Value":"dc2-broker1.internal"}]}}]}'
Шаг 6-7: Запуск consumer’ов и продюсеров на DC-2
# В Kubernetes: обновить ConfigMap с новым bootstrap.servers
kubectl patch configmap kafka-config \
--patch '{"data":{"bootstrap.servers":"dc2-broker1:9092,dc2-broker2:9092"}}'
# Запустить приложения
kubectl scale deployment order-service --replicas=3 -n production
kubectl scale deployment payment-service --replicas=3 -n production
kubectl scale deployment orders-processor --replicas=6 -n production
Шаг 8: Верификация
# Проверить consumer lag (должен снижаться):
watch -n 5 kafka-consumer-groups.sh \
--bootstrap-server dc2-broker1:9092 \
--describe \
--group orders-processor
# Проверить producer throughput:
# kafka.producer:type=producer-metrics,client-id=order-service,name=record-send-rate
# Должен соответствовать pre-failover значению
# UnderReplicatedPartitions на DC-2:
kafka-topics.sh \
--bootstrap-server dc2-broker1:9092 \
--describe \
--under-replicated-partitions
# Должно быть пусто
Failover в стандартном Kafka — это ручное (или скриптованное) решение. Kafka не имеет встроенного автоматического failover на уровне Multi-DC. Это принципиальное отличие от решений вроде Oracle Data Guard или AWS RDS Multi-AZ. Ваши automation scripts ДОЛЖНЫ быть протестированы регулярно (game day, chaos engineering) — script, запущенный впервые во время реального инцидента, почти наверняка не сработает корректно.
Процедура Failback
После восстановления DC-1 нужно вернуть трафик обратно. Это ещё более деликатная операция: DC-2 накопил новые данные, которые нужно реплицировать обратно в DC-1.
Failback = Reverse MM2 direction + replicate DC-2->DC-1 + translate offsets + redirect traffic
Шаг 1: Добавить обратное направление MM2 (dc2->dc1).
# Добавить в mm2.properties:
dc2->dc1.enabled = true
dc2->dc1.topics = .*
dc2->dc1.topics.exclude = .*\.internal, heartbeats
dc2->dc1.emit.checkpoints.enabled = true
dc2->dc1.sync.group.offsets.enabled = true
# Применить изменения (rolling update MM2 Connect cluster):
# или через REST API:
curl -X PUT http://mm2-connect:8083/connectors/mm2-source-dc2-to-dc1/config \
-H "Content-Type: application/json" \
-d '{"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", ...}'
Шаг 2: Дождаться завершения репликации DC-2->DC-1.
Проверить что LAG mm2-source-dc2-to-dc1 = 0 перед переключением трафика.
Шаг 3: Транслировать offset’ы из DC-2 в DC-1 и переключить трафик.
Те же шаги что и при failover, но в обратном направлении.
Мониторинг MM2: JMX и Prometheus
JMX MBeans для MM2
# MirrorSourceConnector метрики
kafka.mirror:type=MirrorSourceConnector,target=dc2
- replication-latency-ms-avg # Средняя задержка репликации (RPO индикатор)
- replication-latency-ms-max # Максимальная задержка
- byte-rate # Bytes/s реплицируется
- record-count # Всего записей реплицировано
# Connect task метрики (per-task)
kafka.connect:type=connector-task-metrics,connector=MirrorSourceConnector,task=0
- batch-size-avg # Среднее число записей в poll-batch
- offset-commit-avg-time-ms # Задержка записи checkpoint
# Connector status (RUNNING | PAUSED | FAILED)
kafka.connect:type=connector-metrics,connector=MirrorSourceConnector
- status
Prometheus JMX Exporter конфигурация для MM2
# Добавить в jmx-exporter config для MM2 Connect worker:
rules:
- pattern: 'kafka\.mirror<type=MirrorSourceConnector, target=(.+)><>replication-latency-ms-avg'
name: kafka_mirror_replication_latency_ms_avg
labels:
target: "$1"
type: GAUGE
- pattern: 'kafka\.mirror<type=MirrorSourceConnector, target=(.+)><>byte-rate'
name: kafka_mirror_byte_rate_bytes_per_second
labels:
target: "$1"
type: GAUGE
- pattern: 'kafka\.mirror<type=MirrorSourceConnector, target=(.+)><>record-count'
name: kafka_mirror_record_count_total
labels:
target: "$1"
type: COUNTER
- pattern: 'kafka\.connect<type=connector-task-metrics,connector=(.+),task=(.+)><>offset-commit-avg-time-ms'
name: kafka_connect_connector_task_offset_commit_avg_time_ms
labels:
connector: "$1"
task: "$2"
type: GAUGE
Grafana Prometheus Alerts
groups:
- name: kafka_mm2_alerts
rules:
# RPO превышен
- alert: KafkaMM2HighReplicationLatency
expr: kafka_mirror_replication_latency_ms_avg > 5000
for: 2m
labels:
severity: warning
annotations:
summary: "MM2 replication latency > 5 seconds. RPO SLA may be breached."
# Репликация остановлена
- alert: KafkaMM2ReplicationStalled
expr: kafka_mirror_replication_latency_ms_avg > 30000
for: 1m
labels:
severity: critical
annotations:
summary: "MM2 replication stalled (latency > 30s). Failover risk."
# Heartbeat недоступен
- alert: KafkaMM2HeartbeatMissing
expr: time() - kafka_mirror_heartbeat_timestamp > 60
for: 30s
labels:
severity: critical
annotations:
summary: "MM2 heartbeat missing. Source cluster may be unavailable."
# Connector упал
- alert: KafkaMM2ConnectorFailed
expr: kafka_connect_connector_status{connector=~"MirrorSourceConnector|MirrorCheckpointConnector"} == 0
for: 0s
labels:
severity: critical
annotations:
summary: "MM2 connector {{ $labels.connector }} is not RUNNING."
Ключевые выводы
- RPO = replication lag в момент сбоя. Контролируется через
replication-latency-ms-avgJMX метрику. Снижается через: увеличениеtasks.max, оптимизацию сети, выделенный Connect кластер для MM2. - RTO = сумма фаз восстановления. Типично 5-25 минут. Снижается через:
sync.group.offsets.enabled=true, низкий DNS TTL, автоматизированные скрипты, pre-warm consumers на DR. - Failover — это ручное решение в стандартной Kafka. Automation scripts должны регулярно тестироваться в рамках game day процедур.
- Failback сложнее failover: DC-2 накопил новые данные, которые нужно реплицировать обратно. Добавьте обратное направление MM2 (dc2->dc1) перед переключением.
- Мониторинг:
replication-latency-ms-avg,heartbeat age,connector status— минимальный набор метрик для MM2.