Skip to content
Learning Platform
Intermediate
25 minutes
pii masking gdpr smt security

Prerequisites:

  • module-4/02-predicates-filtering

Маскирование PII данных с помощью SMT

CDC захватывает все изменения в таблице — включая персональные данные (PII): номера социального страхования, email, телефоны, адреса. Но что если downstream-системе эти данные не нужны в открытом виде? Или вы обязаны соблюдать GDPR и минимизировать раскрытие PII?

MaskField SMT — встроенная трансформация Kafka Connect для маскирования чувствительных полей прямо в CDC pipeline, до попадания данных в Kafka.

Зачем маскировать данные в CDC?

1. Требования безопасности и compliance

GDPR Article 25 (Data Protection by Design): Минимизация обработки персональных данных.

Если аналитическая система использует CDC для построения витрины заказов, ей не нужны email и телефоны клиентов в открытом виде. Маскирование на уровне pipeline снижает риск утечки.

2. Ограничения downstream-систем

Некоторые системы (data warehouse, аналитические платформы) имеют более низкие требования безопасности, чем production база данных. Публикация PII в такие системы увеличивает attack surface.

3. Разделение ответственности команд

Команда аналитики может не иметь clearance для работы с PII, но нуждается в остальных полях (order_id, amount, status). MaskField позволяет дать доступ к событиям без раскрытия чувствительных данных.

MaskField Transformation

Маскировка PII полей для GDPR compliance

Input Record
id: 1
name: "Alice"
phone: "+1-555-0100"
ssn: "123-45-6789"
⚠ PII fields exposed
MaskField SMT
MaskField SMT
Маскирует:
email, phone, ssn
masked
Output Record
id: 1
name: "Alice"
email: null
phone: null
ssn: null
✓ PII masked
Альтернатива: MaskField может использовать replacement="***MASKED***" вместо null

MaskField SMT: Основы

MaskField — это стандартная трансформация Apache Kafka Connect (не специфичная для Debezium), которая заменяет указанные поля фиксированным значением.

Базовая конфигурация

{
  "name": "customers-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "inventory",
    "topic.prefix": "inventory",
    "table.include.list": "public.customers",

    "transforms": "mask",
    "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
    "transforms.mask.fields": "ssn,credit_card,email",
    "transforms.mask.replacement": "***MASKED***"
  }
}

Параметры:

  • transforms.mask.type — класс трансформации ($Value для маскирования значений записи, $Key для ключей)
  • transforms.mask.fields — список полей через запятую (без пробелов!)
  • transforms.mask.replacement — значение для замены (опционально, по умолчанию null)

Результат маскирования

Исходное событие:

{
  "payload": {
    "op": "c",
    "after": {
      "id": 1,
      "name": "John Doe",
      "email": "[email protected]",
      "ssn": "123-45-6789",
      "phone": "+1-555-0123"
    }
  }
}

После MaskField:

{
  "payload": {
    "op": "c",
    "after": {
      "id": 1,
      "name": "John Doe",
      "email": "***MASKED***",
      "ssn": "***MASKED***",
      "phone": "+1-555-0123"
    }
  }
}

Поле phone не маскировано, т.к. не указано в transforms.mask.fields.

Типы полей и поведение маскирования

MaskField работает по-разному в зависимости от типа данных поля:

Тип поляПоведение маскированияПример
StringЗаменяется на replacement"[email protected]""***MASKED***"
Numeric (int, long, float, double)Заменяется на 0 или numeric replacement1234560
BooleanЗаменяется на falsetruefalse
Date/TimestampЗаменяется на epoch (0)2024-01-15T10:00:00Z1970-01-01T00:00:00Z
Array/StructЗаменяется на null[1, 2, 3]null

Пример с разными типами

Конфигурация:

{
  "transforms": "mask",
  "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.mask.fields": "email,age,birth_date,is_premium",
  "transforms.mask.replacement": "***MASKED***"
}

Исходные данные:

{
  "email": "[email protected]",
  "age": 28,
  "birth_date": "1996-05-15",
  "is_premium": true
}

После маскирования:

{
  "email": "***MASKED***",
  "age": 0,
  "birth_date": "1970-01-01",
  "is_premium": false
}

Важно: Для числовых полей replacement игнорируется — всегда используется 0. Для boolean — всегда false.

Проверка знаний
MaskField заменяет строковое поле email на '***MASKED***'. Что произойдет с числовым полем age и boolean полем is_premium при маскировании?
Ответ
MaskField обрабатывает типы по-разному: строковые поля заменяются на значение replacement, но числовые поля (int, long, float) всегда заменяются на 0 (replacement игнорируется), а boolean поля всегда заменяются на false. Downstream-системы должны учитывать, что 0 и false могут означать замаскированные данные, а не реальные значения.

Комбинация с ExtractNewRecordState

В реальных сценариях MaskField часто используется вместе с ExtractNewRecordState для маскирования после flattening событий.

Порядок трансформаций

