Требуемые знания:
- module-4/03-pii-masking
Content-Based Event Routing
Вы настроили CDC, события поступают в Kafka. По умолчанию каждая таблица создает один топик — все изменения в customers попадают в inventory.public.customers. Но что если вам нужно:
- Multi-tenant SaaS: Каждый tenant получает свой топик для изоляции данных
- Региональное compliance: EU данные → EU топик, US данные → US топик
- Priority queues: VIP заказы → приоритетный топик для ускоренной обработки
Content-based routing — набор SMT трансформаций, которые маршрутизируют события в разные топики на основе содержимого полей.
Почему нужна маршрутизация событий?
Сценарий 1: Multi-tenant SaaS
У вас единая база данных с колонкой tenant_id. Каждый tenant должен получать только свои события из отдельного топика.
Каждый tenant получает свой топик
Без routing: Один топик orders, consumers фильтруют по tenant_id → избыточный трафик.
С routing: Отдельные топики, каждый consumer подписан только на нужный топик → эффективно.
Сценарий 2: Региональное compliance (GDPR)
GDPR требует: EU персональные данные не должны покидать EU region.
Решение: Маршрутизировать EU события в EU Kafka cluster, US события в US Kafka cluster.
// Routing based on region field
"topic.expression": "value.after.region == 'EU' ? 'events-eu' : 'events-us'"
Сценарий 3: Consolidation разрозненных таблиц
У вас sharded таблицы (customers_shard_0, customers_shard_1, …, customers_shard_9). Downstream нужен единый топик customers для удобства обработки.
Маршрутизация событий на основе содержимого полей
→ events-eu
ByLogicalTableRouter: Regex-based Routing
ByLogicalTableRouter — Debezium SMT для переименования топиков с помощью regex паттернов.
Базовый пример: Consolidation sharded таблиц
Проблема: Таблицы customers_us, customers_eu создают топики inventory.public.customers_us и inventory.public.customers_eu.
Цель: Объединить в один топик inventory.public.customers.
{
"name": "customers-consolidated-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"table.include.list": "public.customers_us,public.customers_eu",
"topic.prefix": "inventory",
"transforms": "route",
"transforms.route.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.route.topic.regex": "inventory.public.customers_(.*)",
"transforms.route.topic.replacement": "inventory.public.customers"
}
}
Как работает:
- topic.regex:
inventory.public.customers_(.*)— захватывает суффикс (us,eu) в группу$1 - topic.replacement:
inventory.public.customers— заменяет имя топика (игнорирует$1)
Результат:
- События из
customers_us→ топикinventory.public.customers - События из
customers_eu→ топикinventory.public.customers
Использование regex групп
Вы можете использовать захваченные группы в replacement:
{
"transforms.route.topic.regex": "(.*).public.customers_(.*)",
"transforms.route.topic.replacement": "$1.customers.$2"
}
Преобразование:
inventory.public.customers_us→inventory.customers.usinventory.public.customers_eu→inventory.customers.eu
Key uniqueness для merged топиков
Проблема: При объединении таблиц в один топик ключи могут пересекаться.
Пример:
customers_usимеет записьid=1customers_euимеет записьid=1
Оба события попадут в топик customers с ключом {"id": 1}, что вызовет collision в log compaction.
Решение: Добавить key.enforce.uniqueness для префиксирования ключа именем таблицы.
{
"transforms": "route",
"transforms.route.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.route.topic.regex": "inventory.public.customers_(.*)",
"transforms.route.topic.replacement": "inventory.public.customers",
"transforms.route.key.enforce.uniqueness": "true",
"transforms.route.key.field.name": "_source_table"
}
Результат ключа:
{
"id": 1,
"_source_table": "customers_us"
}
Теперь ключи уникальны: (id=1, _source_table=customers_us) ≠ (id=1, _source_table=customers_eu).
ContentBasedRouter: Field Value Routing
ContentBasedRouter — более гибкий SMT, маршрутизирующий на основе значений полей с использованием Groovy expressions.
Установка Groovy в Debezium
ContentBasedRouter требует JSR 223 scripting engine (Groovy). В Debezium 2.5.4 Groovy не включен по умолчанию.
Добавление Groovy в Docker-образ:
# labs/Dockerfile.debezium-with-groovy
FROM quay.io/debezium/connect:2.5.4.Final
# Download Groovy JSR 223 engine
USER root
RUN mkdir -p /kafka/connect/debezium-connector-postgres/groovy && \
cd /tmp && \
wget https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-jsr223/3.0.19/groovy-jsr223-3.0.19.jar && \
wget https://repo1.maven.org/maven2/org/codehaus/groovy/groovy/3.0.19/groovy-3.0.19.jar && \
mv groovy*.jar /kafka/connect/debezium-connector-postgres/groovy/
USER kafka
Пересоберите образ и перезапустите контейнер.
Базовый пример: Regional routing
Цель: Маршрутизировать события на основе поля region.
{
"name": "orders-regional-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"table.include.list": "public.orders",
"topic.prefix": "inventory",
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
"transforms.route.language": "jsr223.groovy",
"transforms.route.topic.expression": "value.after.region == 'EU' ? 'orders-eu' : 'orders-us'"
}
}
Как работает:
Маршрутизация по регионам для compliance
- GDPR compliance (EU data residency)
- Региональная изоляция для latency optimization
- Multi-region Kafka clusters
Результат:
- Событие с
"region": "EU"→ топикorders-eu - Событие с
"region": "US"→ топикorders-us
Важно:
transforms.routeдолжен идти послеunwrap, чтобы иметь доступ к плоскому полюregion(а не вложенномуafter.region).
Groovy expression синтаксис
В expression доступны:
value— десериализованное значение записи (Struct)value.after— полеafter(для Debezium envelope)value.fieldName— любое поле в recordkey— ключ записиtopic— исходный топикheaders— message headers
Примеры выражений:
// Routing по типу события
value.op == 'c' ? 'creates' : 'updates'
// Multi-tenant routing
'orders-tenant-' + value.tenant_id
// Conditional с default
value.priority == 'HIGH' ? 'orders-priority' : 'orders-standard'
// Null safety
value.region != null && value.region == 'EU' ? 'orders-eu' : 'orders-default'
Пример: Multi-tenant SaaS
Схема таблицы:
CREATE TABLE public.events (
id SERIAL PRIMARY KEY,
tenant_id VARCHAR(50) NOT NULL,
event_type VARCHAR(50),
payload JSONB,
created_at TIMESTAMP DEFAULT NOW()
);
Конфигурация:
{
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
"transforms.route.language": "jsr223.groovy",
"transforms.route.topic.expression": "'events-' + value.tenant_id"
}
Результат:
tenant_id = 'acme'→ топикevents-acmetenant_id = 'globex'→ топикevents-globex
Проверка знанийЧем отличается ByLogicalTableRouter от ContentBasedRouter? В каком сценарии нужен каждый?
Предикаты: Routing с защитой от ошибок
Проблема: ContentBasedRouter применяется ко всем событиям, включая heartbeat и tombstone, у которых нет поля after.
Симптом: Ошибка NullPointerException при оценке value.after.region.
Решение: Используйте предикаты для применения routing только к data events.
{
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.predicate": "IsDataEvent",
"transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
"transforms.route.language": "jsr223.groovy",
"transforms.route.topic.expression": "value.region == 'EU' ? 'orders-eu' : 'orders-us'",
"transforms.route.predicate": "IsDataEvent",
"predicates": "IsDataEvent",
"predicates.IsDataEvent.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsDataEvent.pattern": "inventory.public.orders"
}
Как работает:
- Heartbeat события (топик
__debezium-heartbeat.*) → не обрабатываются routing SMT - Data события (топик
inventory.public.orders) → обрабатываются routing SMT
Комбинация нескольких routing стратегий
Вы можете комбинировать ByLogicalTableRouter и ContentBasedRouter:
{
"transforms": "consolidate,route",
"transforms.consolidate.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.consolidate.topic.regex": "inventory.public.orders_(.*)",
"transforms.consolidate.topic.replacement": "inventory.public.orders",
"transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
"transforms.route.language": "jsr223.groovy",
"transforms.route.topic.expression": "value.after.region == 'EU' ? 'orders-eu' : 'orders-us'"
}
Порядок выполнения:
consolidate— объединяетorders_shard_0,orders_shard_1→ordersroute— маршрутизирует объединенные события →orders-euилиorders-us
Lab: Regional routing для заказов
Давайте настроим content-based routing для разделения заказов по регионам.
Шаг 1: Создайте таблицу с региональными данными
CREATE TABLE public.orders_regional (
id SERIAL PRIMARY KEY,
customer_id INT NOT NULL,
region VARCHAR(10) NOT NULL CHECK (region IN ('EU', 'US', 'APAC')),
amount DECIMAL(10, 2),
status VARCHAR(20),
created_at TIMESTAMP DEFAULT NOW()
);
INSERT INTO public.orders_regional (customer_id, region, amount, status) VALUES
(101, 'EU', 150.00, 'PENDING'),
(102, 'US', 200.50, 'APPROVED'),
(103, 'APAC', 99.99, 'PENDING'),
(104, 'EU', 500.00, 'SHIPPED');
Шаг 2: Создайте коннектор с regional routing
{
"name": "orders-regional-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "inventory",
"topic.prefix": "inventory",
"table.include.list": "public.orders_regional",
"plugin.name": "pgoutput",
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
"transforms.route.language": "jsr223.groovy",
"transforms.route.topic.expression": "'orders-' + value.region.toLowerCase()"
}
}
Expression: 'orders-' + value.region.toLowerCase()
region='EU'→ топикorders-euregion='US'→ топикorders-usregion='APAC'→ топикorders-apac
Шаг 3: Зарегистрируйте коннектор
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @orders-regional-connector.json
Шаг 4: Проверьте созданные топики
kafka-topics --bootstrap-server kafka:9092 --list | grep orders
# Ожидаемый вывод:
# orders-eu
# orders-us
# orders-apac
Шаг 5: Потребите события из региональных топиков
EU consumer:
from confluent_kafka import Consumer
import json
consumer = Consumer({
'bootstrap.servers': 'kafka:9092',
'group.id': 'eu-orders-processor',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders-eu'])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
order = json.loads(msg.value().decode('utf-8'))
print(f"[EU] Order {order['id']}: {order['amount']} - {order['status']}")
US consumer:
consumer.subscribe(['orders-us'])
# ... аналогично для US топика
Преимущество: Каждый региональный consumer обрабатывает только релевантные события, без фильтрации в application logic.
Проверка знанийПочему null safety критична для ContentBasedRouter? Что произойдет с выражением value.region == 'EU', если поле region содержит NULL?
Важные особенности и ограничения
1. Groovy execution overhead
ContentBasedRouter выполняет Groovy expression для каждого события. Сложные expressions добавляют latency.
Бенчмарк (Debezium 2.5.4, 10K events/sec):
- Простое expression (
value.region == 'EU') — +1-2ms overhead - Сложное expression (несколько условий, функции) — +5-10ms overhead
Рекомендация: Держите expressions простыми. Для сложной логики используйте Kafka Streams.
2. Topic autocreation
Если Kafka настроен с auto.create.topics.enable=true, ContentBasedRouter создаст топики автоматически при первом событии.
Проблема: Топики создаются с default настройками (partitions, replication factor), которые могут не подходить для production.
Решение: Предварительно создайте топики с правильными настройками:
kafka-topics --bootstrap-server kafka:9092 --create \
--topic orders-eu \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=604800000
3. Null safety в expressions
Если поле region может быть null, expression упадет с NullPointerException.
Небезопасное expression:
value.region == 'EU' ? 'orders-eu' : 'orders-us'
// Падает если region == null!
Безопасное expression:
value.region != null && value.region == 'EU' ? 'orders-eu' : 'orders-default'
4. Schema changes в routing field
Если вы добавите constraint CHECK (region IN ('EU', 'US', 'APAC')) после запуска коннектора, старые события без constraint могут иметь недопустимые значения.
Рекомендация: Добавляйте else fallback в routing expression:
value.region == 'EU' ? 'orders-eu' :
value.region == 'US' ? 'orders-us' :
value.region == 'APAC' ? 'orders-apac' :
'orders-unknown' // Fallback для неожиданных значений
Production паттерны
1. Default topic для unknown values
"transforms.route.topic.expression": "value.priority == 'HIGH' ? 'orders-priority' : 'orders-standard'"
Если добавится новое значение priority = 'CRITICAL', события попадут в orders-standard (что может быть неверно).
Улучшенная версия:
value.priority == 'HIGH' ? 'orders-priority' :
value.priority == 'CRITICAL' ? 'orders-critical' :
'orders-standard'
2. Логирование routing decisions
ContentBasedRouter не логирует, в какой топик попало событие. Для отладки добавьте метаданные:
{
"transforms": "unwrap,addMetadata,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "table,op",
"transforms.addMetadata.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addMetadata.static.field": "_routed_by",
"transforms.addMetadata.static.value": "ContentBasedRouter",
"transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
"transforms.route.language": "jsr223.groovy",
"transforms.route.topic.expression": "value.region == 'EU' ? 'orders-eu' : 'orders-us'"
}
Теперь каждое событие содержит "_routed_by": "ContentBasedRouter" для observability.
3. Тестирование routing logic
Перед production deployment:
- Создайте test connector с sample данными
- Запустите события через разные routing paths
- Проверьте, что события попали в правильные топики
# Проверка топиков после snapshot
kafka-console-consumer --bootstrap-server kafka:9092 \
--topic orders-eu \
--from-beginning \
--max-messages 10
Alternatives: Когда НЕ использовать routing SMT
| Сценарий | Вместо SMT используйте |
|---|---|
| Сложная stateful маршрутизация (требуется state store, joins) | Kafka Streams |
| External service lookup (получение tenant_id из API) | Kafka Connect SMT не поддерживает external calls; используйте Kafka Streams или consumer-side logic |
| Post-processing маршрутизация (routing после агрегации) | Kafka Streams topology |
| A/B testing маршрутизация (random split) | Consumer-side logic или Kafka Streams |
Правило: Используйте routing SMT для простых, deterministic, field-based решений. Для всего остального — Kafka Streams.
Что мы узнали
- Content-based routing маршрутизирует события в разные топики на основе содержимого полей
- ByLogicalTableRouter использует regex для consolidation sharded таблиц
- key.enforce.uniqueness предотвращает key collision в merged топиках
- ContentBasedRouter использует Groovy expressions для flexible routing
- Предикаты обязательны для защиты от ошибок на heartbeat/tombstone событиях
- Groovy требует JSR 223 engine — установите groovy-jsr223.jar в Connect plugin directory
- Null safety критична — используйте
value.field != nullchecks - Production: Pre-create топики, тестируйте routing logic, добавляйте fallback defaults
Что дальше?
В следующем уроке мы изучим Outbox Event Router — специализированную SMT для реализации Outbox pattern в микросервисных архитектурах. Это позволяет публиковать domain events transactionally вместе с бизнес-логикой.
Проверьте понимание
Закончили урок?
Отметьте его как пройденный, чтобы отслеживать свой прогресс