Learning Platform
Глоссарий Troubleshooting
Урок 13.02 · 35 мин
Продвинутый
CQRSCommand Query Responsibility SegregationRead ModelWrite ModelEventual ConsistencyKTableKafka Streams

Реализация CQRS

CQRS — Command Query Responsibility Segregation. Принцип разделения: операции, которые изменяют состояние (команды), и операции, которые только читают (запросы), используют разные модели. Не разные таблицы в одной базе данных — разные модели, оптимизированные для своей задачи.

В традиционной системе одна модель данных обслуживает и запись, и чтение. CQRS говорит: это ограничение без оснований. Write-модель оптимизирована под консистентность и валидацию бизнес-правил. Read-модель оптимизирована под запросы — денормализована, индексирована, может быть в другой базе данных.

Kafka — идеальный event bus для связи между write и read side.


Принцип CQRS: почему разделение выгодно

Write side проблема. Бизнес-правила требуют нормализованной, консистентной модели данных. Нормализованная база данных — слабый кандидат для сложных аналитических запросов.

Read side проблема. Эффективные запросы требуют денормализации, предагрегации, специализированных индексов. Elasticsearch для полнотекстового поиска, Redis для кэша, ClickHouse для аналитики. Одна база данных не может быть одновременно лучшей для всего.

CQRS решение: Write side → Kafka → N read models. Каждый read model обслуживает свой сценарий запросов. Добавить новый read model — значит запустить новый consumer group, который читает те же события и строит новую проекцию. Zero изменений на write side.


CQRS: полная архитектурная диаграмма

CQRS: Разделение записи и чтения
Write Side
Write APIAPI для записи (POST/PUT/DELETE). Принимает команды: CreateOrder, CancelOrder, UpdatePayment. Возвращает 202 Accepted — команда принята, но ещё не отражена в read models (async). Команды валидируются и конвертируются в события. Клиент не должен сразу делать GET ожидая увидеть результат (eventual consistency).
command
Command HandlerОбработчик команды: валидирует бизнес-правила (есть ли товар, доступен ли пользователь), генерирует событие. Транзакция: запись в БД + публикация в Kafka через Outbox pattern (избегаем dual-write). Command Handler не знает о read models — это разделение ответственности CQRS.
produce
Kafka Events TopicЦентральный event log — единственный источник правды (source of truth) в CQRS. Все изменения состояния публикуются как события. Key = entity ID. log.retention.ms=-1 (infinite) или log.cleanup.policy=compact. Schema Registry обеспечивает совместимость схем (FULL_TRANSITIVE для safe evolution). Все read models строятся из этого топика.
async consume
Read Side
Event Consumer / StreamsKafka consumer, Kafka Streams, или Connect Sink. Читает события и обновляет read-модели. Каждая read-модель — отдельный consumer group subscribed to Events Topic. Может отставать (eventual consistency). При сбое — перечитывает с последнего committed offset. При полном восстановлении — читает с offset=0.
project
PostgreSQLРеляционная read-модель: PostgreSQL таблица, оптимизированная для SQL-запросов. Пример: orders_view с предрассчитанными JOINами и агрегациями. Connect JDBC Sink или кастомный consumer с UPSERT. Для сложных отчётов: оконные функции, GROUP BY, JOIN. Задержка обновления: секунды.
ElasticsearchПолнотекстовая read-модель: Elasticsearch индекс для поиска по товарам, заказам, клиентам. Connect Elasticsearch Sink. Маппинг полей события в Elasticsearch-документ. При десинхронизации: DELETE INDEX + replay from offset 0 (Kafka хранит все события). Задержка: обычно < 1 секунды.
RedisКеш read-модель: горячие данные с низкой задержкой. Redis hash per entity (HSET order:123 status CONFIRMED amount 99.99). TTL на ключах. Consumer обновляет Redis при каждом событии. Для мгновенного GET /orders/123/status: O(1) Redis lookup. При промахе кеша: fallback в PostgreSQL.
query
Read APIAPI для чтения (GET). Каждый endpoint обращается к оптимальной read-модели. Eventual consistency: клиент, записавший данные, может не увидеть их при немедленном GET (read-after-write gap). Решения: (1) Write API возвращает полную сущность в 202 response, (2) polling с version check, (3) WebSocket notification при обновлении.
Eventual Consistencyмс — секундыWrite Side и Read Side синхронизируются через Kafka с задержкой (обычно миллисекунды-секунды). Клиент, записавший данные, может не увидеть их при немедленном чтении (read-after-write). Это нормально для большинства сценариев. Не нормально для: финансовых операций, инвентаризации в реальном времени — там нужна синхронная write + read в одной транзакции.
Multiple Read Modelsодин поток — N проекцийКлючевое преимущество CQRS: один поток событий порождает несколько read-моделей, оптимизированных для разных паттернов чтения. Добавление новой read-модели = новый consumer group + replay from offset 0. Не требует изменений в write-side. Масштабирование read-side независимо от write-side.

