Learning Platform
Глоссарий Troubleshooting
Урок 06.04 · 25 мин
Продвинутый
SMTInsertFieldReplaceFieldMaskFieldTimestampRouterRegexRouterChaining

Single Message Transforms

В реальных конвейерах редко когда данные из источника идеально подходят для приёмника без изменений. Нужно добавить метаданные (среду, версию), переименовать поля (legacy_field → canonical_field), замаскировать чувствительные данные (SSN, номер карты), направить запись в правильный топик.

Single Message Transforms (SMT) — это лёгкие трансформации, применяемые к каждой записи внутри Connect Worker, до записи в Kafka (для source connector) или до записи в целевую систему (для sink connector). Никакого отдельного сервиса потоковой обработки не требуется.


Что такое SMT и где они выполняются

SMT выполняются в потоке задачи (task thread) Connect Worker. Это синхронная, однозапросная операция: запись поступает в SMT, выходит трансформированной, продолжает путь дальше.

Позиция SMT в pipeline:

Source connector            Sink connector
  poll()                      consume from Kafka
    ↓                            ↓
  [SMT chain]              [SMT chain]
    ↓                            ↓
  serialize (converter)       SinkTask.put()
    ↓                            ↓
  write to Kafka             write to target
SMT Chain в Source Connector

Source Record

Исходная запись: SourceRecord от poll(). Содержит сырые данные из источника (базы данных, файла). Поля могут иметь не тот формат, содержать чувствительные данные, иметь legacy-имена.

InsertField

SMT 1: InsertField — добавляет статическое поле 'environment' = 'production'. Применяется первым в цепочке. Выходная запись содержит все поля исходной + новое поле environment.

MaskField

SMT 2: MaskField — маскирует поля 'ssn' и 'credit_card' нулевыми/пустыми значениями. Применяется к результату InsertField. Чувствительные данные не попадут в Kafka.

TimestampRouter

SMT 3: TimestampRouter — изменяет имя целевого топика, добавляя суффикс с датой из timestamp сообщения. Запись с ts=2024-01-15 попадёт в топик 'events-20240115'.

Transformed Record

Трансформированная запись: добавлено поле environment, замаскированы SSN и credit_card, назначен топик с датой. Именно эту запись converter сериализует и публикует в Kafka.

Синтаксис конфигурации SMT

SMT задаются в конфигурации коннектора через параметр transforms — список имён трансформаций через запятую. Для каждого имени задаётся тип и параметры с префиксом transforms.{name}.:

{
  "name": "my-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "transforms": "addEnv,maskPII,routeByDate",

    "transforms.addEnv.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.addEnv.static.field": "environment",
    "transforms.addEnv.static.value": "production",

    "transforms.maskPII.type": "org.apache.kafka.connect.transforms.MaskField$Value",
    "transforms.maskPII.fields": "ssn,credit_card",

    "transforms.routeByDate.type": "org.apache.kafka.connect.transforms.TimestampRouter",
    "transforms.routeByDate.topic.format": "${topic}-${timestamp}",
    "transforms.routeByDate.timestamp.format": "yyyyMMdd"
  }
}

Трансформации применяются слева направо в том порядке, в котором они перечислены в transforms.


Встроенные SMT: полный справочник

Все встроенные SMT находятся в пакете org.apache.kafka.connect.transforms. Суффикс \$Value применяет трансформацию к value (телу) сообщения, \$Key — к ключу.

InsertField

Добавляет новое поле со статическим значением или метаданными записи.

transforms.addMeta.type=org.apache.kafka.connect.transforms.InsertField$Value

# Статическое поле
transforms.addMeta.static.field=source_env
transforms.addMeta.static.value=production

# Или метаданные записи:
# transforms.addMeta.offset.field=kafka_offset
# transforms.addMeta.partition.field=kafka_partition
# transforms.addMeta.timestamp.field=ingestion_ts
# transforms.addMeta.topic.field=source_topic

ReplaceField

Переименовывает, включает или исключает поля.

transforms.rename.type=org.apache.kafka.connect.transforms.ReplaceField$Value

