Требуемые знания:
- module-4/05-outbox-pattern-theory
Outbox Implementation с Event Router SMT
Вы понимаете теорию Outbox Pattern. Теперь критический вопрос: как заставить это работать в production?
В этом уроке мы превратим теорию в практику — настроим Debezium Outbox Event Router SMT, создадим outbox-таблицу, напишем application code для event emission, и реализуем idempotent consumer.
После этого урока у вас будет complete, production-ready Outbox implementation.
Outbox Event Router SMT: Обзор
Outbox Event Router — это Single Message Transformation (SMT), которая преобразует CDC-события из outbox-таблицы в clean domain events.
Как CDC событие превращается в clean domain event
Router SMT
- Clean events: Consumer получает domain события без CDC metadata
- Partition affinity: aggregateid как key гарантирует ordering per aggregate
- Type-based filtering: Consumer может фильтровать по headers['type']
- Deduplication: headers['id'] позволяет consumer реализовать idempotency
Что делает SMT:
- Reads outbox CDC event
- Routes to topic based on
aggregatetype - Extracts
payloadas message value - Uses
aggregateidas Kafka key (for partitioning) - Adds
idandtypeto message headers
Результат: Downstream consumers получают clean domain events без CDC metadata.
Базовая конфигурация Outbox Event Router
Minimal configuration для Outbox Event Router SMT:
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "inventory",
"database.server.name": "outbox-server",
"table.include.list": "public.outbox",
"publication.autocreate.mode": "filtered",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter"
}
}
Критический параметр: table.include.list = "public.outbox" — захватываем ТОЛЬКО outbox-таблицу.
Почему table.include.list важен для Outbox
Без table.include.list Debezium захватывает ВСЕ таблицы БД:
dbserver.public.orders → kafka topic
dbserver.public.customers → kafka topic
dbserver.public.outbox → kafka topic (CDC event)
С table.include.list = "public.outbox":
dbserver.public.outbox → (Outbox Event Router) → outbox.event.Order
→ outbox.event.Customer
Outbox connector должен захватывать ТОЛЬКО outbox-таблицу. Business tables захватываются другими коннекторами (если нужно).
Проверка знанийПочему для outbox-коннектора критически важен параметр table.include.list = 'public.outbox'? Что произойдет без него?
Complete Outbox Event Router Configuration
Production-ready configuration со всеми важными параметрами:
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "inventory",
"database.server.name": "outbox-server",
"table.include.list": "public.outbox",
"publication.autocreate.mode": "filtered",
"slot.name": "outbox_slot",
"plugin.name": "pgoutput",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregateid",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregatetype",
"transforms.outbox.route.topic.replacement": "outbox.event.${routedByValue}",
"transforms.outbox.table.expand.json.payload": "true"
}
}
Разбор параметров SMT
| Параметр | Значение | Описание |
|---|---|---|
| transforms.outbox.type | io.debezium.transforms.outbox.EventRouter | Класс SMT |
| table.field.event.id | id | Поле outbox для event ID (→ Kafka header) |
| table.field.event.key | aggregateid | Поле для Kafka key (partitioning) |
| table.field.event.payload | payload | Поле для message value |
| route.by.field | aggregatetype | Поле для routing (Order → outbox.event.Order) |
| route.topic.replacement | outbox.event.${routedByValue} | Шаблон имени topic |
| table.expand.json.payload | true | Парсить JSON payload в structured format |
Mapping: Outbox Table → Kafka Message
Outbox record:
id = '550e8400-e29b-41d4-a716-446655440000'
aggregatetype = 'Order'
aggregateid = 'order-123'
type = 'OrderApproved'
payload = '{"orderId": "123", "amount": 299.99}'
↓ Outbox Event Router SMT ↓
Kafka message:
Topic: outbox.event.Order
Key: "order-123"
Value: {"orderId": "123", "amount": 299.99}
Headers:
id: "550e8400-e29b-41d4-a716-446655440000"
type: "OrderApproved"
table.expand.json.payload: Structured vs Raw JSON
С table.expand.json.payload = true:
Kafka message value — structured object:
{
"orderId": "123",
"amount": 299.99,
"approvedAt": "2026-02-01T10:30:15Z"
}
С table.expand.json.payload = false (default):
Kafka message value — JSON string:
"{\"orderId\": \"123\", \"amount\": 299.99}"
Recommendation: Всегда используйте
table.expand.json.payload = trueдля structured data. Consumer code проще и type-safe.
Outbox Table Creation
Создайте outbox-таблицу в PostgreSQL с правильной schema.
-- labs/schemas/outbox-table.sql
-- Outbox table for reliable event publishing
-- Used with Debezium Outbox Event Router SMT
CREATE TABLE IF NOT EXISTS public.outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregatetype VARCHAR(255) NOT NULL,
aggregateid VARCHAR(255) NOT NULL,
type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT NOW() NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_outbox_created ON public.outbox(created_at);
COMMENT ON TABLE public.outbox IS 'Transactional outbox for reliable event publishing via Debezium';
COMMENT ON COLUMN public.outbox.aggregatetype IS 'Domain entity type for topic routing (Order, Customer, Payment)';
COMMENT ON COLUMN public.outbox.aggregateid IS 'Entity ID used as Kafka partition key';
COMMENT ON COLUMN public.outbox.type IS 'Event type (OrderCreated, OrderApproved, etc.)';
COMMENT ON COLUMN public.outbox.payload IS 'Event data as JSON';
Почему индекс на created_at?
Индекс idx_outbox_created оптимизирует cleanup queries:
-- External cleanup job (если используете Strategy 2)
DELETE FROM outbox
WHERE created_at < NOW() - INTERVAL '1 hour';
Без индекса DELETE сканирует всю таблицу (slow на больших outbox).
Application Code: Event Emission Pattern
Теперь application code для публикации событий через outbox.
Python Example: Order Approval
import psycopg2
import json
import uuid
from datetime import datetime
def approve_order(order_id: str, amount: float):
"""
Approve order and emit OrderApproved event via outbox.
Transactional guarantee: both UPDATE and event INSERT are atomic.
"""
conn = psycopg2.connect(
host="localhost",
port=5433,
database="inventory",
user="postgres",
password="postgres"
)
cursor = conn.cursor()
try:
# BEGIN TRANSACTION (implicit)
# Step 1: Business logic - update order status
cursor.execute("""
UPDATE orders
SET status = 'APPROVED',
updated_at = NOW()
WHERE id = %s
""", (order_id,))
# Verify update succeeded
if cursor.rowcount == 0:
raise ValueError(f"Order {order_id} not found")
# Step 2: Emit event to outbox (same transaction)
event_id = str(uuid.uuid4())
cursor.execute("""
INSERT INTO outbox (id, aggregatetype, aggregateid, type, payload)
VALUES (%s, %s, %s, %s, %s)
""", (
event_id,
'Order', # aggregatetype → topic routing
order_id, # aggregateid → Kafka key
'OrderApproved', # type → event header
json.dumps({ # payload → message value
'orderId': order_id,
'amount': amount,
'approvedAt': datetime.now().isoformat()
})
))
# COMMIT - both UPDATE and INSERT are atomic
conn.commit()
print(f"Order {order_id} approved, event {event_id} emitted")
except Exception as e:
conn.rollback()
print(f"Failed to approve order: {e}")
raise
finally:
cursor.close()
conn.close()
# Usage
approve_order(order_id='order-123', amount=299.99)
Event Emission с Cleanup
Cleanup outbox сразу после INSERT (Strategy 1):
# Step 2: Emit event + cleanup in same transaction
event_id = str(uuid.uuid4())
cursor.execute("""
WITH inserted AS (
INSERT INTO outbox (id, aggregatetype, aggregateid, type, payload)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
)
DELETE FROM outbox WHERE id = (SELECT id FROM inserted)
""", (event_id, 'Order', order_id, 'OrderApproved', json.dumps({...})))
conn.commit()
Debezium behavior: Обрабатывает INSERT, игнорирует DELETE (default для Outbox Event Router).
Важно: DELETE не доставляется как событие — это чистый cleanup mechanism.
Проверка знанийПараметр table.expand.json.payload в Outbox Event Router может быть true или false. Какая разница для consumer, и какой вариант рекомендуется?
Consumer Idempotency Pattern
Outbox Pattern гарантирует at-least-once delivery → consumers должны обрабатывать duplicate events.
Python Consumer с Deduplication
from confluent_kafka import Consumer
import json
# In-memory deduplication (production: Redis, database)
processed_events = set()
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processor',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['outbox.event.Order'])
print("Consuming events from outbox.event.Order...")
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f"Error: {msg.error()}")
continue
# Extract event ID from message headers
event_id = None
if msg.headers():
for header_key, header_value in msg.headers():
if header_key == 'id':
event_id = header_value.decode('utf-8')
break
# Idempotency check
if event_id and event_id in processed_events:
print(f"[DUPLICATE] Event {event_id} already processed, skipping")
consumer.commit()
continue
# Parse event
event_type = None
if msg.headers():
for header_key, header_value in msg.headers():
if header_key == 'type':
event_type = header_value.decode('utf-8')
break
event_data = json.loads(msg.value().decode('utf-8'))
# Process event based on type
if event_type == 'OrderApproved':
handle_order_approved(event_data)
elif event_type == 'OrderCancelled':
handle_order_cancelled(event_data)
else:
print(f"Unknown event type: {event_type}")
# Mark as processed
if event_id:
processed_events.add(event_id)
# Commit offset
consumer.commit()
except KeyboardInterrupt:
print("Shutting down consumer...")
finally:
consumer.close()
def handle_order_approved(event):
"""Process OrderApproved event (idempotent)"""
order_id = event['orderId']
amount = event['amount']
print(f"✅ Order {order_id} approved for ${amount}")
# Send notification email, update analytics, etc.
# This code MUST be idempotent (safe to execute multiple times)
def handle_order_cancelled(event):
"""Process OrderCancelled event (idempotent)"""
order_id = event['orderId']
print(f"❌ Order {order_id} cancelled")
Production Deduplication: Database
In-memory processed_events set теряется при consumer restart. Production-версия:
import psycopg2
def is_already_processed(event_id: str) -> bool:
"""Check if event was already processed (database-backed)"""
conn = psycopg2.connect(...)
cursor = conn.cursor()
cursor.execute("""
SELECT EXISTS(
SELECT 1 FROM processed_events WHERE event_id = %s
)
""", (event_id,))
exists = cursor.fetchone()[0]
cursor.close()
conn.close()
return exists
def mark_as_processed(event_id: str):
"""Mark event as processed (database-backed)"""
conn = psycopg2.connect(...)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO processed_events (event_id, processed_at)
VALUES (%s, NOW())
ON CONFLICT (event_id) DO NOTHING
""", (event_id,))
conn.commit()
cursor.close()
conn.close()
# Usage in consumer
if is_already_processed(event_id):
print(f"Event {event_id} already processed")
continue
handle_order_approved(event_data)
mark_as_processed(event_id)
processed_events table schema:
CREATE TABLE processed_events (
event_id UUID PRIMARY KEY,
processed_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_processed_events_timestamp ON processed_events(processed_at);
Cleanup strategy: Periodically удаляйте старые записи (
processed_at < NOW() - INTERVAL '30 days') для контроля размера таблицы.
Lab: Полная реализация Outbox Pattern
Теперь соберем все вместе в live demo.
Шаг 1: Создайте outbox-таблицу
cd labs
# Создайте outbox-таблицу в PostgreSQL
docker exec -i postgres psql -U postgres -d inventory < schemas/outbox-table.sql
Проверьте:
docker exec -it postgres psql -U postgres -d inventory -c "\d outbox"
Expected output:
Table "public.outbox"
Column | Type | Modifiers
---------------+--------------------------------+-----------
id | uuid | not null default gen_random_uuid()
aggregatetype | character varying(255) | not null
aggregateid | character varying(255) | not null
type | character varying(255) | not null
payload | jsonb | not null
created_at | timestamp without time zone | not null default now()
Шаг 2: Deploy Outbox Connector
Создайте connector configuration:
# connectors/outbox-connector.json
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "inventory",
"database.server.name": "outbox-server",
"table.include.list": "public.outbox",
"publication.autocreate.mode": "filtered",
"slot.name": "outbox_slot",
"plugin.name": "pgoutput",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregateid",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregatetype",
"transforms.outbox.route.topic.replacement": "outbox.event.${routedByValue}",
"transforms.outbox.table.expand.json.payload": "true"
}
}'
Проверьте status:
curl http://localhost:8083/connectors/outbox-connector/status | jq
Expected output:
{
"name": "outbox-connector",
"connector": {
"state": "RUNNING"
},
"tasks": [
{
"id": 0,
"state": "RUNNING"
}
]
}
Шаг 3: Emit Test Event
Вставьте тестовое событие в outbox:
docker exec -it postgres psql -U postgres -d inventory -c "
INSERT INTO outbox (id, aggregatetype, aggregateid, type, payload)
VALUES (
gen_random_uuid(),
'Order',
'order-123',
'OrderApproved',
'{\"orderId\": \"order-123\", \"amount\": 299.99, \"approvedAt\": \"2026-02-01T10:30:15Z\"}'::jsonb
);
"
Шаг 4: Verify Topic Created
docker exec -it kafka kafka-topics --bootstrap-server kafka:9092 --list | grep outbox
Expected output:
outbox.event.Order
Шаг 5: Consume Event
В одном терминале:
docker exec -it kafka kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic outbox.event.Order \
--from-beginning \
--property print.headers=true \
--property print.key=true
Expected output:
id:550e8400-e29b-41d4-a716-446655440000,type:OrderApproved order-123 {"orderId":"order-123","amount":299.99,"approvedAt":"2026-02-01T10:30:15Z"}
Разбор:
- Headers:
id=...,type=OrderApproved - Key:
order-123(aggregateid) - Value: Clean JSON payload
✅ Success! Outbox Event Router преобразовал CDC event в domain event.
Шаг 6: Проверьте Cleanup
Если вы использовали DELETE в той же транзакции:
docker exec -it postgres psql -U postgres -d inventory -c "SELECT COUNT(*) FROM outbox;"
Expected output:
count
-------
0
Outbox пустая — события удалены после захвата Debezium.
Advanced: Multiple Aggregates
Outbox Pattern поддерживает разные aggregate types в одной таблице:
-- Order events → outbox.event.Order
INSERT INTO outbox (id, aggregatetype, aggregateid, type, payload)
VALUES (gen_random_uuid(), 'Order', 'order-123', 'OrderCreated', '...'::jsonb);
-- Customer events → outbox.event.Customer
INSERT INTO outbox (id, aggregatetype, aggregateid, type, payload)
VALUES (gen_random_uuid(), 'Customer', 'customer-456', 'CustomerRegistered', '...'::jsonb);
-- Payment events → outbox.event.Payment
INSERT INTO outbox (id, aggregatetype, aggregateid, type, payload)
VALUES (gen_random_uuid(), 'Payment', 'payment-789', 'PaymentProcessed', '...'::jsonb);
Результат:
outbox.event.Order topic:
- OrderCreated events
- OrderApproved events
- OrderCancelled events
outbox.event.Customer topic:
- CustomerRegistered events
- CustomerUpdated events
outbox.event.Payment topic:
- PaymentProcessed events
- PaymentFailed events
Single outbox table, multiple topics — routing по aggregatetype.
Configuration Troubleshooting
Problem: Topic не создается
Симптом: События вставляются в outbox, но topic outbox.event.Order не появляется.
Диагностика:
-
Проверьте connector status:
curl http://localhost:8083/connectors/outbox-connector/status -
Проверьте logs:
docker logs connect | grep outbox -
Проверьте
table.include.list:"table.include.list": "public.outbox"Если outbox в другой schema — укажите полное имя.
Problem: Message value — JSON string, а не object
Симптом: Kafka message value:
"{\"orderId\": \"123\"}"
Вместо:
{"orderId": "123"}
Решение: Добавьте table.expand.json.payload = true в SMT config.
Problem: Duplicate events
Симптом: Consumer обрабатывает одно событие несколько раз.
Причина: At-least-once delivery — это нормально для Outbox Pattern.
Решение: Implement idempotency в consumer (см. раздел Consumer Idempotency Pattern).
Production Checklist
Перед деплоем Outbox Pattern в production:
- Outbox table создана с индексом на
created_at - Connector deployed с
table.include.list = "public.outbox" - Outbox Event Router SMT настроен с
table.expand.json.payload = true - Cleanup strategy выбрана (DELETE в транзакции или external job)
- Consumer idempotency реализована (database-backed deduplication)
- Monitoring настроен (lag metrics для outbox connector)
- Error handling в application (rollback при failure)
- Event schema documented (aggregatetype, type, payload structure)
Что дальше?
Вы реализовали complete Outbox Pattern с Debezium Outbox Event Router SMT. Теперь ваши микросервисы могут reliably публиковать события с transactional guarantees.
Следующий advanced паттерн — Schema Registry integration для schema evolution и structured serialization (Avro). Это позволит контролировать совместимость событий между producer и consumer.
Ключевые выводы
- Outbox Event Router SMT преобразует CDC events в domain events
- table.include.list = “public.outbox” — захватываем ТОЛЬКО outbox-таблицу
- table.expand.json.payload = true — structured JSON вместо string
- aggregatetype → topic routing, aggregateid → Kafka key
- Event ID в header для consumer deduplication
- Cleanup strategy: DELETE в транзакции (simple) или external job (scale)
- Consumer idempotency обязательна — database-backed deduplication для production
- Single outbox table может обрабатывать multiple aggregate types
- Labs DDL:
labs/schemas/outbox-table.sqlдля быстрого развертывания - Production checklist — проверьте все перед деплоем
Проверьте понимание
Закончили урок?
Отметьте его как пройденный, чтобы отслеживать свой прогресс