Learning Platform
Глоссарий Troubleshooting
Урок 18.03 · 25 мин
Продвинутый
AvroProtobufThriftSchema EvolutionSchema RegistryWire Format

Avro, Protobuf, Thrift

POJO и Kryo — внутренние сериализаторы Flink, оптимизированные для intermediate state и cross-operator shuffle. Когда же речь идёт о data, которая пересекает границу job-а — приходит из Kafka, уходит в Iceberg, обменивается между сервисами — почти всегда используется один из трёх внешних форматов: Avro, Protobuf или Thrift.

Этот урок про их различия, performance, schema evolution и выбор для конкретного use case. Будем смотреть на байты wire format и реальные throughput numbers.

Avro deep-dive в контексте Kafka Schema Registry Protobuf и JSON Schema в Schema Registry Parquet, ORC, Avro: форматы файлов для аналитики

Avro: schema evolution first-class

Apache Avro был спроектирован для Hadoop-ecosystem с одной целью: данные должны жить дольше, чем код, который их создал. Это означает аггрессивный фокус на schema evolution. Avro отделяет schema от data: каждое сообщение в потоке не содержит typing-информацию, она хранится отдельно (в файле header для batch, в Schema Registry для streaming).

Wire format одного record-а минималистичный:

[length-prefixed fields в порядке schema]
[без markers, без field IDs, без type tags]
[NULL поля кодируются через Union с index]

Пример: record {userId: "U-42", amount: 199, ts: 1715900000} под schema {userId: string, amount: long, ts: long}:

[03] [U] [-] [4] [2]   // var-int len + utf8 bytes
[02] [c7]              // var-int amount (zigzag encoded)
[ca] [d8] [e3] [a5] [0a]  // var-int ts

Итого 11 байт vs ~24 байта в JSON. Avro экономит трафик и место примерно в 2-3 раза.

Schema evolution в Avro работает через резолюцию reader/writer schemas:

  • Writer schema — это schema, под которой данные были записаны.
  • Reader schema — это schema, под которой код читает.

Avro правила резолюции:

  • Добавление поля с default value — обратно совместимо.
  • Удаление поля с default value в reader — совместимо.
  • Переименование (через alias) — совместимо.
  • Изменение типа в narrowing/widening — частично совместимо (int -> long OK, long -> int нет).
  • Изменение default — совместимо.

В streaming-контексте Schema Registry (Confluent или Aiven) хранит все версии schema и выдаёт по schema ID, прикреплённому к каждому сообщению как 5-byte prefix.

// Flink Kafka connector с Avro
KafkaSource{'<'}Order{'>'} source = KafkaSource.{'<'}Order{'>'}builder()
    .setValueOnlyDeserializer(
        ConfluentRegistryAvroDeserializationSchema.forSpecific(
            Order.class,
            "https://schema-registry:8081"
        )
    )
    .build();

Protobuf: footprint и speed

Google Protocol Buffers (Protobuf) был сделан для RPC: миллионы запросов в секунду между сервисами с минимальной latency. Это даёт другие приоритеты: максимальная скорость encoding/decoding, варьируемая длина integer-ов (varint), embedded type tags.

Wire format Protobuf хранит каждое поле как (tag, type, value):

[field 1: tag=1, type=length-delim] [length: 4] [U-42]
[field 2: tag=2, type=varint] [199 as varint]
[field 3: tag=3, type=varint] [ts as varint]

Tag-и встроены в поток, что добавляет 1-2 байта overhead на каждое поле, но даёт две killer features: schema evolution через field numbers (поля номеруются, можно добавлять и удалять без поломки) и forward compatibility — старый код может прочитать новое сообщение и проигнорировать незнакомые tag-и.

Производительность Protobuf обычно на 20-50% быстрее Avro для encoding/decoding. Это потому что Protobuf генерирует statically-compiled классы (через protoc), которые работают без reflection. Avro может работать через GenericRecord (reflection-based) или SpecificRecord (generated). SpecificRecord догоняет Protobuf, GenericRecord медленнее в 2-3 раза.

// Flink Kafka с Protobuf
KafkaSource{'<'}Order{'>'} source = KafkaSource.{'<'}Order{'>'}builder()
    .setValueOnlyDeserializer(
        new ProtobufDeserializationSchema{'<'}{'>'}(Order.parser())
    )
    .build();

Минусы Protobuf:

  • schema evolution через field numbers требует discipline (нельзя переиспользовать удалённые номера);
  • нет nullability в proto2 (только optional/required), в proto3 все поля optional но это меняет семантику default;
  • ecosystem менее богат на data tools — Iceberg/Parquet нативно понимают Avro, для Proto нужна конвертация.

Thrift: legacy choice

Apache Thrift родом из Facebook (теперь Meta), сделан в ту же эпоху что и Protobuf (2007). Он покрывает и сериализацию, и RPC framework. В большинстве компаний Thrift существует только потому, что Facebook/Twitter/Foursquare его использовали и legacy services его требуют.

Wire format Thrift похож на Protobuf — (tag, type, value) — но более verbose:

[type byte] [field id: 2 bytes] [value]
[type byte] [field id: 2 bytes] [value]
[stop marker]

Без varint оптимизации field IDs (2 bytes vs Protobuf 1-2 bytes). Speed и footprint обычно на 10-20% хуже Protobuf при идентичных workload-ах.

Thrift предоставляет несколько protocol-ов:

  • BinaryProtocol — стандартный, описанный выше;
  • CompactProtocol — варьируемая длина, похожий на Protobuf по footprint;
  • TupleProtocol — для known schemas без type tags (наименьший footprint, но без forward compat).

В новых стримах Thrift редко выбирается. Его используют либо для совместимости с существующими Thrift services, либо в специфических ecosystem-ах (HBase, Cassandra тулинг иногда зависит от Thrift).


Wire format side-by-side

Wire format одного события {'{'} userId='U-42', amount=199 {'}'}
JSON: 38 bytesJSON baseline: human-readable, no schema, full field names в каждом сообщении. На потоке 1M events/sec это в 4-5x больше bytes чем binary formats. Используется только для control plane, debug streams.
Avro: 11 bytesAvro: schema стоит отдельно (Schema Registry). На wire только raw values в порядке schema. Минимальный footprint среди трёх форматов. Идеален для high-volume streams с stable schema.
Protobuf: 14 bytesProtobuf: field tags в каждом сообщении (1-2 bytes overhead per field). Слегка больше чем Avro, но faster encoding/decoding и без зависимости от Schema Registry.
Thrift: 18 bytesThrift BinaryProtocol: 2-byte field IDs, fixed type bytes. Verbose даже для compact protocol. Choosing Thrift today обычно legacy reasons.
Throughput (encode 1M Order objects, single core)
JSON: 200K/sJSON: 200K ops/sec. Самый медленный, особенно при decoding (parsing требует tokenization). Подходит только для low-volume streams.
Avro: 1.2M/sAvro SpecificRecord (codegen): 1.2M ops/sec. GenericRecord (reflection) был бы 400K. Generated code критичен для performance.
Protobuf: 1.6M/sProtobuf: 1.6M ops/sec. Лучший speed thanks to generated code и varint optimization. Самый предсказуемый performance среди трёх форматов.
Thrift: 1.0M/sThrift CompactProtocol: 1.0M ops/sec. Slightly slower than Protobuf за счёт overhead protocol layer. BinaryProtocol ещё медленнее.

Сравнительная таблица

КритерийAvroProtobufThrift
Footprint (10-field record)100% (baseline)110-130%130-150%
Encoding speedMedium (Specific) / Slow (Generic)FastMedium
Decoding speedMedium / SlowFastMedium
Schema locationExternal (Registry)Embedded tagsEmbedded tags
Schema evolutionFirst-class (resolution rules)Field numbersField IDs
Forward compatibilityYes (с reader schema)Yes (skip unknown tags)Yes
NullabilityUnion with nullOptional (proto3)Optional
Iceberg/Parquet ecosystemNativeЧерез conversionЧерез conversion
Flink connectorExcellent (avro-registry)Good (proto plugin)Limited
Use caseLong-lived data, data lakeRPC, microservices, streamsLegacy systems

Выбор по принципу:

  • Avro — если data попадёт в data lake (Iceberg, Paimon), если у вас Confluent stack, если важна максимальная footprint efficiency, если schema evolution критична.
  • Protobuf — если нужна максимальная скорость encoding/decoding, если у вас уже есть Protobuf services, если streams не уходят в data lake (или вы готовы конвертировать).
  • Thrift — только для legacy совместимости. Для нового кода не выбирайте.
TIP

В новых проектах с data lake тенденция — Avro для Kafka topics + Avro в Iceberg. Это даёт zero-conversion от ingestion до query layer. Если вы выбираете Protobuf для скорости — будьте готовы конвертировать в Avro или Parquet при записи в lake.


Schema Registry — это external сервис (Confluent, Aiven, Apicurio), который хранит все версии schema и выдаёт по schema ID. Каждое сообщение в Kafka содержит 5-byte prefix: magic byte + 4 bytes schema ID. Десериализатор использует ID для fetch schema из Registry (с кешированием).

// Avro + Confluent Schema Registry
Map{'<'}String, Object{'>'} schemaRegConfig = new HashMap{'<'}{'>'}();
schemaRegConfig.put("schema.registry.url", "https://registry:8081");

KafkaSource{'<'}Order{'>'} source = KafkaSource.{'<'}Order{'>'}builder()
    .setTopics("orders")
    .setValueOnlyDeserializer(
        ConfluentRegistryAvroDeserializationSchema.forSpecific(
            Order.class,
            schemaRegConfig
        )
    )
    .build();

Schema Registry даёт:

  • Compatibility checks — при registration новой schema проверяется обратная/прямая совместимость со старыми версиями.
  • Single source of truth — все consumers видят one canonical schema.
  • Schema discovery — можно подключиться к топику не зная schema наперёд.

Минусы:

  • External dependency — Registry должен быть HA;
  • Latency для fetch первого сообщения с новой schema (~10-50ms);
  • Coordination overhead между teams для schema changes.

В Flink 2.2 встроен retry + cache layer для Schema Registry, что снижает impact от Registry downtime. Job не падает мгновенно при network blip — он продолжает работать с last-known schema до восстановления.


Protobuf без Registry

Protobuf обычно не требует Registry — schema распространяется через codegen (proto files compiled в Java classes, deployed вместе с приложением). Это даёт self-contained jars без external runtime dependency.

// order.proto
syntax = "proto3";
package com.example;

message Order {
    string user_id = 1;
    int64 amount = 2;
    int64 ts = 3;
}
protoc --java_out=src/main/java order.proto

В Flink:

KafkaSource{'<'}Order{'>'} source = KafkaSource.{'<'}Order{'>'}builder()
    .setTopics("orders")
    .setValueOnlyDeserializer(
        new ProtobufDeserializationSchema{'<'}Order{'>'}(Order.parser())
    )
    .build();

Schema evolution через discipline: никогда не переиспользовать удалённые field numbers, новые поля только с новыми числами, удалённые поля marked as reserved 4;.


Schema evolution scenarios

Какой формат лучше для конкретного evolution scenario?

Сценарий 1: добавление nullable поля

ФорматCompatibility
AvroПолная (с default value)
ProtobufПолная (proto3 optional с default)
ThriftПолная (optional field)

Все три справляются хорошо. Самый простой и common evolution.

Сценарий 2: переименование поля

ФорматCompatibility
AvroВозможна через alias в reader schema
ProtobufНевозможна (field number — единственный ID, name косметический)
ThriftНевозможна (тоже field ID-based)

Avro здесь побеждает за счёт aliases.

Сценарий 3: изменение типа integer-а (int -> long)

ФорматCompatibility
AvroПолная widening (int -> long), обратной нет
ProtobufПолная (wire-compatible типы)
ThriftЧастичная (зависит от protocol)

Сценарий 4: добавление nested message с required полями

ФорматCompatibility
AvroOK если nested имеет default
ProtobufOK (новые messages optional в proto3)
ThriftЗависит от required vs optional

Compression поверх форматов

Все три формата дают binary output, который дополнительно сжимается на уровне Kafka (snappy, lz4, zstd) или Iceberg (zstd, gzip). Compression efficiency зависит от данных:

  • Repeating values (user IDs, status codes) — отлично сжимаются (10-20x ratio).
  • Random data (UUIDs, hashes) — слабо сжимается (1.5-2x).
  • Numeric series (timestamps, counters) — хорошо сжимаются delta/dictionary (5-10x).

Avro имеет встроенный sync marker и block-level compression в Avro Object Container Files (для batch). Для streaming compression обычно делается на уровне транспорта (Kafka), а не сериализатора.


Попробуй сам

  1. Размер сообщения side-by-side. Возьмите ваш текущий event class и сериализуйте 1000 instances через JSON, Avro Specific, Protobuf, Thrift. Сравните total bytes и время encoding. Ожидаемый результат — Avro/Protobuf в 3-4x меньше JSON, скорость в 5-8x быстрее.

  2. Schema Registry experiment. Поднимите Confluent Schema Registry локально (Docker), зарегистрируйте две версии вашей schema (с добавленным nullable полем). Прочитайте старые сообщения новым consumer-ом через Avro reader resolution. Убедитесь, что новое поле получает default value.

  3. Forward compatibility в Protobuf. Создайте Order с field number 4 (новое поле). Запишите 100 сообщений с новым полем. Прочитайте старым parser (без поля 4) — должно работать без ошибок, unknown поле скипается.

Проверка знанийKnowledge check
Вы планируете архитектуру стриминговой платформы. Kafka topics будут читаться Flink-ом и записываться в Iceberg lake для долгосрочной аналитики. Объём — 5M events/sec, 50 разных типов событий, schemas будут эволюционировать (новые поля каждый квартал). Команда сейчас использует Protobuf для существующих микросервисов. Какой формат сериализации выбрать для новой платформы и почему?
ОтветAnswer
Выбрать Avro для Kafka topics в этой платформе, несмотря на существующий Protobuf в микросервисах. Обоснование: (1) Iceberg/Paimon нативно понимают Avro, поэтому zero-conversion от ingestion до query layer — это сэкономит CPU и операционную сложность. (2) Avro даёт минимальный footprint (на ~15-25% меньше Protobuf), что при 5M events/sec означает существенную экономию network bandwidth и disk space в lake. (3) Schema Registry с compatibility checks даёт guardrails против ломающих schema changes — критично когда 50 типов событий эволюционируют квартально. (4) Avro aliases поддерживают переименование полей, что Protobuf не умеет. Trade-off: микросервисы продолжают работать с Protobuf для RPC (это правильный выбор для low-latency request/response), а на границе с Kafka делается конверсия Proto->Avro в продюсере. Это minor overhead на producer side, но даёт огромные benefits на ingestion и lake side. Альтернатива — оставить Protobuf для согласованности с микросервисами — приемлема, но потребует Proto->Parquet converter перед записью в Iceberg, что добавляет operational complexity и CPU cost.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Какой формат сериализации лучше выбрать для нового streaming pipeline с Iceberg lake destination, 5M events/sec, 50 типов событий, schema evolution каждый квартал? Существующие микросервисы используют Protobuf.

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4