Перейти к содержанию
Learning Platform
Средний
30 минут
predicates filtering smt groovy

Требуемые знания:

  • module-4/01-smt-overview

Predicates и фильтрация событий

В предыдущем уроке мы изучили SMT и построили transformation chain: Filter → Unwrap → Mask. Но есть проблема: не все сообщения подходят для каждого SMT.

Попробуйте применить ExtractNewRecordState к heartbeat сообщению — коннектор упадет с ошибкой “Field ‘after’ not found”. Heartbeat не имеет поля after, а SMT ожидает его.

Predicates решают эту проблему — позволяют применять SMT селективно, только к подходящим сообщениям.

В этом уроке мы изучим систему предикатов Kafka Connect, познакомимся с встроенными предикатами, и научимся использовать Filter SMT с Groovy для сложной фильтрации.

Проблема: SMT падают на нестандартных сообщениях

Сценарий 1: Heartbeat сообщения

Вы настроили heartbeat для продвижения replication slot:

{
  "heartbeat.interval.ms": "10000",
  "heartbeat.action.query": "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)"
}

И добавили ExtractNewRecordState SMT:

{
  "transforms": "unwrap",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}

Результат: Коннектор падает каждые 10 секунд:

ERROR Field 'after' not found in record: {
  "source": { ... },
  "op": "h",  // ← heartbeat operation
  "ts_ms": 1706745600000
}

Heartbeat сообщение не имеет поля after — это не DML событие.

Сценарий 2: Tombstone сообщения

Debezium генерирует tombstone (message с value = null) после DELETE события для Kafka log compaction.

// DELETE event
{"before": {"id": 1}, "after": null, "op": "d"}

// Tombstone (для компактации)
{"key": {"id": 1}, "value": null}  // ← нет полей вообще

Если SMT ожидает поля в value — он упадет на tombstone.

Сценарий 3: Schema change events

PostgreSQL DDL операции генерируют специальные события:

{
  "source": { ... },
  "historyRecord": "{\"ddl\": \"ALTER TABLE customers ADD COLUMN age INT\"}",
  "ts_ms": 1706745600000
}

Нет before, after, op — только метаданные о DDL.

Решение: Predicates — условное применение SMT

Predicate — это условие (condition), которое определяет, применять ли SMT к конкретному сообщению.

Predicate условное применение

SMT применяется только если predicate вернул true

Incoming Message
topic: dbserver1.inventory.customers
Predicate: IsDataEvent
pattern: dbserver1\.inventory\..*
TRUE
Apply SMT
Unwrap envelope
FALSE
Skip SMT
Pass through
To Kafka

Идея: SMT применяется только если predicate вернул true.

Конфигурация с predicate

{
  "transforms": "unwrap",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.predicate": "IsDataEvent",

  "predicates": "IsDataEvent",
  "predicates.IsDataEvent.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.IsDataEvent.pattern": "dbserver1\\.inventory\\..*"
}

Что происходит:

  1. Debezium генерирует сообщение с topic dbserver1.inventory.customers
  2. Predicate IsDataEvent проверяет: топик соответствует regex dbserver1\.inventory\..*?
  3. Если TRUE → применяется SMT unwrap
  4. Если FALSE → сообщение проходит без изменений

Heartbeat публикуется в топик __debezium-heartbeat.dbserver1 (не соответствует pattern) → SMT пропускается → нет ошибки.

Встроенные предикаты Kafka Connect

Kafka Connect (требуется версия 2.6+) предоставляет три стандартных predicate.

1. TopicNameMatches — Фильтр по имени топика

Назначение: Проверить, соответствует ли топик regex-шаблону.

Класс: org.apache.kafka.connect.transforms.predicates.TopicNameMatches

Конфигурация:

{
  "predicates": "IsInventoryTopic",
  "predicates.IsInventoryTopic.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.IsInventoryTopic.pattern": "dbserver1\\.inventory\\..*"
}

Применение: Применять SMT только к определенным топикам (например, исключить heartbeat).

Пример использования:

{
  "transforms": "unwrap",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.predicate": "IsInventoryTopic",

  "predicates": "IsInventoryTopic",
  "predicates.IsInventoryTopic.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.IsInventoryTopic.pattern": "dbserver1\\.inventory\\..*"
}

