Saga Pattern: Хореография и Оркестрация
Распределённые транзакции — одна из самых сложных задач в микросервисной архитектуре. Как гарантировать, что изменения, затрагивающие несколько независимых сервисов, либо все завершатся успешно, либо все будут отменены?
Классический ответ: Two-Phase Commit (2PC). Теоретически красивый, практически неприменимый при Kafka: 2PC требует блокировок на время голосования всех участников, не масштабируется, и Kafka сам по себе не участвует в XA-транзакциях. При 5-10 сервисах в транзакции 2PC превращается в узкое горлышко.
Saga — альтернатива: последовательность локальных транзакций. Каждый шаг меняет состояние одного сервиса и публикует событие. Если шаг провалился — выполняются компенсирующие транзакции, откатывающие предыдущие шаги.
Что такое Saga: определение и свойства
Saga — это протокол для distributed coordination: цепочка локальных транзакций T1, T2, ..., Tn, где каждая Ti имеет компенсирующую транзакцию Ci. При сбое на шаге k выполняются Ck-1, ..., C1 в обратном порядке.
Ключевые свойства:
- Нет глобальных блокировок. Каждый сервис блокирует только своё хранилище на время своей локальной транзакции.
- Нет ACID изоляции. Промежуточные состояния видимы другим транзакциям (OrderCreated, но PaymentPending — это реальное состояние системы, видимое между шагами).
- Eventual consistency. Система приходит к согласованному состоянию через цепочку событий, не мгновенно.
- Компенсирующие транзакции — это бизнес-операции, а не database rollback.
PaymentRefunded— это реальный возврат денег, а не откат строки в БД.
Saga не обеспечивает изоляцию ACID. Пока saga выполняется (например, Payment в процессе), другой пользователь может увидеть заказ в статусе NEW. Проектируйте запросы с учётом частичной saga завершённости. Не полагайтесь на strong consistency между сервисами.
Визуализация: Хореография vs Оркестрация
Хореография: децентрализованная координация
Хореография — каждый сервис знает, на какие события реагировать и какие события публиковать. Центрального координатора нет. Сервисы связаны через события, не через прямые вызовы.
Структура топиков для хореографии заказа:
orders.events # OrderCreated, OrderConfirmed, OrderCancelled
payments.events # PaymentCharged, PaymentFailed, PaymentRefunded
inventory.events # InventoryReserved, InventoryFailed, InventoryReleased
notifications.events # NotificationSent
Happy path: создание заказа в e-commerce
1. Order Service:
- Создаёт заказ в локальной БД (status=PENDING)
- Публикует: orders.events / OrderCreated{orderId, items, totalAmount}
2. Payment Service:
- Потребляет: OrderCreated
- Списывает деньги с карты клиента (вызывает платёжный шлюз)
- При успехе публикует: payments.events / PaymentCharged{orderId, amount}
- При ошибке публикует: payments.events / PaymentFailed{orderId, reason}
3. Inventory Service:
- Потребляет: PaymentCharged
- Резервирует товары на складе
- При успехе публикует: inventory.events / InventoryReserved{orderId, items}
- При ошибке публикует: inventory.events / InventoryFailed{orderId, reason}
4. Notification Service:
- Потребляет: InventoryReserved
- Отправляет email-подтверждение клиенту
- Публикует: notifications.events / NotificationSent{orderId, channel: 'email'}
5. Order Service:
- Потребляет: InventoryReserved
- Обновляет статус заказа: PENDING -> CONFIRMED
- Публикует: orders.events / OrderConfirmed{orderId}
Order Service
Order Service: consumer group cg-order-inventory (потребляет InventoryReserved) и producer для orders.events. При получении InventoryReserved -> UPDATE orders SET status='CONFIRMED' -> publish OrderConfirmed.Payment Service
Payment Service: consumer group cg-payment-orders (потребляет orders.events). При OrderCreated -> charge payment -> publish PaymentCharged | PaymentFailed. Идемпотентен: если orderId уже processed -> skip.Inventory Service
Inventory Service: consumer group cg-inventory-payments (потребляет payments.events). При PaymentCharged -> reserve items -> publish InventoryReserved | InventoryFailed. Если товар недоступен -> InventoryFailed.Notification Service
Notification Service: consumer group cg-notify-inventory (потребляет inventory.events). При InventoryReserved -> send email -> publish NotificationSent. Fire-and-forget для клиента.Компенсирующий путь: InventoryFailed
Inventory Service: publish InventoryFailed{orderId, reason: "OUT_OF_STOCK"}
-> Payment Service: consume InventoryFailed
- Возвращает деньги клиенту (refund)
- Publishes: PaymentRefunded{orderId, amount}
-> Order Service: consume PaymentRefunded
- Обновляет статус: PENDING -> CANCELLED
- Publishes: OrderCancelled{orderId, reason: "OUT_OF_STOCK"}
-> Notification Service: consume OrderCancelled
- Отправляет уведомление об отмене
Реализация Payment Service на псевдокоде:
@KafkaListener(topics = "orders.events", groupId = "cg-payment-orders")
void onOrderEvent(ConsumerRecord<String, OrderEvent> record) {
OrderEvent event = record.value();
String orderId = event.getOrderId();
// Идемпотентность: проверяем, не обрабатывали ли уже
if (processedEvents.contains(event.getEventId())) {
return;
}
if ("OrderCreated".equals(event.getEventType())) {
try {
PaymentResult result = paymentGateway.charge(
event.getCustomerId(),
event.getTotalAmount()
);
producer.send("payments.events", orderId,
new PaymentChargedEvent(orderId, result.getTransactionId(), event.getTotalAmount()));
} catch (PaymentException e) {
producer.send("payments.events", orderId,
new PaymentFailedEvent(orderId, e.getReason()));
}
}
if ("InventoryFailed".equals(event.getEventType())) {
paymentGateway.refund(event.getTransactionId());
producer.send("payments.events", orderId,
new PaymentRefundedEvent(orderId));
}
processedEvents.add(event.getEventId());
}
Оркестрация: централизованная координация
Оркестрация — центральный Saga Orchestrator управляет всем процессом. Orchestrator знает порядок шагов, отправляет команды сервисам, получает ответы, обрабатывает сбои.
Структура топиков для оркестрации:
saga.commands.payments # ChargePayment, RefundPayment
saga.commands.inventory # ReserveInventory, ReleaseInventory
saga.commands.notifications # SendConfirmation, SendCancellation
saga.replies # PaymentCharged, PaymentFailed, InventoryReserved, InventoryFailed
saga.state # compact topic: sagaId -> текущее состояние саги
State machine Orchestrator:
States: STARTED
-> PAYMENT_PENDING (отправлена ChargePayment команда)
-> INVENTORY_PENDING (получен PaymentCharged, отправлена ReserveInventory)
-> NOTIFY_PENDING (получен InventoryReserved, отправлена SendConfirmation)
-> COMPLETED (получен NotificationSent)
Compensating path:
-> COMPENSATING_INVENTORY_FAILED (получен InventoryFailed)
-> REFUNDING_PAYMENT (отправлена RefundPayment)
-> ORDER_CANCELLING (получен PaymentRefunded, отправлена SendCancellation)
-> FAILED (получен NotificationSent для отмены)
Saga Orchestrator
Saga Orchestrator: consumer на saga.replies, producer на saga.commands.*. Хранит состояние саги в saga.state (compact topic). При рестарте восстанавливает состояние из saga.state topic replay.Payment Service
Payment Service: consumer на saga.commands.payments. Выполняет ChargePayment команду. Публикует PaymentCharged | PaymentFailed в saga.replies. Сервис не знает о других сервисах — только о своей команде и ответе.Inventory Service
Inventory Service: consumer на saga.commands.inventory. Выполняет ReserveInventory команду (только если Orchestrator дал команду, то есть PaymentCharged получен). Публикует InventoryReserved | InventoryFailed.Notification Service
Notification Service: consumer на saga.commands.notifications. Отправляет email по команде SendConfirmation. Публикует NotificationSent. Полностью изолирован от логики оплаты и склада.Orchestrator с Kafka transactions:
@KafkaListener(topics = "saga.replies", groupId = "cg-orchestrator")
void onReply(ConsumerRecord<String, SagaReply> record) {
String sagaId = record.key();
SagaReply reply = record.value();
OrderSaga saga = stateStore.load(sagaId);
SagaTransition transition = saga.processReply(reply);
// Атомарно: публикуем команду + обновляем состояние саги
producer.beginTransaction();
try {
if (transition.hasNextCommand()) {
producer.send(new ProducerRecord<>(
"saga.commands." + transition.getTargetService(),
sagaId,
transition.getCommand()
));
}
// Персистим состояние саги в compact topic
producer.send(new ProducerRecord<>(
"saga.state",
sagaId,
saga.withNewState(transition.getNewState())
));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
throw e;
}
}
Kafka транзакция гарантирует: либо и команда, и обновление состояния опубликованы, либо ни то, ни другое. Нет состояния, где команда отправлена но состояние саги не обновлено.
Восстановление Orchestrator при рестарте:
1. Orchestrator стартует.
2. Читает saga.state topic с auto.offset.reset=earliest.
3. Все незавершённые саги восстанавливаются в памяти.
4. Для каждой саги в промежуточном состоянии:
- Проверяет: была ли уже отправлена команда?
- Если нет — отправляет команду повторно (idempotent).
- Если да — ждёт ответа.
5. Orchestrator готов к обработке replies.
Хореография vs Оркестрация: сравнительный анализ
| Критерий | Хореография | Оркестрация |
|---|---|---|
| Связность сервисов | Слабая (через события) | Умеренная (команды от Orchestrator) |
| Единая точка сбоя | Нет (каждый сервис независим) | Orchestrator (требует HA) |
| Сложность отладки | Высокая (нет единого view) | Низкая (состояние в saga.state) |
| Добавление нового шага | Сложно (обновить N сервисов) | Просто (изменить Orchestrator) |
| Мониторинг saga progress | Трудно (требует distributed tracing) | Легко (query saga.state) |
| Подходит для | 2-4 сервиса, линейный flow | 5+ сервисов, ветвление, сложная компенсация |
| Kafka-фича | Consumer groups на domain topics | Transactional producer, command/reply topics |
Правило выбора:
- Хореография: немного сервисов (2-4), простой линейный flow без сложной компенсации, команда предпочитает event-driven стиль.
- Оркестрация: много сервисов (5+), сложные ветвления (разные пути в зависимости от ответа), требования к мониторингу saga progress, сложная компенсация с ветвлением.
Idempotency: обязательное требование для всех участников
Kafka доставляет события at-least-once. Перезапуск consumer, rebalance, сетевой сбой — любой из этих событий может вызвать повторную доставку одного и того же события. Каждый участник саги должен обрабатывать дублирующиеся события без побочных эффектов.
Техники идемпотентности:
-
Уникальный eventId как идемпотентный ключ. Храните обработанные eventId в базе (Redis SET, PostgreSQL таблица). При получении события — проверяете наличие eventId. Если уже обработан — пропускаете.
-
Idempotent бизнес-операция. Зарезервировать товары для orderId X — idempotent: если уже зарезервировано, возвращаем тот же ответ без повторного резервирования.
-
Conditional UPDATE. Обновляем состояние только если текущее состояние ожидаемое:
UPDATE orders SET status = 'CONFIRMED'
WHERE id = 'order-123' AND status = 'PAYMENT_CONFIRMED';
-- Если status уже CONFIRMED (дубль события) - UPDATE затронет 0 строк
Ключевые выводы
- Saga — последовательность локальных транзакций с компенсирующими шагами. Нет глобальных блокировок, нет ACID изоляции.
- Хореография — сервисы координируются через domain events. Слабая связность, сложный мониторинг.
- Оркестрация — центральный Orchestrator управляет шагами через command/reply топики. Проще мониторить, легче добавлять шаги.
- Kafka transactions позволяют Orchestrator атомарно публиковать команду и обновлять состояние саги.
- Все участники саги обязаны быть идемпотентными — дублированные события неизбежны при Kafka.
- Промежуточные состояния видимы: проектируйте UI и запросы с учётом eventual consistency.