Avro, Protobuf, Thrift
POJO и Kryo — внутренние сериализаторы Flink, оптимизированные для intermediate state и cross-operator shuffle. Когда же речь идёт о data, которая пересекает границу job-а — приходит из Kafka, уходит в Iceberg, обменивается между сервисами — почти всегда используется один из трёх внешних форматов: Avro, Protobuf или Thrift.
Этот урок про их различия, performance, schema evolution и выбор для конкретного use case. Будем смотреть на байты wire format и реальные throughput numbers.
Avro deep-dive в контексте Kafka Schema Registry Protobuf и JSON Schema в Schema Registry Parquet, ORC, Avro: форматы файлов для аналитикиAvro: schema evolution first-class
Apache Avro был спроектирован для Hadoop-ecosystem с одной целью: данные должны жить дольше, чем код, который их создал. Это означает аггрессивный фокус на schema evolution. Avro отделяет schema от data: каждое сообщение в потоке не содержит typing-информацию, она хранится отдельно (в файле header для batch, в Schema Registry для streaming).
Wire format одного record-а минималистичный:
[length-prefixed fields в порядке schema]
[без markers, без field IDs, без type tags]
[NULL поля кодируются через Union с index]
Пример: record {userId: "U-42", amount: 199, ts: 1715900000} под schema {userId: string, amount: long, ts: long}:
[03] [U] [-] [4] [2] // var-int len + utf8 bytes
[02] [c7] // var-int amount (zigzag encoded)
[ca] [d8] [e3] [a5] [0a] // var-int ts
Итого 11 байт vs ~24 байта в JSON. Avro экономит трафик и место примерно в 2-3 раза.
Schema evolution в Avro работает через резолюцию reader/writer schemas:
- Writer schema — это schema, под которой данные были записаны.
- Reader schema — это schema, под которой код читает.
Avro правила резолюции:
- Добавление поля с default value — обратно совместимо.
- Удаление поля с default value в reader — совместимо.
- Переименование (через alias) — совместимо.
- Изменение типа в narrowing/widening — частично совместимо (int -> long OK, long -> int нет).
- Изменение default — совместимо.
В streaming-контексте Schema Registry (Confluent или Aiven) хранит все версии schema и выдаёт по schema ID, прикреплённому к каждому сообщению как 5-byte prefix.
// Flink Kafka connector с Avro
KafkaSource{'<'}Order{'>'} source = KafkaSource.{'<'}Order{'>'}builder()
.setValueOnlyDeserializer(
ConfluentRegistryAvroDeserializationSchema.forSpecific(
Order.class,
"https://schema-registry:8081"
)
)
.build();
Protobuf: footprint и speed
Google Protocol Buffers (Protobuf) был сделан для RPC: миллионы запросов в секунду между сервисами с минимальной latency. Это даёт другие приоритеты: максимальная скорость encoding/decoding, варьируемая длина integer-ов (varint), embedded type tags.
Wire format Protobuf хранит каждое поле как (tag, type, value):
[field 1: tag=1, type=length-delim] [length: 4] [U-42]
[field 2: tag=2, type=varint] [199 as varint]
[field 3: tag=3, type=varint] [ts as varint]
Tag-и встроены в поток, что добавляет 1-2 байта overhead на каждое поле, но даёт две killer features: schema evolution через field numbers (поля номеруются, можно добавлять и удалять без поломки) и forward compatibility — старый код может прочитать новое сообщение и проигнорировать незнакомые tag-и.
Производительность Protobuf обычно на 20-50% быстрее Avro для encoding/decoding. Это потому что Protobuf генерирует statically-compiled классы (через protoc), которые работают без reflection. Avro может работать через GenericRecord (reflection-based) или SpecificRecord (generated). SpecificRecord догоняет Protobuf, GenericRecord медленнее в 2-3 раза.
// Flink Kafka с Protobuf
KafkaSource{'<'}Order{'>'} source = KafkaSource.{'<'}Order{'>'}builder()
.setValueOnlyDeserializer(
new ProtobufDeserializationSchema{'<'}{'>'}(Order.parser())
)
.build();
Минусы Protobuf:
- schema evolution через field numbers требует discipline (нельзя переиспользовать удалённые номера);
- нет nullability в proto2 (только optional/required), в proto3 все поля optional но это меняет семантику default;
- ecosystem менее богат на data tools — Iceberg/Parquet нативно понимают Avro, для Proto нужна конвертация.
Thrift: legacy choice
Apache Thrift родом из Facebook (теперь Meta), сделан в ту же эпоху что и Protobuf (2007). Он покрывает и сериализацию, и RPC framework. В большинстве компаний Thrift существует только потому, что Facebook/Twitter/Foursquare его использовали и legacy services его требуют.
Wire format Thrift похож на Protobuf — (tag, type, value) — но более verbose:
[type byte] [field id: 2 bytes] [value]
[type byte] [field id: 2 bytes] [value]
[stop marker]
Без varint оптимизации field IDs (2 bytes vs Protobuf 1-2 bytes). Speed и footprint обычно на 10-20% хуже Protobuf при идентичных workload-ах.
Thrift предоставляет несколько protocol-ов:
- BinaryProtocol — стандартный, описанный выше;
- CompactProtocol — варьируемая длина, похожий на Protobuf по footprint;
- TupleProtocol — для known schemas без type tags (наименьший footprint, но без forward compat).
В новых стримах Thrift редко выбирается. Его используют либо для совместимости с существующими Thrift services, либо в специфических ecosystem-ах (HBase, Cassandra тулинг иногда зависит от Thrift).
Wire format side-by-side
Сравнительная таблица
| Критерий | Avro | Protobuf | Thrift |
|---|---|---|---|
| Footprint (10-field record) | 100% (baseline) | 110-130% | 130-150% |
| Encoding speed | Medium (Specific) / Slow (Generic) | Fast | Medium |
| Decoding speed | Medium / Slow | Fast | Medium |
| Schema location | External (Registry) | Embedded tags | Embedded tags |
| Schema evolution | First-class (resolution rules) | Field numbers | Field IDs |
| Forward compatibility | Yes (с reader schema) | Yes (skip unknown tags) | Yes |
| Nullability | Union with null | Optional (proto3) | Optional |
| Iceberg/Parquet ecosystem | Native | Через conversion | Через conversion |
| Flink connector | Excellent (avro-registry) | Good (proto plugin) | Limited |
| Use case | Long-lived data, data lake | RPC, microservices, streams | Legacy systems |
Выбор по принципу:
- Avro — если data попадёт в data lake (Iceberg, Paimon), если у вас Confluent stack, если важна максимальная footprint efficiency, если schema evolution критична.
- Protobuf — если нужна максимальная скорость encoding/decoding, если у вас уже есть Protobuf services, если streams не уходят в data lake (или вы готовы конвертировать).
- Thrift — только для legacy совместимости. Для нового кода не выбирайте.
В новых проектах с data lake тенденция — Avro для Kafka topics + Avro в Iceberg. Это даёт zero-conversion от ingestion до query layer. Если вы выбираете Protobuf для скорости — будьте готовы конвертировать в Avro или Parquet при записи в lake.
Schema Registry в Flink
Schema Registry — это external сервис (Confluent, Aiven, Apicurio), который хранит все версии schema и выдаёт по schema ID. Каждое сообщение в Kafka содержит 5-byte prefix: magic byte + 4 bytes schema ID. Десериализатор использует ID для fetch schema из Registry (с кешированием).
// Avro + Confluent Schema Registry
Map{'<'}String, Object{'>'} schemaRegConfig = new HashMap{'<'}{'>'}();
schemaRegConfig.put("schema.registry.url", "https://registry:8081");
KafkaSource{'<'}Order{'>'} source = KafkaSource.{'<'}Order{'>'}builder()
.setTopics("orders")
.setValueOnlyDeserializer(
ConfluentRegistryAvroDeserializationSchema.forSpecific(
Order.class,
schemaRegConfig
)
)
.build();
Schema Registry даёт:
- Compatibility checks — при registration новой schema проверяется обратная/прямая совместимость со старыми версиями.
- Single source of truth — все consumers видят one canonical schema.
- Schema discovery — можно подключиться к топику не зная schema наперёд.
Минусы:
- External dependency — Registry должен быть HA;
- Latency для fetch первого сообщения с новой schema (~10-50ms);
- Coordination overhead между teams для schema changes.
В Flink 2.2 встроен retry + cache layer для Schema Registry, что снижает impact от Registry downtime. Job не падает мгновенно при network blip — он продолжает работать с last-known schema до восстановления.
Protobuf без Registry
Protobuf обычно не требует Registry — schema распространяется через codegen (proto files compiled в Java classes, deployed вместе с приложением). Это даёт self-contained jars без external runtime dependency.
// order.proto
syntax = "proto3";
package com.example;
message Order {
string user_id = 1;
int64 amount = 2;
int64 ts = 3;
}
protoc --java_out=src/main/java order.proto
В Flink:
KafkaSource{'<'}Order{'>'} source = KafkaSource.{'<'}Order{'>'}builder()
.setTopics("orders")
.setValueOnlyDeserializer(
new ProtobufDeserializationSchema{'<'}Order{'>'}(Order.parser())
)
.build();
Schema evolution через discipline: никогда не переиспользовать удалённые field numbers, новые поля только с новыми числами, удалённые поля marked as reserved 4;.
Schema evolution scenarios
Какой формат лучше для конкретного evolution scenario?
Сценарий 1: добавление nullable поля
| Формат | Compatibility |
|---|---|
| Avro | Полная (с default value) |
| Protobuf | Полная (proto3 optional с default) |
| Thrift | Полная (optional field) |
Все три справляются хорошо. Самый простой и common evolution.
Сценарий 2: переименование поля
| Формат | Compatibility |
|---|---|
| Avro | Возможна через alias в reader schema |
| Protobuf | Невозможна (field number — единственный ID, name косметический) |
| Thrift | Невозможна (тоже field ID-based) |
Avro здесь побеждает за счёт aliases.
Сценарий 3: изменение типа integer-а (int -> long)
| Формат | Compatibility |
|---|---|
| Avro | Полная widening (int -> long), обратной нет |
| Protobuf | Полная (wire-compatible типы) |
| Thrift | Частичная (зависит от protocol) |
Сценарий 4: добавление nested message с required полями
| Формат | Compatibility |
|---|---|
| Avro | OK если nested имеет default |
| Protobuf | OK (новые messages optional в proto3) |
| Thrift | Зависит от required vs optional |
Compression поверх форматов
Все три формата дают binary output, который дополнительно сжимается на уровне Kafka (snappy, lz4, zstd) или Iceberg (zstd, gzip). Compression efficiency зависит от данных:
- Repeating values (user IDs, status codes) — отлично сжимаются (10-20x ratio).
- Random data (UUIDs, hashes) — слабо сжимается (1.5-2x).
- Numeric series (timestamps, counters) — хорошо сжимаются delta/dictionary (5-10x).
Avro имеет встроенный sync marker и block-level compression в Avro Object Container Files (для batch). Для streaming compression обычно делается на уровне транспорта (Kafka), а не сериализатора.
Попробуй сам
-
Размер сообщения side-by-side. Возьмите ваш текущий event class и сериализуйте 1000 instances через JSON, Avro Specific, Protobuf, Thrift. Сравните total bytes и время encoding. Ожидаемый результат — Avro/Protobuf в 3-4x меньше JSON, скорость в 5-8x быстрее.
-
Schema Registry experiment. Поднимите Confluent Schema Registry локально (Docker), зарегистрируйте две версии вашей schema (с добавленным nullable полем). Прочитайте старые сообщения новым consumer-ом через Avro reader resolution. Убедитесь, что новое поле получает default value.
-
Forward compatibility в Protobuf. Создайте Order с field number 4 (новое поле). Запишите 100 сообщений с новым полем. Прочитайте старым parser (без поля 4) — должно работать без ошибок, unknown поле скипается.