# Переименование: old_name:new_name
transforms.rename.renames=legacy_id:id,legacy_name:full_name

# Включить только эти поля
# transforms.rename.include=id,amount,created_at

# Исключить эти поля
# transforms.rename.exclude=internal_field,debug_info

MaskField

Заменяет значение поля нулевым/пустым значением того же типа. Строка → "", число → 0, boolean → false.

transforms.mask.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.mask.fields=ssn,credit_card,password
# Опционально: заменить на конкретное значение вместо нуля
# transforms.mask.replacement=***REDACTED***

ValueToKey

Заменяет ключ сообщения одним или несколькими полями из значения.

transforms.setKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.setKey.fields=user_id,order_id

ExtractField

Извлекает одно поле из структуры, заменяя всю структуру этим полем.

transforms.extractId.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.extractId.field=id
# Если value = {"id": 42, "name": "Alice"} → value = 42

TimestampRouter

Изменяет имя целевого топика на основе timestamp сообщения.

transforms.route.type=org.apache.kafka.connect.transforms.TimestampRouter
transforms.route.topic.format=${topic}-${timestamp}
transforms.route.timestamp.format=yyyyMMdd
# Топик "events" + ts 2024-01-15 → "events-20240115"

RegexRouter

Изменяет имя топика через регулярное выражение.

transforms.routeClean.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.routeClean.regex=(.*)_raw
transforms.routeClean.replacement=$1_processed
# Топик "orders_raw" → "orders_processed"

TimestampConverter

Конвертирует формат timestamp-поля.

transforms.tsConvert.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.tsConvert.field=created_at
transforms.tsConvert.target.type=string
transforms.tsConvert.format=yyyy-MM-dd HH:mm:ss
# Unix ms timestamp → строка "2024-01-15 10:30:00"

Flatten

Разворачивает вложенные структуры в плоские поля с разделителем.

transforms.flat.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flat.delimiter=_
# {"user": {"id": 1, "name": "Alice"}} → {"user_id": 1, "user_name": "Alice"}

Cast

Приводит тип поля.

transforms.castAmount.type=org.apache.kafka.connect.transforms.Cast$Value
transforms.castAmount.spec=amount:float64,quantity:int32

Filter

Отбрасывает записи, соответствующие (или не соответствующие) условию.

transforms.dropDeletes.type=org.apache.kafka.connect.transforms.Filter
transforms.dropDeletes.predicate=isDelete

predicates=isDelete
predicates.isDelete.type=org.apache.kafka.connect.transforms.predicates.RecordIsTombstone

Предикаты: условное применение SMT

Предикат позволяет применять SMT только к записям, удовлетворяющим условию. Используется совместно с Filter или через negate:

{
  "transforms": "routeErrors",
  "transforms.routeErrors.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.routeErrors.regex": ".*",
  "transforms.routeErrors.replacement": "errors-${topic}",
  "transforms.routeErrors.predicate": "isErrorTopic",

  "predicates": "isErrorTopic",
  "predicates.isErrorTopic.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.isErrorTopic.pattern": ".*-error"
}

Встроенные предикаты:

  • TopicNameMatches — проверяет имя топика по regex
  • HasHeaderKey — проверяет наличие заголовка с именем
  • RecordIsTombstone — проверяет, является ли запись tombstone (null value)

Добавьте transforms.{name}.negate=true для инверсии предиката.


Цепочки SMT: порядок имеет значение

Трансформации применяются строго в том порядке, в котором они перечислены в transforms. Порядок критически важен:

Правильный порядок:

transforms=addEnv,maskPII
  1. addEnv добавляет поле environment=production
  2. maskPII маскирует поля ssn,credit_card — поле environment не трогает

Неправильный порядок (если maskPII должен обработать поле, добавленное insertField):

transforms=maskPII,addEnv
  1. maskPII маскирует ssn,credit_card — поле environment ещё не существует
  2. addEnv добавляет environment — но оно уже не будет замаскировано
WARNING

