Learning Platform
Глоссарий Troubleshooting
Урок 18.04 · 25 мин
Продвинутый
POJO RegistrationCustom SerializerJMH BenchmarkObject ReuseSnapshot Compatibility

Serializer tuning

Предыдущие уроки разобрали выбор сериализатора. Теперь — что делать, когда выбор сделан, но performance всё равно не дотягивает. Сериализатор это горячая точка любого Flink-job-а: на каждый event, на каждый shuffle, на каждое state read/write он вызывается. Микросекунды overhead-а здесь умножаются на миллионы events/sec.

Этот урок про последний рубеж оптимизации: явная регистрация, custom serializers, object reuse, profiling tooling. Мы дойдём до уровня “что точно делать в production-job под нагрузкой”.

Сравнение форматов: Parquet, ORC, Avro

Регистрация POJO classes

Flink при первой встрече с типом анализирует его через TypeExtractor (см. урок 17.1). Этот анализ — затратная reflection-операция, происходящая при построении JobGraph. Для класса с 30 полями она занимает несколько миллисекунд. На запуске больших job-ов с десятками типов это складывается в секунды холодного старта.

Регистрация типов кэширует результат анализа:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Регистрация POJO — однократный анализ
env.getConfig().registerPojoType(Order.class);
env.getConfig().registerPojoType(User.class);
env.getConfig().registerPojoType(Address.class);
env.getConfig().registerPojoType(Payment.class);

Эффект:

  • Holdstart job-а ускоряется (для крупных job-ов экономия 1-3 секунды).
  • TypeInformation для зарегистрированных классов попадает в hot cache при первом использовании.
  • Stable ordering полей между перезапусками — предсказуемый byte layout.

В production-стеке регистрация делается в общем helper-е:

public class TypeRegistry {
    public static void registerAll(StreamExecutionEnvironment env) {
        // Domain events
        env.getConfig().registerPojoType(Order.class);
        env.getConfig().registerPojoType(OrderItem.class);
        env.getConfig().registerPojoType(Payment.class);
        env.getConfig().registerPojoType(User.class);

        // State value types
        env.getConfig().registerPojoType(UserAggregate.class);
        env.getConfig().registerPojoType(SessionState.class);

        // Output sinks
        env.getConfig().registerPojoType(EnrichedOrder.class);
    }
}

// В драйвере каждого job-а
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeRegistry.registerAll(env);

Этот паттерн особенно важен, когда у вас 50+ payload-классов в большом стриминговом репо.


Custom TypeSerializer

Иногда стандартный PojoSerializer не оптимален. Примеры:

  • Класс с большим immutable массивом, который вы знаете, как сжать (delta encoding, dictionary);
  • Класс с дорогим parsing — например, сложный composite ID, который можно хранить как два int-а вместо string;
  • Класс с временным полем, которое всегда Instant.now() — можно делать delta от reference timestamp.

Для таких кейсов есть custom TypeSerializer{'<'}T{'>'}. Это абстрактный класс из flink-core с методами serialize, deserialize, copy, duplicate, snapshotConfiguration.

public class DeltaTimestampSerializer extends TypeSerializer{'<'}EventBatch{'>'} {

    private static final long REFERENCE_EPOCH = 1700000000L * 1000L; // 2023-11-14 ms

    @Override
    public void serialize(EventBatch batch, DataOutputView out) throws IOException {
        // userId — обычная UTF8
        out.writeUTF(batch.userId);
        // timestamps хранятся как int delta от reference, экономия 4 bytes на event
        out.writeInt(batch.events.length);
        for (Event e : batch.events) {
            int delta = (int) (e.ts - REFERENCE_EPOCH);
            out.writeInt(delta);
            out.writeUTF(e.kind);
        }
    }

    @Override
    public EventBatch deserialize(DataInputView in) throws IOException {
        String userId = in.readUTF();
        int count = in.readInt();
        Event[] events = new Event[count];
        for (int i = 0; i {'<'} count; i++) {
            long ts = REFERENCE_EPOCH + in.readInt();
            String kind = in.readUTF();
            events[i] = new Event(ts, kind);
        }
        return new EventBatch(userId, events);
    }