Write side: команды и 202 Accepted

Write API принимает команды — намерения изменить состояние:

POST /api/orders           -> CreateOrder command
PUT  /api/orders/{id}/pay  -> PayOrder command
DELETE /api/orders/{id}    -> CancelOrder command

Критически важно: Write API не возвращает 200 OK с данными заказа. Он возвращает 202 Accepted — “команда принята в обработку”. Почему?

Команда обрабатывается асинхронно. После producer.send() данные не сразу видны в read models — они появятся через миллисекунды-секунды, когда consumers обновят проекции. Возврат 200 с “текущим состоянием” создаёт иллюзию согласованности, которой нет.

HTTP/1.1 202 Accepted
Location: /api/orders/order-123
Content-Type: application/json

{
  "commandId": "cmd-abc-123",
  "status": "ACCEPTED",
  "orderId": "order-123",
  "message": "Команда CreateOrder принята. Заказ будет доступен через несколько секунд."
}

Command Handler выполняет валидацию и публикацию:

  1. Принять команду из HTTP запроса.
  2. Выполнить быструю синхронную валидацию (формат данных, аутентификация/авторизация).
  3. Загрузить текущее состояние агрегата (если нужна бизнес-валидация против текущего состояния).
  4. Применить бизнес-правила.
  5. Если всё корректно — опубликовать событие в Kafka.
  6. Вернуть 202 Accepted.

Exactly-once semantics на write side. Kafka предоставляет два уровня гарантий для продюсеров:

  • enable.idempotence=true — идемпотентный продюсер. Дублированные сетевые запросы не создают дублированных записей в партиции.
  • transactional.id + producer.beginTransaction() — транзакционный API. Позволяет атомарно опубликовать события в несколько топиков или совместить с чтением (EOS в Kafka Streams).
# Producer config для write side
enable.idempotence=true
acks=all
retries=2147483647
max.in.flight.requests.per.connection=5

Kafka как source of truth

Kafka events topic — это канонический источник истины в CQRS. Не PostgreSQL, не Redis — именно Kafka topics.

Почему? Потому что любой read model может быть пересоздан из Kafka: запустить новый consumer с auto.offset.reset=earliest, дочитать все события, получить актуальное состояние. PostgreSQL таблицу, если удалить, не восстановить из самой PostgreSQL. Kafka topic восстанавливает любую базу.

orders.events        <- Source of Truth (Kafka)
    |
    +-> PostgreSQL (JDBC Sink)    <- Read Model для SQL запросов
    |
    +-> Elasticsearch (ES Sink)   <- Read Model для full-text search
    |
    +-> Redis (Custom consumer)   <- Read Model для hot-path кэша
    |
    +-> KTable in Kafka Streams   <- Read Model для streaming analytics

Read side: Kafka Streams KTable как in-memory read model

Kafka Streams KTable — материализованное представление последнего значения для каждого ключа. Идеален как lightweight read model:

StreamsBuilder builder = new StreamsBuilder();

// Читаем события из orders.events
KTable<String, OrderState> orderTable = builder
    .table("orders.events",
           Consumed.with(Serdes.String(), orderEventSerde),
           Materialized.<String, OrderState, KeyValueStore<Bytes, byte[]>>as("order-state-store")
               .withKeySerde(Serdes.String())
               .withValueSerde(orderStateSerde));

