Learning Platform
Глоссарий Troubleshooting
Урок 13.01 · 35 мин
Продвинутый
Event SourcingEvent StoreAggregateSnapshotSchema EvolutionLog Compaction

Event Sourcing с Kafka

Традиционные CRUD-системы хранят только текущее состояние: таблица orders содержит строку для каждого заказа, и при обновлении строка перезаписывается. История изменений, как правило, теряется. Event Sourcing переворачивает эту модель: вместо текущего состояния система хранит последовательность событий, из которых состояние вычисляется.

Kafka — не просто брокер сообщений для event sourcing. Kafka — это нативный event store. Её append-only лог, неизменяемые записи, именованные партиции и гарантированный порядок внутри партиции — это именно те свойства, которые нужны для event sourcing.


Почему Event Sourcing: задачи, которые он решает

Event Sourcing появляется там, где CRUD-модель начинает ломаться:

Аудит и compliance. CRUD хранит последнее состояние. Event sourcing хранит каждое изменение. Финансовые системы, медицинские записи, юридические системы требуют полной истории: кто, когда, что изменил. С event sourcing ответ всегда в логе.

Временные запросы (temporal queries). “Какой был остаток на счёте три недели назад в 14:35?” В CRUD-системе без специальной таблицы истории — ответа нет. В event sourcing — нужно просто воспроизвести события до заданного timestamp.

Отладка и воспроизведение ошибок. Если система ведёт себя неправильно, воспроизведите события заново в изолированной среде. Точная реплика состояния в любой момент времени — бесценный инструмент для диагностики.

Развитие бизнес-логики. Добавляете новую projection — запускаете новый consumer с auto.offset.reset=earliest, он читает все события с самого начала и строит новый read model. Нет необходимости в batch-миграциях данных.

Decoupled read models. Несколько разных read models из одного потока событий — разные агрегации, разные базы данных, разные форматы — без изменений write side.


Kafka как нативный event store

Почему Kafka, а не специализированные event store системы (EventStore, Axon Server)?

Kafka — это append-only лог. Запись добавляется только в конец партиции, никогда не изменяется. Это буквально и есть event store. Каждое событие — запись в логе с уникальным offset внутри партиции.

Kafka как Event Store: агрегат Order
Топик orders.events, partition 0. Ключ каждой записи = aggregate ID (order-123). Offset = позиция события в истории агрегата.
Offset 0OrderCreated. Aggregate: order-123. Timestamp: 2026-01-01T10:00:00Z. Payload: {customerId, items[], totalAmount}. Первое событие жизненного цикла заказа.
Offset 1ItemAdded. Aggregate: order-123. Payload: {itemId: 'SKU-456', quantity: 2, price: 29.99}. Событие добавления товара в уже созданный заказ.
Offset 2PaymentConfirmed. Aggregate: order-123. Payload: {paymentId, method: 'card', amount: 59.98}. Событие подтверждения оплаты.
Offset 3OrderShipped. Aggregate: order-123. Payload: {trackingId, carrier, estimatedDelivery}. Конечное событие в happy path жизненного цикла заказа.
Состояние заказа = fold(events[0..3], initialState, applyEvent). Никаких UPDATE. Только чтение лога.

Топик per aggregate type. Каждый тип агрегата получает свой топик:

  • orders.events — события для агрегата Order
  • payments.events — события для агрегата Payment
  • inventory.events — события для агрегата InventoryItem

Ключ = aggregate ID. Это критично: все события одного агрегата должны попадать в одну партицию. Ключ order-123 всегда хешируется в одну и ту же партицию (при фиксированном числе партиций). Это гарантирует:

  1. Порядок событий в пределах одного агрегата — Kafka гарантирует порядок внутри партиции.
  2. Возможность параллельного чтения: разные агрегаты обрабатываются разными consumer-потоками.

log.cleanup.policy=delete с log.retention.ms=-1. Для полного event log нужна бесконечная retention. Compaction удалит события и оставит только последнее по ключу — для event log это неприемлемо (теряется история). Snapshot топик — отдельно с compact.


Анатомия события: envelope + payload

Событие в event sourced системе — не просто произвольный JSON. Оно имеет стандартизированную структуру:

