Learning Platform
Глоссарий Troubleshooting
Урок 18.02 · 25 мин
Продвинутый
PojoSerializerKryoSerializerStrict POJO ValidationPerformanceSerializer Profiling

POJO vs Kryo performance

В предыдущем уроке мы разобрали, что Flink выбирает сериализатор на стадии построения JobGraph и что Kryo используется как fallback. Теперь конкретные числа: насколько именно дороже Kryo, как поймать его в production и что делать, когда поймали.

Этот урок про практическую инструментацию. Мы посмотрим на байты, наносекунды, флаги и логи — то, что отличает работающий streaming-job от такого, который ест в 5 раз больше CPU, чем должен.

Cache hierarchy: L1/L2/L3 и RAM

Что делает PojoSerializer

PojoSerializer держит массив under-the-hood: список полей класса, и для каждого поля — соответствующий TypeSerializer. На запись он итерирует по полям и вызывает суб-сериализаторы в строгом порядке (alphabetical by field name — это даёт детерминированность между запусками).

Byte layout для нашего Order(String userId, long amount, Instant ts):

[isNull marker: 1 byte]
[var-int: длина userId в utf8]
[utf8 bytes userId]
[long amount: 8 bytes big-endian]
[long ts.epochSecond: 8 bytes]
[int ts.nano: 4 bytes]

Никаких меток типов, никаких имён полей в потоке — schema сохраняется через TypeSerializerSnapshot в чекпойнте, а сам поток это plain bytes. На декодирование PojoSerializer знает, сколько байт читать для каждого поля, и десериализует строго по очереди.

Производительность: на современном CPU (3.5 GHz) сериализация типичного POJO с 5-10 полями занимает 50-200 нс. На потоке 1M events/sec это 5-20% CPU на одном core. На потоке 10M events/sec — половина core.

// Pseudocode внутри PojoSerializer.serialize(T value, DataOutputView target)
for (Field field : sortedFields) {
    Object value = field.get(instance);  // setAccessible(true) — один раз при init
    fieldSerializers[i].serialize(value, target);
}

Reflection вызывается, но field.setAccessible(true) делается один раз при инициализации сериализатора, а field.get(instance) через unsafe-API в современной JVM это десятки наносекунд.


Что делает KryoSerializer

Kryo — внешняя библиотека (Esoteric Software Kryo, версия 5.5 в Flink 2.2). Она сериализует произвольные Java-объекты через reflection, но с большим overhead-ом по сравнению с PojoSerializer:

  • Каждый класс в потоке имеет numeric ID (var-int) или полное имя класса при первой встрече.
  • Поля сериализуются в порядке reflection-discovery (не детерминированный между JVM-версиями).
  • Reference tracking (для циклических ссылок) добавляет per-object overhead.
  • Default constructor вызывается через Reflection.newInstance, что в Java 17+ требует deep reflection access.

Byte layout того же Order под Kryo:

[var-int: class registration ID]  -- 1 byte для known, 4+ для unknown
[reference marker: 1 byte]
[utf8 marker]
[var-int: длина userId]
[utf8 bytes userId]
[fixed marker]
[long amount: 8 bytes variable-length encoded]
[class registration ID для Instant]
[long epochSecond + int nano с encoding]

Плюс: Kryo делает variable-length encoding для long-ов (маленькие числа короче), что экономит place. Минус: эти кодирования стоят CPU.

Производительность: 500-2000 нс на тот же объект. То есть в 5-10 раз медленнее POJO.

// Внутри KryoSerializer.serialize() — реальный путь
kryo.writeClassAndObject(output, value);
// -> лезет в reflection cache
// -> итерирует поля в reflection-discovery order
// -> для каждого поля вызывает соответствующий FieldSerializer
// -> для unknown types падает на DefaultClassResolver и regs new ID
POJO vs Kryo: путь одного события через сериализатор
Order(userId=‘U-42’, amount=199, ts=Instant.now()) — обычный event
PojoSerializerPojoSerializer hot path: уже знает field offsets, sorted alphabetically. Каждый field вызывает свой sub-serializer. Никакого class metadata в bytes — schema хранится в snapshot чекпойнта. Total: 80-150 нс на event.
80-150 ns
~24 bytesByte output: ~24 bytes compact. 1 byte null mask + 2 byte var-int len + 4 bytes 'U-42' + 8 bytes amount + 12 bytes Instant. Никаких class meta — десериализатор знает структуру из snapshot.
KryoSerializerKryoSerializer hot path: reflection cache lookup, reference tracking init, поле за полем через FieldSerializer-ы. На каждый class — registration ID или полное имя класса (если не registered). Total: 500-1500 нс на event — 5-10x medlennее.
500-1500 ns
~38-60 bytesByte output: ~38-60 bytes (зависит от registration состояния). Class ID, reference markers, field markers, variable-length encoding. Больше bytes — больше bandwidth на сеть, больше disk I/O для state.
5-10x CPUThroughput multiplier: на одинаковой нагрузке Kryo требует 5-10x больше CPU. На 1M events/sec POJO ест 10% core, Kryo ест половину core. На 10M events/sec — POJO 1 core, Kryo требует 5-10 core.
2-3x bytesNetwork impact: на тех же 10M events/sec Kryo выдаёт ~600 MB/s vs ~240 MB/s POJO. Это разница между 10G линком и 1G — то есть Kryo job может упереться в network bandwidth там, где POJO job свободно дышит.
2-3x stateState size impact: тот же multiplier применяется к RocksDB. Job с Kryo в state хранит 2-3x больше bytes, что означает 2-3x больше disk I/O, 2-3x длиннее compaction, 2-3x больше checkpoint size. Каскадный effect.

