Skip to content
Learning Platform
Intermediate
30 minutes
routing smt multi-tenant regional

Prerequisites:

  • module-4/03-pii-masking

Content-Based Event Routing

Вы настроили CDC, события поступают в Kafka. По умолчанию каждая таблица создает один топик — все изменения в customers попадают в inventory.public.customers. Но что если вам нужно:

  • Multi-tenant SaaS: Каждый tenant получает свой топик для изоляции данных
  • Региональное compliance: EU данные → EU топик, US данные → US топик
  • Priority queues: VIP заказы → приоритетный топик для ускоренной обработки

Content-based routing — набор SMT трансформаций, которые маршрутизируют события в разные топики на основе содержимого полей.

Почему нужна маршрутизация событий?

Сценарий 1: Multi-tenant SaaS

У вас единая база данных с колонкой tenant_id. Каждый tenant должен получать только свои события из отдельного топика.

Multi-Tenant Routing: Изоляция данных

Каждый tenant получает свой топик

orders (multi-tenant)tenant_id: 'acme' | 'globex'
CDC capture
ContentBasedRouterRoute by tenant_id
tenant_id='acme'
orders-acme
tenant_id='globex'
orders-globex

Без routing: Один топик orders, consumers фильтруют по tenant_id → избыточный трафик. С routing: Отдельные топики, каждый consumer подписан только на нужный топик → эффективно.

Сценарий 2: Региональное compliance (GDPR)

GDPR требует: EU персональные данные не должны покидать EU region.

Решение: Маршрутизировать EU события в EU Kafka cluster, US события в US Kafka cluster.

// Routing based on region field
"topic.expression": "value.after.region == 'EU' ? 'events-eu' : 'events-us'"

Сценарий 3: Consolidation разрозненных таблиц

У вас sharded таблицы (customers_shard_0, customers_shard_1, …, customers_shard_9). Downstream нужен единый топик customers для удобства обработки.

Content-Based Router: Принцип работы

Маршрутизация событий на основе содержимого полей

customersregion: 'EU' | 'US'
CDC events
Debezium CDC
CDC envelope
ContentBasedRouterif (region == 'EU')
→ events-eu
region='EU'
events-eu
region='US'
events-us

ByLogicalTableRouter: Regex-based Routing

ByLogicalTableRouter — Debezium SMT для переименования топиков с помощью regex паттернов.

Базовый пример: Consolidation sharded таблиц

Проблема: Таблицы customers_us, customers_eu создают топики inventory.public.customers_us и inventory.public.customers_eu. Цель: Объединить в один топик inventory.public.customers.

{
  "name": "customers-consolidated-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "table.include.list": "public.customers_us,public.customers_eu",
    "topic.prefix": "inventory",

    "transforms": "route",
    "transforms.route.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.route.topic.regex": "inventory.public.customers_(.*)",
    "transforms.route.topic.replacement": "inventory.public.customers"
  }
}

Как работает:

  • topic.regex: inventory.public.customers_(.*) — захватывает суффикс (us, eu) в группу $1
  • topic.replacement: inventory.public.customers — заменяет имя топика (игнорирует $1)

Результат:

  • События из customers_us → топик inventory.public.customers
  • События из customers_eu → топик inventory.public.customers

Использование regex групп

Вы можете использовать захваченные группы в replacement:

{
  "transforms.route.topic.regex": "(.*).public.customers_(.*)",
  "transforms.route.topic.replacement": "$1.customers.$2"
}

Преобразование:

  • inventory.public.customers_usinventory.customers.us
  • inventory.public.customers_euinventory.customers.eu

Key uniqueness для merged топиков

Проблема: При объединении таблиц в один топик ключи могут пересекаться.

Пример:

  • customers_us имеет запись id=1
  • customers_eu имеет запись id=1

Оба события попадут в топик customers с ключом {"id": 1}, что вызовет collision в log compaction.

Решение: Добавить key.enforce.uniqueness для префиксирования ключа именем таблицы.

{
  "transforms": "route",
  "transforms.route.type": "io.debezium.transforms.ByLogicalTableRouter",
  "transforms.route.topic.regex": "inventory.public.customers_(.*)",
  "transforms.route.topic.replacement": "inventory.public.customers",
  "transforms.route.key.enforce.uniqueness": "true",
  "transforms.route.key.field.name": "_source_table"
}

Результат ключа:

{
  "id": 1,
  "_source_table": "customers_us"
}

Теперь ключи уникальны: (id=1, _source_table=customers_us)(id=1, _source_table=customers_eu).