{
  "eventId": "e7c3a1b2-4f5d-4e6f-8a9b-0c1d2e3f4a5b",
  "aggregateId": "order-123",
  "aggregateType": "Order",
  "eventType": "OrderCreated",
  "version": 1,
  "timestamp": "2026-04-16T10:00:00.000Z",
  "correlationId": "req-abc-xyz",
  "causationId": "cmd-create-order-456",
  "payload": {
    "customerId": "cust-789",
    "items": [
      {"sku": "SKU-001", "qty": 2, "price": 49.99}
    ],
    "totalAmount": 99.98,
    "currency": "RUB"
  }
}
  • eventId — уникальный идентификатор события. Используется для дедупликации при at-least-once доставке.
  • aggregateId + aggregateType — идентифицируют, к какому агрегату относится событие.
  • version — порядковый номер события в жизненном цикле агрегата (1, 2, 3…). Отличается от Kafka offset: offset глобален в партиции, version — только для данного агрегата.
  • correlationId — связывает событие с исходным запросом (для трассировки).
  • causationId — ID команды, которая породила это событие.

EventSourcingDiagram: полный поток

Event Sourcing: Поток данных
Write Path — Команда → Событие
API / CommandТочка входа: API принимает команду (CreateOrder, UpdateInventory). Команда — это намерение изменить состояние системы. Команда валидируется бизнес-логикой перед генерацией события. Если валидация не проходит — событие не создаётся, состояние не меняется.
validate
Command HandlerОбработчик команды: загружает текущее состояние агрегата (из снапшота + последних событий), применяет бизнес-правила (достаточно ли товара на складе?), и если валидация успешна — генерирует событие. Паттерн: load(aggregateId) -> validate(command, state) -> emit(event). Агрегат = единица транзакционной целостности.
emit
Kafka Event TopicТопик Kafka как event store. Key = aggregate ID (например, order-123). Value = событие в Avro (OrderCreated, OrderUpdated). log.retention.ms=-1 для бесконечного хранения всей истории. Партиционирование по ключу гарантирует порядок событий одного агрегата. Schema Registry обеспечивает совместимость схем.
Event Store — История событий агрегата
Event #1: OrderCreatedoffset=0, key=order-123Первое событие агрегата order-123. Содержит: orderId, customerId, items[], totalAmount, createdAt. Offset 0 в партиции (key=order-123 всегда в одной партиции). Схема зарегистрирована в Schema Registry (subject: orders-value, Avro). Состояние агрегата после: PENDING.
Event #2: ItemAddedoffset=1, key=order-123Добавлен товар в заказ. Содержит: orderId, itemId, quantity, price. Offset 1. Состояние агрегата восстанавливается последовательным применением событий: state = fold(events, initialState). Проекции читают этот топик и обновляют read-модели.
Event #3: OrderConfirmedoffset=2, key=order-123Заказ подтверждён. Offset 2. После этого события агрегат переходит в состояние CONFIRMED. Идемпотентность проекций: если OrderConfirmed применён дважды (duplicate event), состояние не меняется повторно. Consumer group offset committed после успешной обработки.
Snapshot (compact)log.cleanup.policy=compactLog compaction (log.cleanup.policy=compact) сохраняет последнее значение для каждого ключа — это и есть snapshot текущего состояния агрегата. Альтернатива: отдельный snapshot-топик для ускорения восстановления — загружаем snapshot, затем replay только событий после snapshot.lastEventOffset. min.compaction.lag.ms предотвращает преждевременную компакцию свежих событий.
Read Path — Проекция → Read Model
Event TopicТот же топик событий — consumer группы проекций читают отсюда. Каждая проекция (read model) имеет свою consumer group. auto.offset.reset=earliest: при первом старте проекция читает с offset=0 и перестраивает полное состояние. При повреждении read model: удалить и запустить заново — events никуда не делись.
consume
Projection BuilderProjection Builder: Kafka consumer (или Kafka Streams/Connect Sink), читает события и строит read-модель. Может быть: Kafka Streams KTable (in-memory или RocksDB — горячий путь), Connect JDBC Sink в PostgreSQL (отчёты, сложные SQL-запросы), Connect Elasticsearch Sink (полнотекстовый поиск). Каждая read-модель — независимый consumer group.
upsert
Read Model (DB)Материализованное представление: PostgreSQL таблица, Elasticsearch индекс, Redis hash. Оптимизировано для конкретного паттерна чтения. Примеры: orders_by_customer (индекс по customerId), order_search (Elasticsearch для поиска по товарам), order_status (Redis для мгновенного получения статуса по orderId). При повреждении — полностью перестраивается из event topic.

Паттерн Command → Validate → Event

В event sourcing поток обработки запроса выглядит следующим образом:

1. Команда (Command). Выражает намерение изменить состояние. Команды именуются в повелительном наклонении:

  • CreateOrder(customerId, items)
  • AddItem(orderId, itemId, quantity)
  • ConfirmOrder(orderId, paymentId)

Команда — это запрос, а не факт. Она может быть отклонена.

2. Загрузка текущего состояния. Command handler загружает текущее состояние агрегата из event store. Для небольших агрегатов — полное воспроизведение всех событий. Для крупных — снапшот + инкрементальное воспроизведение.

