Prerequisites:
- module-4/06-outbox-implementation
Schema Registry и Avro сериализация
До этого момента мы использовали JSON для передачи CDC событий в Kafka. JSON — простой, читаемый, но не без недостатков: большой размер сообщений, отсутствие schema enforcement, ручное управление эволюцией схем.
В production CDC системах часто используют Apache Avro с Confluent Schema Registry — комбинация, которая решает проблемы JSON и добавляет централизованное управление схемами.
В этом уроке мы изучим, почему Avro критичен для production, как интегрировать Schema Registry с Debezium, и какие особенности есть в Debezium 2.x.
Почему Schema Registry критически важен для CDC
CDC pipeline генерирует миллионы событий. Каждое событие должно быть корректным по структуре, иначе downstream потребители сломаются. Schema Registry решает эту проблему.
Проблемы с JSON подходом:
- Нет schema enforcement — producer может отправить некорректные данные
- Большой размер — каждое сообщение содержит полные имена полей
- Ручная эволюция схем — при добавлении поля нужно координировать producers и consumers
- Нет версионирования — невозможно отследить изменения схемы
Что дает Schema Registry:
- Centralized schema storage — единый источник истины для всех схем
- Compatibility enforcement — невозможно зарегистрировать несовместимую схему
- Compact serialization — Avro использует schema ID вместо имен полей
- Schema evolution — автоматическое управление backward/forward compatibility
Production истина: В компаниях с десятками CDC коннекторов Schema Registry — единственный способ избежать хаоса. Без него каждое изменение схемы становится проектом на неделю.
Avro vs JSON: Сравнение
Давайте посмотрим на конкретные числа.
| Аспект | JSON | Avro + Schema Registry |
|---|---|---|
| Размер сообщения | 100% (baseline) | ~50% (2x меньше) |
| Скорость десериализации | 1x | 2x быстрее |
| Schema enforcement | Нет (implicit) | Да (explicit) |
| Эволюция схем | Ручная координация | Managed via compatibility modes |
| Версионирование | Нет | Автоматическое (schema ID) |
| Читаемость | Высокая (text format) | Низкая (binary format) |
Пример JSON сообщения:
{
"before": null,
"after": {
"id": 1001,
"first_name": "Sally",
"last_name": "Thomas",
"email": "[email protected]"
},
"source": {
"version": "2.5.4.Final",
"connector": "postgresql",
"name": "dbserver1",
"ts_ms": 1559033904863,
"db": "inventory",
"schema": "public",
"table": "customers"
},
"op": "c",
"ts_ms": 1559033904870
}
Тот же payload в Avro (binary):
magic_byte schema_id binary_data
\0 \x00\x00\x00\x01 [binary representation of data]
Размер:
- JSON: ~350 bytes
- Avro: ~180 bytes (без учета сжатия Kafka)
Для pipeline с 1 миллионом событий в час экономия составляет ~170 MB/час только на размере сообщений.
Архитектура Schema Registry с Debezium
Centralized schema management для producer и consumer
Schema хранится один раз в Schema Registry. Сообщения содержат только 4-byte schema ID. Экономия: ~300 bytes на сообщение, что критично для CDC pipeline с миллионами событий.
Поток работы:
-
Producer (Debezium):
- Читает WAL, получает event
- Передает event в AvroConverter
- AvroConverter извлекает schema из CDC события
- Проверяет Schema Registry: зарегистрирована ли эта schema?
- Если нет — регистрирует новую schema, получает schema ID
- Сериализует данные в binary Avro с schema ID в начале
-
Schema Registry:
- Хранит schemas в Kafka topic
_schemas - Назначает schema ID (auto-increment)
- Проверяет compatibility mode перед регистрацией
- Хранит schemas в Kafka topic
-
Consumer:
- Читает binary message из Kafka
- Извлекает schema ID из первых 5 bytes
- Запрашивает schema по ID из Schema Registry (с кешированием)
- Десериализует binary data используя schema
Проверка знанийКакие две ключевые проблемы JSON-сериализации решает комбинация Avro + Schema Registry? Почему они критичны для production CDC pipeline?
Debezium 2.x: Установка Avro Converter
Критическое изменение в Debezium 2.0+: Confluent Avro converter JARs более не включены в дистрибутив Debezium из-за лицензионных ограничений (Confluent Community License).
В Debezium 1.x Avro converter был “из коробки”. В Debezium 2.x вы должны установить JAR файлы вручную.
Необходимые JAR файлы
Для работы Avro serialization требуется 5 библиотек:
- kafka-connect-avro-converter — Kafka Connect converter
- kafka-connect-avro-data — Avro data structures
- kafka-avro-serializer — Avro serializer/deserializer
- kafka-schema-serializer — Generic schema serializer
- kafka-schema-registry-client — Client для взаимодействия с Registry
Процесс установки
Шаг 1: Скачать JARs из Maven Central
# Определить версию Confluent Platform (используйте ту же, что Schema Registry)
CONFLUENT_VERSION=7.8.1
# Скачать каждый JAR
cd /tmp
curl -O https://packages.confluent.io/maven/io/confluent/kafka-connect-avro-converter/${CONFLUENT_VERSION}/kafka-connect-avro-converter-${CONFLUENT_VERSION}.jar
curl -O https://packages.confluent.io/maven/io/confluent/kafka-connect-avro-data/${CONFLUENT_VERSION}/kafka-connect-avro-data-${CONFLUENT_VERSION}.jar
curl -O https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/${CONFLUENT_VERSION}/kafka-avro-serializer-${CONFLUENT_VERSION}.jar
curl -O https://packages.confluent.io/maven/io/confluent/kafka-schema-serializer/${CONFLUENT_VERSION}/kafka-schema-serializer-${CONFLUENT_VERSION}.jar
curl -O https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/${CONFLUENT_VERSION}/kafka-schema-registry-client-${CONFLUENT_VERSION}.jar
Шаг 2: Скопировать в Kafka Connect plugins directory
# Путь к Debezium connector plugins
PLUGIN_PATH=/kafka/connect/debezium-connector-postgresql
# Копировать все JARs
cp kafka-connect-avro-*.jar ${PLUGIN_PATH}/
cp kafka-avro-serializer-*.jar ${PLUGIN_PATH}/
cp kafka-schema-*.jar ${PLUGIN_PATH}/
Шаг 3: Перезапустить Kafka Connect
docker-compose restart connect
Шаг 4: Проверить доступность converter
# Проверить loaded plugins
curl http://localhost:8083/connector-plugins | jq
# Должен быть в списке:
# "class": "io.confluent.connect.avro.AvroConverter"
Важно: Версия Confluent JARs должна совпадать с версией Schema Registry. Несовпадение версий может вызвать
NoSuchMethodErrorилиClassNotFoundException.
Альтернатива: Apicurio Registry
Если вы хотите избежать зависимости от Confluent licensing, рассмотрите Apicurio Registry — open-source альтернатива с Apache 2.0 лицензией.
Apicurio поддерживает Avro, Protobuf, JSON Schema и совместим с Confluent API.
Конфигурация Debezium коннектора для Avro
Теперь, когда JARs установлены, настроим коннектор для Avro serialization.
Базовая конфигурация
{
"name": "inventory-connector-avro",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "inventory",
"database.server.name": "dbserver1",
"table.include.list": "public.customers",
"plugin.name": "pgoutput",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Ключевые параметры:
key.converter— AvroConverter для ключа сообщения (обычно primary key)value.converter— AvroConverter для value (CDC payload)schema.registry.url— URL Schema Registry (internal Docker hostname)
Опциональные параметры
{
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",
"key.converter.auto.register.schemas": "true",
"value.converter.auto.register.schemas": "true",
"value.converter.use.latest.version": "false"
}
Параметры:
schemas.enable=true— включить schema в сообщение (нужно для Avro)auto.register.schemas=true— автоматически регистрировать новые schemasuse.latest.version=false— использовать конкретную версию schema (не latest)
Best practice: Установите
auto.register.schemas=falseв production, чтобы предотвратить случайную регистрацию некорректных schemas. Регистрируйте schemas вручную или через CI/CD.
Проверка знанийЧто произойдет, если Schema Registry недоступен при запуске Debezium коннектора с AvroConverter? Будет ли коннектор использовать JSON как fallback?
Subject Naming Strategies
Schema Registry хранит schemas под subjects (ключи для идентификации схем). Debezium поддерживает несколько naming strategies.
TopicNameStrategy (default)
Схема называется по имени топика.
Subject format: {topic}-key, {topic}-value
Пример:
- Topic:
dbserver1.inventory.customers - Key subject:
dbserver1.inventory.customers-key - Value subject:
dbserver1.inventory.customers-value
Когда использовать: Когда один topic содержит один тип событий.
RecordNameStrategy
Схема называется по имени Avro record (полное имя типа).
Subject format: {namespace}.{name}
Пример:
- Avro record:
io.debezium.connector.postgresql.Customer - Subject:
io.debezium.connector.postgresql.Customer
Когда использовать: Когда один topic содержит несколько типов records (multi-tenant topics).
TopicRecordNameStrategy
Комбинация topic + record name.
Subject format: {topic}-{namespace}.{name}
Пример:
- Topic:
dbserver1.inventory.customers - Record:
io.debezium.connector.postgresql.Customer - Subject:
dbserver1.inventory.customers-io.debezium.connector.postgresql.Customer
Когда использовать: Гибрид для обеспечения namespace изоляции в multi-topic окружениях.
Конфигурация naming strategy:
{
"value.converter.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
}
Просмотр зарегистрированных схем
Schema Registry предоставляет REST API для управления schemas.
Получить список subjects
curl http://localhost:8081/subjects
Вывод:
[
"dbserver1.inventory.customers-key",
"dbserver1.inventory.customers-value"
]
Получить все версии schema для subject
curl http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions
Вывод:
[1, 2, 3]
Получить latest schema
curl http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/latest
Вывод:
{
"subject": "dbserver1.inventory.customers-value",
"version": 3,
"id": 5,
"schema": "{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"dbserver1.inventory.customers\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"}]}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.postgresql\",\"fields\":[...]}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null}]}"
}
Получить schema по ID
curl http://localhost:8081/schemas/ids/5
Когда использовать: Consumer десериализует сообщение и видит schema ID = 5 в binary payload. Запрашивает schema по ID.
Lab: Конфигурация Avro serialization
Давайте настроим Avro сериализацию для таблицы customers.
Предварительные требования
Убедитесь, что в вашем docker-compose.yml есть Schema Registry:
schema-registry:
image: confluentinc/cp-schema-registry:7.8.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
Шаг 1: Запустить Schema Registry
cd labs
docker-compose up -d schema-registry
Проверить доступность:
curl http://localhost:8081/subjects
# Должен вернуть: []
Шаг 2: Установить Avro converter JARs (если еще не сделано)
См. раздел “Установка Avro Converter” выше.
Шаг 3: Создать Avro-enabled коннектор
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "inventory-connector-avro",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "inventory",
"database.server.name": "dbserver1",
"table.include.list": "public.customers",
"plugin.name": "pgoutput",
"publication.autocreate.mode": "filtered",
"slot.name": "debezium_avro",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}'
Шаг 4: Проверить зарегистрированные schemas
# Список subjects
curl http://localhost:8081/subjects
# Должен вернуть:
# ["dbserver1.inventory.customers-key","dbserver1.inventory.customers-value"]
# Получить value schema
curl http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/latest | jq
Шаг 5: Вставить тестовую запись
docker exec -it postgres psql -U postgres -d inventory -c \
"INSERT INTO customers (first_name, last_name, email) VALUES ('John', 'Doe', '[email protected]');"
Шаг 6: Проверить binary сообщение в Kafka
docker exec -it kafka kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic dbserver1.inventory.customers \
--from-beginning \
--max-messages 1
Вывод будет binary garbage — это нормально! Avro binary format не читается в plain text.
Шаг 7: Десериализовать с Avro consumer
docker exec -it schema-registry kafka-avro-console-consumer \
--bootstrap-server kafka:9092 \
--topic dbserver1.inventory.customers \
--from-beginning \
--property schema.registry.url=http://schema-registry:8081
Теперь вы увидите читаемый JSON — kafka-avro-console-consumer автоматически десериализует binary используя Schema Registry.
Типичные ошибки при настройке Avro
ClassNotFoundException: AvroConverter
Ошибка:
Caused by: java.lang.ClassNotFoundException: io.confluent.connect.avro.AvroConverter
Причина: JAR файлы AvroConverter не установлены в Kafka Connect plugins directory.
Решение: Скачать и скопировать 5 необходимых JARs (см. раздел “Установка”).
Schema Registry недоступен
Ошибка:
Failed to connect to Schema Registry at http://schema-registry:8081
Причина:
- Schema Registry не запущен
- Неверный hostname (используйте internal Docker name, не localhost)
- Network isolation (Connect и Registry в разных Docker networks)
Решение:
# Проверить статус
docker-compose ps schema-registry
# Проверить network connectivity
docker exec -it connect curl http://schema-registry:8081/subjects
Incompatible schema version
Ошибка:
Schema being registered is incompatible with an earlier schema
Причина: Новая схема несовместима с compatibility mode (например, удалили required field при BACKWARD mode).
Решение: См. следующий урок “Schema Evolution and Compatibility” для понимания safe schema changes.
Когда использовать Avro vs JSON
Используйте Avro:
- Production системы с высоким throughput
- Когда размер сообщений критичен (cost optimization)
- Когда нужен strict schema enforcement
- Multi-team окружения (Schema Registry как контракт)
Используйте JSON:
- Development/testing
- Когда читаемость важнее эффективности
- Простые pipeline без schema evolution
- Debugging (легко инспектировать сообщения)
Практический совет: Начните с JSON для прототипирования, затем мигрируйте на Avro для production. Schema Registry можно добавить без изменения consumers — просто обновите connector config.
Что дальше?
Вы настроили Avro serialization с Schema Registry. Следующий критический вопрос: как управлять изменениями схем в production?
В следующем уроке мы изучим schema evolution — как безопасно добавлять/удалять поля, какие изменения ломают consumers, и как использовать compatibility modes для предотвращения breaking changes.
Ключевые выводы
- Avro + Schema Registry — production standard для CDC (2x меньше размер, schema enforcement)
- Debezium 2.x требует ручной установки Confluent JAR файлов (5 библиотек)
- AvroConverter настраивается через
key.converterиvalue.converterв connector config - Subject naming strategies определяют, как schemas именуются в Registry
- Schema Registry REST API позволяет инспектировать зарегистрированные schemas
- ClassNotFoundException = забыли установить JARs в plugins directory
- Binary Avro нечитаем в plain text — используйте
kafka-avro-console-consumerдля debugging - auto.register.schemas=false в production для контроля schema changes
Check Your Understanding
Finished the lesson?
Mark it as complete to track your progress