Разбор эталонного решения
Это одно из корректных решений архитектурного challenge. Ваше решение может отличаться — главное, обоснованы ли ваши решения. Используйте этот разбор как эталон, с которым можно сравнить свой подход и проверить ключевые технические решения.
Не существует единственно правильного решения для системного дизайна. Данный разбор отражает решения, принятые конкретным старшим инженером с конкретными допущениями. Если ваш выбор отличается — убедитесь, что у вас есть обоснование, и сравните trade-offs.
Раздел 1: Эталонный каталог топиков
Соглашение об именовании
Используем формат {domain}.{entity}.{event-type} из Модуля 12. Домен = команда-владелец. Entity = бизнес-сущность. Event-type = прошедшее время (что произошло).
Обоснование количества партиций
Исходные данные: 10 000 заказов/сек, средний размер события 500 байт.
Пропускная способность на партицию (конservативная оценка):
1 000 сообщений/сек при 500 байт = 500 KB/s на партицию
Kafka может обрабатывать до 10 MB/s per partition, но:
1 consumer thread = практически ~1 000 msg/s при реальной обработке
Минимальное число партиций = ceil(10 000 / 1 000) = 10
Округление до кратного числу брокеров (6 брокеров): 12 партиций
Раздел 2: Архитектура сервисов
Решение: Orchestration Saga
При 4 сервисах в цепочке выполнения заказа (Order → Payment → Inventory → Notification) выбор между хореографией и оркестрацией принципиален.
Решение: Orchestration (Оркестрация)
Обоснование: При 4 сервисах хореография становится трудноотслеживаемой — нет единого взгляда на прогресс выполнения заказа. При сбое на шаге 3 (резервирование инвентаря) нужно компенсировать шаг 2 (платёж — возврат средств). Логика компенсации разбросана по 4 сервисам без центрального координатора.
Оркестрация: вся логика в одном месте — Saga Orchestrator. Добавить новый шаг = изменить только оркестратор. Debug: состояние видно в saga.order-fulfillment.state.
Order Service
Order Service: принимает HTTP запрос на создание заказа. Записывает заказ в собственную БД И запись в outbox-таблицу — в ОДНОЙ транзакции. Debezium CDC или polling publisher читает outbox и публикует в Kafka.orders.order.created
orders.order.created: 12 партиций, ключ=orderId. Order Service продюсирует через Debezium Outbox Connector — гарантия at-least-once без dual write проблемы.Saga Orchestrator
Saga Orchestrator: stateful сервис, читает orders.order.created, запускает saga instance. Состояние сохраняет в saga.order-fulfillment.state (compact topic). Координирует Payment → Inventory → Notification.saga.commands.payments
saga.commands.payments: команды от оркестратора к Payment Service. ChargePayment, RefundPayment. Ключ=sagaId для упорядоченности команд.Payment Service
Payment Service: idempotent consumer — dedup по orderId (хранит processed_order_ids в Redis). Использует enable.idempotence=true + transactional.id для exactly-once produce.payments.payment.charged
payments.payment.charged: результат успешного платежа. Ключ=orderId. Оркестратор читает и переходит к следующему шагу — резервированию инвентаря.Inventory Service
Inventory Service: читает saga.commands.inventory. Резервирует товар. При успехе — публикует inventory.item.reserved. При нехватке — failure event, оркестратор компенсирует (RefundPayment).Notification Service
Notification Service: читает orders.order.confirmed от оркестратора. Отправляет email/SMS. Fire-and-forget — сбой уведомления не блокирует saga.Order Confirmed
Заказ подтверждён. Оркестратор публикует orders.order.confirmed. Состояние саги = COMPLETED. Analytics и Finance читают обогащённые данные.Transactional Outbox для Order Service
Order Service использует паттерн transactional outbox из Модуля 12. Без него — dual write: запись в БД и отправка в Kafka не атомарны.
-- В одной транзакции PostgreSQL:
BEGIN;
INSERT INTO orders (id, customer_id, status, amount)
VALUES ('order-123', 'cust-1', 'CREATED', 99.99);
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES (
'Order',
'order-123',
'OrderCreated',
'{"orderId":"order-123","customerId":"cust-1","amount":99.99}'
);
COMMIT;
-- Обе записи атомарны. Debezium читает WAL и публикует outbox запись в Kafka.
Exactly-once для Payment Service
Решение: Idempotent consumer + transactional.id на producer.
- Consumer: проверяет Redis cache по
orderIdперед обработкой. ЕслиorderIdуже обработан — skip (idempotent handling). - Producer:
transactional.id=payment-service-{partition},enable.idempotence=true. Атомарная публикацияpayments.payment.chargedи обновления статуса.
Почему не только Kafka transactions для consumer? Idempotent consumer на стороне приложения более надёжен — не зависит от isolation.level=read_committed в downstream.
Раздел 3: Стриминговый конвейер
Kafka Streams Topology
// Топология обогащения заказов
StreamsBuilder builder = new StreamsBuilder();
// Исходный поток заказов
KStream<String, OrderCreated> orders = builder
.stream("orders.order.created",
Consumed.with(Serdes.String(), orderCreatedSerde));
// KTable профилей клиентов (changelog из customer-service)
KTable<String, CustomerProfile> customers = builder
.table("customers.customer.profile",
Consumed.with(Serdes.String(), customerProfileSerde),
Materialized.as("customers-store"));
// Re-key заказов по customerId для JOIN
KStream<String, OrderCreated> ordersByCustomer = orders
.selectKey((orderId, order) -> order.getCustomerId());
// JOIN заказа с профилем клиента
KStream<String, OrderEnriched> enriched = ordersByCustomer.join(
customers,
(order, customer) -> OrderEnriched.builder()
.orderId(order.getOrderId())
.customerId(order.getCustomerId())
.customerName(customer.getFullName())
.customerEmail(customer.getEmail())
.amount(order.getAmount())
.createdAt(order.getCreatedAt())
.build(),
Joined.with(Serdes.String(), orderCreatedSerde, customerProfileSerde)
);
// Re-key обратно по orderId для output топика
enriched
.selectKey((customerId, enrichedOrder) -> enrichedOrder.getOrderId())
.to("orders.order.enriched",
Produced.with(Serdes.String(), orderEnrichedSerde));
Ключевой момент: Re-key по customerId перед JOIN необходим — Kafka Streams JOIN требует совпадения ключей. Затем повторный re-key обратно по orderId для downstream совместимости (JDBC Sink ожидает orderId как первичный ключ).
Connect Pipelines
JDBC Sink (PostgreSQL для Finance):
{
"name": "orders-jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://postgres:5432/analytics",
"topics": "orders.order.enriched",
"table.name.format": "order_analytics",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "order_id",
"auto.create": "false",
"batch.size": "3000",
"consumer.override.max.poll.records": "3000"
}
}
insert.mode=upsert: при compact топике возможны дубликаты при redelivery — upsert безопасен.
Elasticsearch Sink (Search):
{
"name": "products-es-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch:9200",
"topics": "products.product.updated",
"type.name": "_doc",
"key.ignore": "false",
"schema.ignore": "false",
"behavior.on.null.values": "delete",
"batch.size": "1000",
"linger.ms": "100"
}
}
behavior.on.null.values=delete: tombstone записи (null value) в compact топике интерпретируются как удаление документа из Elasticsearch.
Раздел 4: Дизайн безопасности
SASL/SCRAM principals
Каждый сервис — отдельный SCRAM principal. Компрометация одного credential не затрагивает остальные.
Principals:
order-service — Order Service application
payment-service — Payment Service application
inventory-service — Inventory Service application
notification-service — Notification Service application
saga-orchestrator — Saga Orchestrator application
streams-app — Kafka Streams enrichment app
analytics-connector — Connect JDBC Sink for analytics
search-connector — Connect Elasticsearch Sink
mirrormaker2 — MM2 replication process
admin — Operational super user (super.users)
ACL матрица
Раздел 5: Multi-DC и Disaster Recovery
Выбор топологии: Active-Passive
Обоснование: E-commerce платформа с единым основным DC и DR резервом. Активный-пассивный режим проще в управлении, не требует разрешения конфликтов при записи с обеих сторон.
Active-active подходит, когда:
- Требуется нулевое время failover (near-zero RTO)
- Два DC равнозначно обслуживают разные географические регионы
- Приложение рассчитано на partition affinity (A-M ключи в DC-1, N-Z в DC-2)
Для нашего сценария (один основной DC) — active-passive с MM2.
MM2 конфигурация
# MirrorMaker 2 — Active-Passive: DC-1 -> DC-2
# Полная конфигурация для production
clusters = dc1, dc2
dc1.bootstrap.servers = kafka-dc1-broker1:9092,kafka-dc1-broker2:9092,kafka-dc1-broker3:9092
dc2.bootstrap.servers = kafka-dc2-broker1:9092,kafka-dc2-broker2:9092,kafka-dc2-broker3:9092
# Только направление DC-1 -> DC-2
dc1->dc2.enabled = true
dc1->dc2.topics = orders\..*, payments\..*, inventory\..*, saga\..*
dc1->dc2.groups = order-processor, payment-processor, inventory-processor
# DC-2 -> DC-1 отключено (active-passive)
dc2->dc1.enabled = false
# Checkpoint connector: синхронизация consumer group offsets
emit.checkpoints.enabled = true
emit.checkpoints.interval.seconds = 10
sync.group.offsets.enabled = true
sync.group.offsets.interval.seconds = 10
# Heartbeat connector: мониторинг здоровья репликации
emit.heartbeats.enabled = true
emit.heartbeats.interval.seconds = 5
# DefaultReplicationPolicy: топики появляются как dc1.orders.order.created на DC-2
replication.policy.class = org.apache.kafka.connect.mirror.DefaultReplicationPolicy
# Параллелизм: по одной task per partition group
tasks.max = 8
# Аутентификация MM2 -> DC-1 и DC-2
dc1.security.protocol = SASL_SSL
dc1.sasl.mechanism = SCRAM-SHA-256
dc2.security.protocol = SASL_SSL
dc2.sasl.mechanism = SCRAM-SHA-256
Почему DefaultReplicationPolicy, а не IdentityReplicationPolicy?
DefaultReplicationPolicy: топикorders.order.createdна DC-1 появляется какdc1.orders.order.createdна DC-2. Consumers на DC-2 явно настроены на чтение с префиксомdc1.. Это предотвращает случайную запись в “оригинальный” топик на DR кластере.IdentityReplicationPolicy: те же имена на обоих кластерах. Требуется только при active-active, где приложения должны читать/писать независимо от DC. Для active-passive создаёт риск ambiguity.
RPO/RTO анализ
RPO (Recovery Point Objective = максимальная допустимая потеря данных):
MM2 replication latency при нормальной работе: 2–5 секунд
emit.checkpoints.interval.seconds = 10 секунд
Максимальный RPO = max(replication lag) + checkpoint interval
= 5 + 10 = ~15 секунд (значительно лучше требования 30 сек)
RTO (Recovery Time Objective = время восстановления):
| Этап | Время | Комментарий |
|---|---|---|
| Обнаружение сбоя (мониторинг) | ~2 мин | Prometheus alert + PagerDuty |
| Принятие решения о failover | ~1 мин | Ручное или автоматическое |
| Дренирование MM2 лага | ~1 мин | Или принять текущий RPO |
| DNS failover (TTL 60 сек) | ~1 мин | Обновление A-записи load balancer |
| Offset translation (auto) | ~0 мин | sync.group.offsets.enabled=true — автоматически |
| Перезапуск сервисов на DC-2 | ~2 мин | Kubernetes rolling restart |
| Итого RTO | ~7 мин | Менее требования 10 мин |
Runbook failover
- Prometheus алерт:
kafka_mirror_heartbeat_age_seconds > 60на DC-2 (репликация остановилась) - On-call инженер проверяет доступность DC-1:
kafka-topics.sh --bootstrap-server kafka-dc1:9092 --list - Если DC-1 недоступен: принять решение о failover (согласно incident severity playbook)
- Остановить все producers на DC-1 (circuit breaker / feature flag в приложении)
- Зафиксировать текущий MM2 lag:
kafka-consumer-groups.sh --bootstrap-server kafka-dc2:9092 --describe --group mirrormaker2-dc1-dc2 - Проверить наличие consumer group offsets на DC-2:
kafka-consumer-groups.sh --bootstrap-server kafka-dc2:9092 --describe --group order-processor - Обновить DNS: изменить A-запись load balancer с DC-1 endpoints на DC-2 endpoints
- Consumers на DC-2 стартуют и читают из
dc1.orders.order.created(DefaultReplicationPolicy prefix) - Проверить стабилизацию: consumer lag снижается, UnderReplicatedPartitions = 0, бизнес-метрики возобновились
Раздел 6: Мониторинг и ёмкость
Ключевые Prometheus алерты
groups:
- name: kafka-production-alerts
rules:
# Критический: под-реплицированные партиции
- alert: KafkaUnderReplicatedPartitions
expr: kafka_server_replicamanager_underreplicatedpartitions > 0
for: 5m
labels:
severity: critical
annotations:
summary: "UnderReplicatedPartitions > 0 на {{ $labels.instance }}"
description: "Возможен сбой брокера или сетевая проблема. Немедленная диагностика."
# Warning: высокий consumer lag
- alert: KafkaConsumerLagHigh
expr: kafka_consumer_group_lag > 5000
for: 10m
labels:
severity: warning
annotations:
summary: "Consumer lag {{ $value }} для группы {{ $labels.group }}"
# Critical: очень высокий consumer lag
- alert: KafkaConsumerLagCritical
expr: kafka_consumer_group_lag > 50000
for: 5m
labels:
severity: critical
# Warning: задержка produce
- alert: KafkaProduceLatencyHigh
expr: kafka_network_requestmetrics_totaltimems{request="Produce", quantile="0.99"} > 200
for: 5m
labels:
severity: warning
# Critical: задержка репликации MM2
- alert: MM2ReplicationLagHigh
expr: kafka_mirror_replication_latency_ms_avg > 10000
for: 2m
labels:
severity: critical
annotations:
summary: "MM2 replication lag > 10 секунд — RPO под угрозой"
Ёмкостный план (детальный расчёт)
Входные данные:
Пиковый throughput: 10 000 заказов/сек
Средний размер события: 500 байт
Replication Factor: 3
Retention: 30 дней (основные топики)
Число топиков: ~10 (из каталога)
Расчёт:
Суммарный produce rate: 10 000 × 500 байт = 5 МБ/с (на уровне приложения)
С RF=3: 5 МБ/с × 3 = 15 МБ/с суммарная disk write rate (по всем брокерам)
Суточный объём на диске: 15 МБ/с × 86 400 сек = ~1,3 ТБ/день
30-дневное хранилище: 1,3 × 30 = ~39 ТБ суммарно
Конфигурация кластера:
Минимум брокеров: 3 (для RF=3)
Рекомендуется: 6 (для headroom + reassignment без деградации)
На брокер: 39 / 6 = ~6,5 ТБ + 30% headroom = ~8,5 ТБ SSD
RAM на брокер:
JVM heap: 6 ГБ (фиксированно, не увеличивать — Модуль 10)
Page cache: 24 ГБ (15 МБ/с × 1 600 сек = ~24 ГБ "горячих" данных)
Итого: 32 ГБ RAM per broker
Сеть:
Inbound: 5 МБ/с producer
Replication: 5 × 2 = 10 МБ/с (2 follower копии)
Consumer: ~15 МБ/с (3 consumer groups × 5 МБ/с)
Итого per broker: ~30 МБ/с = 0,24 Гбит/с
При 1 Гбит/с NIC: 24% утилизация (значительно ниже 70% порога)
Итоговая спецификация на брокер:
CPU: 16 ядер
RAM: 32 ГБ
Диск: 9 ТБ NVMe SSD (с 30% запасом)
NIC: 1 Гбит/с
Брокеров: 6 (3 в DC-1, 3 в DC-2)
Обратите внимание на соотношение JVM heap / page cache. 6 ГБ heap — жёсткое правило (Модуль 10: больший heap = GC паузы). Весь дополнительный RAM уходит в page cache. Именно page cache обеспечивает низкую latency — OS держит горячие log сегменты в памяти, consumer читает без disk I/O.