Skip to content
Learning Platform
Intermediate
35 minutes
schema-registry avro serialization

Prerequisites:

  • module-4/06-outbox-implementation

Schema Registry и Avro сериализация

До этого момента мы использовали JSON для передачи CDC событий в Kafka. JSON — простой, читаемый, но не без недостатков: большой размер сообщений, отсутствие schema enforcement, ручное управление эволюцией схем.

В production CDC системах часто используют Apache Avro с Confluent Schema Registry — комбинация, которая решает проблемы JSON и добавляет централизованное управление схемами.

В этом уроке мы изучим, почему Avro критичен для production, как интегрировать Schema Registry с Debezium, и какие особенности есть в Debezium 2.x.

Почему Schema Registry критически важен для CDC

CDC pipeline генерирует миллионы событий. Каждое событие должно быть корректным по структуре, иначе downstream потребители сломаются. Schema Registry решает эту проблему.

Проблемы с JSON подходом:

  1. Нет schema enforcement — producer может отправить некорректные данные
  2. Большой размер — каждое сообщение содержит полные имена полей
  3. Ручная эволюция схем — при добавлении поля нужно координировать producers и consumers
  4. Нет версионирования — невозможно отследить изменения схемы

Что дает Schema Registry:

  • Centralized schema storage — единый источник истины для всех схем
  • Compatibility enforcement — невозможно зарегистрировать несовместимую схему
  • Compact serialization — Avro использует schema ID вместо имен полей
  • Schema evolution — автоматическое управление backward/forward compatibility

Production истина: В компаниях с десятками CDC коннекторов Schema Registry — единственный способ избежать хаоса. Без него каждое изменение схемы становится проектом на неделю.

Avro vs JSON: Сравнение

Давайте посмотрим на конкретные числа.

АспектJSONAvro + Schema Registry
Размер сообщения100% (baseline)~50% (2x меньше)
Скорость десериализации1x2x быстрее
Schema enforcementНет (implicit)Да (explicit)
Эволюция схемРучная координацияManaged via compatibility modes
ВерсионированиеНетАвтоматическое (schema ID)
ЧитаемостьВысокая (text format)Низкая (binary format)

Пример JSON сообщения:

{
  "before": null,
  "after": {
    "id": 1001,
    "first_name": "Sally",
    "last_name": "Thomas",
    "email": "[email protected]"
  },
  "source": {
    "version": "2.5.4.Final",
    "connector": "postgresql",
    "name": "dbserver1",
    "ts_ms": 1559033904863,
    "db": "inventory",
    "schema": "public",
    "table": "customers"
  },
  "op": "c",
  "ts_ms": 1559033904870
}

Тот же payload в Avro (binary):

magic_byte schema_id                  binary_data
   \0        \x00\x00\x00\x01    [binary representation of data]

Размер:

  • JSON: ~350 bytes
  • Avro: ~180 bytes (без учета сжатия Kafka)

Для pipeline с 1 миллионом событий в час экономия составляет ~170 MB/час только на размере сообщений.

Архитектура Schema Registry с Debezium

Schema Registry Architecture с Debezium

Centralized schema management для producer и consumer

PostgreSQL
WAL capture
Debezium + Avro Serializer
Register schema
Schema RegistryConfluent / Apicurio
Schema ID: 123
[0x00][123][binary data]
Kafka Topic
Consumer + Avro Deserializer
GET schema by ID
Schema Registry
Return schema
Deserialized Object
Ключевое преимущество:

Schema хранится один раз в Schema Registry. Сообщения содержат только 4-byte schema ID. Экономия: ~300 bytes на сообщение, что критично для CDC pipeline с миллионами событий.

Поток работы:

  1. Producer (Debezium):

    • Читает WAL, получает event
    • Передает event в AvroConverter
    • AvroConverter извлекает schema из CDC события
    • Проверяет Schema Registry: зарегистрирована ли эта schema?
    • Если нет — регистрирует новую schema, получает schema ID
    • Сериализует данные в binary Avro с schema ID в начале
  2. Schema Registry:

    • Хранит schemas в Kafka topic _schemas
    • Назначает schema ID (auto-increment)
    • Проверяет compatibility mode перед регистрацией
  3. Consumer:

    • Читает binary message из Kafka
    • Извлекает schema ID из первых 5 bytes
    • Запрашивает schema по ID из Schema Registry (с кешированием)
    • Десериализует binary data используя schema