Regex синтаксис: Java regex (\\. для точки, .* для любых символов).

2. RecordIsTombstone — Проверка tombstone

Назначение: Проверить, является ли сообщение tombstone (value = null).

Класс: org.apache.kafka.connect.transforms.predicates.RecordIsTombstone

Конфигурация:

{
  "predicates": "IsTombstone",
  "predicates.IsTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"
}

Применение: Пропустить tombstone сообщения при обработке SMT.

Пример использования:

{
  "transforms": "unwrap",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.predicate": "IsNotTombstone",

  "predicates": "IsNotTombstone",
  "predicates.IsNotTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
  "predicates.IsNotTombstone.negate": "true"
}

Negate: negate: true инвертирует результат (применить SMT к не-tombstone сообщениям).

3. HasHeaderKey — Проверка наличия header

Назначение: Проверить, существует ли указанный header в сообщении.

Класс: org.apache.kafka.connect.transforms.predicates.HasHeaderKey

Конфигурация:

{
  "predicates": "HasEventId",
  "predicates.HasEventId.type": "org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
  "predicates.HasEventId.name": "event_id"
}

Применение: Обрабатывать только сообщения с определенным header (например, outbox events).

Пример использования:

{
  "transforms": "route",
  "transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
  "transforms.route.predicate": "HasEventId",

  "predicates": "HasEventId",
  "predicates.HasEventId.type": "org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
  "predicates.HasEventId.name": "id"
}
Проверка знаний
В чем разница между предикатом и Filter SMT? Когда использовать каждый инструмент?
Ответ
Предикат определяет, применять ли конкретный SMT к сообщению (условное применение трансформации). Filter SMT удаляет сообщение целиком, если условие не выполнено. Предикат используется, когда нужно пропустить SMT для определенных сообщений (например, не применять unwrap к heartbeat). Filter используется, когда нужно отбросить сообщение из pipeline (например, удалить DELETE-события).

Negate — Инверсия предиката

Любой predicate можно инвертировать с помощью параметра negate.

Пример: Применить SMT ко всем сообщениям, КРОМЕ tombstones

{
  "transforms": "mask",
  "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.mask.fields": "email",
  "transforms.mask.predicate": "IsNotTombstone",

  "predicates": "IsNotTombstone",
  "predicates.IsNotTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
  "predicates.IsNotTombstone.negate": "true"
}

Логика: Применить MaskField если RecordIsTombstone вернул false (т.е. это НЕ tombstone).

Комбинирование нескольких предикатов

Вы можете объявить несколько предикатов и использовать их для разных SMT.

Пример: Разные предикаты для unwrap и mask

{
  "transforms": "unwrap,mask",

  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.predicate": "IsDataEvent",

  "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.mask.fields": "email,phone",
  "transforms.mask.predicate": "IsCustomerTable",

  "predicates": "IsDataEvent,IsCustomerTable",

  "predicates.IsDataEvent.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.IsDataEvent.pattern": "dbserver1\\.inventory\\..*",

  "predicates.IsCustomerTable.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.IsCustomerTable.pattern": ".*customers"
}

Что происходит:

  1. unwrap применяется только к топикам dbserver1.inventory.* (исключая heartbeat)
  2. mask применяется только к топику customers (не маскируем поля в orders)
Комбинирование нескольких предикатов

Разные предикаты для разных SMT в одном коннекторе

Message
topic: dbserver1.inventory.customers
SMT 1: unwrap
Predicate: IsDataEvent
pattern: dbserver1.inventory.*
TRUE →
ExtractNewRecordState
flat JSON
SMT 2: mask
Predicate: IsCustomerTable
pattern: .*customers
TRUE →
MaskField
To Kafka
Разные форматы по предикатам

Filter SMT с Groovy — Продвинутая фильтрация

Предикаты полезны, но ограничены: проверка топика, tombstone, header. Что если нужна фильтрация по содержимому?

Filter SMT позволяет удалить сообщение на основе сложного условия, написанного на Groovy (или JavaScript).

Что такое Filter SMT?

Filter SMT — это Debezium SMT, который отбрасывает (drops) сообщения, если условие вернуло false.

Класс: io.debezium.transforms.Filter