3. Валидация бизнес-правил. Применяем бизнес-правила к текущему состоянию:

  • Можно ли добавить товар? Заказ не в статусе SHIPPED?
  • Достаточно ли позиций на складе?
  • Не превышен ли лимит суммы заказа?

Если правила нарушены — возвращаем ошибку. Событие не генерируется.

4. Генерация события. Если валидация прошла — генерируем одно или несколько событий:

  • OrderCreated{...}
  • ItemAdded{...}

5. Запись в Kafka. Событие публикуется в топик. Ключ = aggregateId. Value = сериализованное событие (Avro/JSON). После успешного produce — состояние изменено (source of truth).

6. Projection. Consumer-ы читают события и обновляют read models в PostgreSQL, Elasticsearch, Redis — для запросов.


Воспроизведение состояния: fold over events

Текущее состояние агрегата вычисляется как свёртка (fold) всех его событий:

state = events.foldLeft(initialState)(applyEvent)

Пример на псевдокоде:

interface OrderState {
  id: string;
  status: "NEW" | "PAID" | "SHIPPED" | "CANCELLED";
  items: Item[];
  totalAmount: number;
}

function applyEvent(state: OrderState, event: DomainEvent): OrderState {
  switch (event.eventType) {
    case "OrderCreated":
      return {
        ...state,
        id: event.payload.orderId,
        status: "NEW",
        items: event.payload.items,
        totalAmount: event.payload.totalAmount,
      };
    case "ItemAdded":
      return {
        ...state,
        items: [...state.items, event.payload.item],
        totalAmount: state.totalAmount + event.payload.item.price,
      };
    case "PaymentConfirmed":
      return { ...state, status: "PAID" };
    case "OrderShipped":
      return { ...state, status: "SHIPPED" };
    default:
      return state;
  }
}

// Воспроизведение: читаем все события из Kafka для order-123
const events = kafkaConsumer.readAll("orders.events", key: "order-123");
const currentState = events.reduce(applyEvent, initialOrderState);

Это детерминированный процесс: одинаковая последовательность событий всегда даёт одинаковое состояние. Никаких сюрпризов, никаких side effects.

WARNING

Kafka не является базой данных с произвольным доступом. Она не поддерживает запрос “дай мне все события для aggregate-id order-123 начиная с offset 1500”. Для event sourcing с Kafka требуется либо (1) один агрегат = одна партиция (нереалистично при большом числе агрегатов), либо (2) проекция event store в PostgreSQL/DynamoDB для быстрой загрузки событий конкретного агрегата. Kafka — source of truth, но загрузка событий агрегата чаще выполняется из проекции, а не из Kafka напрямую.


Снапшоты: оптимизация воспроизведения

Проблема: агрегат с 50,000 событиями требует 50,000 применений applyEvent при каждой загрузке. Это занимает секунды.

Решение — снапшот: периодическая материализация текущего состояния агрегата. При загрузке: найти последний снапшот, прочитать только события после снапшота.

Snapshot топик в Kafka:

Топик: orders.snapshots
cleanup.policy: compact
Ключ: aggregateId (order-123)
Value: { state: {...}, lastEventVersion: 1000, snapshotTimestamp: ... }

С log.cleanup.policy=compact Kafka хранит только последний снапшот для каждого aggregateId. Старые снапшоты автоматически удаляются при log compaction.

Когда создавать снапшот?

  • Каждые N событий (например, каждые 100 событий).
  • При достижении определённого состояния агрегата (OrderFulfilled — финальное состояние).
  • По расписанию (раз в час для активных агрегатов).

Конфигурация snapshot топика:

log.cleanup.policy=compact
min.compaction.lag.ms=3600000
# Минимум 1 час до compaction — свежий снапшот не сразу удаляется
min.cleanable.dirty.ratio=0.1

min.compaction.lag.ms — ключевой параметр: определяет, как долго событие должно быть “старым” прежде чем лог-компактор может его удалить. Для snapshot топика: достаточно большое значение, чтобы не удалять актуальные снапшоты.

Снапшот-стратегия: снапшот + инкрементальные события
Загрузка агрегата order-123: снапшот при version=1000, затем события 1001-1047.

Snapshot v=1000

orders.snapshots topic. log.cleanup.policy=compact. Хранит только последний снапшот на ключ (aggregateId). Снапшот включает: state объект + lastEventVersion=1000.
load

State v=1000

Command Handler загружает снапшот (state при version=1000) из orders.snapshots. Затем читает события из orders.events начиная с version 1001.
+47 events

State v=1047

Применяем applyEvent к 47 событиям (v1001..v1047). Вместо 1047 событий — только 47. Ускорение в 22 раза для данного агрегата.
ready

