POJO vs Kryo performance
В предыдущем уроке мы разобрали, что Flink выбирает сериализатор на стадии построения JobGraph и что Kryo используется как fallback. Теперь конкретные числа: насколько именно дороже Kryo, как поймать его в production и что делать, когда поймали.
Этот урок про практическую инструментацию. Мы посмотрим на байты, наносекунды, флаги и логи — то, что отличает работающий streaming-job от такого, который ест в 5 раз больше CPU, чем должен.
Cache hierarchy: L1/L2/L3 и RAMЧто делает PojoSerializer
PojoSerializer держит массив under-the-hood: список полей класса, и для каждого поля — соответствующий TypeSerializer. На запись он итерирует по полям и вызывает суб-сериализаторы в строгом порядке (alphabetical by field name — это даёт детерминированность между запусками).
Byte layout для нашего Order(String userId, long amount, Instant ts):
[isNull marker: 1 byte]
[var-int: длина userId в utf8]
[utf8 bytes userId]
[long amount: 8 bytes big-endian]
[long ts.epochSecond: 8 bytes]
[int ts.nano: 4 bytes]
Никаких меток типов, никаких имён полей в потоке — schema сохраняется через TypeSerializerSnapshot в чекпойнте, а сам поток это plain bytes. На декодирование PojoSerializer знает, сколько байт читать для каждого поля, и десериализует строго по очереди.
Производительность: на современном CPU (3.5 GHz) сериализация типичного POJO с 5-10 полями занимает 50-200 нс. На потоке 1M events/sec это 5-20% CPU на одном core. На потоке 10M events/sec — половина core.
// Pseudocode внутри PojoSerializer.serialize(T value, DataOutputView target)
for (Field field : sortedFields) {
Object value = field.get(instance); // setAccessible(true) — один раз при init
fieldSerializers[i].serialize(value, target);
}
Reflection вызывается, но field.setAccessible(true) делается один раз при инициализации сериализатора, а field.get(instance) через unsafe-API в современной JVM это десятки наносекунд.
Что делает KryoSerializer
Kryo — внешняя библиотека (Esoteric Software Kryo, версия 5.5 в Flink 2.2). Она сериализует произвольные Java-объекты через reflection, но с большим overhead-ом по сравнению с PojoSerializer:
- Каждый класс в потоке имеет numeric ID (var-int) или полное имя класса при первой встрече.
- Поля сериализуются в порядке reflection-discovery (не детерминированный между JVM-версиями).
- Reference tracking (для циклических ссылок) добавляет per-object overhead.
- Default constructor вызывается через Reflection.newInstance, что в Java 17+ требует deep reflection access.
Byte layout того же Order под Kryo:
[var-int: class registration ID] -- 1 byte для known, 4+ для unknown
[reference marker: 1 byte]
[utf8 marker]
[var-int: длина userId]
[utf8 bytes userId]
[fixed marker]
[long amount: 8 bytes variable-length encoded]
[class registration ID для Instant]
[long epochSecond + int nano с encoding]
Плюс: Kryo делает variable-length encoding для long-ов (маленькие числа короче), что экономит place. Минус: эти кодирования стоят CPU.
Производительность: 500-2000 нс на тот же объект. То есть в 5-10 раз медленнее POJO.
// Внутри KryoSerializer.serialize() — реальный путь
kryo.writeClassAndObject(output, value);
// -> лезет в reflection cache
// -> итерирует поля в reflection-discovery order
// -> для каждого поля вызывает соответствующий FieldSerializer
// -> для unknown types падает на DefaultClassResolver и regs new ID
Strict POJO validation
В Flink 2.2 финализирован флаг pipeline.strict-pojo-validation (FLIP-389), который превращает Kryo fallback из silent warning в обязательную ошибку. Конфигурация:
# flink-conf.yaml
pipeline.strict-pojo-validation: true
Или per-job через ExecutionConfig:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setStrictPojoValidation(true);
Эффект: если TypeExtractor пытается вернуть GenericTypeInfo, билд JobGraph падает с подробным сообщением, какой именно тип не прошёл валидацию и почему:
org.apache.flink.api.common.functions.InvalidTypesException:
Type 'com.example.Order' is being treated as a generic type (GenericTypeInfo)
but strict POJO validation is enabled.
Reasons it failed POJO checks:
- Field 'tags' of type 'java.util.List' is missing a concrete generic parameter
- Field 'amount' is final and has no setter
To fix:
- Use a concrete type instead of List<?>
- Remove 'final' modifier or provide a setter
- Or explicitly register a custom serializer via env.getConfig().registerTypeSerializer(...)
Это game changer для production-job. Раньше Kryo fallback находили только при performance review — теперь fail-fast на этапе CI. Команды, переехавшие на Flink 2.2, ставят этот флаг по умолчанию в shared конфигурации.
strict-pojo-validation breaking change для jobs, которые исторически полагались на Kryo fallback. Перед включением сделайте audit: запустите job в dev с флагом, посмотрите failures, либо зарегистрируйте custom serializers, либо рефакторите классы. Не включайте флаг бездумно на production-кластере.
Поиск Kryo fallback в работающем job
Если strict-validation не доступен (legacy кластер на 1.x), используйте Flink Metrics для post-hoc detection. Для каждого оператора Flink публикует метрики:
Serialize Time per record(приблизительно черезnumRecordsOutPerSecondи custom timing)Bytes per recordчерезnumBytesOut / numRecordsOut
Если bytes/record для какого-то stream превышает ваши ожидания (например 60+ bytes для типичного 3-field POJO), это сигнал. Дальше используйте JVM-флаги для подробной inspection:
# При старте TaskManager
-Dorg.apache.flink.api.common.typeutils.SerializerProfiler=DEBUG
Или async-profiler с allocation profiling — он покажет, где идут allocation-ы внутри KryoSerializer:
./profiler.sh -d 30 -e alloc -f kryo-alloc.html {taskmanager-pid}
В flame graph вы увидите hot path вида KryoSerializer.serialize -> Kryo.writeClassAndObject -> ReflectionUtil.... Если этот путь занимает значимое место — у вас Kryo в hot path.
Третий путь — exception trick:
TypeInformation{'<'}MyType{'>'} info = TypeInformation.of(MyType.class);
if (info.getClass().getSimpleName().contains("Generic")) {
log.warn("Type {} is GenericType (Kryo): {}",
MyType.class.getName(), info);
}
Вставьте такой чек в начало драйвер-кода для каждого важного типа. Срабатывает на этапе сборки JobGraph — production-safe.
Real production impact: case study
Реальный случай из production-кластера большого ride-sharing сервиса (анонимизированно). Job обрабатывал GPS-сигналы с водителей: ~50K events/sec на TaskManager. После релиза нового data class Location performance деградировала на 40%: backpressure стал хроническим, lag вырос с 200ms до 4s.
Расследование:
- Flame graph через async-profiler показал 35% CPU времени в
KryoSerializer.serialize. - Driver code:
TypeInformation.of(Location.class).toString()вернулGenericType{'<'}Location{'>'}. - Причина — в
Locationпоявилось полеMap{'<'}String, Object{'>'} extras. Object стирал generic, POJO-validation падал.
Фикс:
// Было
public class Location {
public double lat;
public double lon;
public Map{'<'}String, Object{'>'} extras; // killed POJO
}
// Стало
public class Location {
public double lat;
public double lon;
public Map{'<'}String, String{'>'} extras; // concrete, POJO OK
// или передавать extras через отдельный stream side-output
}
После фикса: CPU вернулся к baseline, lag упал обратно до 200ms. Простой фикс одного поля дал 40% throughput improvement. Это типичная история — Object в payload classes это silent killer.
Когда Kryo оправдан
Не всегда Kryo это враг. Есть кейсы, где он осознанный выбор:
- Polymorphic types в state. Если в одном
ValueStateмогут лежать разные subclass-ы (Event abstract, и subclasses LoginEvent, PurchaseEvent, …), POJO такой полиморфизм не поддерживает. Kryo с registered subclasses работает корректно. - Snapshot-friendly evolution через Kryo serializer registration. Когда вы знаете, что schema будет меняться непредсказуемо и не хочется управлять MigrationStrategies.
- Прототипирование и dev-окружение — пока job не в production, Kryo overhead приемлем для скорости итераций.
Для production hot path POJO всегда win. Для cold storage (long-term state, archives) и low-volume sources (config streams, control messages) Kryo приемлем.
Если нужен полиморфизм в state — рассмотрите Avro Union types или Protobuf oneof. Они дают type-safe полиморфизм с лучшей производительностью чем Kryo и нормальной schema evolution. Это тема следующего урока.
Регистрация типов в Kryo для ускорения
Если Kryo неизбежен, его можно ускорить регистрацией типов. По умолчанию каждый встретившийся класс получает 4-byte ID и записывается полным именем при первой встрече per-stream. С регистрацией:
env.getConfig().registerKryoType(LoginEvent.class);
env.getConfig().registerKryoType(PurchaseEvent.class);
env.getConfig().registerKryoType(LogoutEvent.class);
Это даёт каждому типу 1-byte ID и убирает class name из каждого сообщения. Speedup: 1.5-2x для Kryo path. Не приближает к POJO, но снижает overhead.
Альтернативно, можно зарегистрировать custom Kryo serializer для конкретного типа:
env.getConfig().registerTypeWithKryoSerializer(
JodaDateTime.class,
JodaDateTimeSerializer.class
);
Это полезно для типов, которые трудно сделать POJO (immutable types из external library), но критичны для throughput.
Snapshot compatibility
PojoSerializer и KryoSerializer оба производят TypeSerializerSnapshot — это binary-форма descriptor сериализатора, которая сохраняется в каждый чекпойнт. При restore Flink десериализует snapshot и проверяет совместимость со старой версией.
PojoSerializer snapshot содержит:
- список field names и их типы;
- информацию о field order (sorted);
- recursive snapshot для каждого field serializer.
При schema evolution (новое поле в POJO) Flink сравнивает snapshot со старым и применяет MigrationStrategy: новое поле инициализируется default value, удалённое поле игнорируется при чтении старых state-bytes.
KryoSerializer snapshot практически не поддерживает evolution — изменение полей класса часто приводит к UnsupportedOperationException при restore. Это ещё одна причина избегать Kryo для state.
Попробуй сам
-
Benchmark на собственном POJO. Напишите simple JMH benchmark: создайте 100K instances
Order, сериализуйте через PojoSerializer и через KryoSerializer (явно), измерьте total time. Ожидаемый результат — 5-10x в пользу POJO. -
Включить strict-pojo-validation. На dev-кластере включите флаг и попробуйте запустить ваши существующие job-ы. Соберите список всех типов, которые упали — это ваш todo на рефакторинг.
-
Найти Kryo в metrics. Запустите job на staging, дайте ему поработать час, экспортируйте metrics в Prometheus. Сравните bytes/record для разных streams. Аномально высокие значения — потенциальные Kryo fallback.