Learning Platform
Глоссарий Troubleshooting
Урок 04.05 · 18 мин
Средний
SerializationPOJOTupleAvroKryoTypeInformation

Сериализация: POJO, Tuple, Avro, Kryo

Каждый раз, когда событие пересекает границу между subtasks (shuffle), пишется в state backend, или включается в checkpoint, оно сериализуется. Если сериализация медленная — производительность падает в разы. Если она через Kryo fallback — это часто означает, что вы сделали что-то “неправильно” с точки зрения типов.

Этот урок — про то, какие типы сериализации в Flink, как Flink выбирает между ними, и как избегать самой худшей опции (Kryo fallback). К концу урока вы будете писать классы для Flink “правильно” с первого раза.


TypeInformation: метаданные о типе

Чтобы сериализовать тип, Flink нужно знать его структуру. Эту информацию несёт TypeInformation<T>. Когда вы делаете stream.map(x -> ...), Flink пытается вывести (infer) TypeInformation возвращаемого типа автоматически.

Если он не может вывести (из-за Java generic erasure), вы получаете либо warning, либо fallback на Kryo (плохо), либо exception (“could not determine type of return value”). Поэтому часто видим:

DataStream<Event> events = source
    .map(json -> parseEvent(json))
    .returns(TypeInformation.of(Event.class));  // явный hint

Или с TypeHint для generics:

DataStream<Tuple2<String, List<Long>>> stream = source
    .map(x -> ...)
    .returns(new TypeHint<Tuple2<String, List<Long>>>() {});

TypeInformation — это объект runtime, содержащий info о типе: classes, field types, serializer. Через TypeInformation.getKey(...) и подобные методы он используется внутри Flink для всего.


Flink выбирает serializer на основе типа. Иерархия (от лучшего к худшему):

Иерархия типов в Flink: производительность
1. Primitives и боксированныеint, long, double, boolean и их box equivalents (Integer, Long, ...). Самые быстрые — нативная сериализация JVM. Используйте, когда возможно.
2. Tuple types (Tuple0..Tuple25)Flink-специфические классы Tuple1 of A, Tuple2 of A,B, ..., Tuple25. Используются исторически. Быстрые, потому что Flink знает их структуру.
3. POJO typesPlain Old Java Objects, соответствующие POJO правилам (public class, public no-arg constructor, public/getter+setter поля). Flink анализирует через reflection, генерирует эффективный serializer.
4. Avro / Protobuf / ThriftВнешние schema-aware форматы. Поддерживаются через специальные TypeInformation. Хороши для inter-service compatibility и schema evolution.
5. Kryo (fallback)Generic Java serializer. Используется когда тип не подходит ни в одну категорию выше. Медленный (5-10x), большой output (3x), не schema-evolvable. ИЗБЕГАЙТЕ!

POJO: рекомендованный путь

POJO в Flink-смысле — это класс, удовлетворяющий правилам:

  1. Public класс (не nested non-static, не anonymous).
  2. Public no-arg constructor.
  3. Все поля либо public, либо имеют public getter/setter соответствующего naming pattern.

Пример правильного POJO:

public class Transaction {
    public String id;
    public String userId;
    public BigDecimal amount;
    public long timestamp;
    
    public Transaction() {}  // public no-arg constructor — обязателен
    
    public Transaction(String id, String userId, BigDecimal amount, long timestamp) {
        this.id = id;
        this.userId = userId;
        this.amount = amount;
        this.timestamp = timestamp;
    }
}

Или с приватными полями + getters/setters:

public class Transaction {
    private String id;
    private String userId;
    private BigDecimal amount;
    private long timestamp;
    
    public Transaction() {}
    
    // Getters и setters для каждого поля
    public String getId() { return id; }
    public void setId(String id) { this.id = id; }
    public String getUserId() { return userId; }
    public void setUserId(String userId) { this.userId = userId; }
    // и так далее
}

Java records (Java 17+) — тоже работают как POJO в Flink 2.x:

public record Transaction(String id, String userId, BigDecimal amount, long timestamp) {}

Records автоматически дают: final fields, getter методы (имя поля), equals/hashCode, toString. Flink распознаёт records как POJOs. Это рекомендованный путь для новых типов в Flink 2.x.

Что портит POJO

Часто люди создают “почти POJO”, и Flink сваливается в Kryo:

// НЕ POJO — нет public no-arg constructor
public class Transaction {
    public final String id;
    public Transaction(String id) { this.id = id; }
}