SMT предназначены для простых, per-record трансформаций. Для сложной бизнес-логики, требующей состояния или работы с несколькими записями одновременно (join между топиками, агрегации, windowing), используйте Kafka Streams (Модуль 07). Попытка реализовать stateful логику в SMT невозможна по архитектурным причинам.


Анатомия custom SMT

Если встроенных SMT недостаточно, можно написать собственный. Custom SMT реализует интерфейс Transformation<R extends ConnectRecord<R>>.

Структура класса (концептуально):

public class AddSchemaVersion<R extends ConnectRecord<R>>
    implements Transformation<R> {

    private String schemaVersion;

    // Вызывается при старте коннектора — применяет конфигурацию
    @Override
    public void configure(Map<String, ?> configs) {
        this.schemaVersion = (String) configs.get("schema.version");
    }

    // Вызывается для каждой записи
    @Override
    public R apply(R record) {
        // Создаём новую запись с дополнительным полем
        // ConnectRecord неизменяем — создаём новый объект
        return record.newRecord(
            record.topic(), record.kafkaPartition(),
            record.keySchema(), record.key(),
            newSchema, newValue,
            record.timestamp()
        );
    }

    // Возвращает спецификацию конфигурационных параметров
    @Override
    public ConfigDef config() {
        return new ConfigDef()
            .define("schema.version", ConfigDef.Type.STRING,
                    ConfigDef.Importance.HIGH, "Schema version to insert");
    }

    // Вызывается при остановке коннектора — освобождает ресурсы
    @Override
    public void close() {}
}

Custom SMT упаковывается в JAR и помещается в директорию plugin.path Connect Worker. После перезапуска воркера SMT доступен по полному имени класса.


Производительность SMT

SMT выполняются синхронно в потоке задачи. Производительность:

  • Простые трансформации (InsertField, ReplaceField, MaskField): субмикросекундные операции, пренебрежимый overhead.
  • RegexRouter: компиляция regex происходит при configure(), не при каждом apply(). Но само matching имеет overhead для сложных regex на каждую запись — используйте простые паттерны.
  • Flatten с глубокой вложенностью: рекурсивный обход структуры — может быть заметен при очень высоком throughput.
  • Последовательные цепочки: каждый SMT добавляет вызов функции. Цепочка из 10 SMT всё равно быстрее, чем внешняя обработка через отдельный сервис.

Ключевые выводы

  1. SMT — per-record трансформации внутри Connect Worker. Применяются до записи в Kafka (source) или до записи в целевую систему (sink).
  2. Конфигурация: transforms=a,b,c — порядок слева направо.
  3. Встроенные SMT покрывают большинство потребностей: InsertField, ReplaceField, MaskField, TimestampRouter, RegexRouter, Filter + Predicates.
  4. Порядок трансформаций критичен — применяются последовательно, каждая получает результат предыдущей.
  5. Для stateful логики (join, aggregate, window) — Kafka Streams, не SMT.
  6. Custom SMT: реализовать Transformation<R>, упаковать в JAR, добавить в plugin.path.
Проверка знанийKnowledge check
В цепочке SMT: transforms=maskSensitive,addTimestamp. MaskSensitive маскирует поле 'ssn'. AddTimestamp добавляет поле 'processed_at'. Теперь требование изменилось: нужно также замаскировать поле 'processed_at'. Как изменить цепочку, чтобы 'processed_at' был добавлен, а затем замаскирован?
ОтветAnswer
Нужно переставить порядок: transforms=addTimestamp,maskSensitive. Тогда: (1) addTimestamp добавляет поле 'processed_at', (2) maskSensitive маскирует 'ssn' И 'processed_at' (добавив его в список fields). Текущий порядок maskSensitive,addTimestamp не сработает: maskSensitive выполняется первым, когда поля 'processed_at' ещё не существует — оно не будет замаскировано. Затем addTimestamp добавляет поле уже после маскировки. Вывод: SMT применяются строго последовательно; поле можно обработать только в той SMT, которая стоит после SMT, создавшей это поле.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. В какой момент SMT выполняются в pipeline source connector?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7