Поддерживаемые языки:

  • Groovy (jsr223.groovy) — рекомендуется для production
  • JavaScript (jsr223.graal.js) — требует GraalVM
  • Go/WASM — экспериментально

Базовый пример: Фильтровать только INSERT и UPDATE

Задача: Отбрасывать DELETE события (op == 'd').

{
  "transforms": "filter",
  "transforms.filter.type": "io.debezium.transforms.Filter",
  "transforms.filter.language": "jsr223.groovy",
  "transforms.filter.condition": "value.op == 'c' || value.op == 'u'"
}

Как это работает:

  1. Filter SMT получает сообщение с Debezium envelope
  2. Выполняет Groovy expression: value.op == 'c' || value.op == 'u'
  3. Если TRUE → сообщение проходит дальше
  4. Если FALSE → сообщение отбрасывается (не публикуется в Kafka)

Доступные переменные в Groovy:

  • value — значение сообщения (Debezium envelope)
  • key — ключ сообщения
  • topic — имя топика
  • timestamp — timestamp сообщения

Пример 2: Фильтровать по значению поля

Задача: Публиковать только события для активных пользователей (status == 'ACTIVE').

{
  "transforms": "filter",
  "transforms.filter.type": "io.debezium.transforms.Filter",
  "transforms.filter.language": "jsr223.groovy",
  "transforms.filter.condition": "value.after != null && value.after.status == 'ACTIVE'"
}

Проверка value.after != null: Обязательна, потому что DELETE события имеют after = null.

Пример 3: Фильтровать по нескольким условиям

Задача: Публиковать только UPDATE события для таблицы customers с country == 'US'.

{
  "transforms": "filter",
  "transforms.filter.type": "io.debezium.transforms.Filter",
  "transforms.filter.language": "jsr223.groovy",
  "transforms.filter.condition": "value.op == 'u' && value.source.table == 'customers' && value.after.country == 'US'"
}

Важные нюансы Filter SMT

1. Filter работает с Debezium envelope

Filter SMT получает полный Debezium envelope, не flattened payload. Это значит:

  • Используйте value.after.field_name, не value.field_name
  • Проверяйте value.op для типа операции
  • Доступна метаданные: value.source.table, value.source.ts_ms

2. Filter выполняется ДО unwrap

Стандартный порядок SMT chain:

Filter → Unwrap → Route → Mask

Пример конфигурации:

{
  "transforms": "filter,unwrap",
  "transforms.filter.type": "io.debezium.transforms.Filter",
  "transforms.filter.language": "jsr223.groovy",
  "transforms.filter.condition": "value.op == 'c' || value.op == 'u'",

  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}

Почему Filter ДО unwrap?

  • Уменьшает объем данных для unwrap (отброшенные сообщения не обрабатываются)
  • Filter работает с envelope (есть доступ к value.op)

3. Комбинирование Filter с предикатами

Вы можете применять Filter только к определенным топикам:

{
  "transforms": "filter",
  "transforms.filter.type": "io.debezium.transforms.Filter",
  "transforms.filter.language": "jsr223.groovy",
  "transforms.filter.condition": "value.after.amount > 1000",
  "transforms.filter.predicate": "IsOrdersTable",

  "predicates": "IsOrdersTable",
  "predicates.IsOrdersTable.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.IsOrdersTable.pattern": ".*orders"
}

Логика: Применить Filter только к топику orders, фильтровать по amount > 1000.

Decision Tree: Predicate vs Filter SMT

Когда использовать предикаты, а когда Filter SMT?

Decision Tree: Predicate vs Filter SMT

Когда использовать предикаты, а когда Filter SMT

Нужна фильтрация?
Условие зависит от содержимого message?
НЕТ
(только topic/tombstone/header)
Используйте Predicate
Примеры:
- TopicNameMatches
- RecordIsTombstone
- HasHeaderKey
ДА
(нужен доступ к value.after)
Простое условие (1-2 поля)?
ДА
Filter SMT
+ Groovy condition
Примеры:
- value.op == 'c'
- value.after.status == 'ACTIVE'
НЕТ
(сложная логика)
Kafka Streams
или ksqlDB
Для:
- Сложная логика
- Агрегации
- Joins

Рекомендации:

СценарийИнструментПример
Исключить heartbeatPredicate (TopicNameMatches)pattern: "dbserver1\\.inventory\\..*"
Исключить tombstonesPredicate (RecordIsTombstone)negate: true
Фильтр по opFilter SMTvalue.op == 'c' || value.op == 'u'
Фильтр по значению поляFilter SMTvalue.after.status == 'ACTIVE'
Агрегация, joinKafka StreamsgroupBy(), join()
Проверка знаний
Почему Groovy-выражение value.after.status в Filter SMT вызывает NullPointerException на DELETE-событиях? Как написать безопасную версию?
Ответ
DELETE-события в Debezium имеют after = null, потому что после удаления записи нет нового состояния. Обращение к value.after.status при after = null вызывает NullPointerException. Безопасные варианты: (1) явная проверка value.after != null && value.after.status == 'ACTIVE', или (2) Groovy safe navigation operator value?.after?.status == 'ACTIVE'.

Pitfall: Filter на DELETE событиях

Проблема: DELETE события имеют after = null. Если вы пишете:

value.after.status == 'ACTIVE'

Для DELETE события Groovy выбросит NullPointerException → коннектор упадет.

Решение: Всегда проверяйте value.after != null:

value.after != null && value.after.status == 'ACTIVE'

Или используйте оператор безопасной навигации Groovy (safe navigation):

value?.after?.status == 'ACTIVE'

Lab: Настройка Filter SMT для INSERT/UPDATE

Давайте настроим реальный коннектор с фильтрацией.

Задача

Настроить PostgreSQL коннектор, который:

  1. Захватывает таблицу customers
  2. Фильтрует только INSERT и UPDATE события (отбрасывает DELETE)
  3. Разворачивает envelope (ExtractNewRecordState)
  4. Маскирует поле email

Шаг 1: Подготовьте lab-среду

cd labs
docker-compose up -d

# Убедитесь, что PostgreSQL доступен
docker-compose exec postgres psql -U postgres -d inventory -c "SELECT count(*) FROM customers;"

Шаг 2: Создайте конфигурацию коннектора

Создайте файл labs/connectors/filtered-connector.json:

{
  "name": "filtered-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",

    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "inventory",

    "topic.prefix": "filtered",
    "table.include.list": "public.customers",
    "plugin.name": "pgoutput",

    "transforms": "filter,unwrap,mask",

    "transforms.filter.type": "io.debezium.transforms.Filter",
    "transforms.filter.language": "jsr223.groovy",
    "transforms.filter.condition": "value.op == 'c' || value.op == 'u'",

    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.add.fields": "op,table,source.ts_ms",
    "transforms.unwrap.add.fields.prefix": "__",

    "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
    "transforms.mask.fields": "email"
  }
}

Шаг 3: Создайте коннектор

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @labs/connectors/filtered-connector.json

Шаг 4: Проверьте результат в Kafka

Откройте первый терминал для чтения Kafka:

docker-compose exec kafka kafka-console-consumer \
  --bootstrap-server kafka:9092 \
  --topic filtered.public.customers \
  --from-beginning \
  --property print.key=true

Шаг 5: Выполните DML операции

Откройте второй терминал:

# INSERT (будет опубликован)
docker-compose exec postgres psql -U postgres -d inventory -c \
  "INSERT INTO customers (first_name, last_name, email) VALUES ('Alice', 'Smith', '[email protected]');"

# UPDATE (будет опубликован)
docker-compose exec postgres psql -U postgres -d inventory -c \
  "UPDATE customers SET first_name = 'Alice Updated' WHERE email = '[email protected]';"

# DELETE (будет отброшен Filter SMT)
docker-compose exec postgres psql -U postgres -d inventory -c \
  "DELETE FROM customers WHERE email = '[email protected]';"

Шаг 6: Проверьте вывод

В терминале с Kafka consumer вы увидите:

// INSERT event
{"id": 5}	{"id": 5, "first_name": "Alice", "last_name": "Smith", "email": null, "__op": "c", "__table": "customers", "__source.ts_ms": 1706745600000}

// UPDATE event
{"id": 5}	{"id": 5, "first_name": "Alice Updated", "last_name": "Smith", "email": null, "__op": "u", "__table": "customers", "__source.ts_ms": 1706745601000}

