Skip to content
Learning Platform
Intermediate
25 minutes
smt transformations kafka-connect

Prerequisites:

  • module-3/07-disaster-recovery-procedures

Single Message Transformations: Обзор

Вы настроили Debezium коннектор, события потекли в Kafka. Но что если вам нужно отфильтровать часть событий? Или замаскировать PII поля перед публикацией? Или переименовать топики по вашей логике? Single Message Transformations (SMT) — это механизм Kafka Connect для inline обработки событий.

В этом уроке мы изучим, что такое SMT, когда их использовать, какие трансформации доступны из коробки, и как их комбинировать в chains для сложных сценариев.

Зачем нужны SMT?

Рассмотрим типичную проблему CDC pipeline:

Проблема: Сложность на стороне консьюмеров

Проблема: Сложность на стороне консьюмеров

Каждый консьюмер дублирует логику парсинга envelope

PostgreSQL
Debezium
Envelope format
Kafka Topic
Consumer 1Extract after field
Consumer 2Extract after field
Consumer 3Extract after field

Без SMT: Каждый консьюмер должен:

  1. Парсить Debezium envelope (before, after, op, source)
  2. Извлекать нужные поля
  3. Обрабатывать разные типы операций (c/u/d)
  4. Фильтровать события, если не все нужны

Дублирование логики в каждом консьюмере. Изменение формата → обновление всех консьюмеров.

Решение: Трансформации на уровне Kafka Connect

Решение: Трансформации на уровне Kafka ConnectRecommended

SMT централизует логику обработки

PostgreSQL
Debezium
Envelope
SMT Chain
1. Filter
2. Unwrap
3. Mask
Flat JSON
Kafka Topic
Consumer 1Simple parsing
Consumer 2Simple parsing
Consumer 3Simple parsing

С SMT: Трансформация выполняется до публикации в Kafka:

  • Консьюмеры получают готовый формат данных
  • Логика обработки centralized в коннекторе
  • Изменение формата — один конфиг коннектора

Production истина: SMT — это “сервер рендеринга” для CDC событий. Подготовьте данные один раз в Connect, а не в каждом консьюмере.

Что такое SMT?

Single Message Transformation (SMT) — это Java-класс, который принимает Kafka Connect record и возвращает модифицированный record.

Execution Model

Execution Model: SMT внутри Kafka Connect

SMT выполняется синхронно в том же потоке

Kafka Connect Worker
Source Connector(Debezium)
SourceRecord
SMT 1: Filter
Filtered record
SMT 2: ExtractNewRecordState
Flattened record
SMT 3: MaskField
Masked record
Kafka Producer
Kafka Broker

Ключевые свойства:

  1. Синхронное выполнение — SMT выполняется в том же потоке, что и чтение источника
  2. Ordered processing — SMT применяются в порядке объявления в конфиге
  3. Per-message — каждое сообщение обрабатывается независимо
  4. Stateless (обычно) — SMT не должен хранить состояние между сообщениями

Встроенные Debezium SMTs

Debezium 2.5.4 предоставляет набор production-ready SMT для типичных сценариев.

1. ExtractNewRecordState — Event Flattening

Назначение: Упрощает Debezium envelope, извлекает payload.

Проблема:

// Debezium envelope (сложный для консьюмеров)
{
  "before": null,
  "after": {"id": 1, "name": "Alice"},
  "op": "c",
  "source": { ... },
  "ts_ms": 1706745600000
}

Решение:

// После ExtractNewRecordState
{
  "id": 1,
  "name": "Alice",
  "__op": "c",
  "__ts_ms": 1706745600000
}

Конфигурация:

{
  "transforms": "unwrap",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.drop.tombstones": "false",
  "transforms.unwrap.add.fields": "op,table,source.ts_ms",
  "transforms.unwrap.add.fields.prefix": "__"
}

2. Filter — Event Filtering

Назначение: Удаляет события по условию (Groovy/JavaScript).

