Prerequisites:
- module-4/01-smt-overview
Predicates и фильтрация событий
В предыдущем уроке мы изучили SMT и построили transformation chain: Filter → Unwrap → Mask. Но есть проблема: не все сообщения подходят для каждого SMT.
Попробуйте применить ExtractNewRecordState к heartbeat сообщению — коннектор упадет с ошибкой “Field ‘after’ not found”. Heartbeat не имеет поля after, а SMT ожидает его.
Predicates решают эту проблему — позволяют применять SMT селективно, только к подходящим сообщениям.
В этом уроке мы изучим систему предикатов Kafka Connect, познакомимся с встроенными предикатами, и научимся использовать Filter SMT с Groovy для сложной фильтрации.
Проблема: SMT падают на нестандартных сообщениях
Сценарий 1: Heartbeat сообщения
Вы настроили heartbeat для продвижения replication slot:
{
"heartbeat.interval.ms": "10000",
"heartbeat.action.query": "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)"
}
И добавили ExtractNewRecordState SMT:
{
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
Результат: Коннектор падает каждые 10 секунд:
ERROR Field 'after' not found in record: {
"source": { ... },
"op": "h", // ← heartbeat operation
"ts_ms": 1706745600000
}
Heartbeat сообщение не имеет поля after — это не DML событие.
Сценарий 2: Tombstone сообщения
Debezium генерирует tombstone (message с value = null) после DELETE события для Kafka log compaction.
// DELETE event
{"before": {"id": 1}, "after": null, "op": "d"}
// Tombstone (для компактации)
{"key": {"id": 1}, "value": null} // ← нет полей вообще
Если SMT ожидает поля в value — он упадет на tombstone.
Сценарий 3: Schema change events
PostgreSQL DDL операции генерируют специальные события:
{
"source": { ... },
"historyRecord": "{\"ddl\": \"ALTER TABLE customers ADD COLUMN age INT\"}",
"ts_ms": 1706745600000
}
Нет before, after, op — только метаданные о DDL.
Решение: Predicates — условное применение SMT
Predicate — это условие (condition), которое определяет, применять ли SMT к конкретному сообщению.
SMT применяется только если predicate вернул true
Идея: SMT применяется только если predicate вернул true.
Конфигурация с predicate
{
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.predicate": "IsDataEvent",
"predicates": "IsDataEvent",
"predicates.IsDataEvent.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsDataEvent.pattern": "dbserver1\\.inventory\\..*"
}
Что происходит:
- Debezium генерирует сообщение с topic
dbserver1.inventory.customers - Predicate
IsDataEventпроверяет: топик соответствует regexdbserver1\.inventory\..*? - Если TRUE → применяется SMT
unwrap - Если FALSE → сообщение проходит без изменений
Heartbeat публикуется в топик __debezium-heartbeat.dbserver1 (не соответствует pattern) → SMT пропускается → нет ошибки.
Встроенные предикаты Kafka Connect
Kafka Connect (требуется версия 2.6+) предоставляет три стандартных predicate.
1. TopicNameMatches — Фильтр по имени топика
Назначение: Проверить, соответствует ли топик regex-шаблону.
Класс: org.apache.kafka.connect.transforms.predicates.TopicNameMatches
Конфигурация:
{
"predicates": "IsInventoryTopic",
"predicates.IsInventoryTopic.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsInventoryTopic.pattern": "dbserver1\\.inventory\\..*"
}
Применение: Применять SMT только к определенным топикам (например, исключить heartbeat).
Пример использования:
{
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.predicate": "IsInventoryTopic",
"predicates": "IsInventoryTopic",
"predicates.IsInventoryTopic.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsInventoryTopic.pattern": "dbserver1\\.inventory\\..*"
}
Regex синтаксис: Java regex (\\. для точки, .* для любых символов).
2. RecordIsTombstone — Проверка tombstone
Назначение: Проверить, является ли сообщение tombstone (value = null).
Класс: org.apache.kafka.connect.transforms.predicates.RecordIsTombstone
Конфигурация:
{
"predicates": "IsTombstone",
"predicates.IsTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"
}
Применение: Пропустить tombstone сообщения при обработке SMT.
Пример использования:
{
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.predicate": "IsNotTombstone",
"predicates": "IsNotTombstone",
"predicates.IsNotTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
"predicates.IsNotTombstone.negate": "true"
}
Negate: negate: true инвертирует результат (применить SMT к не-tombstone сообщениям).
3. HasHeaderKey — Проверка наличия header
Назначение: Проверить, существует ли указанный header в сообщении.
Класс: org.apache.kafka.connect.transforms.predicates.HasHeaderKey
Конфигурация:
{
"predicates": "HasEventId",
"predicates.HasEventId.type": "org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
"predicates.HasEventId.name": "event_id"
}
Применение: Обрабатывать только сообщения с определенным header (например, outbox events).
Пример использования:
{
"transforms": "route",
"transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
"transforms.route.predicate": "HasEventId",
"predicates": "HasEventId",
"predicates.HasEventId.type": "org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
"predicates.HasEventId.name": "id"
}
Проверка знанийВ чем разница между предикатом и Filter SMT? Когда использовать каждый инструмент?
Negate — Инверсия предиката
Любой predicate можно инвертировать с помощью параметра negate.
Пример: Применить SMT ко всем сообщениям, КРОМЕ tombstones
{
"transforms": "mask",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "email",
"transforms.mask.predicate": "IsNotTombstone",
"predicates": "IsNotTombstone",
"predicates.IsNotTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
"predicates.IsNotTombstone.negate": "true"
}
Логика: Применить MaskField если RecordIsTombstone вернул false (т.е. это НЕ tombstone).
Комбинирование нескольких предикатов
Вы можете объявить несколько предикатов и использовать их для разных SMT.
Пример: Разные предикаты для unwrap и mask
{
"transforms": "unwrap,mask",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.predicate": "IsDataEvent",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "email,phone",
"transforms.mask.predicate": "IsCustomerTable",
"predicates": "IsDataEvent,IsCustomerTable",
"predicates.IsDataEvent.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsDataEvent.pattern": "dbserver1\\.inventory\\..*",
"predicates.IsCustomerTable.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsCustomerTable.pattern": ".*customers"
}
Что происходит:
unwrapприменяется только к топикамdbserver1.inventory.*(исключая heartbeat)maskприменяется только к топикуcustomers(не маскируем поля вorders)
Разные предикаты для разных SMT в одном коннекторе
Filter SMT с Groovy — Продвинутая фильтрация
Предикаты полезны, но ограничены: проверка топика, tombstone, header. Что если нужна фильтрация по содержимому?
Filter SMT позволяет удалить сообщение на основе сложного условия, написанного на Groovy (или JavaScript).
Что такое Filter SMT?
Filter SMT — это Debezium SMT, который отбрасывает (drops) сообщения, если условие вернуло false.
Класс: io.debezium.transforms.Filter
Поддерживаемые языки:
- Groovy (jsr223.groovy) — рекомендуется для production
- JavaScript (jsr223.graal.js) — требует GraalVM
- Go/WASM — экспериментально
Базовый пример: Фильтровать только INSERT и UPDATE
Задача: Отбрасывать DELETE события (op == 'd').
{
"transforms": "filter",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.op == 'c' || value.op == 'u'"
}
Как это работает:
- Filter SMT получает сообщение с Debezium envelope
- Выполняет Groovy expression:
value.op == 'c' || value.op == 'u' - Если TRUE → сообщение проходит дальше
- Если FALSE → сообщение отбрасывается (не публикуется в Kafka)
Доступные переменные в Groovy:
value— значение сообщения (Debezium envelope)key— ключ сообщенияtopic— имя топикаtimestamp— timestamp сообщения
Пример 2: Фильтровать по значению поля
Задача: Публиковать только события для активных пользователей (status == 'ACTIVE').
{
"transforms": "filter",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.after != null && value.after.status == 'ACTIVE'"
}
Проверка value.after != null: Обязательна, потому что DELETE события имеют after = null.
Пример 3: Фильтровать по нескольким условиям
Задача: Публиковать только UPDATE события для таблицы customers с country == 'US'.
{
"transforms": "filter",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.op == 'u' && value.source.table == 'customers' && value.after.country == 'US'"
}
Важные нюансы Filter SMT
1. Filter работает с Debezium envelope
Filter SMT получает полный Debezium envelope, не flattened payload. Это значит:
- Используйте
value.after.field_name, неvalue.field_name - Проверяйте
value.opдля типа операции - Доступна метаданные:
value.source.table,value.source.ts_ms
2. Filter выполняется ДО unwrap
Стандартный порядок SMT chain:
Filter → Unwrap → Route → Mask
Пример конфигурации:
{
"transforms": "filter,unwrap",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.op == 'c' || value.op == 'u'",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
Почему Filter ДО unwrap?
- Уменьшает объем данных для unwrap (отброшенные сообщения не обрабатываются)
- Filter работает с envelope (есть доступ к
value.op)
3. Комбинирование Filter с предикатами
Вы можете применять Filter только к определенным топикам:
{
"transforms": "filter",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.after.amount > 1000",
"transforms.filter.predicate": "IsOrdersTable",
"predicates": "IsOrdersTable",
"predicates.IsOrdersTable.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsOrdersTable.pattern": ".*orders"
}
Логика: Применить Filter только к топику orders, фильтровать по amount > 1000.
Decision Tree: Predicate vs Filter SMT
Когда использовать предикаты, а когда Filter SMT?
Когда использовать предикаты, а когда Filter SMT
Рекомендации:
| Сценарий | Инструмент | Пример |
|---|---|---|
| Исключить heartbeat | Predicate (TopicNameMatches) | pattern: "dbserver1\\.inventory\\..*" |
| Исключить tombstones | Predicate (RecordIsTombstone) | negate: true |
Фильтр по op | Filter SMT | value.op == 'c' || value.op == 'u' |
| Фильтр по значению поля | Filter SMT | value.after.status == 'ACTIVE' |
| Агрегация, join | Kafka Streams | groupBy(), join() |
Проверка знанийПочему Groovy-выражение value.after.status в Filter SMT вызывает NullPointerException на DELETE-событиях? Как написать безопасную версию?
Pitfall: Filter на DELETE событиях
Проблема: DELETE события имеют after = null. Если вы пишете:
value.after.status == 'ACTIVE'
Для DELETE события Groovy выбросит NullPointerException → коннектор упадет.
Решение: Всегда проверяйте value.after != null:
value.after != null && value.after.status == 'ACTIVE'
Или используйте оператор безопасной навигации Groovy (safe navigation):
value?.after?.status == 'ACTIVE'
Lab: Настройка Filter SMT для INSERT/UPDATE
Давайте настроим реальный коннектор с фильтрацией.
Задача
Настроить PostgreSQL коннектор, который:
- Захватывает таблицу
customers - Фильтрует только INSERT и UPDATE события (отбрасывает DELETE)
- Разворачивает envelope (ExtractNewRecordState)
- Маскирует поле
email
Шаг 1: Подготовьте lab-среду
cd labs
docker-compose up -d
# Убедитесь, что PostgreSQL доступен
docker-compose exec postgres psql -U postgres -d inventory -c "SELECT count(*) FROM customers;"
Шаг 2: Создайте конфигурацию коннектора
Создайте файл labs/connectors/filtered-connector.json:
{
"name": "filtered-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "inventory",
"topic.prefix": "filtered",
"table.include.list": "public.customers",
"plugin.name": "pgoutput",
"transforms": "filter,unwrap,mask",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.op == 'c' || value.op == 'u'",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.unwrap.add.fields.prefix": "__",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "email"
}
}
Шаг 3: Создайте коннектор
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @labs/connectors/filtered-connector.json
Шаг 4: Проверьте результат в Kafka
Откройте первый терминал для чтения Kafka:
docker-compose exec kafka kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic filtered.public.customers \
--from-beginning \
--property print.key=true
Шаг 5: Выполните DML операции
Откройте второй терминал:
# INSERT (будет опубликован)
docker-compose exec postgres psql -U postgres -d inventory -c \
"INSERT INTO customers (first_name, last_name, email) VALUES ('Alice', 'Smith', '[email protected]');"
# UPDATE (будет опубликован)
docker-compose exec postgres psql -U postgres -d inventory -c \
"UPDATE customers SET first_name = 'Alice Updated' WHERE email = '[email protected]';"
# DELETE (будет отброшен Filter SMT)
docker-compose exec postgres psql -U postgres -d inventory -c \
"DELETE FROM customers WHERE email = '[email protected]';"
Шаг 6: Проверьте вывод
В терминале с Kafka consumer вы увидите:
// INSERT event
{"id": 5} {"id": 5, "first_name": "Alice", "last_name": "Smith", "email": null, "__op": "c", "__table": "customers", "__source.ts_ms": 1706745600000}
// UPDATE event
{"id": 5} {"id": 5, "first_name": "Alice Updated", "last_name": "Smith", "email": null, "__op": "u", "__table": "customers", "__source.ts_ms": 1706745601000}
// DELETE event НЕ появляется (отфильтрован)
Обратите внимание:
emailзамаскирован (null) из-заMaskField__op,__table,__source.ts_msдобавлены из-заadd.fields- DELETE событие не появилось (отброшено Filter SMT)
Продвинутые паттерны предикатов
Pattern 1: Комбинирование TopicNameMatches с negate
Задача: Применить unwrap ко ВСЕМ топикам, КРОМЕ heartbeat.
{
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.predicate": "IsNotHeartbeat",
"predicates": "IsNotHeartbeat",
"predicates.IsNotHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsNotHeartbeat.pattern": "__debezium-heartbeat\\..*",
"predicates.IsNotHeartbeat.negate": "true"
}
Pattern 2: Разные SMT для разных таблиц
Задача: Маскировать email в customers, но не в orders.
{
"transforms": "unwrap,mask",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "email",
"transforms.mask.predicate": "IsCustomersTable",
"predicates": "IsCustomersTable",
"predicates.IsCustomersTable.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsCustomersTable.pattern": ".*customers"
}
Pattern 3: Filter + Predicate для специфичных сценариев
Задача: Фильтровать по значению поля только в одной таблице.
{
"transforms": "filter,unwrap",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.after.amount > 1000",
"transforms.filter.predicate": "IsOrdersTable",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"predicates": "IsOrdersTable",
"predicates.IsOrdersTable.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsOrdersTable.pattern": ".*orders"
}
Логика:
- Filter применяется только к топику
orders - В
ordersфильтруются только заказы сamount > 1000 - Для
customersфильтр не применяется (predicate вернулfalse)
Производительность Filter SMT
Filter SMT выполняет Groovy expression для каждого сообщения. Это overhead.
Типичные latency
| Groovy expression | Latency | Пример |
|---|---|---|
| Простое сравнение | менее 1ms | value.op == 'c' |
| Доступ к полю | 1-2ms | value.after.status == 'ACTIVE' |
| Множественные условия | 2-5ms | value.op == 'u' && value.after.amount > 1000 && value.after.country == 'US' |
| Регулярные выражения | 5-10ms | value.after.email =~ /.*@example\\.com/ |
Мониторинг Filter impact
После добавления Filter SMT проверьте метрику:
debezium_metrics_MilliSecondsBehindSource{connector="filtered-connector"}
Если lag вырос значительно — Filter слишком медленный.
Оптимизация
- Фильтруйте рано — Filter ДО unwrap уменьшает объем обработки
- Используйте простые условия — избегайте regex, если возможно
- Применяйте предикаты — Filter только к нужным топикам
- Profiling Groovy — используйте
System.currentTimeMillis()для замера
Что дальше?
Теперь вы умеете:
- Применять SMT селективно с предикатами
- Фильтровать события по содержимому с Filter SMT
- Комбинировать предикаты и Filter для сложных сценариев
В следующем уроке мы изучим Outbox Pattern — как использовать Debezium для event-driven microservices архитектуры с transactional guarantees.
Ключевые выводы
- Predicates позволяют применять SMT селективно к подходящим сообщениям
- TopicNameMatches — основной predicate для исключения heartbeat и системных топиков
- RecordIsTombstone — для обработки tombstone сообщений (с
negate: true) - HasHeaderKey — для проверки наличия headers (полезно для Outbox pattern)
- Filter SMT использует Groovy для фильтрации по содержимому сообщения
- Filter работает с envelope — доступны
value.op,value.after,value.source - Всегда проверяйте
value.after != nullв Groovy для DELETE событий - Стандартный порядок: Filter → Unwrap (Filter до unwrap для эффективности)
- Predicates комбинируются — можно использовать разные предикаты для разных SMT
- Filter SMT добавляет overhead — профилируйте через
MilliSecondsBehindSource
Check Your Understanding
Finished the lesson?
Mark it as complete to track your progress