// DELETE event НЕ появляется (отфильтрован)

Обратите внимание:

  • email замаскирован (null) из-за MaskField
  • __op, __table, __source.ts_ms добавлены из-за add.fields
  • DELETE событие не появилось (отброшено Filter SMT)

Продвинутые паттерны предикатов

Pattern 1: Комбинирование TopicNameMatches с negate

Задача: Применить unwrap ко ВСЕМ топикам, КРОМЕ heartbeat.

{
  "transforms": "unwrap",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.predicate": "IsNotHeartbeat",

  "predicates": "IsNotHeartbeat",
  "predicates.IsNotHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.IsNotHeartbeat.pattern": "__debezium-heartbeat\\..*",
  "predicates.IsNotHeartbeat.negate": "true"
}

Pattern 2: Разные SMT для разных таблиц

Задача: Маскировать email в customers, но не в orders.

{
  "transforms": "unwrap,mask",

  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",

  "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.mask.fields": "email",
  "transforms.mask.predicate": "IsCustomersTable",

  "predicates": "IsCustomersTable",
  "predicates.IsCustomersTable.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.IsCustomersTable.pattern": ".*customers"
}

Pattern 3: Filter + Predicate для специфичных сценариев

Задача: Фильтровать по значению поля только в одной таблице.

{
  "transforms": "filter,unwrap",

  "transforms.filter.type": "io.debezium.transforms.Filter",
  "transforms.filter.language": "jsr223.groovy",
  "transforms.filter.condition": "value.after.amount > 1000",
  "transforms.filter.predicate": "IsOrdersTable",

  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",

  "predicates": "IsOrdersTable",
  "predicates.IsOrdersTable.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.IsOrdersTable.pattern": ".*orders"
}

Логика:

  1. Filter применяется только к топику orders
  2. В orders фильтруются только заказы с amount > 1000
  3. Для customers фильтр не применяется (predicate вернул false)

Производительность Filter SMT

Filter SMT выполняет Groovy expression для каждого сообщения. Это overhead.

Типичные latency

Groovy expressionLatencyПример
Простое сравнениеменее 1msvalue.op == 'c'
Доступ к полю1-2msvalue.after.status == 'ACTIVE'
Множественные условия2-5msvalue.op == 'u' && value.after.amount > 1000 && value.after.country == 'US'
Регулярные выражения5-10msvalue.after.email =~ /.*@example\\.com/

Мониторинг Filter impact

После добавления Filter SMT проверьте метрику:

debezium_metrics_MilliSecondsBehindSource{connector="filtered-connector"}

Если lag вырос значительно — Filter слишком медленный.

Оптимизация

  1. Фильтруйте рано — Filter ДО unwrap уменьшает объем обработки
  2. Используйте простые условия — избегайте regex, если возможно
  3. Применяйте предикаты — Filter только к нужным топикам
  4. Profiling Groovy — используйте System.currentTimeMillis() для замера

Что дальше?

Теперь вы умеете:

  • Применять SMT селективно с предикатами
  • Фильтровать события по содержимому с Filter SMT
  • Комбинировать предикаты и Filter для сложных сценариев

В следующем уроке мы изучим Outbox Pattern — как использовать Debezium для event-driven microservices архитектуры с transactional guarantees.

Ключевые выводы

  1. Predicates позволяют применять SMT селективно к подходящим сообщениям
  2. TopicNameMatches — основной predicate для исключения heartbeat и системных топиков
  3. RecordIsTombstone — для обработки tombstone сообщений (с negate: true)
  4. HasHeaderKey — для проверки наличия headers (полезно для Outbox pattern)
  5. Filter SMT использует Groovy для фильтрации по содержимому сообщения
  6. Filter работает с envelope — доступны value.op, value.after, value.source
  7. Всегда проверяйте value.after != null в Groovy для DELETE событий
  8. Стандартный порядок: Filter → Unwrap (Filter до unwrap для эффективности)
  9. Predicates комбинируются — можно использовать разные предикаты для разных SMT
  10. Filter SMT добавляет overhead — профилируйте через MilliSecondsBehindSource

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какую возможность предоставляют предикаты (predicates) в Kafka Connect, которую нельзя реализовать обычной SMT chain без предикатов?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс