Реализация CQRS
CQRS — Command Query Responsibility Segregation. Принцип разделения: операции, которые изменяют состояние (команды), и операции, которые только читают (запросы), используют разные модели. Не разные таблицы в одной базе данных — разные модели, оптимизированные для своей задачи.
В традиционной системе одна модель данных обслуживает и запись, и чтение. CQRS говорит: это ограничение без оснований. Write-модель оптимизирована под консистентность и валидацию бизнес-правил. Read-модель оптимизирована под запросы — денормализована, индексирована, может быть в другой базе данных.
Kafka — идеальный event bus для связи между write и read side.
Принцип CQRS: почему разделение выгодно
Write side проблема. Бизнес-правила требуют нормализованной, консистентной модели данных. Нормализованная база данных — слабый кандидат для сложных аналитических запросов.
Read side проблема. Эффективные запросы требуют денормализации, предагрегации, специализированных индексов. Elasticsearch для полнотекстового поиска, Redis для кэша, ClickHouse для аналитики. Одна база данных не может быть одновременно лучшей для всего.
CQRS решение: Write side → Kafka → N read models. Каждый read model обслуживает свой сценарий запросов. Добавить новый read model — значит запустить новый consumer group, который читает те же события и строит новую проекцию. Zero изменений на write side.
CQRS: полная архитектурная диаграмма
Write side: команды и 202 Accepted
Write API принимает команды — намерения изменить состояние:
POST /api/orders -> CreateOrder command
PUT /api/orders/{id}/pay -> PayOrder command
DELETE /api/orders/{id} -> CancelOrder command
Критически важно: Write API не возвращает 200 OK с данными заказа. Он возвращает 202 Accepted — “команда принята в обработку”. Почему?
Команда обрабатывается асинхронно. После producer.send() данные не сразу видны в read models — они появятся через миллисекунды-секунды, когда consumers обновят проекции. Возврат 200 с “текущим состоянием” создаёт иллюзию согласованности, которой нет.
HTTP/1.1 202 Accepted
Location: /api/orders/order-123
Content-Type: application/json
{
"commandId": "cmd-abc-123",
"status": "ACCEPTED",
"orderId": "order-123",
"message": "Команда CreateOrder принята. Заказ будет доступен через несколько секунд."
}
Command Handler выполняет валидацию и публикацию:
- Принять команду из HTTP запроса.
- Выполнить быструю синхронную валидацию (формат данных, аутентификация/авторизация).
- Загрузить текущее состояние агрегата (если нужна бизнес-валидация против текущего состояния).
- Применить бизнес-правила.
- Если всё корректно — опубликовать событие в Kafka.
- Вернуть 202 Accepted.
Exactly-once semantics на write side. Kafka предоставляет два уровня гарантий для продюсеров:
enable.idempotence=true— идемпотентный продюсер. Дублированные сетевые запросы не создают дублированных записей в партиции.transactional.id+producer.beginTransaction()— транзакционный API. Позволяет атомарно опубликовать события в несколько топиков или совместить с чтением (EOS в Kafka Streams).
# Producer config для write side
enable.idempotence=true
acks=all
retries=2147483647
max.in.flight.requests.per.connection=5
Kafka как source of truth
Kafka events topic — это канонический источник истины в CQRS. Не PostgreSQL, не Redis — именно Kafka topics.
Почему? Потому что любой read model может быть пересоздан из Kafka: запустить новый consumer с auto.offset.reset=earliest, дочитать все события, получить актуальное состояние. PostgreSQL таблицу, если удалить, не восстановить из самой PostgreSQL. Kafka topic восстанавливает любую базу.
orders.events <- Source of Truth (Kafka)
|
+-> PostgreSQL (JDBC Sink) <- Read Model для SQL запросов
|
+-> Elasticsearch (ES Sink) <- Read Model для full-text search
|
+-> Redis (Custom consumer) <- Read Model для hot-path кэша
|
+-> KTable in Kafka Streams <- Read Model для streaming analytics
Read side: Kafka Streams KTable как in-memory read model
Kafka Streams KTable — материализованное представление последнего значения для каждого ключа. Идеален как lightweight read model:
StreamsBuilder builder = new StreamsBuilder();
// Читаем события из orders.events
KTable<String, OrderState> orderTable = builder
.table("orders.events",
Consumed.with(Serdes.String(), orderEventSerde),
Materialized.<String, OrderState, KeyValueStore<Bytes, byte[]>>as("order-state-store")
.withKeySerde(Serdes.String())
.withValueSerde(orderStateSerde));
// Применяем event fold для построения текущего состояния
KTable<String, OrderSummary> orderSummaries = orderTable
.mapValues(OrderSummary::fromState);
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.start();
// Interactive Queries - отвечаем на HTTP запросы напрямую из state store
ReadOnlyKeyValueStore<String, OrderSummary> store =
streams.store(
StoreQueryParameters.fromNameAndType(
"order-state-store",
QueryableStoreTypes.keyValueStore()
)
);
// GET /api/orders/order-123 -> store.get("order-123")
OrderSummary summary = store.get("order-123");
KTable хранит состояние на диске (RocksDB) и в памяти (кэш). Время ответа на запрос: 1-5 мс. Это быстрее, чем SQL запрос в PostgreSQL для простых lookups по ключу.
Отсылка к Модулю 07 (Kafka Streams): полный разбор KTable, KStream, GlobalKTable, windowed aggregations, interactive queries и RocksDB state management — там. Здесь мы рассматриваем KTable только как read model в контексте CQRS.
Read side: Kafka Connect Sinks как projectors
Kafka Connect автоматизирует создание read models из событий:
JDBC Sink → PostgreSQL. Проецирует события в реляционные таблицы. Подходит для сложных SQL запросов, reporting, joins:
{
"name": "orders-postgres-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://postgres:5432/read_models",
"topics": "orders.events",
"table.name.format": "order_projections",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "order_id",
"auto.create": "true",
"auto.evolve": "true"
}
}
Elasticsearch Sink → full-text search. Индексирует события для поиска. Используйте для GET /api/orders?q=iPhone+128GB+customer:John:
{
"name": "orders-es-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch:9200",
"topics": "orders.events",
"type.name": "order",
"key.ignore": "false",
"schema.ignore": "true"
}
}
Отсылка к Модулю 05 (Kafka Connect): архитектура Connect, Source/Sink connectors, SMT (Single Message Transform), connector lifecycle и мониторинг разобраны там.
orders.events
orders.events Kafka topic. Один топик, один источник истины. Продюсер пишет сюда события. Ни один из read model consumers не влияет на этот топик.KTable (KV store)
Consumer group: cg-streams-kv. Kafka Streams KTable. In-memory read model для быстрых lookups по order ID. Latency: 1-5ms. Rebuilds automatically on crash (RocksDB changelog topic).PostgreSQL
Consumer group: cg-jdbc-sink. Kafka Connect JDBC Sink. PostgreSQL materialized view. SQL queries, joins, reporting. Upsert mode: INSERT ... ON CONFLICT UPDATE.Elasticsearch
Consumer group: cg-es-sink. Kafka Connect Elasticsearch Sink. Full-text search index. Покрывает queries типа 'найди заказы клиента John с iPhone'.Redis Cache
Consumer group: cg-redis-cache. Custom consumer. Пишет в Redis Hash с TTL=300s. Покрывает hot-path: /api/orders/{id} читает из Redis первым, cache miss -> KTable.Eventual consistency: управление ожиданиями
Eventual consistency — не баг, а свойство системы. Read model отстаёт от write model на время обработки события: produce latency + consumer poll latency + projection update. Обычно: 10-500 мс. Это приемлемо для большинства сценариев.
Но есть read-after-write проблема: пользователь создаёт заказ (POST /api/orders) и немедленно читает его (GET /api/orders/order-123). Если read model ещё не обновился — клиент получает 404.
Три стратегии решения:
1. Возврат данных в 202 Accepted. Write API знает, что было записано (команда содержит данные). Он может вернуть “оптимистичный” ответ с данными из команды, без ожидания проекции:
{
"status": "ACCEPTED",
"orderId": "order-123",
"order": {
"id": "order-123",
"customerId": "cust-789",
"status": "NEW",
"items": [...],
"totalAmount": 99.98
}
}
Клиент получает данные немедленно, без обращения к read model.
2. Version-based polling. Каждое событие имеет version. Write API возвращает version. Клиент опрашивает Read API с параметром ?minVersion=5. Read API ждёт, пока projection не обработает event version >= 5:
GET /api/orders/order-123?minVersion=5
// Read API: poll until store.version("order-123") >= 5
// Timeout: 3000ms, затем 202 (still processing)
3. WebSocket / Server-Sent Events уведомление. Write side публикует события не только в Kafka, но и через WebSocket нотификационный канал. Клиент подписан на обновления для своего order ID. Как только проекция обновлена — push нотификация:
const ws = new WebSocket('/ws/orders/order-123');
ws.onmessage = (event) => {
const update = JSON.parse(event.data);
if (update.type === 'ORDER_READY') {
fetchOrder(); // теперь read model актуален
}
};
CQRS с Event Sourcing и без него
CQRS без Event Sourcing. Write side использует традиционную CRUD-базу (PostgreSQL). CDC (Debezium) читает WAL и публикует изменения в Kafka. Consumers строят read models.
- Преимущества: меньше сложности, без необходимости в event fold, привычная CRUD разработка.
- Недостатки: нет полного event log, нет replay прошлых событий, аудит только через CDC историю.
CQRS с Event Sourcing. Commands → Kafka events (source of truth). Projections читают события и строят read models. Полный event log, replay в любой момент.
- Преимущества: полный аудит, replay, evolvable projections.
- Недостатки: более высокая сложность, upcasting, event schema governance.
Когда применять CQRS: критерии решения
CQRS оправдан:
- Read/write нагрузки асимметричны (10,000 reads/s vs 100 writes/s).
- Несколько разных read models из одних данных (поиск + кэш + аналитика).
- Высокая сложность write side (event sourcing, сложные бизнес-правила).
- Микросервисная архитектура с независимым масштабированием read и write.
CQRS избыточен:
- Простое CRUD-приложение с одной базой данных.
- Симметричная нагрузка read/write.
- Команда без опыта распределённых систем.
- Жёсткие требования к strong consistency (финансовые транзакции без eventual consistency tolerance).
CQRS — это архитектурная сложность. Не применяйте его “для порядка”. Добавьте CQRS только тогда, когда конкретная задача (масштабируемость read side, multiple projections, event sourcing) требует его. Преждевременный CQRS = overhead без benefit.
Ключевые выводы
- CQRS разделяет write model (консистентность, валидация) и read model (денормализованные проекции, оптимизированы под запросы).
- Kafka — event bus между write и read side. Kafka events topic = source of truth.
- Write API возвращает 202 Accepted — команда принята асинхронно. Не 200 с данными.
- Read models: KTable (in-memory), JDBC Sink (PostgreSQL), ES Sink (search), custom consumer (Redis).
- Read-after-write проблема: три решения — ответ с данными в 202, version polling, WebSocket push.
- Добавить новый read model = новый consumer group с
auto.offset.reset=earliest. Нулевые изменения write side.