Strict POJO validation

В Flink 2.2 финализирован флаг pipeline.strict-pojo-validation (FLIP-389), который превращает Kryo fallback из silent warning в обязательную ошибку. Конфигурация:

# flink-conf.yaml
pipeline.strict-pojo-validation: true

Или per-job через ExecutionConfig:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setStrictPojoValidation(true);

Эффект: если TypeExtractor пытается вернуть GenericTypeInfo, билд JobGraph падает с подробным сообщением, какой именно тип не прошёл валидацию и почему:

org.apache.flink.api.common.functions.InvalidTypesException:
Type 'com.example.Order' is being treated as a generic type (GenericTypeInfo)
but strict POJO validation is enabled.

Reasons it failed POJO checks:
  - Field 'tags' of type 'java.util.List' is missing a concrete generic parameter
  - Field 'amount' is final and has no setter

To fix:
  - Use a concrete type instead of List<?>
  - Remove 'final' modifier or provide a setter
  - Or explicitly register a custom serializer via env.getConfig().registerTypeSerializer(...)

Это game changer для production-job. Раньше Kryo fallback находили только при performance review — теперь fail-fast на этапе CI. Команды, переехавшие на Flink 2.2, ставят этот флаг по умолчанию в shared конфигурации.

WARNING

strict-pojo-validation breaking change для jobs, которые исторически полагались на Kryo fallback. Перед включением сделайте audit: запустите job в dev с флагом, посмотрите failures, либо зарегистрируйте custom serializers, либо рефакторите классы. Не включайте флаг бездумно на production-кластере.


Поиск Kryo fallback в работающем job

Если strict-validation не доступен (legacy кластер на 1.x), используйте Flink Metrics для post-hoc detection. Для каждого оператора Flink публикует метрики:

  • Serialize Time per record (приблизительно через numRecordsOutPerSecond и custom timing)
  • Bytes per record через numBytesOut / numRecordsOut

Если bytes/record для какого-то stream превышает ваши ожидания (например 60+ bytes для типичного 3-field POJO), это сигнал. Дальше используйте JVM-флаги для подробной inspection:

# При старте TaskManager
-Dorg.apache.flink.api.common.typeutils.SerializerProfiler=DEBUG

Или async-profiler с allocation profiling — он покажет, где идут allocation-ы внутри KryoSerializer:

./profiler.sh -d 30 -e alloc -f kryo-alloc.html {taskmanager-pid}

В flame graph вы увидите hot path вида KryoSerializer.serialize -> Kryo.writeClassAndObject -> ReflectionUtil.... Если этот путь занимает значимое место — у вас Kryo в hot path.

Третий путь — exception trick:

TypeInformation{'<'}MyType{'>'} info = TypeInformation.of(MyType.class);
if (info.getClass().getSimpleName().contains("Generic")) {
    log.warn("Type {} is GenericType (Kryo): {}",
        MyType.class.getName(), info);
}

Вставьте такой чек в начало драйвер-кода для каждого важного типа. Срабатывает на этапе сборки JobGraph — production-safe.


Real production impact: case study

Реальный случай из production-кластера большого ride-sharing сервиса (анонимизированно). Job обрабатывал GPS-сигналы с водителей: ~50K events/sec на TaskManager. После релиза нового data class Location performance деградировала на 40%: backpressure стал хроническим, lag вырос с 200ms до 4s.

Расследование:

  1. Flame graph через async-profiler показал 35% CPU времени в KryoSerializer.serialize.
  2. Driver code: TypeInformation.of(Location.class).toString() вернул GenericType{'<'}Location{'>'}.
  3. Причина — в Location появилось поле Map{'<'}String, Object{'>'} extras. Object стирал generic, POJO-validation падал.

Фикс:

// Было
public class Location {
    public double lat;
    public double lon;
    public Map{'<'}String, Object{'>'} extras;  // killed POJO
}

// Стало
public class Location {
    public double lat;
    public double lon;
    public Map{'<'}String, String{'>'} extras;  // concrete, POJO OK
    // или передавать extras через отдельный stream side-output
}

После фикса: CPU вернулся к baseline, lag упал обратно до 200ms. Простой фикс одного поля дал 40% throughput improvement. Это типичная история — Object в payload classes это silent killer.


