Learning Platform
Глоссарий Troubleshooting
Урок 07.02 · 30 мин
Продвинутый
AvroSchema DefinitionSchema EvolutionWire FormatConfluent Wire FormatGenericRecordSpecificRecord

Avro Deep-Dive

Apache Avro — это бинарный формат сериализации данных с поддержкой схем. В контексте Kafka и Schema Registry Avro является форматом де-факто: он компактен, хорошо поддерживает эволюцию схем и имеет первоклассную интеграцию с Confluent Schema Registry. В этом уроке разбираем Avro до уровня wire format — байт за байтом.


Почему Avro — выбор по умолчанию для Kafka

Avro обладает несколькими свойствами, которые делают его идеальным для Kafka:

Компактное бинарное кодирование. В отличие от JSON, Avro не включает имена полей в каждое сообщение — они определены в схеме. Числа кодируются в varint-формате (переменная длина). Для типичного сообщения Avro-encoding на 3-10x компактнее JSON.

Схема обязательна для декодирования. Avro-байты без схемы — это просто байты. Это отличает Avro от самоописывающих форматов (XML, JSON) и делает Schema Registry необходимым компонентом. Взамен: нет overhead схемы в каждом сообщении.

Сильная поддержка эволюции. Avro определяет правила совместимости для каждого типа изменения (добавление поля, удаление, изменение типа). Эти правила реализуются через reader/writer schema resolution: при декодировании данных Avro применяет writer-схему (с которой данные были написаны) против reader-схемы (текущей схемы потребителя).

Первоклассная поддержка Schema Registry. KafkaAvroSerializer и KafkaAvroDeserializer из confluent-kafka — это reference-реализации. Весь Confluent Platform строится вокруг Avro + Schema Registry как стека по умолчанию.


Avro Schema Definition

Схема Avro записывается в JSON-формате. Верхний уровень — объект с обязательными полями type, name, namespace, fields.

{
  "type": "record",
  "name": "Order",
  "namespace": "com.example.kafka",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "product", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
    {"name": "metadata", "type": ["null", "string"], "default": null}
  ]
}

Примитивные типы Avro: null, boolean, int, long, float, double, string, bytes.

Сложные типы: record (вложенная запись), array (однотипный массив), map (строковые ключи), enum (перечисление), fixed (фиксированный размер в байтах), union (один из нескольких типов).

Логические типы (logicalType) добавляют семантику к примитивам: timestamp-millis (long → миллисекунды от Unix epoch), date (int → дни от Unix epoch), decimal (bytes → точное число с масштабом), uuid (string → UUID).


Union-тип и опциональные поля

Самый важный сложный тип для эволюции схем — union. Union записывается как JSON-массив допустимых типов:

{"name": "metadata", "type": ["null", "string"], "default": null}

Правило: если первый тип в union — null, то null является значением по умолчанию (если задан "default": null). Это стандартный паттерн для опциональных полей в Avro.

Важное ограничение: в union нельзя использовать два одинаковых типа (["int", "int"] — ошибка). Нельзя использовать два record-типа с одним именем.

Порядок типов в union влияет на кодирование: Avro кодирует union как индекс типа (0, 1, 2…) + значение. Для ["null", "string"] при null-значении кодируется только 0 (varint). При string-значении — 1 + длина строки + байты строки.


Правила эволюции Avro

Понимание правил эволюции критично для правильного выбора режима совместимости. Вот полный набор правил:

Добавление поля с default — BACKWARD совместимо. Старые потребители (reader-схема без нового поля) просто игнорируют лишнее поле в данных. Новые данные содержат поле, старые — нет, но reader берёт default при чтении старых данных.

{"name": "newField", "type": ["null", "string"], "default": null}

Добавление поля без default — только FORWARD совместимо. Старые reader-схемы игнорируют неизвестное поле (если оно есть в данных). Но новые reader-схемы не могут прочитать старые данные, в которых поле отсутствует — нет default для подстановки. BACKWARD нарушен.

Удаление поля с default — FORWARD совместимо. Новые данные не содержат поле. Старые потребители с reader-схемой, в которой поле есть, используют default. Новые потребители просто не видят поля.

Удаление поля без default — ТОЛЬКО совместимо при NONE. Если поле требуется old reader и нет default — old reader не может прочитать новые данные. Это breaking change.

Переименование поля — через aliases. Прямое переименование несовместимо. Правильный путь: добавить в новую схему "aliases": ["oldName"]. Avro reader resolution сопоставляет поля по имени и alias.

Изменение типа — только допустимые promotions. Avro поддерживает type promotion: intlong, intfloat, intdouble, longfloat, longdouble, floatdouble. Остальные изменения типов (string → int, long → int и т.д.) несовместимы.

