Skip to content
Learning Platform
Advanced
40 minutes
outbox smt implementation lab

Prerequisites:

  • module-4/05-outbox-pattern-theory

Outbox Implementation с Event Router SMT

Вы понимаете теорию Outbox Pattern. Теперь критический вопрос: как заставить это работать в production?

В этом уроке мы превратим теорию в практику — настроим Debezium Outbox Event Router SMT, создадим outbox-таблицу, напишем application code для event emission, и реализуем idempotent consumer.

После этого урока у вас будет complete, production-ready Outbox implementation.

Outbox Event Router SMT: Обзор

Outbox Event Router — это Single Message Transformation (SMT), которая преобразует CDC-события из outbox-таблицы в clean domain events.

Outbox Event Router SMT: Трансформация полей

Как CDC событие превращается в clean domain event

Input: Outbox CDC Record
id:'550e8400-...'
aggregatetype:'Order'
aggregateid:'order-123'
type:'OrderApproved'
payload:
{
orderId: '123',
amount: 299.99,
customer: 'Alice'
}
+ CDC metadata (before, after, op, source)
Outbox Event
Router SMT
Output: Kafka Domain Event
Topic:outbox.event.Order
Key:'order-123'
Value:
{
orderId: '123',
amount: 299.99,
customer: 'Alice'
}
Headers:
id: '550e8400-...'
type: 'OrderApproved'
Mapping Summary:
aggregatetype
→ Topic Name
aggregateid
→ Kafka Key
type
→ Message Header
payload
→ Message Value
Преимущества трансформации:
  • Clean events: Consumer получает domain события без CDC metadata
  • Partition affinity: aggregateid как key гарантирует ordering per aggregate
  • Type-based filtering: Consumer может фильтровать по headers['type']
  • Deduplication: headers['id'] позволяет consumer реализовать idempotency

Что делает SMT:

  1. Reads outbox CDC event
  2. Routes to topic based on aggregatetype
  3. Extracts payload as message value
  4. Uses aggregateid as Kafka key (for partitioning)
  5. Adds id and type to message headers

Результат: Downstream consumers получают clean domain events без CDC metadata.

Базовая конфигурация Outbox Event Router

Minimal configuration для Outbox Event Router SMT:

{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",

    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "inventory",
    "database.server.name": "outbox-server",

    "table.include.list": "public.outbox",
    "publication.autocreate.mode": "filtered",

    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter"
  }
}

Критический параметр: table.include.list = "public.outbox" — захватываем ТОЛЬКО outbox-таблицу.

Почему table.include.list важен для Outbox

Без table.include.list Debezium захватывает ВСЕ таблицы БД:

dbserver.public.orders → kafka topic
dbserver.public.customers → kafka topic
dbserver.public.outbox → kafka topic (CDC event)

С table.include.list = "public.outbox":

dbserver.public.outbox → (Outbox Event Router) → outbox.event.Order
                                                → outbox.event.Customer

Outbox connector должен захватывать ТОЛЬКО outbox-таблицу. Business tables захватываются другими коннекторами (если нужно).

Проверка знаний
Почему для outbox-коннектора критически важен параметр table.include.list = 'public.outbox'? Что произойдет без него?
Ответ
Без table.include.list Debezium захватывает ВСЕ таблицы базы данных, и бизнес-данные (orders, customers) попадут в CDC pipeline вместе с outbox-событиями. Outbox connector должен захватывать ТОЛЬКО outbox-таблицу. Бизнес-таблицы при необходимости захватываются отдельными коннекторами. Это обеспечивает разделение ответственности: outbox-коннектор публикует domain events, а не raw CDC.

Complete Outbox Event Router Configuration

Production-ready configuration со всеми важными параметрами:

{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",

    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "inventory",
    "database.server.name": "outbox-server",

    "table.include.list": "public.outbox",
    "publication.autocreate.mode": "filtered",

    "slot.name": "outbox_slot",
    "plugin.name": "pgoutput",

    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregateid",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.by.field": "aggregatetype",
    "transforms.outbox.route.topic.replacement": "outbox.event.${routedByValue}",
    "transforms.outbox.table.expand.json.payload": "true"
  }
}

Разбор параметров SMT

ПараметрЗначениеОписание
transforms.outbox.typeio.debezium.transforms.outbox.EventRouterКласс SMT
table.field.event.ididПоле outbox для event ID (→ Kafka header)
table.field.event.keyaggregateidПоле для Kafka key (partitioning)
table.field.event.payloadpayloadПоле для message value
route.by.fieldaggregatetypeПоле для routing (Order → outbox.event.Order)
route.topic.replacementoutbox.event.${routedByValue}Шаблон имени topic
table.expand.json.payloadtrueПарсить JSON payload в structured format

Mapping: Outbox Table → Kafka Message

Outbox record:
  id = '550e8400-e29b-41d4-a716-446655440000'
  aggregatetype = 'Order'
  aggregateid = 'order-123'
  type = 'OrderApproved'
  payload = '{"orderId": "123", "amount": 299.99}'

↓ Outbox Event Router SMT ↓

Kafka message:
  Topic: outbox.event.Order
  Key: "order-123"
  Value: {"orderId": "123", "amount": 299.99}
  Headers:
    id: "550e8400-e29b-41d4-a716-446655440000"
    type: "OrderApproved"

table.expand.json.payload: Structured vs Raw JSON

С table.expand.json.payload = true:

Kafka message value — structured object:

{
  "orderId": "123",
  "amount": 299.99,
  "approvedAt": "2026-02-01T10:30:15Z"
}

С table.expand.json.payload = false (default):

Kafka message value — JSON string:

"{\"orderId\": \"123\", \"amount\": 299.99}"

Recommendation: Всегда используйте table.expand.json.payload = true для structured data. Consumer code проще и type-safe.

Outbox Table Creation

Создайте outbox-таблицу в PostgreSQL с правильной schema.

-- labs/schemas/outbox-table.sql
-- Outbox table for reliable event publishing
-- Used with Debezium Outbox Event Router SMT

CREATE TABLE IF NOT EXISTS public.outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregatetype VARCHAR(255) NOT NULL,
    aggregateid VARCHAR(255) NOT NULL,
    type VARCHAR(255) NOT NULL,
    payload JSONB NOT NULL,
    created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT NOW() NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_outbox_created ON public.outbox(created_at);

COMMENT ON TABLE public.outbox IS 'Transactional outbox for reliable event publishing via Debezium';
COMMENT ON COLUMN public.outbox.aggregatetype IS 'Domain entity type for topic routing (Order, Customer, Payment)';
COMMENT ON COLUMN public.outbox.aggregateid IS 'Entity ID used as Kafka partition key';
COMMENT ON COLUMN public.outbox.type IS 'Event type (OrderCreated, OrderApproved, etc.)';
COMMENT ON COLUMN public.outbox.payload IS 'Event data as JSON';

Почему индекс на created_at?

Индекс idx_outbox_created оптимизирует cleanup queries:

-- External cleanup job (если используете Strategy 2)
DELETE FROM outbox
WHERE created_at < NOW() - INTERVAL '1 hour';

Без индекса DELETE сканирует всю таблицу (slow на больших outbox).

Application Code: Event Emission Pattern

Теперь application code для публикации событий через outbox.

Python Example: Order Approval

import psycopg2
import json
import uuid
from datetime import datetime

def approve_order(order_id: str, amount: float):
    """
    Approve order and emit OrderApproved event via outbox.
    Transactional guarantee: both UPDATE and event INSERT are atomic.
    """
    conn = psycopg2.connect(
        host="localhost",
        port=5433,
        database="inventory",
        user="postgres",
        password="postgres"
    )
    cursor = conn.cursor()

    try:
        # BEGIN TRANSACTION (implicit)

        # Step 1: Business logic - update order status
        cursor.execute("""
            UPDATE orders
            SET status = 'APPROVED',
                updated_at = NOW()
            WHERE id = %s
        """, (order_id,))

        # Verify update succeeded
        if cursor.rowcount == 0:
            raise ValueError(f"Order {order_id} not found")

        # Step 2: Emit event to outbox (same transaction)
        event_id = str(uuid.uuid4())
        cursor.execute("""
            INSERT INTO outbox (id, aggregatetype, aggregateid, type, payload)
            VALUES (%s, %s, %s, %s, %s)
        """, (
            event_id,
            'Order',                    # aggregatetype → topic routing
            order_id,                   # aggregateid → Kafka key
            'OrderApproved',            # type → event header
            json.dumps({                # payload → message value
                'orderId': order_id,
                'amount': amount,
                'approvedAt': datetime.now().isoformat()
            })
        ))

        # COMMIT - both UPDATE and INSERT are atomic
        conn.commit()

        print(f"Order {order_id} approved, event {event_id} emitted")

    except Exception as e:
        conn.rollback()
        print(f"Failed to approve order: {e}")
        raise
    finally:
        cursor.close()
        conn.close()