// Применяем event fold для построения текущего состояния
KTable<String, OrderSummary> orderSummaries = orderTable
    .mapValues(OrderSummary::fromState);

KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.start();

// Interactive Queries - отвечаем на HTTP запросы напрямую из state store
ReadOnlyKeyValueStore<String, OrderSummary> store =
    streams.store(
        StoreQueryParameters.fromNameAndType(
            "order-state-store",
            QueryableStoreTypes.keyValueStore()
        )
    );

// GET /api/orders/order-123 -> store.get("order-123")
OrderSummary summary = store.get("order-123");

KTable хранит состояние на диске (RocksDB) и в памяти (кэш). Время ответа на запрос: 1-5 мс. Это быстрее, чем SQL запрос в PostgreSQL для простых lookups по ключу.

NOTE

Отсылка к Модулю 07 (Kafka Streams): полный разбор KTable, KStream, GlobalKTable, windowed aggregations, interactive queries и RocksDB state management — там. Здесь мы рассматриваем KTable только как read model в контексте CQRS.


Read side: Kafka Connect Sinks как projectors

Kafka Connect автоматизирует создание read models из событий:

JDBC Sink → PostgreSQL. Проецирует события в реляционные таблицы. Подходит для сложных SQL запросов, reporting, joins:

{
  "name": "orders-postgres-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://postgres:5432/read_models",
    "topics": "orders.events",
    "table.name.format": "order_projections",
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "pk.fields": "order_id",
    "auto.create": "true",
    "auto.evolve": "true"
  }
}

Elasticsearch Sink → full-text search. Индексирует события для поиска. Используйте для GET /api/orders?q=iPhone+128GB+customer:John:

{
  "name": "orders-es-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "connection.url": "http://elasticsearch:9200",
    "topics": "orders.events",
    "type.name": "order",
    "key.ignore": "false",
    "schema.ignore": "true"
  }
}
NOTE

Отсылка к Модулю 05 (Kafka Connect): архитектура Connect, Source/Sink connectors, SMT (Single Message Transform), connector lifecycle и мониторинг разобраны там.

CQRS: несколько read models из одного event stream
orders.events — один топик. Четыре разных read model — разные consumer groups. Write side ничего не знает о количестве read models.

orders.events

orders.events Kafka topic. Один топик, один источник истины. Продюсер пишет сюда события. Ни один из read model consumers не влияет на этот топик.

KTable (KV store)

Consumer group: cg-streams-kv. Kafka Streams KTable. In-memory read model для быстрых lookups по order ID. Latency: 1-5ms. Rebuilds automatically on crash (RocksDB changelog topic).

PostgreSQL

Consumer group: cg-jdbc-sink. Kafka Connect JDBC Sink. PostgreSQL materialized view. SQL queries, joins, reporting. Upsert mode: INSERT ... ON CONFLICT UPDATE.

Elasticsearch

Consumer group: cg-es-sink. Kafka Connect Elasticsearch Sink. Full-text search index. Покрывает queries типа 'найди заказы клиента John с iPhone'.

Redis Cache

Consumer group: cg-redis-cache. Custom consumer. Пишет в Redis Hash с TTL=300s. Покрывает hot-path: /api/orders/{id} читает из Redis первым, cache miss -> KTable.

Eventual consistency: управление ожиданиями

Eventual consistency — не баг, а свойство системы. Read model отстаёт от write model на время обработки события: produce latency + consumer poll latency + projection update. Обычно: 10-500 мс. Это приемлемо для большинства сценариев.

Но есть read-after-write проблема: пользователь создаёт заказ (POST /api/orders) и немедленно читает его (GET /api/orders/order-123). Если read model ещё не обновился — клиент получает 404.

Три стратегии решения:

1. Возврат данных в 202 Accepted. Write API знает, что было записано (команда содержит данные). Он может вернуть “оптимистичный” ответ с данными из команды, без ожидания проекции:

{
  "status": "ACCEPTED",
  "orderId": "order-123",
  "order": {
    "id": "order-123",
    "customerId": "cust-789",
    "status": "NEW",
    "items": [...],
    "totalAmount": 99.98
  }
}