Проверка знаний
Какие две ключевые проблемы JSON-сериализации решает комбинация Avro + Schema Registry? Почему они критичны для production CDC pipeline?
Ответ
(1) Отсутствие schema enforcement: JSON позволяет producer отправить некорректные данные (пропущенные поля, неверные типы), и ошибка обнаружится только при десериализации на стороне consumer. Avro + Schema Registry проверяют schema при регистрации. (2) Большой размер сообщений: каждое JSON-сообщение повторяет имена полей, увеличивая размер на ~50%. Avro сериализует в binary с schema ID (5 bytes вместо полных имен). Для pipeline с миллионами событий экономия значительна.

Debezium 2.x: Установка Avro Converter

Критическое изменение в Debezium 2.0+: Confluent Avro converter JARs более не включены в дистрибутив Debezium из-за лицензионных ограничений (Confluent Community License).

В Debezium 1.x Avro converter был “из коробки”. В Debezium 2.x вы должны установить JAR файлы вручную.

Необходимые JAR файлы

Для работы Avro serialization требуется 5 библиотек:

  1. kafka-connect-avro-converter — Kafka Connect converter
  2. kafka-connect-avro-data — Avro data structures
  3. kafka-avro-serializer — Avro serializer/deserializer
  4. kafka-schema-serializer — Generic schema serializer
  5. kafka-schema-registry-client — Client для взаимодействия с Registry

Процесс установки

Шаг 1: Скачать JARs из Maven Central

# Определить версию Confluent Platform (используйте ту же, что Schema Registry)
CONFLUENT_VERSION=7.8.1

# Скачать каждый JAR
cd /tmp
curl -O https://packages.confluent.io/maven/io/confluent/kafka-connect-avro-converter/${CONFLUENT_VERSION}/kafka-connect-avro-converter-${CONFLUENT_VERSION}.jar
curl -O https://packages.confluent.io/maven/io/confluent/kafka-connect-avro-data/${CONFLUENT_VERSION}/kafka-connect-avro-data-${CONFLUENT_VERSION}.jar
curl -O https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/${CONFLUENT_VERSION}/kafka-avro-serializer-${CONFLUENT_VERSION}.jar
curl -O https://packages.confluent.io/maven/io/confluent/kafka-schema-serializer/${CONFLUENT_VERSION}/kafka-schema-serializer-${CONFLUENT_VERSION}.jar
curl -O https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/${CONFLUENT_VERSION}/kafka-schema-registry-client-${CONFLUENT_VERSION}.jar

Шаг 2: Скопировать в Kafka Connect plugins directory

# Путь к Debezium connector plugins
PLUGIN_PATH=/kafka/connect/debezium-connector-postgresql

# Копировать все JARs
cp kafka-connect-avro-*.jar ${PLUGIN_PATH}/
cp kafka-avro-serializer-*.jar ${PLUGIN_PATH}/
cp kafka-schema-*.jar ${PLUGIN_PATH}/

Шаг 3: Перезапустить Kafka Connect

docker-compose restart connect

Шаг 4: Проверить доступность converter

# Проверить loaded plugins
curl http://localhost:8083/connector-plugins | jq

# Должен быть в списке:
# "class": "io.confluent.connect.avro.AvroConverter"

Важно: Версия Confluent JARs должна совпадать с версией Schema Registry. Несовпадение версий может вызвать NoSuchMethodError или ClassNotFoundException.

Альтернатива: Apicurio Registry

Если вы хотите избежать зависимости от Confluent licensing, рассмотрите Apicurio Registry — open-source альтернатива с Apache 2.0 лицензией.

Apicurio поддерживает Avro, Protobuf, JSON Schema и совместим с Confluent API.

Конфигурация Debezium коннектора для Avro

Теперь, когда JARs установлены, настроим коннектор для Avro serialization.

Базовая конфигурация