    // Регистрация через configure
    @Override
    public TypeSerializerSnapshot{'<'}EventBatch{'>'} snapshotConfiguration() {
        return new DeltaTimestampSerializerSnapshot();
    }
}

// Применение
env.getConfig().registerTypeSerializer(EventBatch.class, new DeltaTimestampSerializer());

Custom serializer-ы дают точечный 2-5x speedup на специфичных типах. Это микро-оптимизация — она нужна для top 1% hot path, не для общего кода.

WARNING

Custom serializer добавляет maintenance burden. У него должен быть TypeSerializerSnapshot, который поддерживает schema evolution. Если вы измените layout — все существующие checkpoints станут несовместимы. Используйте custom serializer только когда стандартный точно не подходит.


Object reuse

По умолчанию Flink создаёт новый instance объекта для каждого вызова deserialize(). На потоке 1M events/sec это 1M new-операций в секунду на один оператор. GC pressure становится заметным.

Object reuse режим переиспользует один и тот же instance — оператор получает тот же объект каждый раз с новыми field values:

env.getConfig().enableObjectReuse();

Эффект:

  • GC pressure снижается на 40-80% (зависит от рабочей нагрузки).
  • Latency p99 улучшается за счёт меньшего числа GC pauses.
  • Allocation rate в JFR-репорте падает в 2-4 раза.

Но: код оператора должен быть готов. Если вы где-то сохраняете ссылку на input event (например в state без копирования), вы будете получать одну и ту же мутирующуюся ссылку:

// БАГ при enableObjectReuse
private List{'<'}Event{'>'} buffer = new ArrayList{'<'}{'>'}();

@Override
public void processElement(Event e, Context ctx, ...) {
    buffer.add(e);  // bug — все элементы будут одинаковыми (последний)
}

// Фикс
@Override
public void processElement(Event e, Context ctx, ...) {
    buffer.add(new Event(e));  // copy constructor
    // или
    buffer.add(serializer.copy(e));
}

Аналогично для state — если вы делаете state.update(event), серилазация делает копию, но если вы храните в local field или передаёте дальше через collector, нужна явная копия.

Альтернатива — использовать immutable POJO. Если класс immutable, object reuse не имеет эффекта (одна и та же ссылка == один и тот же state), и можно безопасно включать.

TIP

Включать enableObjectReuse — это профессиональный выбор. На production-job это даёт 10-30% throughput improvement, но требует careful code review каждого оператора. Если ваши operator-ы строго stateless functional — включайте. Если есть сомнения — лучше оставить выключенным.


Профилирование сериализатора с JMH

Для конкретных типов и серилазаторов используйте JMH (Java Microbenchmark Harness) — стандарт de-facto для micro-benchmark на JVM.

@State(Scope.Benchmark)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public class SerializerBench {

    private PojoSerializer{'<'}Order{'>'} pojoSer;
    private KryoSerializer{'<'}Order{'>'} kryoSer;
    private Order sample;
    private DataOutputSerializer out;

    @Setup
    public void setup() {
        pojoSer = (PojoSerializer{'<'}Order{'>'})
            TypeInformation.of(Order.class).createSerializer(new ExecutionConfig());
        kryoSer = new KryoSerializer{'<'}{'>'}(Order.class, new ExecutionConfig());
        sample = new Order("U-42", 199, Instant.now());
        out = new DataOutputSerializer(1024);
    }

    @Benchmark
    public void pojoSerialize() throws IOException {
        out.clear();
        pojoSer.serialize(sample, out);
    }

    @Benchmark
    public void kryoSerialize() throws IOException {
        out.clear();
        kryoSer.serialize(sample, out);
    }
}

Запуск:

java -jar benchmarks.jar -wi 5 -i 10 -f 1 SerializerBench

Типичный output для нашего Order:

Benchmark                Mode   Cnt      Score      Error  Units
pojoSerialize          thrpt    10  18540192 ± 234567  ops/s
kryoSerialize          thrpt    10   3217845 ±  89231  ops/s

5.8x разница на чистом encoding. На реальном Flink-job-е разница меньше (часть времени уходит на network, state) но всё равно значительна.

