Serializer tuning
Предыдущие уроки разобрали выбор сериализатора. Теперь — что делать, когда выбор сделан, но performance всё равно не дотягивает. Сериализатор это горячая точка любого Flink-job-а: на каждый event, на каждый shuffle, на каждое state read/write он вызывается. Микросекунды overhead-а здесь умножаются на миллионы events/sec.
Этот урок про последний рубеж оптимизации: явная регистрация, custom serializers, object reuse, profiling tooling. Мы дойдём до уровня “что точно делать в production-job под нагрузкой”.
Сравнение форматов: Parquet, ORC, AvroРегистрация POJO classes
Flink при первой встрече с типом анализирует его через TypeExtractor (см. урок 17.1). Этот анализ — затратная reflection-операция, происходящая при построении JobGraph. Для класса с 30 полями она занимает несколько миллисекунд. На запуске больших job-ов с десятками типов это складывается в секунды холодного старта.
Регистрация типов кэширует результат анализа:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Регистрация POJO — однократный анализ
env.getConfig().registerPojoType(Order.class);
env.getConfig().registerPojoType(User.class);
env.getConfig().registerPojoType(Address.class);
env.getConfig().registerPojoType(Payment.class);
Эффект:
- Holdstart job-а ускоряется (для крупных job-ов экономия 1-3 секунды).
- TypeInformation для зарегистрированных классов попадает в hot cache при первом использовании.
- Stable ordering полей между перезапусками — предсказуемый byte layout.
В production-стеке регистрация делается в общем helper-е:
public class TypeRegistry {
public static void registerAll(StreamExecutionEnvironment env) {
// Domain events
env.getConfig().registerPojoType(Order.class);
env.getConfig().registerPojoType(OrderItem.class);
env.getConfig().registerPojoType(Payment.class);
env.getConfig().registerPojoType(User.class);
// State value types
env.getConfig().registerPojoType(UserAggregate.class);
env.getConfig().registerPojoType(SessionState.class);
// Output sinks
env.getConfig().registerPojoType(EnrichedOrder.class);
}
}
// В драйвере каждого job-а
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeRegistry.registerAll(env);
Этот паттерн особенно важен, когда у вас 50+ payload-классов в большом стриминговом репо.
Custom TypeSerializer
Иногда стандартный PojoSerializer не оптимален. Примеры:
- Класс с большим immutable массивом, который вы знаете, как сжать (delta encoding, dictionary);
- Класс с дорогим parsing — например, сложный composite ID, который можно хранить как два int-а вместо string;
- Класс с временным полем, которое всегда
Instant.now()— можно делать delta от reference timestamp.
Для таких кейсов есть custom TypeSerializer{'<'}T{'>'}. Это абстрактный класс из flink-core с методами serialize, deserialize, copy, duplicate, snapshotConfiguration.
public class DeltaTimestampSerializer extends TypeSerializer{'<'}EventBatch{'>'} {
private static final long REFERENCE_EPOCH = 1700000000L * 1000L; // 2023-11-14 ms
@Override
public void serialize(EventBatch batch, DataOutputView out) throws IOException {
// userId — обычная UTF8
out.writeUTF(batch.userId);
// timestamps хранятся как int delta от reference, экономия 4 bytes на event
out.writeInt(batch.events.length);
for (Event e : batch.events) {
int delta = (int) (e.ts - REFERENCE_EPOCH);
out.writeInt(delta);
out.writeUTF(e.kind);
}
}
@Override
public EventBatch deserialize(DataInputView in) throws IOException {
String userId = in.readUTF();
int count = in.readInt();
Event[] events = new Event[count];
for (int i = 0; i {'<'} count; i++) {
long ts = REFERENCE_EPOCH + in.readInt();
String kind = in.readUTF();
events[i] = new Event(ts, kind);
}
return new EventBatch(userId, events);
}
// Регистрация через configure
@Override
public TypeSerializerSnapshot{'<'}EventBatch{'>'} snapshotConfiguration() {
return new DeltaTimestampSerializerSnapshot();
}
}
// Применение
env.getConfig().registerTypeSerializer(EventBatch.class, new DeltaTimestampSerializer());
Custom serializer-ы дают точечный 2-5x speedup на специфичных типах. Это микро-оптимизация — она нужна для top 1% hot path, не для общего кода.
Custom serializer добавляет maintenance burden. У него должен быть TypeSerializerSnapshot, который поддерживает schema evolution. Если вы измените layout — все существующие checkpoints станут несовместимы. Используйте custom serializer только когда стандартный точно не подходит.
Object reuse
По умолчанию Flink создаёт новый instance объекта для каждого вызова deserialize(). На потоке 1M events/sec это 1M new-операций в секунду на один оператор. GC pressure становится заметным.
Object reuse режим переиспользует один и тот же instance — оператор получает тот же объект каждый раз с новыми field values:
env.getConfig().enableObjectReuse();
Эффект:
- GC pressure снижается на 40-80% (зависит от рабочей нагрузки).
- Latency p99 улучшается за счёт меньшего числа GC pauses.
- Allocation rate в JFR-репорте падает в 2-4 раза.
Но: код оператора должен быть готов. Если вы где-то сохраняете ссылку на input event (например в state без копирования), вы будете получать одну и ту же мутирующуюся ссылку:
// БАГ при enableObjectReuse
private List{'<'}Event{'>'} buffer = new ArrayList{'<'}{'>'}();
@Override
public void processElement(Event e, Context ctx, ...) {
buffer.add(e); // bug — все элементы будут одинаковыми (последний)
}
// Фикс
@Override
public void processElement(Event e, Context ctx, ...) {
buffer.add(new Event(e)); // copy constructor
// или
buffer.add(serializer.copy(e));
}
Аналогично для state — если вы делаете state.update(event), серилазация делает копию, но если вы храните в local field или передаёте дальше через collector, нужна явная копия.
Альтернатива — использовать immutable POJO. Если класс immutable, object reuse не имеет эффекта (одна и та же ссылка == один и тот же state), и можно безопасно включать.
Включать enableObjectReuse — это профессиональный выбор. На production-job это даёт 10-30% throughput improvement, но требует careful code review каждого оператора. Если ваши operator-ы строго stateless functional — включайте. Если есть сомнения — лучше оставить выключенным.
Профилирование сериализатора с JMH
Для конкретных типов и серилазаторов используйте JMH (Java Microbenchmark Harness) — стандарт de-facto для micro-benchmark на JVM.
@State(Scope.Benchmark)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public class SerializerBench {
private PojoSerializer{'<'}Order{'>'} pojoSer;
private KryoSerializer{'<'}Order{'>'} kryoSer;
private Order sample;
private DataOutputSerializer out;
@Setup
public void setup() {
pojoSer = (PojoSerializer{'<'}Order{'>'})
TypeInformation.of(Order.class).createSerializer(new ExecutionConfig());
kryoSer = new KryoSerializer{'<'}{'>'}(Order.class, new ExecutionConfig());
sample = new Order("U-42", 199, Instant.now());
out = new DataOutputSerializer(1024);
}
@Benchmark
public void pojoSerialize() throws IOException {
out.clear();
pojoSer.serialize(sample, out);
}
@Benchmark
public void kryoSerialize() throws IOException {
out.clear();
kryoSer.serialize(sample, out);
}
}
Запуск:
java -jar benchmarks.jar -wi 5 -i 10 -f 1 SerializerBench
Типичный output для нашего Order:
Benchmark Mode Cnt Score Error Units
pojoSerialize thrpt 10 18540192 ± 234567 ops/s
kryoSerialize thrpt 10 3217845 ± 89231 ops/s
5.8x разница на чистом encoding. На реальном Flink-job-е разница меньше (часть времени уходит на network, state) но всё равно значительна.
JMH bench даёт стабильные числа за счёт JIT warmup, fork изоляции, statistical analysis. На “просто измерить через System.nanoTime” не полагайтесь — это даёт случайные результаты.
Snapshot compatibility и schema evolution
Каждый TypeSerializer производит TypeSerializerSnapshot — это сериализуемое описание состояния сериализатора, которое сохраняется в каждый checkpoint. При restore Flink сравнивает snapshot со старой версией и принимает решение о compatibility:
- Compatible as-is — десериализатор может читать старые байты без изменений.
- Compatible after migration — нужна миграция, Flink её выполняет (например, добавлено новое поле с default).
- Incompatible — restore fails, требуется savepoint migration через State Processor API.
Для PojoSerializer snapshot содержит:
public class PojoSerializerSnapshot{'<'}T{'>'} implements TypeSerializerSnapshot{'<'}T{'>'} {
private Class{'<'}T{'>'} clazz;
private LinkedHashMap{'<'}String, TypeSerializerSnapshot{'<'}?{'>'}{'>'} fieldSerializerSnapshots;
private int version = 5; // версия формата snapshot
// ...
}
При restore:
- Загружается старый snapshot из checkpoint.
- Создаётся новый snapshot текущего сериализатора.
resolveSchemaCompatibility(old)сравнивает — добавлены ли поля, изменился ли тип.- Если совместимо — Flink использует MigratingSerializer (читает старые байты, пишет новые).
Для custom TypeSerializer вы должны написать свой TypeSerializerSnapshot. Самый простой вариант — extend SimpleTypeSerializerSnapshot:
public class DeltaTimestampSerializerSnapshot
extends SimpleTypeSerializerSnapshot{'<'}EventBatch{'>'} {
public DeltaTimestampSerializerSnapshot() {
super(DeltaTimestampSerializer::new);
}
}
Это говорит Flink: “новый snapshot создаётся через указанный constructor, совместимость есть только если оба snapshot имеют тот же serializer class”.
Для evolution custom serializer-а используйте CompositeTypeSerializerSnapshot, который хранит nested snapshots и реализует custom compatibility logic.
Real production tuning: ride-sharing case
Production-кейс из крупной ride-sharing платформы. Job обрабатывал GPS-сигналы от водителей: ~80K events/sec на TaskManager, parallelism 200, общий throughput 16M events/sec.
Baseline (Flink 1.19, до тюнинга):
- p99 latency: 600ms
- CPU per TaskManager: 4 core
- GC pause time: 8% wall clock
После tuning:
Step 1: Audit типов. Driver-helper выводил TypeInformation для всех payload classes. Найдено: 2 типа имели Kryo fallback из-за Map{'<'}String, Object{'>'} полей. Фикс — заменить на Map{'<'}String, String{'>'} для metadata, и отдельные strongly-typed классы для остального.
Step 2: Регистрация типов. 30+ payload classes зарегистрированы через единый registry.
Step 3: Object reuse. Включён enableObjectReuse. Code review показал 2 места с unsafe reference saving — исправлены через explicit copy.
Step 4: Custom serializer для главного hot type. GpsBatch — массив до 100 GPS points в одном event. Custom serializer с delta encoding для timestamps (15 bytes на point вместо 24 byte raw). Подобран после JMH benchmark — 4.2x speedup на этом конкретном типе.
После tuning:
- p99 latency: 220ms (2.7x улучшение)
- CPU per TaskManager: 1.8 core (2.2x улучшение)
- GC pause time: 2% wall clock (4x улучшение)
Job стал работать на 60% меньшем кластере. Это типичный ROI от serializer tuning в production.
Antipatterns
Несколько частых ошибок в попытках тюнинга:
1. Custom serializer для всего. Дико затратное по maintenance, мало выгоды для большинства типов. Custom serializer оправдан только для top 1% hot types с domain-specific оптимизациями.
2. Object reuse без code review. Включить флаг и получить молчаливые баги в state — частая история. Сначала надо понять каждое место, где хранится reference на event.
3. Premature optimization. Тюнинг сериализатора до того, как вы доказали что он bottleneck. Async-profiler — обязательное доказательство перед началом работ.
4. Изменение custom serializer-а без version bump. Если вы поменяли byte layout, а TypeSerializerSnapshot не отражает изменений, restore из старого checkpoint молча даст corrupted data. Всегда version-ируйте.
Попробуй сам
-
JMH бенчмарк POJO vs Kryo. Возьмите ваш реальный payload class, напишите JMH benchmark по примеру выше. Сравните throughput, allocation rate. Зафиксируйте baseline.
-
Custom serializer для одного типа. Найдите в вашем job-е тип с array/list полем (events, samples, points). Напишите custom serializer с delta encoding или dictionary. JMH benchmark до и после.
-
Object reuse audit. Включите enableObjectReuse в dev-окружении. Сделайте code review каждого
processElement/flatMap/map— есть ли где-то сохранение references? Запустите unit-tests, ищите регрессии.