Требуемые знания:
- module-4/02-predicates-filtering
Маскирование PII данных с помощью SMT
CDC захватывает все изменения в таблице — включая персональные данные (PII): номера социального страхования, email, телефоны, адреса. Но что если downstream-системе эти данные не нужны в открытом виде? Или вы обязаны соблюдать GDPR и минимизировать раскрытие PII?
MaskField SMT — встроенная трансформация Kafka Connect для маскирования чувствительных полей прямо в CDC pipeline, до попадания данных в Kafka.
Зачем маскировать данные в CDC?
1. Требования безопасности и compliance
GDPR Article 25 (Data Protection by Design): Минимизация обработки персональных данных.
Если аналитическая система использует CDC для построения витрины заказов, ей не нужны email и телефоны клиентов в открытом виде. Маскирование на уровне pipeline снижает риск утечки.
2. Ограничения downstream-систем
Некоторые системы (data warehouse, аналитические платформы) имеют более низкие требования безопасности, чем production база данных. Публикация PII в такие системы увеличивает attack surface.
3. Разделение ответственности команд
Команда аналитики может не иметь clearance для работы с PII, но нуждается в остальных полях (order_id, amount, status). MaskField позволяет дать доступ к событиям без раскрытия чувствительных данных.
Маскировка PII полей для GDPR compliance
MaskField SMT: Основы
MaskField — это стандартная трансформация Apache Kafka Connect (не специфичная для Debezium), которая заменяет указанные поля фиксированным значением.
Базовая конфигурация
{
"name": "customers-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "inventory",
"topic.prefix": "inventory",
"table.include.list": "public.customers",
"transforms": "mask",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "ssn,credit_card,email",
"transforms.mask.replacement": "***MASKED***"
}
}
Параметры:
transforms.mask.type— класс трансформации ($Valueдля маскирования значений записи,$Keyдля ключей)transforms.mask.fields— список полей через запятую (без пробелов!)transforms.mask.replacement— значение для замены (опционально, по умолчанию null)
Результат маскирования
Исходное событие:
{
"payload": {
"op": "c",
"after": {
"id": 1,
"name": "John Doe",
"email": "[email protected]",
"ssn": "123-45-6789",
"phone": "+1-555-0123"
}
}
}
После MaskField:
{
"payload": {
"op": "c",
"after": {
"id": 1,
"name": "John Doe",
"email": "***MASKED***",
"ssn": "***MASKED***",
"phone": "+1-555-0123"
}
}
}
Поле phone не маскировано, т.к. не указано в transforms.mask.fields.
Типы полей и поведение маскирования
MaskField работает по-разному в зависимости от типа данных поля:
| Тип поля | Поведение маскирования | Пример |
|---|---|---|
| String | Заменяется на replacement | "[email protected]" → "***MASKED***" |
| Numeric (int, long, float, double) | Заменяется на 0 или numeric replacement | 123456 → 0 |
| Boolean | Заменяется на false | true → false |
| Date/Timestamp | Заменяется на epoch (0) | 2024-01-15T10:00:00Z → 1970-01-01T00:00:00Z |
| Array/Struct | Заменяется на null | [1, 2, 3] → null |
Пример с разными типами
Конфигурация:
{
"transforms": "mask",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "email,age,birth_date,is_premium",
"transforms.mask.replacement": "***MASKED***"
}
Исходные данные:
{
"email": "[email protected]",
"age": 28,
"birth_date": "1996-05-15",
"is_premium": true
}
После маскирования:
{
"email": "***MASKED***",
"age": 0,
"birth_date": "1970-01-01",
"is_premium": false
}
Важно: Для числовых полей
replacementигнорируется — всегда используется0. Для boolean — всегдаfalse.
Проверка знанийMaskField заменяет строковое поле email на '***MASKED***'. Что произойдет с числовым полем age и boolean полем is_premium при маскировании?
Комбинация с ExtractNewRecordState
В реальных сценариях MaskField часто используется вместе с ExtractNewRecordState для маскирования после flattening событий.
Порядок трансформаций
{
"transforms": "unwrap,mask",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "ssn,credit_card,email"
}
Порядок имеет значение!
Debezium envelope формат
Flat JSON формат
Результат:
{
"id": 1,
"name": "Alice",
"email": "***MASKED***",
"ssn": "***MASKED***",
"__op": "c",
"__table": "customers",
"__source.ts_ms": 1706745600000
}
Правило: Всегда применяйте MaskField после unwrap, чтобы маскировать финальные данные, которые попадут в Kafka.
MaskField для ключей (MaskField$Key)
Иногда нужно маскировать ключ записи (key), а не только значение (value).
Сценарий: Debezium использует primary key таблицы как Kafka message key. Если primary key — email или SSN, его нужно маскировать.
Конфигурация
{
"transforms": "maskKey,maskValue",
"transforms.maskKey.type": "org.apache.kafka.connect.transforms.MaskField$Key",
"transforms.maskKey.fields": "email",
"transforms.maskValue.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskValue.fields": "ssn,credit_card"
}
Результат:
Key: { "email": "***MASKED***" }
Value: { "id": 1, "name": "Alice", "ssn": "***MASKED***", "credit_card": "***MASKED***" }
Предупреждение: Маскирование ключа нарушает log compaction! Если topic использует
cleanup.policy=compact, все записи с одинаковым маскированным ключом будут схлопываться в одну. Используйте MaskField$Key только для non-compacted topics.
Селективное маскирование с предикатами
Проблема: Вы хотите маскировать PII только для таблицы customers, но не для orders.
Решение: Используйте предикаты для применения MaskField только к определенным событиям.
{
"transforms": "mask",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "email,phone",
"transforms.mask.predicate": "IsCustomersTable",
"predicates": "IsCustomersTable",
"predicates.IsCustomersTable.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsCustomersTable.pattern": "inventory.public.customers"
}
Как работает:
- События из топика
inventory.public.customers→ маскируются - События из топика
inventory.public.orders→ не маскируются
Negate предикат для обратной логики
Сценарий: Маскировать все таблицы кроме audit_log.
{
"transforms": "mask",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "email,ssn",
"transforms.mask.predicate": "IsNotAuditLog",
"predicates": "IsNotAuditLog",
"predicates.IsNotAuditLog.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsNotAuditLog.pattern": "inventory.public.audit_log",
"predicates.IsNotAuditLog.negate": "true"
}
negate: true инвертирует предикат — теперь маскируются все топики кроме audit_log.
Lab: Маскирование PII полей в таблице customers
Давайте настроим коннектор с маскированием email, SSN и телефона.
Шаг 1: Создайте таблицу с PII данными
CREATE TABLE public.customers_pii (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) NOT NULL,
ssn VARCHAR(11) NOT NULL,
phone VARCHAR(20),
address TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
INSERT INTO public.customers_pii (name, email, ssn, phone, address) VALUES
('Alice Johnson', '[email protected]', '123-45-6789', '+1-555-0101', '123 Main St'),
('Bob Smith', '[email protected]', '987-65-4321', '+1-555-0202', '456 Oak Ave'),
('Charlie Brown', '[email protected]', '111-22-3333', '+1-555-0303', '789 Pine Rd');
Шаг 2: Создайте коннектор с маскированием
{
"name": "customers-pii-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "inventory",
"topic.prefix": "inventory",
"table.include.list": "public.customers_pii",
"plugin.name": "pgoutput",
"transforms": "unwrap,mask",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.add.fields": "op,table",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "email,ssn,phone",
"transforms.mask.replacement": "***MASKED***"
}
}
Шаг 3: Зарегистрируйте коннектор
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @customers-pii-connector.json
Шаг 4: Потребите события с маскированными данными
from confluent_kafka import Consumer
consumer = Consumer({
'bootstrap.servers': 'kafka:9092',
'group.id': 'pii-consumer',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['inventory.public.customers_pii'])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
value = json.loads(msg.value().decode('utf-8'))
print(f"ID: {value['id']}")
print(f"Name: {value['name']}")
print(f"Email: {value['email']}") # ***MASKED***
print(f"SSN: {value['ssn']}") # ***MASKED***
print(f"Phone: {value['phone']}") # ***MASKED***
print(f"Address: {value['address']}") # НЕ замаскировано
print(f"Operation: {value['__op']}")
print("---")
Вывод:
ID: 1
Name: Alice Johnson
Email: ***MASKED***
SSN: ***MASKED***
Phone: ***MASKED***
Address: 123 Main St
Operation: r
---
Проверка знанийПочему MaskField рекомендуется использовать после ExtractNewRecordState (unwrap), а не до него? Что произойдет, если MaskField применить к Debezium envelope напрямую?
Важные ограничения MaskField
1. Маскирование происходит после захвата WAL
MaskField трансформирует события в Kafka Connect, а не в PostgreSQL. Это означает:
- Исходные данные все еще в WAL — оригинальные PII доступны в базе данных
- WAL replication slot хранит незамаскированные события — до применения трансформации
- WAL retention сохраняет PII — пока не истечет
max_slot_wal_keep_sizeилиwal_level
Security callout: MaskField не шифрует данные и не удаляет их из источника. Это только downstream protection — защита данных в Kafka и downstream-системах. Для полного compliance требуется шифрование на уровне БД.
2. MaskField не поддерживает условное маскирование
Вы не можете замаскировать email только для определенных записей (например, только для EU пользователей).
Что НЕ работает:
// Такого синтаксиса НЕ существует!
"transforms.mask.fields": "email",
"transforms.mask.condition": "value.region == 'EU'"
Решение: Используйте два отдельных коннектора:
- Коннектор для EU таблиц — с маскированием
- Коннектор для non-EU таблиц — без маскирования
3. Производительность SMT
MaskField выполняется синхронно в потоке Kafka Connect worker. Для больших нагрузок (>10K events/sec) замена полей добавляет overhead ~1-5ms на событие.
Рекомендация: Используйте MaskField для простого маскирования. Для сложной логики (частичное маскирование, tokenization) используйте Kafka Streams или внешний сервис.
Production рекомендации
1. Всегда используйте unwrap + mask
"transforms": "unwrap,mask"
Не маскируйте Debezium envelope (before, after), т.к. это усложняет debugging.
2. Документируйте маскированные поля
В описании коннектора явно перечислите замаскированные поля:
{
"name": "customers-connector",
"config": {
// ... connector config ...
"transforms.mask.fields": "email,ssn,phone,credit_card",
"// PII_MASKING_COMMENT": "email, ssn, phone, credit_card are masked with ***MASKED***"
}
}
3. Тестируйте маскирование в pre-production
Создайте тестовый коннектор с sample данными и проверьте, что маскируются только нужные поля. Легко случайно замаскировать критическое бизнес-поле с похожим названием.
4. Координируйте с downstream-командами
Если downstream consumer ожидает реальные email для notification, а вы их замаскировали — система сломается. Заранее согласуйте, какие поля доступны в каких топиках.
Альтернативы MaskField
| Подход | Плюсы | Минусы |
|---|---|---|
| MaskField SMT | Простота, встроенная функция | Только полная замена, нет частичного маскирования |
| Custom SMT | Гибкая логика (частичное маскирование, tokenization) | Требует разработки, развертывания, поддержки |
| Kafka Streams processor | Stateful трансформации, сложная логика | Дополнительная инфраструктура, latency |
| PostgreSQL column encryption | Данные шифруются в источнике | Performance overhead на БД, сложность key management |
| Field-level ACLs в Kafka | Granular access control | Требует Confluent Platform или custom auth, не скрывает данные на уровне record |
Что мы узнали
- MaskField SMT заменяет указанные поля фиксированным значением для compliance и безопасности
- Типы полей обрабатываются по-разному: string → replacement, numeric → 0, boolean → false
- MaskFieldKey для ключей (осторожно с compaction!)
- Комбинация unwrap + mask — стандартный паттерн для маскирования после flattening
- Предикаты позволяют маскировать селективно (только определенные таблицы)
- Ограничения: Исходные данные остаются в WAL, нет условного маскирования
- Production: Документируйте маскированные поля, тестируйте, координируйте с downstream
Что дальше?
В следующем уроке мы изучим content-based routing — маршрутизацию событий в разные топики на основе значений полей. Это полезно для multi-tenant систем, региональной изоляции данных (EU vs US) и приоритезации событий.
Проверьте понимание
Закончили урок?
Отметьте его как пройденный, чтобы отслеживать свой прогресс