# Usage
approve_order(order_id='order-123', amount=299.99)

Event Emission с Cleanup

Cleanup outbox сразу после INSERT (Strategy 1):

# Step 2: Emit event + cleanup in same transaction
event_id = str(uuid.uuid4())
cursor.execute("""
    WITH inserted AS (
        INSERT INTO outbox (id, aggregatetype, aggregateid, type, payload)
        VALUES (%s, %s, %s, %s, %s)
        RETURNING id
    )
    DELETE FROM outbox WHERE id = (SELECT id FROM inserted)
""", (event_id, 'Order', order_id, 'OrderApproved', json.dumps({...})))

conn.commit()

Debezium behavior: Обрабатывает INSERT, игнорирует DELETE (default для Outbox Event Router).

Важно: DELETE не доставляется как событие — это чистый cleanup mechanism.

Проверка знаний
Параметр table.expand.json.payload в Outbox Event Router может быть true или false. Какая разница для consumer, и какой вариант рекомендуется?
Ответ
С table.expand.json.payload = false (default) consumer получает экранированную JSON строку, требующую дополнительного JSON.parse(). С true consumer получает structured JSON object напрямую. Рекомендуется true, потому что consumer code проще и type-safe -- не нужен дополнительный этап десериализации для разбора строки.

Consumer Idempotency Pattern

Outbox Pattern гарантирует at-least-once delivery → consumers должны обрабатывать duplicate events.

Python Consumer с Deduplication

from confluent_kafka import Consumer
import json

# In-memory deduplication (production: Redis, database)
processed_events = set()

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-processor',
    'auto.offset.reset': 'earliest'
})

consumer.subscribe(['outbox.event.Order'])

print("Consuming events from outbox.event.Order...")

try:
    while True:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue

        if msg.error():
            print(f"Error: {msg.error()}")
            continue

        # Extract event ID from message headers
        event_id = None
        if msg.headers():
            for header_key, header_value in msg.headers():
                if header_key == 'id':
                    event_id = header_value.decode('utf-8')
                    break

        # Idempotency check
        if event_id and event_id in processed_events:
            print(f"[DUPLICATE] Event {event_id} already processed, skipping")
            consumer.commit()
            continue

        # Parse event
        event_type = None
        if msg.headers():
            for header_key, header_value in msg.headers():
                if header_key == 'type':
                    event_type = header_value.decode('utf-8')
                    break

        event_data = json.loads(msg.value().decode('utf-8'))

        # Process event based on type
        if event_type == 'OrderApproved':
            handle_order_approved(event_data)
        elif event_type == 'OrderCancelled':
            handle_order_cancelled(event_data)
        else:
            print(f"Unknown event type: {event_type}")

        # Mark as processed
        if event_id:
            processed_events.add(event_id)

        # Commit offset
        consumer.commit()

except KeyboardInterrupt:
    print("Shutting down consumer...")
finally:
    consumer.close()

def handle_order_approved(event):
    """Process OrderApproved event (idempotent)"""
    order_id = event['orderId']
    amount = event['amount']

    print(f"✅ Order {order_id} approved for ${amount}")

    # Send notification email, update analytics, etc.
    # This code MUST be idempotent (safe to execute multiple times)

def handle_order_cancelled(event):
    """Process OrderCancelled event (idempotent)"""
    order_id = event['orderId']
    print(f"❌ Order {order_id} cancelled")

Production Deduplication: Database

In-memory processed_events set теряется при consumer restart. Production-версия:

import psycopg2

def is_already_processed(event_id: str) -> bool:
    """Check if event was already processed (database-backed)"""
    conn = psycopg2.connect(...)
    cursor = conn.cursor()

    cursor.execute("""
        SELECT EXISTS(
            SELECT 1 FROM processed_events WHERE event_id = %s
        )
    """, (event_id,))

    exists = cursor.fetchone()[0]
    cursor.close()
    conn.close()

    return exists

def mark_as_processed(event_id: str):
    """Mark event as processed (database-backed)"""
    conn = psycopg2.connect(...)
    cursor = conn.cursor()

    cursor.execute("""
        INSERT INTO processed_events (event_id, processed_at)
        VALUES (%s, NOW())
        ON CONFLICT (event_id) DO NOTHING
    """, (event_id,))

    conn.commit()
    cursor.close()
    conn.close()

# Usage in consumer
if is_already_processed(event_id):
    print(f"Event {event_id} already processed")
    continue

handle_order_approved(event_data)
mark_as_processed(event_id)

processed_events table schema:

CREATE TABLE processed_events (
    event_id UUID PRIMARY KEY,
    processed_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_processed_events_timestamp ON processed_events(processed_at);

Cleanup strategy: Periodically удаляйте старые записи (processed_at < NOW() - INTERVAL '30 days') для контроля размера таблицы.

Lab: Полная реализация Outbox Pattern

Теперь соберем все вместе в live demo.

Шаг 1: Создайте outbox-таблицу

cd labs

# Создайте outbox-таблицу в PostgreSQL
docker exec -i postgres psql -U postgres -d inventory < schemas/outbox-table.sql

Проверьте:

docker exec -it postgres psql -U postgres -d inventory -c "\d outbox"

Expected output:

                          Table "public.outbox"
    Column     |              Type              | Modifiers
---------------+--------------------------------+-----------
 id            | uuid                           | not null default gen_random_uuid()
 aggregatetype | character varying(255)         | not null
 aggregateid   | character varying(255)         | not null
 type          | character varying(255)         | not null
 payload       | jsonb                          | not null
 created_at    | timestamp without time zone    | not null default now()

Шаг 2: Deploy Outbox Connector

Создайте connector configuration:

# connectors/outbox-connector.json
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "outbox-connector",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "tasks.max": "1",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "postgres",
      "database.password": "postgres",
      "database.dbname": "inventory",
      "database.server.name": "outbox-server",
      "table.include.list": "public.outbox",
      "publication.autocreate.mode": "filtered",
      "slot.name": "outbox_slot",
      "plugin.name": "pgoutput",
      "transforms": "outbox",
      "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
      "transforms.outbox.table.field.event.id": "id",
      "transforms.outbox.table.field.event.key": "aggregateid",
      "transforms.outbox.table.field.event.payload": "payload",
      "transforms.outbox.route.by.field": "aggregatetype",
      "transforms.outbox.route.topic.replacement": "outbox.event.${routedByValue}",
      "transforms.outbox.table.expand.json.payload": "true"
    }
  }'

Проверьте status:

curl http://localhost:8083/connectors/outbox-connector/status | jq

Expected output:

{
  "name": "outbox-connector",
  "connector": {
    "state": "RUNNING"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING"
    }
  ]
}

Шаг 3: Emit Test Event

Вставьте тестовое событие в outbox:

docker exec -it postgres psql -U postgres -d inventory -c "
INSERT INTO outbox (id, aggregatetype, aggregateid, type, payload)
VALUES (
  gen_random_uuid(),
  'Order',
  'order-123',
  'OrderApproved',
  '{\"orderId\": \"order-123\", \"amount\": 299.99, \"approvedAt\": \"2026-02-01T10:30:15Z\"}'::jsonb
);
"

Шаг 4: Verify Topic Created

docker exec -it kafka kafka-topics --bootstrap-server kafka:9092 --list | grep outbox

Expected output:

outbox.event.Order

Шаг 5: Consume Event

В одном терминале:

docker exec -it kafka kafka-console-consumer \
  --bootstrap-server kafka:9092 \
  --topic outbox.event.Order \
  --from-beginning \
  --property print.headers=true \
  --property print.key=true

Expected output:

id:550e8400-e29b-41d4-a716-446655440000,type:OrderApproved    order-123    {"orderId":"order-123","amount":299.99,"approvedAt":"2026-02-01T10:30:15Z"}

Разбор:

  • Headers: id=..., type=OrderApproved
  • Key: order-123 (aggregateid)
  • Value: Clean JSON payload

Success! Outbox Event Router преобразовал CDC event в domain event.

Шаг 6: Проверьте Cleanup