Применение: Отбрасывать DELETE события, фильтровать по значению поля.

{
  "transforms": "filter",
  "transforms.filter.type": "io.debezium.transforms.Filter",
  "transforms.filter.language": "jsr223.groovy",
  "transforms.filter.condition": "value.op == 'c' || value.op == 'u'"
}

Важно: Filter SMT работает до unwrap. Если нужна фильтрация после unwrap — используйте предикаты.

3. ByLogicalTableRouter — Topic Routing

Назначение: Переименовывает топики по regex-шаблону.

Применение: Объединение sharded таблиц в один топик.

{
  "transforms": "route",
  "transforms.route.type": "io.debezium.transforms.ByLogicalTableRouter",
  "transforms.route.topic.regex": "(.*)customers_shard(.*)",
  "transforms.route.topic.replacement": "$1customers_all_shards"
}

Пример:

  • Входной топик: dbserver1.inventory.customers_shard1
  • Выходной топик: dbserver1.inventory.customers_all_shards

4. ContentBasedRouter — Routing по значению поля

Назначение: Маршрутизация событий в разные топики по содержимому.

Применение: Multi-tenant systems, региональная маршрутизация.

{
  "transforms": "route",
  "transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
  "transforms.route.language": "jsr223.groovy",
  "transforms.route.topic.expression": "value.after.region == 'EU' ? 'events-eu' : 'events-us'"
}

5. OutboxEventRouter — Outbox Pattern

Назначение: Преобразует outbox таблицу в domain events.

Применение: Microservices event-driven architecture.

{
  "transforms": "outbox",
  "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
  "transforms.outbox.table.field.event.key": "aggregateid",
  "transforms.outbox.table.field.event.payload": "payload",
  "transforms.outbox.route.by.field": "aggregatetype",
  "transforms.outbox.route.topic.replacement": "outbox.event.${routedByValue}"
}

Outbox Pattern мы изучим подробно в отдельном уроке (Module 4, Lesson 4).

6. MaskField — PII Masking

Назначение: Маскирует чувствительные поля (GDPR, compliance).

Важно: MaskField — это Kafka Connect SMT, не Debezium. Работает после unwrap.

{
  "transforms": "unwrap,mask",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.mask.fields": "ssn,credit_card,email",
  "transforms.mask.replacement": "***MASKED***"
}
Проверка знаний
Какие четыре обязанности обычно лежат на каждом консьюмере без SMT, и как SMT устраняет это дублирование?
Ответ
Без SMT каждый консьюмер должен: (1) парсить Debezium envelope (before/after/op/source), (2) извлекать нужные поля, (3) обрабатывать разные типы операций (c/u/d), (4) фильтровать ненужные события. SMT выполняет эти задачи централизованно на уровне Kafka Connect до публикации в Kafka, и все консьюмеры получают готовый формат данных.

SMT Chaining — Комбинирование трансформаций

Реальные pipeline требуют нескольких трансформаций в последовательности.

Правило порядка SMT

Критически важно: Порядок SMT имеет значение. Каждый SMT получает output предыдущего.

Стандартный порядок SMT Chain

Порядок SMT имеет значение — каждый получает output предыдущего

1. Filter
Работает с envelope
отфильтрованные события
2. Unwrap
Flatten payload
flat JSON
3. Route
Работает с flat data
переименованный топик
4. Mask
Работает с flat data

Стандартный порядок:

  1. Filter — фильтрация раньше всего (уменьшает объем данных для downstream SMT)
  2. Unwrap — разворачивание envelope (большинство последующих SMT работают с flat data)
  3. Route — маршрутизация после unwrap (routing logic часто нужен доступ к after fields)
  4. Mask — маскировка в конце (работает с уже обработанными данными)

Полный пример SMT Chain