ContentBasedRouter: Field Value Routing

ContentBasedRouter — более гибкий SMT, маршрутизирующий на основе значений полей с использованием Groovy expressions.

Установка Groovy в Debezium

ContentBasedRouter требует JSR 223 scripting engine (Groovy). В Debezium 2.5.4 Groovy не включен по умолчанию.

Добавление Groovy в Docker-образ:

# labs/Dockerfile.debezium-with-groovy
FROM quay.io/debezium/connect:2.5.4.Final

# Download Groovy JSR 223 engine
USER root
RUN mkdir -p /kafka/connect/debezium-connector-postgres/groovy && \
    cd /tmp && \
    wget https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-jsr223/3.0.19/groovy-jsr223-3.0.19.jar && \
    wget https://repo1.maven.org/maven2/org/codehaus/groovy/groovy/3.0.19/groovy-3.0.19.jar && \
    mv groovy*.jar /kafka/connect/debezium-connector-postgres/groovy/

USER kafka

Пересоберите образ и перезапустите контейнер.

Базовый пример: Regional routing

Цель: Маршрутизировать события на основе поля region.

{
  "name": "orders-regional-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "table.include.list": "public.orders",
    "topic.prefix": "inventory",

    "transforms": "unwrap,route",

    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",

    "transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
    "transforms.route.language": "jsr223.groovy",
    "transforms.route.topic.expression": "value.after.region == 'EU' ? 'orders-eu' : 'orders-us'"
  }
}

Как работает:

Region-Based Routing: GDPR Compliance

Маршрутизация по регионам для compliance

usersregion: 'EU' | 'US' | 'APAC'
CDC events
RegionRouterRoute by region field
EU
users-eu
US
users-us
APAC
users-apac
Когда использовать?
  • GDPR compliance (EU data residency)
  • Региональная изоляция для latency optimization
  • Multi-region Kafka clusters

Результат:

  • Событие с "region": "EU" → топик orders-eu
  • Событие с "region": "US" → топик orders-us

Важно: transforms.route должен идти после unwrap, чтобы иметь доступ к плоскому полю region (а не вложенному after.region).

Groovy expression синтаксис

В expression доступны:

  • value — десериализованное значение записи (Struct)
  • value.after — поле after (для Debezium envelope)
  • value.fieldName — любое поле в record
  • key — ключ записи
  • topic — исходный топик
  • headers — message headers

Примеры выражений:

// Routing по типу события
value.op == 'c' ? 'creates' : 'updates'

// Multi-tenant routing
'orders-tenant-' + value.tenant_id

// Conditional с default
value.priority == 'HIGH' ? 'orders-priority' : 'orders-standard'

// Null safety
value.region != null && value.region == 'EU' ? 'orders-eu' : 'orders-default'

Пример: Multi-tenant SaaS

Схема таблицы:

CREATE TABLE public.events (
    id SERIAL PRIMARY KEY,
    tenant_id VARCHAR(50) NOT NULL,
    event_type VARCHAR(50),
    payload JSONB,
    created_at TIMESTAMP DEFAULT NOW()
);

Конфигурация:

{
  "transforms": "unwrap,route",

  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",

  "transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
  "transforms.route.language": "jsr223.groovy",
  "transforms.route.topic.expression": "'events-' + value.tenant_id"
}

Результат:

  • tenant_id = 'acme' → топик events-acme
  • tenant_id = 'globex' → топик events-globex
Проверка знаний
Чем отличается ByLogicalTableRouter от ContentBasedRouter? В каком сценарии нужен каждый?
Ответ
ByLogicalTableRouter маршрутизирует по имени топика (regex на имя таблицы) -- используется для объединения sharded таблиц в один топик. ContentBasedRouter маршрутизирует по значению поля в событии (Groovy expression) -- используется для multi-tenant routing, региональной маршрутизации, priority queues. Например, consolidation customers_shard_0/1/2 в один топик -- ByLogicalTableRouter, а routing по region EU/US -- ContentBasedRouter.

Предикаты: Routing с защитой от ошибок

Проблема: ContentBasedRouter применяется ко всем событиям, включая heartbeat и tombstone, у которых нет поля after.

Симптом: Ошибка NullPointerException при оценке value.after.region.

Решение: Используйте предикаты для применения routing только к data events.

