Avro Deep-Dive
Apache Avro — это бинарный формат сериализации данных с поддержкой схем. В контексте Kafka и Schema Registry Avro является форматом де-факто: он компактен, хорошо поддерживает эволюцию схем и имеет первоклассную интеграцию с Confluent Schema Registry. В этом уроке разбираем Avro до уровня wire format — байт за байтом.
Почему Avro — выбор по умолчанию для Kafka
Avro обладает несколькими свойствами, которые делают его идеальным для Kafka:
Компактное бинарное кодирование. В отличие от JSON, Avro не включает имена полей в каждое сообщение — они определены в схеме. Числа кодируются в varint-формате (переменная длина). Для типичного сообщения Avro-encoding на 3-10x компактнее JSON.
Схема обязательна для декодирования. Avro-байты без схемы — это просто байты. Это отличает Avro от самоописывающих форматов (XML, JSON) и делает Schema Registry необходимым компонентом. Взамен: нет overhead схемы в каждом сообщении.
Сильная поддержка эволюции. Avro определяет правила совместимости для каждого типа изменения (добавление поля, удаление, изменение типа). Эти правила реализуются через reader/writer schema resolution: при декодировании данных Avro применяет writer-схему (с которой данные были написаны) против reader-схемы (текущей схемы потребителя).
Первоклассная поддержка Schema Registry. KafkaAvroSerializer и KafkaAvroDeserializer из confluent-kafka — это reference-реализации. Весь Confluent Platform строится вокруг Avro + Schema Registry как стека по умолчанию.
Avro Schema Definition
Схема Avro записывается в JSON-формате. Верхний уровень — объект с обязательными полями type, name, namespace, fields.
{
"type": "record",
"name": "Order",
"namespace": "com.example.kafka",
"fields": [
{"name": "id", "type": "int"},
{"name": "product", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
{"name": "metadata", "type": ["null", "string"], "default": null}
]
}
Примитивные типы Avro: null, boolean, int, long, float, double, string, bytes.
Сложные типы: record (вложенная запись), array (однотипный массив), map (строковые ключи), enum (перечисление), fixed (фиксированный размер в байтах), union (один из нескольких типов).
Логические типы (logicalType) добавляют семантику к примитивам: timestamp-millis (long → миллисекунды от Unix epoch), date (int → дни от Unix epoch), decimal (bytes → точное число с масштабом), uuid (string → UUID).
Union-тип и опциональные поля
Самый важный сложный тип для эволюции схем — union. Union записывается как JSON-массив допустимых типов:
{"name": "metadata", "type": ["null", "string"], "default": null}
Правило: если первый тип в union — null, то null является значением по умолчанию (если задан "default": null). Это стандартный паттерн для опциональных полей в Avro.
Важное ограничение: в union нельзя использовать два одинаковых типа (["int", "int"] — ошибка). Нельзя использовать два record-типа с одним именем.
Порядок типов в union влияет на кодирование: Avro кодирует union как индекс типа (0, 1, 2…) + значение. Для ["null", "string"] при null-значении кодируется только 0 (varint). При string-значении — 1 + длина строки + байты строки.
Правила эволюции Avro
Понимание правил эволюции критично для правильного выбора режима совместимости. Вот полный набор правил:
Добавление поля с default — BACKWARD совместимо. Старые потребители (reader-схема без нового поля) просто игнорируют лишнее поле в данных. Новые данные содержат поле, старые — нет, но reader берёт default при чтении старых данных.
{"name": "newField", "type": ["null", "string"], "default": null}
Добавление поля без default — только FORWARD совместимо. Старые reader-схемы игнорируют неизвестное поле (если оно есть в данных). Но новые reader-схемы не могут прочитать старые данные, в которых поле отсутствует — нет default для подстановки. BACKWARD нарушен.
Удаление поля с default — FORWARD совместимо. Новые данные не содержат поле. Старые потребители с reader-схемой, в которой поле есть, используют default. Новые потребители просто не видят поля.
Удаление поля без default — ТОЛЬКО совместимо при NONE. Если поле требуется old reader и нет default — old reader не может прочитать новые данные. Это breaking change.
Переименование поля — через aliases.
Прямое переименование несовместимо. Правильный путь: добавить в новую схему "aliases": ["oldName"]. Avro reader resolution сопоставляет поля по имени и alias.
Изменение типа — только допустимые promotions.
Avro поддерживает type promotion: int → long, int → float, int → double, long → float, long → double, float → double. Остальные изменения типов (string → int, long → int и т.д.) несовместимы.
Добавление обязательного поля (без default) — это самая частая ошибка при работе с Avro и Schema Registry. Под BACKWARD-совместимостью (режим по умолчанию) такое добавление будет отклонено Schema Registry. Всегда добавляйте поля через union с null и default.
Confluent Wire Format
Это самая важная концепция для понимания взаимодействия Kafka, Schema Registry и десериализаторов. Каждое сообщение, сериализованное через KafkaAvroSerializer (или Protobuf/JSON Schema аналог), содержит 5-байтный заголовок перед payload:
Итого overhead: ровно 5 байт на каждое сообщение независимо от размера payload
Почему схема не встраивается в каждое сообщение? В standalone Avro-файлах схема включена в заголовок файла один раз на весь файл. В Kafka каждое сообщение независимо — если включать схему в каждое сообщение, это десятки или сотни байт overhead на каждое. При миллионах сообщений в секунду это неприемлемо.
Schema Registry решает это через единый справочник: схема хранится один раз, идентифицируется коротким integer ID, который помещается в 4 байта заголовка. При работающем кэше десериализатор не делает HTTP-запросы на каждое сообщение.
kafka_sim.py использует JSON внутри wire format вместо бинарного Avro — это педагогически эквивалентно для изучения Schema Registry, но в production используется настоящий Avro binary encoding. AvroSerializer и AvroDeserializer в kafka_sim.py реализуют ту же 5-байтную структуру заголовка, что и production KafkaAvroSerializer.
GenericRecord vs SpecificRecord
Avro предоставляет два способа работы с данными на стороне Java/Scala:
GenericRecord — динамическое представление. Работает с любой схемой без предварительной кодогенерации. Доступ к полям через строковые имена: record.get("field_name"). Тип возвращаемого значения — Object, нужен каст. Используется в большинстве Kafka Consumer-приложений с Schema Registry, потому что не требует генерации кода для каждой схемы.
SpecificRecord — статическое представление. Требует предварительной кодогенерации (avro-tools или avro-maven-plugin). Генерирует Java-классы с типизированными геттерами. Используется когда нужна type-safety и IDE-поддержка — например, в приложениях-продюсерах, где схема известна заранее.
В Kafka Streams оба подхода поддерживаются через GenericAvroSerde и SpecificAvroSerde.
Пример: kafka_sim.py с AvroSerializer
kafka_sim.py предоставляет AvroSerializer и AvroDeserializer, которые реализуют Confluent wire format:
from kafka import AvroSerializer, AvroDeserializer
schema_str = '''{
"type": "record",
"name": "Order",
"namespace": "com.example.kafka",
"fields": [
{"name": "id", "type": "int"},
{"name": "product", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "metadata", "type": ["null", "string"], "default": null}
]
}'''
# schema_id назначается Schema Registry при регистрации
serializer = AvroSerializer(schema_str=schema_str, schema_id=1)
encoded = serializer.encode({"id": 42, "product": "widget", "amount": 9.99, "metadata": None})
# encoded: b'\x00\x00\x00\x00\x01' + JSON payload (в kafka_sim.py вместо binary Avro)
decoded = AvroDeserializer().decode(encoded)
# decoded: {"id": 42, "product": "widget", "amount": 9.99, "metadata": None}
05-module-5/07-schema-registry-avro
Avro: type system на уровне формата