// НЕ POJO — приватное поле БЕЗ getter
public class Transaction {
    private String id;
    public String getRandomName() { return id; }  // wrong naming
}

// НЕ POJO — final поле без proper getter naming
public class Transaction {
    private final String id;
    // нет getId()
}

Как проверить, что класс — POJO: запустите job в local mode, в логах увидите либо “GenericType” (Kryo fallback) либо “PojoType” (POJO recognized). Также можно явно:

TypeInformation<Transaction> typeInfo = TypeInformation.of(Transaction.class);
System.out.println(typeInfo);  // "PojoType<Transaction>" или "GenericType<Transaction>"

Tuple: legacy, но рабочий

Flink имеет Tuple1, Tuple2, …, Tuple25 — generic classes для tuples:

DataStream<Tuple2<String, Long>> counts = words
    .map(w -> Tuple2.of(w, 1L))
    .returns(Types.TUPLE(Types.STRING, Types.LONG))
    .keyBy(t -> t.f0)
    .sum(1);

Tuple — это POJO с public полями f0, f1, …, fN. Flink имеет встроенный TupleTypeInfo, очень быстрый serializer.

Когда использовать Tuple:

  • Быстрый prototyping.
  • Очень простые случаи, где named POJO избыточен.
  • Legacy code.

Когда НЕ использовать Tuple:

  • Production code с осмысленными доменами — POJO с named fields читается намного лучше.
  • Когда полей больше 4-5 — tuple.f0, tuple.f1, tuple.f2 непонятно.

Best practice: используйте POJO/records для production, Tuple для prototyping.


Avro: для schema evolution и cross-service

Avro — schema-aware формат. Используется широко в Kafka-стеке (через Confluent Schema Registry).

Avro Deep-Dive в Kafka Schema Registry

В Flink есть полноценная поддержка Avro:

// Avro generated class (через Avro IDL or Schema)
public class UserEvent extends SpecificRecordBase {
    private String userId;
    private long timestamp;
    private String eventType;
    // generated code
}

// Использование в Flink
DataStream<UserEvent> events = env
    .fromSource(
        kafkaSourceWithAvroSchema(),
        ws,
        "Kafka Avro Source"
    );

Avro generated classes автоматически распознаются Flink (если есть зависимость на flink-avro). Используется AvroSerializer, который быстрый и schema-evolution friendly.

Кейсы:

  • Inter-service compatibility (multiple producers/consumers одного топика).
  • Schema evolution (добавление/удаление полей без breakage).
  • Большие enterprise pipelines с governance.

Цена: дополнительная сложность (schema registry, code generation). Для одиночного Flink-сервиса POJO проще.


Kryo: fallback, которого надо избегать

Если Flink не может распознать тип как POJO, Tuple, Avro и так далее, он fallback на Kryo — generic Java сериализатор.

Проблемы Kryo:

  • Медленный: 5-10x slower, чем POJO serializer.
  • Большой output: 2-3x больше байт, чем POJO.
  • Не schema-evolvable: добавил поле в класс — старые savepoints/checkpoints невалидны.
  • Skipping changes при schema mismatch — данные могут читаться неправильно без warning.

Когда Flink fallback на Kryo:

  • Класс — не POJO (нарушены правила, например, private field без getter).
  • Класс — third-party (не контролируете), не POJO-compatible.
  • Java collection types в state (List, Map of complex types).
  • Generic типы с erasure (например, Object).

Как обнаружить Kryo fallback

  1. Логи: при старте job логи показывают “Cannot use Kryo serializer for…” или “Falling back to Kryo serialization for…”.
  2. Web UI Configuration tab: показывает type information для каждого оператора.
  3. Программно: TypeInformation.of(MyClass.class).toString() — если “GenericType<MyClass>” — это Kryo.
  4. Метрики: высокий time spent in serialization (через flame graphs).

Как избегать Kryo

  1. Сделайте класс POJO-compatible: добавьте public no-arg constructor, getters/setters.
  2. Используйте records: автоматически POJO-compatible.
  3. Используйте Avro/Protobuf для cross-service.
  4. Register custom serializer, если класс действительно не может быть POJO:
env.getConfig().registerKryoType(MyClass.class);
env.getConfig().addDefaultKryoSerializer(MyClass.class, MyCustomSerializer.class);

Это всё ещё Kryo, но с вашей кастомной логикой — может быть быстрее, чем generic Kryo.

  1. Disable Kryo fallback (для catching проблем в dev):