{
  "name": "inventory-connector-avro",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "inventory",
    "database.server.name": "dbserver1",
    "table.include.list": "public.customers",
    "plugin.name": "pgoutput",

    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

Ключевые параметры:

  • key.converter — AvroConverter для ключа сообщения (обычно primary key)
  • value.converter — AvroConverter для value (CDC payload)
  • schema.registry.url — URL Schema Registry (internal Docker hostname)

Опциональные параметры

{
  "key.converter.schemas.enable": "true",
  "value.converter.schemas.enable": "true",
  "key.converter.auto.register.schemas": "true",
  "value.converter.auto.register.schemas": "true",
  "value.converter.use.latest.version": "false"
}

Параметры:

  • schemas.enable=true — включить schema в сообщение (нужно для Avro)
  • auto.register.schemas=true — автоматически регистрировать новые schemas
  • use.latest.version=false — использовать конкретную версию schema (не latest)

Best practice: Установите auto.register.schemas=false в production, чтобы предотвратить случайную регистрацию некорректных schemas. Регистрируйте schemas вручную или через CI/CD.

Проверка знаний
Что произойдет, если Schema Registry недоступен при запуске Debezium коннектора с AvroConverter? Будет ли коннектор использовать JSON как fallback?
Ответ
Нет, AvroConverter не поддерживает automatic fallback на JSON. При недоступности Schema Registry коннектор не сможет зарегистрировать schema и получить schema ID, необходимый для binary Avro формата (magic byte + schema ID + data). Сериализация невозможна, и Kafka Connect task перейдет в FAILED state. В production необходимо обеспечить высокую доступность Schema Registry.

Subject Naming Strategies

Schema Registry хранит schemas под subjects (ключи для идентификации схем). Debezium поддерживает несколько naming strategies.

TopicNameStrategy (default)

Схема называется по имени топика.

Subject format: {topic}-key, {topic}-value

Пример:

  • Topic: dbserver1.inventory.customers
  • Key subject: dbserver1.inventory.customers-key
  • Value subject: dbserver1.inventory.customers-value

Когда использовать: Когда один topic содержит один тип событий.

RecordNameStrategy

Схема называется по имени Avro record (полное имя типа).

Subject format: {namespace}.{name}

Пример:

  • Avro record: io.debezium.connector.postgresql.Customer
  • Subject: io.debezium.connector.postgresql.Customer

Когда использовать: Когда один topic содержит несколько типов records (multi-tenant topics).

TopicRecordNameStrategy

Комбинация topic + record name.

Subject format: {topic}-{namespace}.{name}

Пример:

  • Topic: dbserver1.inventory.customers
  • Record: io.debezium.connector.postgresql.Customer
  • Subject: dbserver1.inventory.customers-io.debezium.connector.postgresql.Customer

Когда использовать: Гибрид для обеспечения namespace изоляции в multi-topic окружениях.

Конфигурация naming strategy:

{
  "value.converter.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
}

Просмотр зарегистрированных схем

Schema Registry предоставляет REST API для управления schemas.

Получить список subjects

curl http://localhost:8081/subjects

Вывод:

[
  "dbserver1.inventory.customers-key",
  "dbserver1.inventory.customers-value"
]

Получить все версии schema для subject

curl http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions

Вывод:

[1, 2, 3]

Получить latest schema

curl http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/latest

Вывод:

{
  "subject": "dbserver1.inventory.customers-value",
  "version": 3,
  "id": 5,
  "schema": "{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"dbserver1.inventory.customers\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"}]}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.postgresql\",\"fields\":[...]}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null}]}"
}

Получить schema по ID

curl http://localhost:8081/schemas/ids/5

Когда использовать: Consumer десериализует сообщение и видит schema ID = 5 в binary payload. Запрашивает schema по ID.

Lab: Конфигурация Avro serialization

Давайте настроим Avro сериализацию для таблицы customers.

Предварительные требования

Убедитесь, что в вашем docker-compose.yml есть Schema Registry:

schema-registry:
  image: confluentinc/cp-schema-registry:7.8.1
  hostname: schema-registry
  container_name: schema-registry
  depends_on:
    - kafka
  ports:
    - "8081:8081"
  environment:
    SCHEMA_REGISTRY_HOST_NAME: schema-registry
    SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
    SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

Шаг 1: Запустить Schema Registry

cd labs
docker-compose up -d schema-registry

Проверить доступность:

curl http://localhost:8081/subjects
# Должен вернуть: []

Шаг 2: Установить Avro converter JARs (если еще не сделано)

См. раздел “Установка Avro Converter” выше.

Шаг 3: Создать Avro-enabled коннектор

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "inventory-connector-avro",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "postgres",
      "database.password": "postgres",
      "database.dbname": "inventory",
      "database.server.name": "dbserver1",
      "table.include.list": "public.customers",
      "plugin.name": "pgoutput",
      "publication.autocreate.mode": "filtered",
      "slot.name": "debezium_avro",
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "key.converter.schema.registry.url": "http://schema-registry:8081",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://schema-registry:8081"
    }
  }'