JMH bench даёт стабильные числа за счёт JIT warmup, fork изоляции, statistical analysis. На “просто измерить через System.nanoTime” не полагайтесь — это даёт случайные результаты.


Snapshot compatibility и schema evolution

Каждый TypeSerializer производит TypeSerializerSnapshot — это сериализуемое описание состояния сериализатора, которое сохраняется в каждый checkpoint. При restore Flink сравнивает snapshot со старой версией и принимает решение о compatibility:

  • Compatible as-is — десериализатор может читать старые байты без изменений.
  • Compatible after migration — нужна миграция, Flink её выполняет (например, добавлено новое поле с default).
  • Incompatible — restore fails, требуется savepoint migration через State Processor API.

Для PojoSerializer snapshot содержит:

public class PojoSerializerSnapshot{'<'}T{'>'} implements TypeSerializerSnapshot{'<'}T{'>'} {
    private Class{'<'}T{'>'} clazz;
    private LinkedHashMap{'<'}String, TypeSerializerSnapshot{'<'}?{'>'}{'>'} fieldSerializerSnapshots;
    private int version = 5; // версия формата snapshot
    // ...
}

При restore:

  1. Загружается старый snapshot из checkpoint.
  2. Создаётся новый snapshot текущего сериализатора.
  3. resolveSchemaCompatibility(old) сравнивает — добавлены ли поля, изменился ли тип.
  4. Если совместимо — Flink использует MigratingSerializer (читает старые байты, пишет новые).

Для custom TypeSerializer вы должны написать свой TypeSerializerSnapshot. Самый простой вариант — extend SimpleTypeSerializerSnapshot:

public class DeltaTimestampSerializerSnapshot
    extends SimpleTypeSerializerSnapshot{'<'}EventBatch{'>'} {

    public DeltaTimestampSerializerSnapshot() {
        super(DeltaTimestampSerializer::new);
    }
}

Это говорит Flink: “новый snapshot создаётся через указанный constructor, совместимость есть только если оба snapshot имеют тот же serializer class”.

Для evolution custom serializer-а используйте CompositeTypeSerializerSnapshot, который хранит nested snapshots и реализует custom compatibility logic.


Serializer tuning decision tree
Сериализация — bottleneckStarting point: измерили перформанс job-а, нашли что сериализация занимает значимую долю CPU (через async-profiler). Дальше decision tree какое именно действие применить.
1. Type checkПервая проверка: какой именно сериализатор используется? POJO быстрее всего, Kryo медленнее в 5-10x. Если Kryo — фикс через рефакторинг типов или strict-pojo-validation.
2. Object reuseЕсли все типы — POJO, следующий шаг: object reuse. Снижает GC pressure на 40-80% за счёт переиспользования instances. Требует careful review кода — нельзя сохранять ссылки без копирования.
3. Custom serializerЕсли POJO + reuse не дотягивают: посмотреть на конкретные hot types. Если для них есть domain-specific оптимизация (delta encoding, dictionary) — custom serializer даёт 2-5x speedup.
4. Register typesРегистрация типов через registerPojoType: убыстряет cold start, фиксирует field order. Полезно для job-ов с большим числом типов. Маленький wins, но free.
5. JMH benchmarkJMH бенчмарки конкретных serializer-ов на ваших данных. Сравнить варианты — POJO, custom, тогда вы знаете точные числа для трейдоффов.
6. External formatsФинальный шаг — переключение на binary форматы (Avro/Protobuf) для external boundaries. Это не оптимизация сериализации Flink, но оптимизация всего pipeline ingestion + egress.

Real production tuning: ride-sharing case

Production-кейс из крупной ride-sharing платформы. Job обрабатывал GPS-сигналы от водителей: ~80K events/sec на TaskManager, parallelism 200, общий throughput 16M events/sec.

Baseline (Flink 1.19, до тюнинга):

  • p99 latency: 600ms
  • CPU per TaskManager: 4 core
  • GC pause time: 8% wall clock

После tuning:

Step 1: Audit типов. Driver-helper выводил TypeInformation для всех payload classes. Найдено: 2 типа имели Kryo fallback из-за Map{'<'}String, Object{'>'} полей. Фикс — заменить на Map{'<'}String, String{'>'} для metadata, и отдельные strongly-typed классы для остального.

Step 2: Регистрация типов. 30+ payload classes зарегистрированы через единый registry.

Step 3: Object reuse. Включён enableObjectReuse. Code review показал 2 места с unsafe reference saving — исправлены через explicit copy.

Step 4: Custom serializer для главного hot type. GpsBatch — массив до 100 GPS points в одном event. Custom serializer с delta encoding для timestamps (15 bytes на point вместо 24 byte raw). Подобран после JMH benchmark — 4.2x speedup на этом конкретном типе.

После tuning:

  • p99 latency: 220ms (2.7x улучшение)
  • CPU per TaskManager: 1.8 core (2.2x улучшение)
  • GC pause time: 2% wall clock (4x улучшение)

Job стал работать на 60% меньшем кластере. Это типичный ROI от serializer tuning в production.


Antipatterns

Несколько частых ошибок в попытках тюнинга:

1. Custom serializer для всего. Дико затратное по maintenance, мало выгоды для большинства типов. Custom serializer оправдан только для top 1% hot types с domain-specific оптимизациями.

2. Object reuse без code review. Включить флаг и получить молчаливые баги в state — частая история. Сначала надо понять каждое место, где хранится reference на event.

3. Premature optimization. Тюнинг сериализатора до того, как вы доказали что он bottleneck. Async-profiler — обязательное доказательство перед началом работ.

4. Изменение custom serializer-а без version bump. Если вы поменяли byte layout, а TypeSerializerSnapshot не отражает изменений, restore из старого checkpoint молча даст corrupted data. Всегда version-ируйте.


Попробуй сам

  1. JMH бенчмарк POJO vs Kryo. Возьмите ваш реальный payload class, напишите JMH benchmark по примеру выше. Сравните throughput, allocation rate. Зафиксируйте baseline.

  2. Custom serializer для одного типа. Найдите в вашем job-е тип с array/list полем (events, samples, points). Напишите custom serializer с delta encoding или dictionary. JMH benchmark до и после.

  3. Object reuse audit. Включите enableObjectReuse в dev-окружении. Сделайте code review каждого processElement/flatMap/map — есть ли где-то сохранение references? Запустите unit-tests, ищите регрессии.

Проверка знанийKnowledge check
Вы запустили JMH benchmark на двух сериализаторах. PojoSerializer выдаёт 18M ops/sec, custom serializer 22M ops/sec. Custom serializer экономит 30% bytes на wire. Но при попытке restore job-а из checkpoint, сделанного со старым сериализатором, restore падает с UnsupportedOperationException. Что произошло и какой workflow правильный для миграции на custom serializer в production-job?
ОтветAnswer
Произошло следующее: custom serializer вернул TypeSerializerSnapshot, который Flink не смог сматчить с старым PojoSerializerSnapshot из checkpoint. resolveSchemaCompatibility вернул INCOMPATIBLE, restore upal. Это правильное поведение — Flink не должен молча менять формат данных, иначе можно получить corrupted state. Правильный workflow для миграции: (1) Подготовка — написать custom serializer + TypeSerializerSnapshot с CompositeTypeSerializerSnapshot, который умеет читать PojoSerializer старого формата. Это означает: новый TypeSerializerSnapshot.resolveSchemaCompatibility должен возвращать COMPATIBLE_AFTER_MIGRATION когда видит старый PojoSnapshot, и MigratingSerializer должен использовать PojoSerializer для чтения, а custom для записи. (2) Альтернатива через State Processor API — взять savepoint, прочитать всё состояние через State Processor API, переписать с новым сериализатором, запустить job из нового savepoint. Это работает всегда, но требует downtime и отдельный batch job. (3) Альтернатива: запустить новый job parallel со старым, переключить ingestion, дать новому job-у накопить state с нуля, killing старый. Подходит когда state восстанавливается из source быстро. Production deploy custom serializer-а никогда не делается hot-swap без миграционного плана.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Что делает env.getConfig().enableObjectReuse() и какие риски при его использовании?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4