Если вы использовали DELETE в той же транзакции:

docker exec -it postgres psql -U postgres -d inventory -c "SELECT COUNT(*) FROM outbox;"

Expected output:

 count
-------
     0

Outbox пустая — события удалены после захвата Debezium.

Advanced: Multiple Aggregates

Outbox Pattern поддерживает разные aggregate types в одной таблице:

-- Order events → outbox.event.Order
INSERT INTO outbox (id, aggregatetype, aggregateid, type, payload)
VALUES (gen_random_uuid(), 'Order', 'order-123', 'OrderCreated', '...'::jsonb);

-- Customer events → outbox.event.Customer
INSERT INTO outbox (id, aggregatetype, aggregateid, type, payload)
VALUES (gen_random_uuid(), 'Customer', 'customer-456', 'CustomerRegistered', '...'::jsonb);

-- Payment events → outbox.event.Payment
INSERT INTO outbox (id, aggregatetype, aggregateid, type, payload)
VALUES (gen_random_uuid(), 'Payment', 'payment-789', 'PaymentProcessed', '...'::jsonb);

Результат:

outbox.event.Order topic:
  - OrderCreated events
  - OrderApproved events
  - OrderCancelled events

outbox.event.Customer topic:
  - CustomerRegistered events
  - CustomerUpdated events

outbox.event.Payment topic:
  - PaymentProcessed events
  - PaymentFailed events

Single outbox table, multiple topics — routing по aggregatetype.

Configuration Troubleshooting

Problem: Topic не создается

Симптом: События вставляются в outbox, но topic outbox.event.Order не появляется.

Диагностика:

  1. Проверьте connector status:

    curl http://localhost:8083/connectors/outbox-connector/status
  2. Проверьте logs:

    docker logs connect | grep outbox
  3. Проверьте table.include.list:

    "table.include.list": "public.outbox"

    Если outbox в другой schema — укажите полное имя.

Problem: Message value — JSON string, а не object

Симптом: Kafka message value:

"{\"orderId\": \"123\"}"

Вместо:

{"orderId": "123"}

Решение: Добавьте table.expand.json.payload = true в SMT config.

Problem: Duplicate events

Симптом: Consumer обрабатывает одно событие несколько раз.

Причина: At-least-once delivery — это нормально для Outbox Pattern.

Решение: Implement idempotency в consumer (см. раздел Consumer Idempotency Pattern).

Production Checklist

Перед деплоем Outbox Pattern в production:

  • Outbox table создана с индексом на created_at
  • Connector deployed с table.include.list = "public.outbox"
  • Outbox Event Router SMT настроен с table.expand.json.payload = true
  • Cleanup strategy выбрана (DELETE в транзакции или external job)
  • Consumer idempotency реализована (database-backed deduplication)
  • Monitoring настроен (lag metrics для outbox connector)
  • Error handling в application (rollback при failure)
  • Event schema documented (aggregatetype, type, payload structure)

Что дальше?

Вы реализовали complete Outbox Pattern с Debezium Outbox Event Router SMT. Теперь ваши микросервисы могут reliably публиковать события с transactional guarantees.

Следующий advanced паттерн — Schema Registry integration для schema evolution и structured serialization (Avro). Это позволит контролировать совместимость событий между producer и consumer.

Ключевые выводы

  1. Outbox Event Router SMT преобразует CDC events в domain events
  2. table.include.list = “public.outbox” — захватываем ТОЛЬКО outbox-таблицу
  3. table.expand.json.payload = true — structured JSON вместо string
  4. aggregatetype → topic routing, aggregateid → Kafka key
  5. Event ID в header для consumer deduplication
  6. Cleanup strategy: DELETE в транзакции (simple) или external job (scale)
  7. Consumer idempotency обязательна — database-backed deduplication для production
  8. Single outbox table может обрабатывать multiple aggregate types
  9. Labs DDL: labs/schemas/outbox-table.sql для быстрого развертывания
  10. Production checklist — проверьте все перед деплоем

Check Your Understanding

Score: 0 of 0
Analytical
Question 1 of 3. Outbox Event Router SMT по умолчанию удаляет (игнорирует) DELETE-события из outbox-таблицы. Почему это правильное поведение, и что произойдет, если router будет обрабатывать DELETE-события?

Finished the lesson?

Mark it as complete to track your progress