Когда Kryo оправдан

Не всегда Kryo это враг. Есть кейсы, где он осознанный выбор:

  • Polymorphic types в state. Если в одном ValueState могут лежать разные subclass-ы (Event abstract, и subclasses LoginEvent, PurchaseEvent, …), POJO такой полиморфизм не поддерживает. Kryo с registered subclasses работает корректно.
  • Snapshot-friendly evolution через Kryo serializer registration. Когда вы знаете, что schema будет меняться непредсказуемо и не хочется управлять MigrationStrategies.
  • Прототипирование и dev-окружение — пока job не в production, Kryo overhead приемлем для скорости итераций.

Для production hot path POJO всегда win. Для cold storage (long-term state, archives) и low-volume sources (config streams, control messages) Kryo приемлем.

TIP

Если нужен полиморфизм в state — рассмотрите Avro Union types или Protobuf oneof. Они дают type-safe полиморфизм с лучшей производительностью чем Kryo и нормальной schema evolution. Это тема следующего урока.


Регистрация типов в Kryo для ускорения

Если Kryo неизбежен, его можно ускорить регистрацией типов. По умолчанию каждый встретившийся класс получает 4-byte ID и записывается полным именем при первой встрече per-stream. С регистрацией:

env.getConfig().registerKryoType(LoginEvent.class);
env.getConfig().registerKryoType(PurchaseEvent.class);
env.getConfig().registerKryoType(LogoutEvent.class);

Это даёт каждому типу 1-byte ID и убирает class name из каждого сообщения. Speedup: 1.5-2x для Kryo path. Не приближает к POJO, но снижает overhead.

Альтернативно, можно зарегистрировать custom Kryo serializer для конкретного типа:

env.getConfig().registerTypeWithKryoSerializer(
    JodaDateTime.class,
    JodaDateTimeSerializer.class
);

Это полезно для типов, которые трудно сделать POJO (immutable types из external library), но критичны для throughput.


Snapshot compatibility

PojoSerializer и KryoSerializer оба производят TypeSerializerSnapshot — это binary-форма descriptor сериализатора, которая сохраняется в каждый чекпойнт. При restore Flink десериализует snapshot и проверяет совместимость со старой версией.

PojoSerializer snapshot содержит:

  • список field names и их типы;
  • информацию о field order (sorted);
  • recursive snapshot для каждого field serializer.

При schema evolution (новое поле в POJO) Flink сравнивает snapshot со старым и применяет MigrationStrategy: новое поле инициализируется default value, удалённое поле игнорируется при чтении старых state-bytes.

KryoSerializer snapshot практически не поддерживает evolution — изменение полей класса часто приводит к UnsupportedOperationException при restore. Это ещё одна причина избегать Kryo для state.


Попробуй сам

  1. Benchmark на собственном POJO. Напишите simple JMH benchmark: создайте 100K instances Order, сериализуйте через PojoSerializer и через KryoSerializer (явно), измерьте total time. Ожидаемый результат — 5-10x в пользу POJO.

  2. Включить strict-pojo-validation. На dev-кластере включите флаг и попробуйте запустить ваши существующие job-ы. Соберите список всех типов, которые упали — это ваш todo на рефакторинг.

  3. Найти Kryo в metrics. Запустите job на staging, дайте ему поработать час, экспортируйте metrics в Prometheus. Сравните bytes/record для разных streams. Аномально высокие значения — потенциальные Kryo fallback.

Проверка знанийKnowledge check
Вы запустили production Flink job на Flink 2.2. После выкатки увидели, что throughput упал в 4 раза, а checkpoint size вырос в 2.5 раза. В коде была добавлена одна новая фича — поле metadata типа HashMap<String, Object> в основной payload class. Объясните корневую причину деградации, опишите как её локализовать и как исправить.
ОтветAnswer
Корневая причина — Object в Map<String, Object> сломал POJO-detection для основного payload class. TypeExtractor не может определить generic параметр Object, поэтому фолбэк в GenericTypeInfo с Kryo. Все события теперь сериализуются через Kryo, что в 5-10 раз медленнее (отсюда 4x throughput drop) и даёт в 2-3x больше bytes (отсюда 2.5x checkpoint size growth). Локализация: (1) в driver-коде вызвать TypeInformation.of(MyPayload.class).toString() — должно показать GenericType вместо PojoType; (2) async-profiler покажет KryoSerializer.serialize в hot path; (3) с включённым strict-pojo-validation билд JobGraph упадёт ещё на CI с указанием конкретного поля. Фикс: заменить Map<String, Object> на конкретный тип — Map<String, String> или Map<String, MetadataValue> (где MetadataValue — POJO с union полей). Если нужен полиморфизм — рассмотреть Avro Union или вынести разные типы данных в side-outputs. После фикса добавить strict-pojo-validation в CI для всех job-ов чтобы такая регрессия не прошла снова.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какие конкретные performance differences между PojoSerializer и KryoSerializer в Flink 2.x?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4