Single Message Transforms
В реальных конвейерах редко когда данные из источника идеально подходят для приёмника без изменений. Нужно добавить метаданные (среду, версию), переименовать поля (legacy_field → canonical_field), замаскировать чувствительные данные (SSN, номер карты), направить запись в правильный топик.
Single Message Transforms (SMT) — это лёгкие трансформации, применяемые к каждой записи внутри Connect Worker, до записи в Kafka (для source connector) или до записи в целевую систему (для sink connector). Никакого отдельного сервиса потоковой обработки не требуется.
Что такое SMT и где они выполняются
SMT выполняются в потоке задачи (task thread) Connect Worker. Это синхронная, однозапросная операция: запись поступает в SMT, выходит трансформированной, продолжает путь дальше.
Позиция SMT в pipeline:
Source connector Sink connector
poll() consume from Kafka
↓ ↓
[SMT chain] [SMT chain]
↓ ↓
serialize (converter) SinkTask.put()
↓ ↓
write to Kafka write to target
Source Record
Исходная запись: SourceRecord от poll(). Содержит сырые данные из источника (базы данных, файла). Поля могут иметь не тот формат, содержать чувствительные данные, иметь legacy-имена.InsertField
SMT 1: InsertField — добавляет статическое поле 'environment' = 'production'. Применяется первым в цепочке. Выходная запись содержит все поля исходной + новое поле environment.MaskField
SMT 2: MaskField — маскирует поля 'ssn' и 'credit_card' нулевыми/пустыми значениями. Применяется к результату InsertField. Чувствительные данные не попадут в Kafka.TimestampRouter
SMT 3: TimestampRouter — изменяет имя целевого топика, добавляя суффикс с датой из timestamp сообщения. Запись с ts=2024-01-15 попадёт в топик 'events-20240115'.Transformed Record
Трансформированная запись: добавлено поле environment, замаскированы SSN и credit_card, назначен топик с датой. Именно эту запись converter сериализует и публикует в Kafka.Синтаксис конфигурации SMT
SMT задаются в конфигурации коннектора через параметр transforms — список имён трансформаций через запятую. Для каждого имени задаётся тип и параметры с префиксом transforms.{name}.:
{
"name": "my-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"transforms": "addEnv,maskPII,routeByDate",
"transforms.addEnv.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addEnv.static.field": "environment",
"transforms.addEnv.static.value": "production",
"transforms.maskPII.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskPII.fields": "ssn,credit_card",
"transforms.routeByDate.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.routeByDate.topic.format": "${topic}-${timestamp}",
"transforms.routeByDate.timestamp.format": "yyyyMMdd"
}
}
Трансформации применяются слева направо в том порядке, в котором они перечислены в transforms.
Встроенные SMT: полный справочник
Все встроенные SMT находятся в пакете org.apache.kafka.connect.transforms. Суффикс \$Value применяет трансформацию к value (телу) сообщения, \$Key — к ключу.
InsertField
Добавляет новое поле со статическим значением или метаданными записи.
transforms.addMeta.type=org.apache.kafka.connect.transforms.InsertField$Value
# Статическое поле
transforms.addMeta.static.field=source_env
transforms.addMeta.static.value=production
# Или метаданные записи:
# transforms.addMeta.offset.field=kafka_offset
# transforms.addMeta.partition.field=kafka_partition
# transforms.addMeta.timestamp.field=ingestion_ts
# transforms.addMeta.topic.field=source_topic
ReplaceField
Переименовывает, включает или исключает поля.
transforms.rename.type=org.apache.kafka.connect.transforms.ReplaceField$Value
# Переименование: old_name:new_name
transforms.rename.renames=legacy_id:id,legacy_name:full_name
# Включить только эти поля
# transforms.rename.include=id,amount,created_at
# Исключить эти поля
# transforms.rename.exclude=internal_field,debug_info
MaskField
Заменяет значение поля нулевым/пустым значением того же типа. Строка → "", число → 0, boolean → false.
transforms.mask.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.mask.fields=ssn,credit_card,password
# Опционально: заменить на конкретное значение вместо нуля
# transforms.mask.replacement=***REDACTED***
ValueToKey
Заменяет ключ сообщения одним или несколькими полями из значения.
transforms.setKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.setKey.fields=user_id,order_id
ExtractField
Извлекает одно поле из структуры, заменяя всю структуру этим полем.
transforms.extractId.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.extractId.field=id
# Если value = {"id": 42, "name": "Alice"} → value = 42
TimestampRouter
Изменяет имя целевого топика на основе timestamp сообщения.
transforms.route.type=org.apache.kafka.connect.transforms.TimestampRouter
transforms.route.topic.format=${topic}-${timestamp}
transforms.route.timestamp.format=yyyyMMdd
# Топик "events" + ts 2024-01-15 → "events-20240115"
RegexRouter
Изменяет имя топика через регулярное выражение.
transforms.routeClean.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.routeClean.regex=(.*)_raw
transforms.routeClean.replacement=$1_processed
# Топик "orders_raw" → "orders_processed"
TimestampConverter
Конвертирует формат timestamp-поля.
transforms.tsConvert.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.tsConvert.field=created_at
transforms.tsConvert.target.type=string
transforms.tsConvert.format=yyyy-MM-dd HH:mm:ss
# Unix ms timestamp → строка "2024-01-15 10:30:00"
Flatten
Разворачивает вложенные структуры в плоские поля с разделителем.
transforms.flat.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flat.delimiter=_
# {"user": {"id": 1, "name": "Alice"}} → {"user_id": 1, "user_name": "Alice"}
Cast
Приводит тип поля.
transforms.castAmount.type=org.apache.kafka.connect.transforms.Cast$Value
transforms.castAmount.spec=amount:float64,quantity:int32
Filter
Отбрасывает записи, соответствующие (или не соответствующие) условию.
transforms.dropDeletes.type=org.apache.kafka.connect.transforms.Filter
transforms.dropDeletes.predicate=isDelete
predicates=isDelete
predicates.isDelete.type=org.apache.kafka.connect.transforms.predicates.RecordIsTombstone
Предикаты: условное применение SMT
Предикат позволяет применять SMT только к записям, удовлетворяющим условию. Используется совместно с Filter или через negate:
{
"transforms": "routeErrors",
"transforms.routeErrors.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeErrors.regex": ".*",
"transforms.routeErrors.replacement": "errors-${topic}",
"transforms.routeErrors.predicate": "isErrorTopic",
"predicates": "isErrorTopic",
"predicates.isErrorTopic.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isErrorTopic.pattern": ".*-error"
}
Встроенные предикаты:
TopicNameMatches— проверяет имя топика по regexHasHeaderKey— проверяет наличие заголовка с именемRecordIsTombstone— проверяет, является ли запись tombstone (null value)
Добавьте transforms.{name}.negate=true для инверсии предиката.
Цепочки SMT: порядок имеет значение
Трансформации применяются строго в том порядке, в котором они перечислены в transforms. Порядок критически важен:
Правильный порядок:
transforms=addEnv,maskPII
addEnvдобавляет полеenvironment=productionmaskPIIмаскирует поляssn,credit_card— полеenvironmentне трогает
Неправильный порядок (если maskPII должен обработать поле, добавленное insertField):
transforms=maskPII,addEnv
maskPIIмаскируетssn,credit_card— полеenvironmentещё не существуетaddEnvдобавляетenvironment— но оно уже не будет замаскировано
SMT предназначены для простых, per-record трансформаций. Для сложной бизнес-логики, требующей состояния или работы с несколькими записями одновременно (join между топиками, агрегации, windowing), используйте Kafka Streams (Модуль 07). Попытка реализовать stateful логику в SMT невозможна по архитектурным причинам.
Анатомия custom SMT
Если встроенных SMT недостаточно, можно написать собственный. Custom SMT реализует интерфейс Transformation<R extends ConnectRecord<R>>.
Структура класса (концептуально):
public class AddSchemaVersion<R extends ConnectRecord<R>>
implements Transformation<R> {
private String schemaVersion;
// Вызывается при старте коннектора — применяет конфигурацию
@Override
public void configure(Map<String, ?> configs) {
this.schemaVersion = (String) configs.get("schema.version");
}
// Вызывается для каждой записи
@Override
public R apply(R record) {
// Создаём новую запись с дополнительным полем
// ConnectRecord неизменяем — создаём новый объект
return record.newRecord(
record.topic(), record.kafkaPartition(),
record.keySchema(), record.key(),
newSchema, newValue,
record.timestamp()
);
}
// Возвращает спецификацию конфигурационных параметров
@Override
public ConfigDef config() {
return new ConfigDef()
.define("schema.version", ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, "Schema version to insert");
}
// Вызывается при остановке коннектора — освобождает ресурсы
@Override
public void close() {}
}
Custom SMT упаковывается в JAR и помещается в директорию plugin.path Connect Worker. После перезапуска воркера SMT доступен по полному имени класса.
Производительность SMT
SMT выполняются синхронно в потоке задачи. Производительность:
- Простые трансформации (InsertField, ReplaceField, MaskField): субмикросекундные операции, пренебрежимый overhead.
- RegexRouter: компиляция regex происходит при
configure(), не при каждомapply(). Но само matching имеет overhead для сложных regex на каждую запись — используйте простые паттерны. - Flatten с глубокой вложенностью: рекурсивный обход структуры — может быть заметен при очень высоком throughput.
- Последовательные цепочки: каждый SMT добавляет вызов функции. Цепочка из 10 SMT всё равно быстрее, чем внешняя обработка через отдельный сервис.
Ключевые выводы
- SMT — per-record трансформации внутри Connect Worker. Применяются до записи в Kafka (source) или до записи в целевую систему (sink).
- Конфигурация:
transforms=a,b,c— порядок слева направо. - Встроенные SMT покрывают большинство потребностей: InsertField, ReplaceField, MaskField, TimestampRouter, RegexRouter, Filter + Predicates.
- Порядок трансформаций критичен — применяются последовательно, каждая получает результат предыдущей.
- Для stateful логики (join, aggregate, window) — Kafka Streams, не SMT.
- Custom SMT: реализовать
Transformation<R>, упаковать в JAR, добавить вplugin.path.