Event Sourcing с Kafka
Традиционные CRUD-системы хранят только текущее состояние: таблица orders содержит строку для каждого заказа, и при обновлении строка перезаписывается. История изменений, как правило, теряется. Event Sourcing переворачивает эту модель: вместо текущего состояния система хранит последовательность событий, из которых состояние вычисляется.
Kafka — не просто брокер сообщений для event sourcing. Kafka — это нативный event store. Её append-only лог, неизменяемые записи, именованные партиции и гарантированный порядок внутри партиции — это именно те свойства, которые нужны для event sourcing.
Почему Event Sourcing: задачи, которые он решает
Event Sourcing появляется там, где CRUD-модель начинает ломаться:
Аудит и compliance. CRUD хранит последнее состояние. Event sourcing хранит каждое изменение. Финансовые системы, медицинские записи, юридические системы требуют полной истории: кто, когда, что изменил. С event sourcing ответ всегда в логе.
Временные запросы (temporal queries). “Какой был остаток на счёте три недели назад в 14:35?” В CRUD-системе без специальной таблицы истории — ответа нет. В event sourcing — нужно просто воспроизвести события до заданного timestamp.
Отладка и воспроизведение ошибок. Если система ведёт себя неправильно, воспроизведите события заново в изолированной среде. Точная реплика состояния в любой момент времени — бесценный инструмент для диагностики.
Развитие бизнес-логики. Добавляете новую projection — запускаете новый consumer с auto.offset.reset=earliest, он читает все события с самого начала и строит новый read model. Нет необходимости в batch-миграциях данных.
Decoupled read models. Несколько разных read models из одного потока событий — разные агрегации, разные базы данных, разные форматы — без изменений write side.
Kafka как нативный event store
Почему Kafka, а не специализированные event store системы (EventStore, Axon Server)?
Kafka — это append-only лог. Запись добавляется только в конец партиции, никогда не изменяется. Это буквально и есть event store. Каждое событие — запись в логе с уникальным offset внутри партиции.
Топик per aggregate type. Каждый тип агрегата получает свой топик:
orders.events— события для агрегата Orderpayments.events— события для агрегата Paymentinventory.events— события для агрегата InventoryItem
Ключ = aggregate ID. Это критично: все события одного агрегата должны попадать в одну партицию. Ключ order-123 всегда хешируется в одну и ту же партицию (при фиксированном числе партиций). Это гарантирует:
- Порядок событий в пределах одного агрегата — Kafka гарантирует порядок внутри партиции.
- Возможность параллельного чтения: разные агрегаты обрабатываются разными consumer-потоками.
log.cleanup.policy=delete с log.retention.ms=-1. Для полного event log нужна бесконечная retention. Compaction удалит события и оставит только последнее по ключу — для event log это неприемлемо (теряется история). Snapshot топик — отдельно с compact.
Анатомия события: envelope + payload
Событие в event sourced системе — не просто произвольный JSON. Оно имеет стандартизированную структуру:
{
"eventId": "e7c3a1b2-4f5d-4e6f-8a9b-0c1d2e3f4a5b",
"aggregateId": "order-123",
"aggregateType": "Order",
"eventType": "OrderCreated",
"version": 1,
"timestamp": "2026-04-16T10:00:00.000Z",
"correlationId": "req-abc-xyz",
"causationId": "cmd-create-order-456",
"payload": {
"customerId": "cust-789",
"items": [
{"sku": "SKU-001", "qty": 2, "price": 49.99}
],
"totalAmount": 99.98,
"currency": "RUB"
}
}
eventId— уникальный идентификатор события. Используется для дедупликации при at-least-once доставке.aggregateId+aggregateType— идентифицируют, к какому агрегату относится событие.version— порядковый номер события в жизненном цикле агрегата (1, 2, 3…). Отличается от Kafka offset: offset глобален в партиции, version — только для данного агрегата.correlationId— связывает событие с исходным запросом (для трассировки).causationId— ID команды, которая породила это событие.
EventSourcingDiagram: полный поток
Паттерн Command → Validate → Event
В event sourcing поток обработки запроса выглядит следующим образом:
1. Команда (Command). Выражает намерение изменить состояние. Команды именуются в повелительном наклонении:
CreateOrder(customerId, items)AddItem(orderId, itemId, quantity)ConfirmOrder(orderId, paymentId)
Команда — это запрос, а не факт. Она может быть отклонена.
2. Загрузка текущего состояния. Command handler загружает текущее состояние агрегата из event store. Для небольших агрегатов — полное воспроизведение всех событий. Для крупных — снапшот + инкрементальное воспроизведение.
3. Валидация бизнес-правил. Применяем бизнес-правила к текущему состоянию:
- Можно ли добавить товар? Заказ не в статусе SHIPPED?
- Достаточно ли позиций на складе?
- Не превышен ли лимит суммы заказа?
Если правила нарушены — возвращаем ошибку. Событие не генерируется.
4. Генерация события. Если валидация прошла — генерируем одно или несколько событий:
OrderCreated{...}ItemAdded{...}
5. Запись в Kafka. Событие публикуется в топик. Ключ = aggregateId. Value = сериализованное событие (Avro/JSON). После успешного produce — состояние изменено (source of truth).
6. Projection. Consumer-ы читают события и обновляют read models в PostgreSQL, Elasticsearch, Redis — для запросов.
Воспроизведение состояния: fold over events
Текущее состояние агрегата вычисляется как свёртка (fold) всех его событий:
state = events.foldLeft(initialState)(applyEvent)
Пример на псевдокоде:
interface OrderState {
id: string;
status: "NEW" | "PAID" | "SHIPPED" | "CANCELLED";
items: Item[];
totalAmount: number;
}
function applyEvent(state: OrderState, event: DomainEvent): OrderState {
switch (event.eventType) {
case "OrderCreated":
return {
...state,
id: event.payload.orderId,
status: "NEW",
items: event.payload.items,
totalAmount: event.payload.totalAmount,
};
case "ItemAdded":
return {
...state,
items: [...state.items, event.payload.item],
totalAmount: state.totalAmount + event.payload.item.price,
};
case "PaymentConfirmed":
return { ...state, status: "PAID" };
case "OrderShipped":
return { ...state, status: "SHIPPED" };
default:
return state;
}
}
// Воспроизведение: читаем все события из Kafka для order-123
const events = kafkaConsumer.readAll("orders.events", key: "order-123");
const currentState = events.reduce(applyEvent, initialOrderState);
Это детерминированный процесс: одинаковая последовательность событий всегда даёт одинаковое состояние. Никаких сюрпризов, никаких side effects.
Kafka не является базой данных с произвольным доступом. Она не поддерживает запрос “дай мне все события для aggregate-id order-123 начиная с offset 1500”. Для event sourcing с Kafka требуется либо (1) один агрегат = одна партиция (нереалистично при большом числе агрегатов), либо (2) проекция event store в PostgreSQL/DynamoDB для быстрой загрузки событий конкретного агрегата. Kafka — source of truth, но загрузка событий агрегата чаще выполняется из проекции, а не из Kafka напрямую.
Снапшоты: оптимизация воспроизведения
Проблема: агрегат с 50,000 событиями требует 50,000 применений applyEvent при каждой загрузке. Это занимает секунды.
Решение — снапшот: периодическая материализация текущего состояния агрегата. При загрузке: найти последний снапшот, прочитать только события после снапшота.
Snapshot топик в Kafka:
Топик: orders.snapshots
cleanup.policy: compact
Ключ: aggregateId (order-123)
Value: { state: {...}, lastEventVersion: 1000, snapshotTimestamp: ... }
С log.cleanup.policy=compact Kafka хранит только последний снапшот для каждого aggregateId. Старые снапшоты автоматически удаляются при log compaction.
Когда создавать снапшот?
- Каждые N событий (например, каждые 100 событий).
- При достижении определённого состояния агрегата (OrderFulfilled — финальное состояние).
- По расписанию (раз в час для активных агрегатов).
Конфигурация snapshot топика:
log.cleanup.policy=compact
min.compaction.lag.ms=3600000
# Минимум 1 час до compaction — свежий снапшот не сразу удаляется
min.cleanable.dirty.ratio=0.1
min.compaction.lag.ms — ключевой параметр: определяет, как долго событие должно быть “старым” прежде чем лог-компактор может его удалить. Для snapshot топика: достаточно большое значение, чтобы не удалять актуальные снапшоты.
Snapshot v=1000
orders.snapshots topic. log.cleanup.policy=compact. Хранит только последний снапшот на ключ (aggregateId). Снапшот включает: state объект + lastEventVersion=1000.State v=1000
Command Handler загружает снапшот (state при version=1000) из orders.snapshots. Затем читает события из orders.events начиная с version 1001.State v=1047
Применяем applyEvent к 47 событиям (v1001..v1047). Вместо 1047 событий — только 47. Ускорение в 22 раза для данного агрегата.Validate Command
Агрегат готов к обработке следующей команды. Текущее состояние актуально на version 1047.Версионирование событий и Schema Registry
Бизнес-логика меняется. Структура события OrderCreated v1 может отличаться от v2. Как обеспечить совместимость?
Schema Registry с Avro. Каждое событие публикуется через Schema Registry. Схема orders.events-value версионируется автоматически. При чтении старых событий Avro-десериализатор автоматически применяет schema evolution rules (добавление optional полей — backward compatible).
Upcasting событий. Если изменение ломает обратную совместимость (переименование поля, смена типа), применяется upcasting: при чтении старого события специальный transformer преобразует его к текущей схеме:
function upcastEvent(event: RawEvent): DomainEvent {
if (event.eventType === "OrderCreated" && event.version === 1) {
// v1: поле 'amount' -> v2: поле 'totalAmount'
return {
...event,
version: 2,
payload: {
...event.payload,
totalAmount: event.payload.amount,
currency: "RUB", // default для старых событий
},
};
}
return event;
}
Upcasting применяется при чтении, не изменяя хранимые события. Это ключевой принцип: события в Kafka никогда не изменяются, upcasting — слой абстракции в application коде.
Рекомендации по Schema Registry для event store:
- Стратегия именования субъектов:
TopicNameStrategy(один топик = одна схема). Если топик содержит несколько типов событий —RecordNameStrategy(OrderCreated-value,ItemAdded-value). - Режим совместимости:
BACKWARD_TRANSITIVE— каждая новая схема должна быть совместима со всеми предыдущими версиями. - Перед добавлением нового поля — всегда с default значением.
Отсылка к Модулю 06 (Schema Registry): все режимы совместимости (BACKWARD, FORWARD, FULL, TRANSITIVE варианты), SubjectNameStrategy и продвинутые техники schema evolution подробно разобраны там. Здесь мы рассматриваем только применение к event sourcing.
Preимущества и недостатки: взвешенная оценка
Когда Event Sourcing оправдан:
- Требования к аудиту (финансы, медицина, e-commerce с возвратами).
- Сложная бизнес-логика с множеством переходов состояний.
- Множество разнородных read models из одних данных.
- Необходимость replay при изменении бизнес-правил.
Когда Event Sourcing избыточен:
- Простые CRUD-системы без требований к аудиту.
- Малые объёмы данных, единственный read model.
- Команда без опыта распределённых систем.
- Строгие требования к транзакционной согласованности между агрегатами.
Ключевые выводы
- Event Sourcing хранит последовательность событий, а не текущее состояние. Состояние =
fold(events, initialState, applyEvent). - Kafka — нативный event store: append-only лог, иммутабельность, ключ = aggregateId для partition locality.
- Снапшоты — оптимизация replay:
compactтопик для последнего снапшота + инкрементальные события. - Schema Registry с Avro + upcasting обеспечивают эволюцию схем без изменения хранимых событий.
- Kafka не поддерживает произвольный доступ по aggregateId — нужна projection-база для загрузки событий агрегата.