Prerequisites:
- module-3/07-disaster-recovery-procedures
Single Message Transformations: Обзор
Вы настроили Debezium коннектор, события потекли в Kafka. Но что если вам нужно отфильтровать часть событий? Или замаскировать PII поля перед публикацией? Или переименовать топики по вашей логике? Single Message Transformations (SMT) — это механизм Kafka Connect для inline обработки событий.
В этом уроке мы изучим, что такое SMT, когда их использовать, какие трансформации доступны из коробки, и как их комбинировать в chains для сложных сценариев.
Зачем нужны SMT?
Рассмотрим типичную проблему CDC pipeline:
Проблема: Сложность на стороне консьюмеров
Каждый консьюмер дублирует логику парсинга envelope
Без SMT: Каждый консьюмер должен:
- Парсить Debezium envelope (
before,after,op,source) - Извлекать нужные поля
- Обрабатывать разные типы операций (c/u/d)
- Фильтровать события, если не все нужны
Дублирование логики в каждом консьюмере. Изменение формата → обновление всех консьюмеров.
Решение: Трансформации на уровне Kafka Connect
SMT централизует логику обработки
С SMT: Трансформация выполняется до публикации в Kafka:
- Консьюмеры получают готовый формат данных
- Логика обработки centralized в коннекторе
- Изменение формата — один конфиг коннектора
Production истина: SMT — это “сервер рендеринга” для CDC событий. Подготовьте данные один раз в Connect, а не в каждом консьюмере.
Что такое SMT?
Single Message Transformation (SMT) — это Java-класс, который принимает Kafka Connect record и возвращает модифицированный record.
Execution Model
SMT выполняется синхронно в том же потоке
Ключевые свойства:
- Синхронное выполнение — SMT выполняется в том же потоке, что и чтение источника
- Ordered processing — SMT применяются в порядке объявления в конфиге
- Per-message — каждое сообщение обрабатывается независимо
- Stateless (обычно) — SMT не должен хранить состояние между сообщениями
Встроенные Debezium SMTs
Debezium 2.5.4 предоставляет набор production-ready SMT для типичных сценариев.
1. ExtractNewRecordState — Event Flattening
Назначение: Упрощает Debezium envelope, извлекает payload.
Проблема:
// Debezium envelope (сложный для консьюмеров)
{
"before": null,
"after": {"id": 1, "name": "Alice"},
"op": "c",
"source": { ... },
"ts_ms": 1706745600000
}
Решение:
// После ExtractNewRecordState
{
"id": 1,
"name": "Alice",
"__op": "c",
"__ts_ms": 1706745600000
}
Конфигурация:
{
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.unwrap.add.fields.prefix": "__"
}
2. Filter — Event Filtering
Назначение: Удаляет события по условию (Groovy/JavaScript).
Применение: Отбрасывать DELETE события, фильтровать по значению поля.
{
"transforms": "filter",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.op == 'c' || value.op == 'u'"
}
Важно: Filter SMT работает до unwrap. Если нужна фильтрация после unwrap — используйте предикаты.
3. ByLogicalTableRouter — Topic Routing
Назначение: Переименовывает топики по regex-шаблону.
Применение: Объединение sharded таблиц в один топик.
{
"transforms": "route",
"transforms.route.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.route.topic.regex": "(.*)customers_shard(.*)",
"transforms.route.topic.replacement": "$1customers_all_shards"
}
Пример:
- Входной топик:
dbserver1.inventory.customers_shard1 - Выходной топик:
dbserver1.inventory.customers_all_shards
4. ContentBasedRouter — Routing по значению поля
Назначение: Маршрутизация событий в разные топики по содержимому.
Применение: Multi-tenant systems, региональная маршрутизация.
{
"transforms": "route",
"transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
"transforms.route.language": "jsr223.groovy",
"transforms.route.topic.expression": "value.after.region == 'EU' ? 'events-eu' : 'events-us'"
}
5. OutboxEventRouter — Outbox Pattern
Назначение: Преобразует outbox таблицу в domain events.
Применение: Microservices event-driven architecture.
{
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.key": "aggregateid",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregatetype",
"transforms.outbox.route.topic.replacement": "outbox.event.${routedByValue}"
}
Outbox Pattern мы изучим подробно в отдельном уроке (Module 4, Lesson 4).
6. MaskField — PII Masking
Назначение: Маскирует чувствительные поля (GDPR, compliance).
Важно: MaskField — это Kafka Connect SMT, не Debezium. Работает после unwrap.
{
"transforms": "unwrap,mask",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "ssn,credit_card,email",
"transforms.mask.replacement": "***MASKED***"
}
Проверка знанийКакие четыре обязанности обычно лежат на каждом консьюмере без SMT, и как SMT устраняет это дублирование?
SMT Chaining — Комбинирование трансформаций
Реальные pipeline требуют нескольких трансформаций в последовательности.
Правило порядка SMT
Критически важно: Порядок SMT имеет значение. Каждый SMT получает output предыдущего.
Порядок SMT имеет значение — каждый получает output предыдущего
Стандартный порядок:
- Filter — фильтрация раньше всего (уменьшает объем данных для downstream SMT)
- Unwrap — разворачивание envelope (большинство последующих SMT работают с flat data)
- Route — маршрутизация после unwrap (routing logic часто нужен доступ к
afterfields) - Mask — маскировка в конце (работает с уже обработанными данными)
Полный пример SMT Chain
{
"name": "postgres-inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "inventory",
"topic.prefix": "dbserver1",
"table.include.list": "public.customers,public.orders",
"transforms": "filter,unwrap,route,mask",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.op == 'c' || value.op == 'u'",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.unwrap.add.fields.prefix": "__",
"transforms.route.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.route.topic.regex": "(.*)customers(.*)",
"transforms.route.topic.replacement": "$1customer_events",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "email,phone"
}
}
Что происходит:
- Filter: Отбрасываем DELETE события (
op == 'd') - Unwrap: Разворачиваем envelope → flat JSON
- Route:
dbserver1.public.customers→dbserver1.public.customer_events - Mask: Поля
emailиphoneзаменяются наnull
Когда использовать SMT?
✅ Good Use Cases
| Сценарий | SMT | Обоснование |
|---|---|---|
| Упрощение envelope | ExtractNewRecordState | Стандартная операция для всех консьюмеров |
Фильтрация по op | Filter | Простое условие, дешевая операция |
| PII masking | MaskField | Compliance требование на уровне pipeline |
| Объединение shards | ByLogicalTableRouter | Логическая группировка таблиц |
| Добавление timestamp | InsertField | Простая метаданные |
❌ Anti-Patterns
| Сценарий | Почему плохо | Альтернатива |
|---|---|---|
| Агрегации (sum, count) | SMT stateless, нет агрегационной логики | Kafka Streams, Flink |
| Joins между таблицами | SMT обрабатывает одно сообщение | Kafka Streams, ksqlDB |
| External service calls | Синхронный блокинг → bottleneck | Kafka Streams с async lookup |
| Сложная бизнес-логика | SMT не место для business rules | Downstream service |
Production правило: Если трансформация требует более 10ms на сообщение — она не подходит для SMT. Используйте Kafka Streams или Flink.
Производительность SMT
Overhead Measurement
SMT выполняются синхронно в потоке Kafka Connect. Каждая миллисекунда — это задержка публикации в Kafka.
Типичные overhead:
- Filter: менее 1ms (простое условие Groovy)
- ExtractNewRecordState: 1-2ms (JSON processing)
- MaskField: менее 1ms (field replacement)
- ByLogicalTableRouter: менее 1ms (regex match)
Проблема: Если у вас 10 SMT по 2ms каждый → 20ms overhead на сообщение.
Мониторинг SMT Performance
Debezium 2.5.4 не предоставляет метрики per-SMT. Но вы можете измерить общий impact:
# Lag метрика (включает SMT processing)
debezium_metrics_MilliSecondsBehindSource{connector="inventory-connector"}
# Throughput метрика
rate(debezium_metrics_TotalNumberOfEventsSeen[5m])
Если после добавления SMT lag вырос с 50ms до 200ms — SMT добавляет ~150ms overhead.
Оптимизация SMT Performance
- Минимизируйте количество SMT — каждый SMT = overhead
- Используйте predicates — применяйте SMT только к нужным сообщениям
- Filter early — чем раньше отбросите события, тем меньше обрабатывается downstream SMT
- Профилируйте Groovy — сложные Groovy expressions могут быть медленными
- Offload complex logic — если SMT не справляется → Kafka Streams
Проверка знанийПочему стандартный порядок SMT chain именно Filter -> Unwrap -> Route -> Mask? Что произойдет, если поменять местами Filter и Unwrap?
Когда НЕ использовать SMT
SMT — это не универсальный инструмент. Есть задачи, для которых SMT не подходит.
Проблема 1: Stateful Processing
Пример: Подсчитать количество UPDATE операций на каждый customer_id.
Почему SMT не подходит: SMT stateless. Нет способа хранить счетчик между сообщениями.
Решение: Kafka Streams с groupBy() и count().
Проблема 2: Join Between Tables
Пример: Обогатить события orders данными из customers.
Почему SMT не подходит: SMT обрабатывает одно сообщение. Нет доступа к данным из других таблиц.
Решение: Kafka Streams KTable.join() или ksqlDB.
Проблема 3: External Service Calls
Пример: Для каждого события вызвать REST API для валидации.
Почему SMT не подходит: SMT синхронные. External call блокирует поток → throughput падает.
Решение: Kafka Streams с async lookup или downstream service.
Decision Framework
Когда использовать SMT, а когда Kafka Streams
Полный каталог Debezium SMT
Для справки — все SMT, доступные в Debezium 2.5.4:
| SMT | Класс | Назначение |
|---|---|---|
| ExtractNewRecordState | io.debezium.transforms.ExtractNewRecordState | Flatten envelope |
| Filter | io.debezium.transforms.Filter | Event filtering |
| ByLogicalTableRouter | io.debezium.transforms.ByLogicalTableRouter | Topic routing |
| ContentBasedRouter | io.debezium.transforms.ContentBasedRouter | Content-based routing |
| OutboxEventRouter | io.debezium.transforms.outbox.EventRouter | Outbox pattern |
| PartitionRouting | io.debezium.transforms.partitions.PartitionRouting | Custom partitioning |
| TimestampConverter | org.apache.kafka.connect.transforms.TimestampConverter | Timestamp format |
| InsertField | org.apache.kafka.connect.transforms.InsertField | Add metadata |
| MaskField | org.apache.kafka.connect.transforms.MaskField | PII masking |
| ReplaceField | org.apache.kafka.connect.transforms.ReplaceField | Rename/exclude fields |
Примечание:
ComputePartitionSMT удален в Debezium 2.5.0. ИспользуйтеPartitionRoutingвместо него.
Что дальше?
Теперь вы понимаете, что такое SMT, когда их использовать, и какие трансформации доступны из коробки.
Но есть проблема: Что если SMT не подходит для всех сообщений? Например, ExtractNewRecordState ожидает поле after, но heartbeat сообщения его не имеют → SMT падает с ошибкой.
Решение — Predicates. В следующем уроке мы изучим, как применять SMT селективно, только к нужным типам событий.
Ключевые выводы
- SMT — это механизм inline трансформации событий в Kafka Connect до публикации в Kafka
- Дебейзиум 2.5.4 включает 6 production-ready SMT: Filter, Unwrap, Router (2 типа), Outbox, PartitionRouting
- SMT chaining позволяет комбинировать трансформации; порядок имеет значение
- Стандартный порядок: Filter → Unwrap → Route → Mask
- SMT подходит для простых stateless операций с overhead менее 10ms
- SMT НЕ подходит для агрегаций, joins, external calls — используйте Kafka Streams
- ExtractNewRecordState — самый частый SMT, упрощает Debezium envelope
- MaskField — для PII compliance, работает после unwrap
- SMT выполняются синхронно в том же потоке, что и чтение WAL
- Профилируйте SMT через
MilliSecondsBehindSourceметрику
Check Your Understanding
Finished the lesson?
Mark it as complete to track your progress