{
  "transforms": "unwrap,mask",

  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.drop.tombstones": "false",
  "transforms.unwrap.add.fields": "op,table,source.ts_ms",

  "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.mask.fields": "ssn,credit_card,email"
}

Порядок имеет значение!

Before ExtractNewRecordState

Debezium envelope формат

before: null
after: {
id: 1,
name: "Alice",
}
op: "c"
source: { ... }
ts_ms: 1706745600000
⚠ Сложный формат для консьюмеров
After ExtractNewRecordStateRecommended

Flat JSON формат

id: 1
name: "Alice"
__op: "c"
__table: "customers"
__ts_ms: 1706745600000
✓ Metadata добавлено с prefix __

Результат:

{
  "id": 1,
  "name": "Alice",
  "email": "***MASKED***",
  "ssn": "***MASKED***",
  "__op": "c",
  "__table": "customers",
  "__source.ts_ms": 1706745600000
}

Правило: Всегда применяйте MaskField после unwrap, чтобы маскировать финальные данные, которые попадут в Kafka.

MaskField для ключей (MaskField$Key)

Иногда нужно маскировать ключ записи (key), а не только значение (value).

Сценарий: Debezium использует primary key таблицы как Kafka message key. Если primary key — email или SSN, его нужно маскировать.

Конфигурация

{
  "transforms": "maskKey,maskValue",

  "transforms.maskKey.type": "org.apache.kafka.connect.transforms.MaskField$Key",
  "transforms.maskKey.fields": "email",

  "transforms.maskValue.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.maskValue.fields": "ssn,credit_card"
}

Результат:

Key:   { "email": "***MASKED***" }
Value: { "id": 1, "name": "Alice", "ssn": "***MASKED***", "credit_card": "***MASKED***" }

Предупреждение: Маскирование ключа нарушает log compaction! Если topic использует cleanup.policy=compact, все записи с одинаковым маскированным ключом будут схлопываться в одну. Используйте MaskField$Key только для non-compacted topics.

Селективное маскирование с предикатами

Проблема: Вы хотите маскировать PII только для таблицы customers, но не для orders.

Решение: Используйте предикаты для применения MaskField только к определенным событиям.

{
  "transforms": "mask",
  "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.mask.fields": "email,phone",
  "transforms.mask.predicate": "IsCustomersTable",

  "predicates": "IsCustomersTable",
  "predicates.IsCustomersTable.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.IsCustomersTable.pattern": "inventory.public.customers"
}

Как работает:

  • События из топика inventory.public.customers → маскируются
  • События из топика inventory.public.ordersне маскируются

Negate предикат для обратной логики

Сценарий: Маскировать все таблицы кроме audit_log.

{
  "transforms": "mask",
  "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.mask.fields": "email,ssn",
  "transforms.mask.predicate": "IsNotAuditLog",

  "predicates": "IsNotAuditLog",
  "predicates.IsNotAuditLog.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.IsNotAuditLog.pattern": "inventory.public.audit_log",
  "predicates.IsNotAuditLog.negate": "true"
}

negate: true инвертирует предикат — теперь маскируются все топики кроме audit_log.

Lab: Маскирование PII полей в таблице customers

Давайте настроим коннектор с маскированием email, SSN и телефона.

Шаг 1: Создайте таблицу с PII данными

CREATE TABLE public.customers_pii (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    email VARCHAR(100) NOT NULL,
    ssn VARCHAR(11) NOT NULL,
    phone VARCHAR(20),
    address TEXT,
    created_at TIMESTAMP DEFAULT NOW()
);

INSERT INTO public.customers_pii (name, email, ssn, phone, address) VALUES
('Alice Johnson', '[email protected]', '123-45-6789', '+1-555-0101', '123 Main St'),
('Bob Smith', '[email protected]', '987-65-4321', '+1-555-0202', '456 Oak Ave'),
('Charlie Brown', '[email protected]', '111-22-3333', '+1-555-0303', '789 Pine Rd');

Шаг 2: Создайте коннектор с маскированием

{
  "name": "customers-pii-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "inventory",
    "topic.prefix": "inventory",
    "table.include.list": "public.customers_pii",
    "plugin.name": "pgoutput",

    "transforms": "unwrap,mask",

    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.add.fields": "op,table",

    "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
    "transforms.mask.fields": "email,ssn,phone",
    "transforms.mask.replacement": "***MASKED***"
  }
}

Шаг 3: Зарегистрируйте коннектор

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @customers-pii-connector.json

Шаг 4: Потребите события с маскированными данными

from confluent_kafka import Consumer

consumer = Consumer({
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'pii-consumer',
    'auto.offset.reset': 'earliest'
})

consumer.subscribe(['inventory.public.customers_pii'])

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue

    value = json.loads(msg.value().decode('utf-8'))
    print(f"ID: {value['id']}")
    print(f"Name: {value['name']}")
    print(f"Email: {value['email']}")      # ***MASKED***
    print(f"SSN: {value['ssn']}")          # ***MASKED***
    print(f"Phone: {value['phone']}")      # ***MASKED***
    print(f"Address: {value['address']}")  # НЕ замаскировано
    print(f"Operation: {value['__op']}")
    print("---")