Клиент получает данные немедленно, без обращения к read model.

2. Version-based polling. Каждое событие имеет version. Write API возвращает version. Клиент опрашивает Read API с параметром ?minVersion=5. Read API ждёт, пока projection не обработает event version >= 5:

GET /api/orders/order-123?minVersion=5
// Read API: poll until store.version("order-123") >= 5
// Timeout: 3000ms, затем 202 (still processing)

3. WebSocket / Server-Sent Events уведомление. Write side публикует события не только в Kafka, но и через WebSocket нотификационный канал. Клиент подписан на обновления для своего order ID. Как только проекция обновлена — push нотификация:

const ws = new WebSocket('/ws/orders/order-123');
ws.onmessage = (event) => {
  const update = JSON.parse(event.data);
  if (update.type === 'ORDER_READY') {
    fetchOrder(); // теперь read model актуален
  }
};

CQRS с Event Sourcing и без него

CQRS без Event Sourcing. Write side использует традиционную CRUD-базу (PostgreSQL). CDC (Debezium) читает WAL и публикует изменения в Kafka. Consumers строят read models.

  • Преимущества: меньше сложности, без необходимости в event fold, привычная CRUD разработка.
  • Недостатки: нет полного event log, нет replay прошлых событий, аудит только через CDC историю.

CQRS с Event Sourcing. Commands → Kafka events (source of truth). Projections читают события и строят read models. Полный event log, replay в любой момент.

  • Преимущества: полный аудит, replay, evolvable projections.
  • Недостатки: более высокая сложность, upcasting, event schema governance.

Когда применять CQRS: критерии решения

CQRS оправдан:

  • Read/write нагрузки асимметричны (10,000 reads/s vs 100 writes/s).
  • Несколько разных read models из одних данных (поиск + кэш + аналитика).
  • Высокая сложность write side (event sourcing, сложные бизнес-правила).
  • Микросервисная архитектура с независимым масштабированием read и write.

CQRS избыточен:

  • Простое CRUD-приложение с одной базой данных.
  • Симметричная нагрузка read/write.
  • Команда без опыта распределённых систем.
  • Жёсткие требования к strong consistency (финансовые транзакции без eventual consistency tolerance).
WARNING

CQRS — это архитектурная сложность. Не применяйте его “для порядка”. Добавьте CQRS только тогда, когда конкретная задача (масштабируемость read side, multiple projections, event sourcing) требует его. Преждевременный CQRS = overhead без benefit.


Ключевые выводы

  1. CQRS разделяет write model (консистентность, валидация) и read model (денормализованные проекции, оптимизированы под запросы).
  2. Kafka — event bus между write и read side. Kafka events topic = source of truth.
  3. Write API возвращает 202 Accepted — команда принята асинхронно. Не 200 с данными.
  4. Read models: KTable (in-memory), JDBC Sink (PostgreSQL), ES Sink (search), custom consumer (Redis).
  5. Read-after-write проблема: три решения — ответ с данными в 202, version polling, WebSocket push.
  6. Добавить новый read model = новый consumer group с auto.offset.reset=earliest. Нулевые изменения write side.
Проверка знанийKnowledge check
Ваша CQRS-система использует Elasticsearch как read model для поиска заказов. Индекс повреждён и требует пересоздания. Каков правильный порядок действий, и почему Kafka делает эту операцию безопасной?
ОтветAnswer
Порядок действий: (1) Остановить Elasticsearch Sink connector. (2) Удалить повреждённый Elasticsearch индекс. (3) Создать новый индекс с нужным mapping. (4) Пересоздать connector с параметром auto.offset.reset=earliest. (5) Connector прочитает все события из orders.events с самого начала и перестроит индекс. Kafka делает эту операцию безопасной потому, что events.topic хранит полную историю событий (source of truth). Операция идемпотентна: результат перестройки индекса детерминирован. Write side не затронут вообще — он продолжает публиковать события в Kafka. Другие consumer groups (KTable, PostgreSQL) продолжают работать.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. В аббревиатуре CQRS буква 'Q' означает Query, а 'C' — Command. В чём фундаментальная причина их разделения в системах с высокой нагрузкой?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7