{
  "name": "postgres-inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "inventory",
    "topic.prefix": "dbserver1",
    "table.include.list": "public.customers,public.orders",

    "transforms": "filter,unwrap,route,mask",

    "transforms.filter.type": "io.debezium.transforms.Filter",
    "transforms.filter.language": "jsr223.groovy",
    "transforms.filter.condition": "value.op == 'c' || value.op == 'u'",

    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.add.fields": "op,table,source.ts_ms",
    "transforms.unwrap.add.fields.prefix": "__",

    "transforms.route.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.route.topic.regex": "(.*)customers(.*)",
    "transforms.route.topic.replacement": "$1customer_events",

    "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
    "transforms.mask.fields": "email,phone"
  }
}

Что происходит:

  1. Filter: Отбрасываем DELETE события (op == 'd')
  2. Unwrap: Разворачиваем envelope → flat JSON
  3. Route: dbserver1.public.customersdbserver1.public.customer_events
  4. Mask: Поля email и phone заменяются на null

Когда использовать SMT?

✅ Good Use Cases

СценарийSMTОбоснование
Упрощение envelopeExtractNewRecordStateСтандартная операция для всех консьюмеров
Фильтрация по opFilterПростое условие, дешевая операция
PII maskingMaskFieldCompliance требование на уровне pipeline
Объединение shardsByLogicalTableRouterЛогическая группировка таблиц
Добавление timestampInsertFieldПростая метаданные

❌ Anti-Patterns

СценарийПочему плохоАльтернатива
Агрегации (sum, count)SMT stateless, нет агрегационной логикиKafka Streams, Flink
Joins между таблицамиSMT обрабатывает одно сообщениеKafka Streams, ksqlDB
External service callsСинхронный блокинг → bottleneckKafka Streams с async lookup
Сложная бизнес-логикаSMT не место для business rulesDownstream service

Production правило: Если трансформация требует более 10ms на сообщение — она не подходит для SMT. Используйте Kafka Streams или Flink.

Производительность SMT

Overhead Measurement

SMT выполняются синхронно в потоке Kafka Connect. Каждая миллисекунда — это задержка публикации в Kafka.

Типичные overhead:

  • Filter: менее 1ms (простое условие Groovy)
  • ExtractNewRecordState: 1-2ms (JSON processing)
  • MaskField: менее 1ms (field replacement)
  • ByLogicalTableRouter: менее 1ms (regex match)

Проблема: Если у вас 10 SMT по 2ms каждый → 20ms overhead на сообщение.

Мониторинг SMT Performance

Debezium 2.5.4 не предоставляет метрики per-SMT. Но вы можете измерить общий impact:

# Lag метрика (включает SMT processing)
debezium_metrics_MilliSecondsBehindSource{connector="inventory-connector"}

# Throughput метрика
rate(debezium_metrics_TotalNumberOfEventsSeen[5m])

Если после добавления SMT lag вырос с 50ms до 200ms — SMT добавляет ~150ms overhead.

Оптимизация SMT Performance

  1. Минимизируйте количество SMT — каждый SMT = overhead
  2. Используйте predicates — применяйте SMT только к нужным сообщениям
  3. Filter early — чем раньше отбросите события, тем меньше обрабатывается downstream SMT
  4. Профилируйте Groovy — сложные Groovy expressions могут быть медленными
  5. Offload complex logic — если SMT не справляется → Kafka Streams
Проверка знаний
Почему стандартный порядок SMT chain именно Filter -> Unwrap -> Route -> Mask? Что произойдет, если поменять местами Filter и Unwrap?
Ответ
Filter идет первым, чтобы уменьшить объем данных для downstream SMT и потому что Filter работает с Debezium envelope (доступен value.op). Unwrap разворачивает envelope в flat JSON, после чего Route и Mask работают с плоскими полями. Если поменять на Unwrap -> Filter, то Filter после unwrap потеряет доступ к value.op (оно либо удалено, либо перемещено в добавленное поле), и фильтрация по типу операции не сработает.

Когда НЕ использовать SMT