Шаг 4: Проверить зарегистрированные schemas

# Список subjects
curl http://localhost:8081/subjects

# Должен вернуть:
# ["dbserver1.inventory.customers-key","dbserver1.inventory.customers-value"]

# Получить value schema
curl http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/latest | jq

Шаг 5: Вставить тестовую запись

docker exec -it postgres psql -U postgres -d inventory -c \
  "INSERT INTO customers (first_name, last_name, email) VALUES ('John', 'Doe', '[email protected]');"

Шаг 6: Проверить binary сообщение в Kafka

docker exec -it kafka kafka-console-consumer \
  --bootstrap-server kafka:9092 \
  --topic dbserver1.inventory.customers \
  --from-beginning \
  --max-messages 1

Вывод будет binary garbage — это нормально! Avro binary format не читается в plain text.

Шаг 7: Десериализовать с Avro consumer

docker exec -it schema-registry kafka-avro-console-consumer \
  --bootstrap-server kafka:9092 \
  --topic dbserver1.inventory.customers \
  --from-beginning \
  --property schema.registry.url=http://schema-registry:8081

Теперь вы увидите читаемый JSONkafka-avro-console-consumer автоматически десериализует binary используя Schema Registry.

Типичные ошибки при настройке Avro

ClassNotFoundException: AvroConverter

Ошибка:

Caused by: java.lang.ClassNotFoundException: io.confluent.connect.avro.AvroConverter

Причина: JAR файлы AvroConverter не установлены в Kafka Connect plugins directory.

Решение: Скачать и скопировать 5 необходимых JARs (см. раздел “Установка”).

Schema Registry недоступен

Ошибка:

Failed to connect to Schema Registry at http://schema-registry:8081

Причина:

  1. Schema Registry не запущен
  2. Неверный hostname (используйте internal Docker name, не localhost)
  3. Network isolation (Connect и Registry в разных Docker networks)

Решение:

# Проверить статус
docker-compose ps schema-registry

# Проверить network connectivity
docker exec -it connect curl http://schema-registry:8081/subjects

Incompatible schema version

Ошибка:

Schema being registered is incompatible with an earlier schema

Причина: Новая схема несовместима с compatibility mode (например, удалили required field при BACKWARD mode).

Решение: См. следующий урок “Schema Evolution and Compatibility” для понимания safe schema changes.

Когда использовать Avro vs JSON

Используйте Avro:

  • Production системы с высоким throughput
  • Когда размер сообщений критичен (cost optimization)
  • Когда нужен strict schema enforcement
  • Multi-team окружения (Schema Registry как контракт)

Используйте JSON:

  • Development/testing
  • Когда читаемость важнее эффективности
  • Простые pipeline без schema evolution
  • Debugging (легко инспектировать сообщения)

Практический совет: Начните с JSON для прототипирования, затем мигрируйте на Avro для production. Schema Registry можно добавить без изменения consumers — просто обновите connector config.

Что дальше?

Вы настроили Avro serialization с Schema Registry. Следующий критический вопрос: как управлять изменениями схем в production?

В следующем уроке мы изучим schema evolution — как безопасно добавлять/удалять поля, какие изменения ломают consumers, и как использовать compatibility modes для предотвращения breaking changes.

Ключевые выводы

  1. Avro + Schema Registry — production standard для CDC (2x меньше размер, schema enforcement)
  2. Debezium 2.x требует ручной установки Confluent JAR файлов (5 библиотек)
  3. AvroConverter настраивается через key.converter и value.converter в connector config
  4. Subject naming strategies определяют, как schemas именуются в Registry
  5. Schema Registry REST API позволяет инспектировать зарегистрированные schemas
  6. ClassNotFoundException = забыли установить JARs в plugins directory
  7. Binary Avro нечитаем в plain text — используйте kafka-avro-console-consumer для debugging
  8. auto.register.schemas=false в production для контроля schema changes

Check Your Understanding

Score: 0 of 0
Conceptual
Question 1 of 4. Какие две ключевые проблемы JSON-сериализации в CDC pipeline решает комбинация Avro + Schema Registry?

Finished the lesson?

Mark it as complete to track your progress