Вывод:

ID: 1
Name: Alice Johnson
Email: ***MASKED***
SSN: ***MASKED***
Phone: ***MASKED***
Address: 123 Main St
Operation: r
---
Проверка знаний
Почему MaskField рекомендуется использовать после ExtractNewRecordState (unwrap), а не до него? Что произойдет, если MaskField применить к Debezium envelope напрямую?
Ответ
MaskField$Value работает на верхнем уровне записи. При применении до unwrap верхний уровень содержит Debezium envelope (before, after, op, source), а не данные строки. MaskField не сможет найти поле email на верхнем уровне, потому что оно вложено внутри after. После unwrap данные из after извлекаются на верхний уровень, и MaskField корректно маскирует email.

Важные ограничения MaskField

1. Маскирование происходит после захвата WAL

MaskField трансформирует события в Kafka Connect, а не в PostgreSQL. Это означает:

  • Исходные данные все еще в WAL — оригинальные PII доступны в базе данных
  • WAL replication slot хранит незамаскированные события — до применения трансформации
  • WAL retention сохраняет PII — пока не истечет max_slot_wal_keep_size или wal_level

Security callout: MaskField не шифрует данные и не удаляет их из источника. Это только downstream protection — защита данных в Kafka и downstream-системах. Для полного compliance требуется шифрование на уровне БД.

2. MaskField не поддерживает условное маскирование

Вы не можете замаскировать email только для определенных записей (например, только для EU пользователей).

Что НЕ работает:

// Такого синтаксиса НЕ существует!
"transforms.mask.fields": "email",
"transforms.mask.condition": "value.region == 'EU'"

Решение: Используйте два отдельных коннектора:

  1. Коннектор для EU таблиц — с маскированием
  2. Коннектор для non-EU таблиц — без маскирования

3. Производительность SMT

MaskField выполняется синхронно в потоке Kafka Connect worker. Для больших нагрузок (>10K events/sec) замена полей добавляет overhead ~1-5ms на событие.

Рекомендация: Используйте MaskField для простого маскирования. Для сложной логики (частичное маскирование, tokenization) используйте Kafka Streams или внешний сервис.

Production рекомендации

1. Всегда используйте unwrap + mask

"transforms": "unwrap,mask"

Не маскируйте Debezium envelope (before, after), т.к. это усложняет debugging.

2. Документируйте маскированные поля

В описании коннектора явно перечислите замаскированные поля:

{
  "name": "customers-connector",
  "config": {
    // ... connector config ...
    "transforms.mask.fields": "email,ssn,phone,credit_card",
    "// PII_MASKING_COMMENT": "email, ssn, phone, credit_card are masked with ***MASKED***"
  }
}

3. Тестируйте маскирование в pre-production

Создайте тестовый коннектор с sample данными и проверьте, что маскируются только нужные поля. Легко случайно замаскировать критическое бизнес-поле с похожим названием.

4. Координируйте с downstream-командами

Если downstream consumer ожидает реальные email для notification, а вы их замаскировали — система сломается. Заранее согласуйте, какие поля доступны в каких топиках.

Альтернативы MaskField

ПодходПлюсыМинусы
MaskField SMTПростота, встроенная функцияТолько полная замена, нет частичного маскирования
Custom SMTГибкая логика (частичное маскирование, tokenization)Требует разработки, развертывания, поддержки
Kafka Streams processorStateful трансформации, сложная логикаДополнительная инфраструктура, latency
PostgreSQL column encryptionДанные шифруются в источникеPerformance overhead на БД, сложность key management
Field-level ACLs в KafkaGranular access controlТребует Confluent Platform или custom auth, не скрывает данные на уровне record

Что мы узнали

  1. MaskField SMT заменяет указанные поля фиксированным значением для compliance и безопасности
  2. Типы полей обрабатываются по-разному: string → replacement, numeric → 0, boolean → false
  3. MaskFieldValueдлязначенийзаписи,MaskFieldValue** для значений записи, **MaskFieldKey для ключей (осторожно с compaction!)
  4. Комбинация unwrap + mask — стандартный паттерн для маскирования после flattening
  5. Предикаты позволяют маскировать селективно (только определенные таблицы)
  6. Ограничения: Исходные данные остаются в WAL, нет условного маскирования
  7. Production: Документируйте маскированные поля, тестируйте, координируйте с downstream

Что дальше?

В следующем уроке мы изучим content-based routing — маршрутизацию событий в разные топики на основе значений полей. Это полезно для multi-tenant систем, региональной изоляции данных (EU vs US) и приоритезации событий.

Check Your Understanding

Score: 0 of 0
Conceptual
Question 1 of 4. MaskField SMT шифрует PII данные в Kafka Connect, обеспечивая полную защиту персональных данных от источника до потребителя.

Finished the lesson?

Mark it as complete to track your progress