{
  "transforms": "unwrap,route",

  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.predicate": "IsDataEvent",

  "transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
  "transforms.route.language": "jsr223.groovy",
  "transforms.route.topic.expression": "value.region == 'EU' ? 'orders-eu' : 'orders-us'",
  "transforms.route.predicate": "IsDataEvent",

  "predicates": "IsDataEvent",
  "predicates.IsDataEvent.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.IsDataEvent.pattern": "inventory.public.orders"
}

Как работает:

  • Heartbeat события (топик __debezium-heartbeat.*) → не обрабатываются routing SMT
  • Data события (топик inventory.public.orders) → обрабатываются routing SMT

Комбинация нескольких routing стратегий

Вы можете комбинировать ByLogicalTableRouter и ContentBasedRouter:

{
  "transforms": "consolidate,route",

  "transforms.consolidate.type": "io.debezium.transforms.ByLogicalTableRouter",
  "transforms.consolidate.topic.regex": "inventory.public.orders_(.*)",
  "transforms.consolidate.topic.replacement": "inventory.public.orders",

  "transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
  "transforms.route.language": "jsr223.groovy",
  "transforms.route.topic.expression": "value.after.region == 'EU' ? 'orders-eu' : 'orders-us'"
}

Порядок выполнения:

  1. consolidate — объединяет orders_shard_0, orders_shard_1orders
  2. route — маршрутизирует объединенные события → orders-eu или orders-us

Lab: Regional routing для заказов

Давайте настроим content-based routing для разделения заказов по регионам.

Шаг 1: Создайте таблицу с региональными данными

CREATE TABLE public.orders_regional (
    id SERIAL PRIMARY KEY,
    customer_id INT NOT NULL,
    region VARCHAR(10) NOT NULL CHECK (region IN ('EU', 'US', 'APAC')),
    amount DECIMAL(10, 2),
    status VARCHAR(20),
    created_at TIMESTAMP DEFAULT NOW()
);

INSERT INTO public.orders_regional (customer_id, region, amount, status) VALUES
(101, 'EU', 150.00, 'PENDING'),
(102, 'US', 200.50, 'APPROVED'),
(103, 'APAC', 99.99, 'PENDING'),
(104, 'EU', 500.00, 'SHIPPED');

Шаг 2: Создайте коннектор с regional routing

{
  "name": "orders-regional-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "inventory",
    "topic.prefix": "inventory",
    "table.include.list": "public.orders_regional",
    "plugin.name": "pgoutput",

    "transforms": "unwrap,route",

    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",

    "transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
    "transforms.route.language": "jsr223.groovy",
    "transforms.route.topic.expression": "'orders-' + value.region.toLowerCase()"
  }
}

Expression: 'orders-' + value.region.toLowerCase()

  • region='EU' → топик orders-eu
  • region='US' → топик orders-us
  • region='APAC' → топик orders-apac

Шаг 3: Зарегистрируйте коннектор

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @orders-regional-connector.json

Шаг 4: Проверьте созданные топики

kafka-topics --bootstrap-server kafka:9092 --list | grep orders

# Ожидаемый вывод:
# orders-eu
# orders-us
# orders-apac

Шаг 5: Потребите события из региональных топиков

EU consumer:

from confluent_kafka import Consumer
import json

consumer = Consumer({
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'eu-orders-processor',
    'auto.offset.reset': 'earliest'
})

consumer.subscribe(['orders-eu'])

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue

    order = json.loads(msg.value().decode('utf-8'))
    print(f"[EU] Order {order['id']}: {order['amount']} - {order['status']}")

US consumer:

consumer.subscribe(['orders-us'])
# ... аналогично для US топика

Преимущество: Каждый региональный consumer обрабатывает только релевантные события, без фильтрации в application logic.

Проверка знаний
Почему null safety критична для ContentBasedRouter? Что произойдет с выражением value.region == 'EU', если поле region содержит NULL?
Ответ
Groovy не может выполнить операцию сравнения с null: выражение null == 'EU' вызовет NullPointerException, и коннектор перейдет в FAILED state. Безопасное выражение: value.region != null && value.region == 'EU' ? 'orders-eu' : 'orders-default'. Это распространенная ошибка, потому что разработчики тестируют с валидными данными, но в production встречаются записи с NULL-значениями.

Важные особенности и ограничения

1. Groovy execution overhead

ContentBasedRouter выполняет Groovy expression для каждого события. Сложные expressions добавляют latency.

Бенчмарк (Debezium 2.5.4, 10K events/sec):

  • Простое expression (value.region == 'EU') — +1-2ms overhead
  • Сложное expression (несколько условий, функции) — +5-10ms overhead