SMT — это не универсальный инструмент. Есть задачи, для которых SMT не подходит.

Проблема 1: Stateful Processing

Пример: Подсчитать количество UPDATE операций на каждый customer_id.

Почему SMT не подходит: SMT stateless. Нет способа хранить счетчик между сообщениями.

Решение: Kafka Streams с groupBy() и count().

Проблема 2: Join Between Tables

Пример: Обогатить события orders данными из customers.

Почему SMT не подходит: SMT обрабатывает одно сообщение. Нет доступа к данным из других таблиц.

Решение: Kafka Streams KTable.join() или ksqlDB.

Проблема 3: External Service Calls

Пример: Для каждого события вызвать REST API для валидации.

Почему SMT не подходит: SMT синхронные. External call блокирует поток → throughput падает.

Решение: Kafka Streams с async lookup или downstream service.

Decision Framework

Decision Framework: SMT vs Kafka Streams

Когда использовать SMT, а когда Kafka Streams

Нужна трансформация?
Работает с одним сообщением?
ДА
Требует состояния?
НЕТ
Требует external call?
НЕТ
Overhead менее 10ms?
ДА
SMT подходит
НЕТ
Kafka Streams
ДА
Kafka Streams
ДА
Kafka Streams
НЕТ
Kafka Streams

Полный каталог Debezium SMT

Для справки — все SMT, доступные в Debezium 2.5.4:

SMTКлассНазначение
ExtractNewRecordStateio.debezium.transforms.ExtractNewRecordStateFlatten envelope
Filterio.debezium.transforms.FilterEvent filtering
ByLogicalTableRouterio.debezium.transforms.ByLogicalTableRouterTopic routing
ContentBasedRouterio.debezium.transforms.ContentBasedRouterContent-based routing
OutboxEventRouterio.debezium.transforms.outbox.EventRouterOutbox pattern
PartitionRoutingio.debezium.transforms.partitions.PartitionRoutingCustom partitioning
TimestampConverterorg.apache.kafka.connect.transforms.TimestampConverterTimestamp format
InsertFieldorg.apache.kafka.connect.transforms.InsertFieldAdd metadata
MaskFieldorg.apache.kafka.connect.transforms.MaskFieldPII masking
ReplaceFieldorg.apache.kafka.connect.transforms.ReplaceFieldRename/exclude fields

Примечание: ComputePartition SMT удален в Debezium 2.5.0. Используйте PartitionRouting вместо него.

Что дальше?

Теперь вы понимаете, что такое SMT, когда их использовать, и какие трансформации доступны из коробки.

Но есть проблема: Что если SMT не подходит для всех сообщений? Например, ExtractNewRecordState ожидает поле after, но heartbeat сообщения его не имеют → SMT падает с ошибкой.

Решение — Predicates. В следующем уроке мы изучим, как применять SMT селективно, только к нужным типам событий.

Ключевые выводы

  1. SMT — это механизм inline трансформации событий в Kafka Connect до публикации в Kafka
  2. Дебейзиум 2.5.4 включает 6 production-ready SMT: Filter, Unwrap, Router (2 типа), Outbox, PartitionRouting
  3. SMT chaining позволяет комбинировать трансформации; порядок имеет значение
  4. Стандартный порядок: Filter → Unwrap → Route → Mask
  5. SMT подходит для простых stateless операций с overhead менее 10ms
  6. SMT НЕ подходит для агрегаций, joins, external calls — используйте Kafka Streams
  7. ExtractNewRecordState — самый частый SMT, упрощает Debezium envelope
  8. MaskField — для PII compliance, работает после unwrap
  9. SMT выполняются синхронно в том же потоке, что и чтение WAL
  10. Профилируйте SMT через MilliSecondsBehindSource метрику

Check Your Understanding

Score: 0 of 0
Conceptual
Question 1 of 5. Какую основную проблему решают Single Message Transformations (SMT) в Kafka Connect?

Finished the lesson?

Mark it as complete to track your progress