WARNING

Добавление обязательного поля (без default) — это самая частая ошибка при работе с Avro и Schema Registry. Под BACKWARD-совместимостью (режим по умолчанию) такое добавление будет отклонено Schema Registry. Всегда добавляйте поля через union с null и default.


Confluent Wire Format

Это самая важная концепция для понимания взаимодействия Kafka, Schema Registry и десериализаторов. Каждое сообщение, сериализованное через KafkaAvroSerializer (или Protobuf/JSON Schema аналог), содержит 5-байтный заголовок перед payload:

Confluent Wire Format: структура байт
Байт 0Magic byte: всегда 0x00. Идентифицирует сообщение как Schema Registry-закодированное. Десериализатор проверяет этот байт первым — если не 0x00, сообщение не является SR-encoded.
Байты 1-4Schema ID: 4-байтный целочисленный идентификатор схемы в big-endian формате. Десериализатор извлекает этот ID и запрашивает Schema Registry: GET /schemas/ids/{id}. Ответ кэшируется.
Байты 5..NPayload: Avro binary-encoded данные (или Protobuf binary, или JSON-текст для JSON Schema). Длина варьируется. Декодируется с использованием схемы, полученной по Schema ID.

Итого overhead: ровно 5 байт на каждое сообщение независимо от размера payload

Почему схема не встраивается в каждое сообщение? В standalone Avro-файлах схема включена в заголовок файла один раз на весь файл. В Kafka каждое сообщение независимо — если включать схему в каждое сообщение, это десятки или сотни байт overhead на каждое. При миллионах сообщений в секунду это неприемлемо.

Schema Registry решает это через единый справочник: схема хранится один раз, идентифицируется коротким integer ID, который помещается в 4 байта заголовка. При работающем кэше десериализатор не делает HTTP-запросы на каждое сообщение.

NOTE

kafka_sim.py использует JSON внутри wire format вместо бинарного Avro — это педагогически эквивалентно для изучения Schema Registry, но в production используется настоящий Avro binary encoding. AvroSerializer и AvroDeserializer в kafka_sim.py реализуют ту же 5-байтную структуру заголовка, что и production KafkaAvroSerializer.


GenericRecord vs SpecificRecord

Avro предоставляет два способа работы с данными на стороне Java/Scala:

GenericRecord — динамическое представление. Работает с любой схемой без предварительной кодогенерации. Доступ к полям через строковые имена: record.get("field_name"). Тип возвращаемого значения — Object, нужен каст. Используется в большинстве Kafka Consumer-приложений с Schema Registry, потому что не требует генерации кода для каждой схемы.

SpecificRecord — статическое представление. Требует предварительной кодогенерации (avro-tools или avro-maven-plugin). Генерирует Java-классы с типизированными геттерами. Используется когда нужна type-safety и IDE-поддержка — например, в приложениях-продюсерах, где схема известна заранее.

В Kafka Streams оба подхода поддерживаются через GenericAvroSerde и SpecificAvroSerde.


Пример: kafka_sim.py с AvroSerializer

kafka_sim.py предоставляет AvroSerializer и AvroDeserializer, которые реализуют Confluent wire format:

from kafka import AvroSerializer, AvroDeserializer

schema_str = '''{
  "type": "record",
  "name": "Order",
  "namespace": "com.example.kafka",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "product", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "metadata", "type": ["null", "string"], "default": null}
  ]
}'''

# schema_id назначается Schema Registry при регистрации
serializer = AvroSerializer(schema_str=schema_str, schema_id=1)
encoded = serializer.encode({"id": 42, "product": "widget", "amount": 9.99, "metadata": None})
# encoded: b'\x00\x00\x00\x00\x01' + JSON payload (в kafka_sim.py вместо binary Avro)

decoded = AvroDeserializer().decode(encoded)
# decoded: {"id": 42, "product": "widget", "amount": 9.99, "metadata": None}
05-module-5/07-schema-registry-avro Avro: type system на уровне формата
Проверка знанийKnowledge check
Сколько байт составляет overhead Confluent Wire Format на каждое сообщение, и что содержат эти байты?
ОтветAnswer
Ровно 5 байт: 1 байт magic byte (всегда 0x00, идентифицирует SR-encoded сообщение) + 4 байта schema ID в big-endian формате (int32). Этот минимальный overhead позволяет десериализатору найти нужную схему в Schema Registry по ID — без необходимости включать полное описание схемы в каждое сообщение.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Avro-схема V1 содержит поля {id: int, name: string}. Команда хочет добавить поле email: string без default-значения. При режиме совместимости BACKWARD — что произойдёт при попытке зарегистрировать V2?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6