Рекомендация: Держите expressions простыми. Для сложной логики используйте Kafka Streams.

2. Topic autocreation

Если Kafka настроен с auto.create.topics.enable=true, ContentBasedRouter создаст топики автоматически при первом событии.

Проблема: Топики создаются с default настройками (partitions, replication factor), которые могут не подходить для production.

Решение: Предварительно создайте топики с правильными настройками:

kafka-topics --bootstrap-server kafka:9092 --create \
  --topic orders-eu \
  --partitions 6 \
  --replication-factor 3 \
  --config retention.ms=604800000

3. Null safety в expressions

Если поле region может быть null, expression упадет с NullPointerException.

Небезопасное expression:

value.region == 'EU' ? 'orders-eu' : 'orders-us'
// Падает если region == null!

Безопасное expression:

value.region != null && value.region == 'EU' ? 'orders-eu' : 'orders-default'

4. Schema changes в routing field

Если вы добавите constraint CHECK (region IN ('EU', 'US', 'APAC')) после запуска коннектора, старые события без constraint могут иметь недопустимые значения.

Рекомендация: Добавляйте else fallback в routing expression:

value.region == 'EU' ? 'orders-eu' :
value.region == 'US' ? 'orders-us' :
value.region == 'APAC' ? 'orders-apac' :
'orders-unknown'  // Fallback для неожиданных значений

Production паттерны

1. Default topic для unknown values

"transforms.route.topic.expression": "value.priority == 'HIGH' ? 'orders-priority' : 'orders-standard'"

Если добавится новое значение priority = 'CRITICAL', события попадут в orders-standard (что может быть неверно).

Улучшенная версия:

value.priority == 'HIGH' ? 'orders-priority' :
value.priority == 'CRITICAL' ? 'orders-critical' :
'orders-standard'

2. Логирование routing decisions

ContentBasedRouter не логирует, в какой топик попало событие. Для отладки добавьте метаданные:

{
  "transforms": "unwrap,addMetadata,route",

  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.add.fields": "table,op",

  "transforms.addMetadata.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.addMetadata.static.field": "_routed_by",
  "transforms.addMetadata.static.value": "ContentBasedRouter",

  "transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
  "transforms.route.language": "jsr223.groovy",
  "transforms.route.topic.expression": "value.region == 'EU' ? 'orders-eu' : 'orders-us'"
}

Теперь каждое событие содержит "_routed_by": "ContentBasedRouter" для observability.

3. Тестирование routing logic

Перед production deployment:

  1. Создайте test connector с sample данными
  2. Запустите события через разные routing paths
  3. Проверьте, что события попали в правильные топики
# Проверка топиков после snapshot
kafka-console-consumer --bootstrap-server kafka:9092 \
  --topic orders-eu \
  --from-beginning \
  --max-messages 10

Alternatives: Когда НЕ использовать routing SMT

СценарийВместо SMT используйте
Сложная stateful маршрутизация (требуется state store, joins)Kafka Streams
External service lookup (получение tenant_id из API)Kafka Connect SMT не поддерживает external calls; используйте Kafka Streams или consumer-side logic
Post-processing маршрутизация (routing после агрегации)Kafka Streams topology
A/B testing маршрутизация (random split)Consumer-side logic или Kafka Streams

Правило: Используйте routing SMT для простых, deterministic, field-based решений. Для всего остального — Kafka Streams.

Что мы узнали

  1. Content-based routing маршрутизирует события в разные топики на основе содержимого полей
  2. ByLogicalTableRouter использует regex для consolidation sharded таблиц
  3. key.enforce.uniqueness предотвращает key collision в merged топиках
  4. ContentBasedRouter использует Groovy expressions для flexible routing
  5. Предикаты обязательны для защиты от ошибок на heartbeat/tombstone событиях
  6. Groovy требует JSR 223 engine — установите groovy-jsr223.jar в Connect plugin directory
  7. Null safety критична — используйте value.field != null checks
  8. Production: Pre-create топики, тестируйте routing logic, добавляйте fallback defaults

Что дальше?

В следующем уроке мы изучим Outbox Event Router — специализированную SMT для реализации Outbox pattern в микросервисных архитектурах. Это позволяет публиковать domain events transactionally вместе с бизнес-логикой.

Check Your Understanding

Score: 0 of 0
Conceptual
Question 1 of 5. Чем ContentBasedRouter отличается от стандартной маршрутизации топиков в Debezium, при которой каждая таблица получает свой топик?

Finished the lesson?

Mark it as complete to track your progress