Validate Command

Агрегат готов к обработке следующей команды. Текущее состояние актуально на version 1047.

Версионирование событий и Schema Registry

Бизнес-логика меняется. Структура события OrderCreated v1 может отличаться от v2. Как обеспечить совместимость?

Schema Registry с Avro. Каждое событие публикуется через Schema Registry. Схема orders.events-value версионируется автоматически. При чтении старых событий Avro-десериализатор автоматически применяет schema evolution rules (добавление optional полей — backward compatible).

Upcasting событий. Если изменение ломает обратную совместимость (переименование поля, смена типа), применяется upcasting: при чтении старого события специальный transformer преобразует его к текущей схеме:

function upcastEvent(event: RawEvent): DomainEvent {
  if (event.eventType === "OrderCreated" && event.version === 1) {
    // v1: поле 'amount' -> v2: поле 'totalAmount'
    return {
      ...event,
      version: 2,
      payload: {
        ...event.payload,
        totalAmount: event.payload.amount,
        currency: "RUB", // default для старых событий
      },
    };
  }
  return event;
}

Upcasting применяется при чтении, не изменяя хранимые события. Это ключевой принцип: события в Kafka никогда не изменяются, upcasting — слой абстракции в application коде.

Рекомендации по Schema Registry для event store:

  • Стратегия именования субъектов: TopicNameStrategy (один топик = одна схема). Если топик содержит несколько типов событий — RecordNameStrategy (OrderCreated-value, ItemAdded-value).
  • Режим совместимости: BACKWARD_TRANSITIVE — каждая новая схема должна быть совместима со всеми предыдущими версиями.
  • Перед добавлением нового поля — всегда с default значением.
NOTE

Отсылка к Модулю 06 (Schema Registry): все режимы совместимости (BACKWARD, FORWARD, FULL, TRANSITIVE варианты), SubjectNameStrategy и продвинутые техники schema evolution подробно разобраны там. Здесь мы рассматриваем только применение к event sourcing.


Preимущества и недостатки: взвешенная оценка

Event Sourcing: pro и contra
ПреимуществаПолный audit trail (каждое изменение зафиксировано), temporal queries (состояние на любой момент времени), debugging через event replay, decoupled projections (новый read model = новый consumer без миграции данных), естественная интеграция с event-driven архитектурой.
НедостаткиСложность реализации (command handling, upcasting, snapshot management). Eventually consistent read models. Нет simple queries по текущему состоянию без projection. Управление schema evolution. Operational complexity: мониторинг Kafka + projection lag.

Когда Event Sourcing оправдан:

  • Требования к аудиту (финансы, медицина, e-commerce с возвратами).
  • Сложная бизнес-логика с множеством переходов состояний.
  • Множество разнородных read models из одних данных.
  • Необходимость replay при изменении бизнес-правил.

Когда Event Sourcing избыточен:

  • Простые CRUD-системы без требований к аудиту.
  • Малые объёмы данных, единственный read model.
  • Команда без опыта распределённых систем.
  • Строгие требования к транзакционной согласованности между агрегатами.

Ключевые выводы

  1. Event Sourcing хранит последовательность событий, а не текущее состояние. Состояние = fold(events, initialState, applyEvent).
  2. Kafka — нативный event store: append-only лог, иммутабельность, ключ = aggregateId для partition locality.
  3. Снапшоты — оптимизация replay: compact топик для последнего снапшота + инкрементальные события.
  4. Schema Registry с Avro + upcasting обеспечивают эволюцию схем без изменения хранимых событий.
  5. Kafka не поддерживает произвольный доступ по aggregateId — нужна projection-база для загрузки событий агрегата.
Проверка знанийKnowledge check
Агрегат 'Account' имеет 200,000 событий. Event replay занимает 45 секунд при каждом запросе. Какова правильная архитектурная реакция на эту проблему, и какие два топика Kafka будут задействованы в решении?
ОтветAnswer
Правильная реакция — введение снапшотов. Решение: (1) Топик events (cleanup.policy=delete, log.retention.ms=-1) — полный event log. (2) Топик snapshots (cleanup.policy=compact) — хранит только последний снапшот для каждого aggregateId. При загрузке агрегата: читаем снапшот (state при version=N) из snapshots, затем читаем только события после version N из events. Вместо 200,000 событий — только события после последнего снапшота. Снапшот создаётся каждые 500-1000 событий. Итог: replay занимает миллисекунды, а не секунды.
System Design: Kafka в архитектуре data-platform Lambda/Kappa/Medallion patterns

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. В чём принципиальное отличие Event Sourcing от традиционного CRUD подхода к хранению данных?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7