env.getConfig().disableGenericTypes();

Это сделает job exception вместо fallback — поможет catch проблемы рано.

WARNING

В production НЕ disable generic types — иначе любой missed POJO bag упадёт job. В DEV — да, для catching. В production — мониторьте через метрики и логи.


Сравнение производительности

Benchmarks из Flink community (числа упрощены, конкретные значения зависят от типа объекта):

SerializerSpeedSizeSchema evolution
Primitive (int, long)100% (baseline)100%N/A
Tuple2<String, Long>95%110%No
POJO (Transaction)90%120%Limited (через registered Pojo evolution)
Avro80%130%Yes (с registry)
Kryo10-20%250-300%No

Вывод: POJO с правилами — sweet spot. Avro когда нужна schema evolution. Kryo — никогда осознанно.


State serialization: особо важно

В state backend (RocksDB, ForSt, HashMap) каждое state read/write — это serialization/deserialization. Если у вас миллион keys, и для каждого state операция требует 5 мс из-за Kryo вместо 0.5 мс из-за POJO — это разница 10x на throughput.

Особенно важно для:

  • MapState с complex value types: каждая операция get/put сериализует value.
  • Checkpoint duration: сериализация state в snapshot — главный contributor.
  • Recovery time: после failure deserialization из snapshot.

Правило: state value types ВСЕГДА должны быть POJOs или Tuples. Никогда не используйте Kryo для state, если есть выбор.


Production checklist

  1. Все custom event/state classes — POJOs или records. Запустите job в dev, проверьте логи на “Kryo fallback”.

  2. Generic types в state — избегайте: вместо ValueState<Object> или MapState<String, Object> определите конкретные types.

  3. Java records для immutable events — отличный выбор в Flink 2.x.

  4. Avro для cross-service events в Kafka topics (схемы в Schema Registry).

  5. disableGenericTypes() в local dev и testing: помогает catch проблемы рано.

  6. Регулярный аудит: после изменений типов проверьте, что они всё ещё распознаются как POJO (logs / TypeInformation.toString()).

  7. Performance test: если throughput неудовлетворителен — проверьте flame graphs на serialization overhead.


Попробуй сам

Создайте простой test:

  1. Намеренно сломайте POJO: убрите public no-arg constructor у Transaction класса в WordCount-like pipeline. Запустите job. Какие сообщения в логах? Какая разница в performance vs нормального POJO?

  2. Перепишите Transaction как Java record:

    public record Transaction(String id, String userId, BigDecimal amount, long timestamp) {}

    Запустите job. Распознан как POJO? Какой serializer используется?

  3. Добавьте env.getConfig().disableGenericTypes() в начале main(). Запустите. Если есть Kryo fallback — exception. Если нет — всё работает быстрее (без overhead reflection).

  4. Bonus: сравните size of state в checkpoint при POJO vs Kryo — для big state может быть существенная разница в размере на disk.

Проверка знанийKnowledge check
У вас Flink job, обрабатывающий 100k events/sec. Heavy ProcessFunction со stateful logic. После добавления нового поля в Event класс (с Map<String, List<Long>> типа), throughput упал до 30k events/sec. Что вероятная причина и как это диагностировать?
ОтветAnswer
Вероятная причина: добавление Map<String, List<Long>> в Event сделало класс несовместимым с POJO правилами (generic collection types сложно распознать), и Flink fallback на Kryo serialization. Kryo медленнее POJO в 5-10x, что объясняет падение throughput с 100k до 30k. Диагностика: (1) Логи JobManager / TaskManager при старте — поиск 'Kryo' или 'GenericType' или 'Cannot use POJO'; (2) Web UI -> оператор -> Configuration tab — смотреть type information; (3) программно print 'TypeInformation.of(Event.class).toString()' — если показывает 'GenericType' — Kryo fallback; (4) Flame graph на heavy operator — много времени в Kryo.write / Kryo.read. Решения: (a) Изменить тип поля — например, Map<String, long[]> вместо List<Long>, или вынести Map в отдельный POJO; (b) Использовать Avro для Event, который handles complex nested types; (c) Если совсем не получается — register custom Kryo serializer для Event класса с оптимизированной логикой; (d) Самое радикальное — flatten Event без nested collections, делая его POJO-compatible.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какие правила должен соблюдать класс, чтобы Flink распознал его как POJO (и использовал быстрый POJO serializer)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5