Сериализация: POJO, Tuple, Avro, Kryo
Каждый раз, когда событие пересекает границу между subtasks (shuffle), пишется в state backend, или включается в checkpoint, оно сериализуется. Если сериализация медленная — производительность падает в разы. Если она через Kryo fallback — это часто означает, что вы сделали что-то “неправильно” с точки зрения типов.
Этот урок — про то, какие типы сериализации в Flink, как Flink выбирает между ними, и как избегать самой худшей опции (Kryo fallback). К концу урока вы будете писать классы для Flink “правильно” с первого раза.
TypeInformation: метаданные о типе
Чтобы сериализовать тип, Flink нужно знать его структуру. Эту информацию несёт TypeInformation<T>. Когда вы делаете stream.map(x -> ...), Flink пытается вывести (infer) TypeInformation возвращаемого типа автоматически.
Если он не может вывести (из-за Java generic erasure), вы получаете либо warning, либо fallback на Kryo (плохо), либо exception (“could not determine type of return value”). Поэтому часто видим:
DataStream<Event> events = source
.map(json -> parseEvent(json))
.returns(TypeInformation.of(Event.class)); // явный hint
Или с TypeHint для generics:
DataStream<Tuple2<String, List<Long>>> stream = source
.map(x -> ...)
.returns(new TypeHint<Tuple2<String, List<Long>>>() {});
TypeInformation — это объект runtime, содержащий info о типе: classes, field types, serializer. Через TypeInformation.getKey(...) и подобные методы он используется внутри Flink для всего.
Иерархия типов в Flink
Flink выбирает serializer на основе типа. Иерархия (от лучшего к худшему):
POJO: рекомендованный путь
POJO в Flink-смысле — это класс, удовлетворяющий правилам:
- Public класс (не nested non-static, не anonymous).
- Public no-arg constructor.
- Все поля либо
public, либо имеют public getter/setter соответствующего naming pattern.
Пример правильного POJO:
public class Transaction {
public String id;
public String userId;
public BigDecimal amount;
public long timestamp;
public Transaction() {} // public no-arg constructor — обязателен
public Transaction(String id, String userId, BigDecimal amount, long timestamp) {
this.id = id;
this.userId = userId;
this.amount = amount;
this.timestamp = timestamp;
}
}
Или с приватными полями + getters/setters:
public class Transaction {
private String id;
private String userId;
private BigDecimal amount;
private long timestamp;
public Transaction() {}
// Getters и setters для каждого поля
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
// и так далее
}
Java records (Java 17+) — тоже работают как POJO в Flink 2.x:
public record Transaction(String id, String userId, BigDecimal amount, long timestamp) {}
Records автоматически дают: final fields, getter методы (имя поля), equals/hashCode, toString. Flink распознаёт records как POJOs. Это рекомендованный путь для новых типов в Flink 2.x.
Что портит POJO
Часто люди создают “почти POJO”, и Flink сваливается в Kryo:
// НЕ POJO — нет public no-arg constructor
public class Transaction {
public final String id;
public Transaction(String id) { this.id = id; }
}
// НЕ POJO — приватное поле БЕЗ getter
public class Transaction {
private String id;
public String getRandomName() { return id; } // wrong naming
}
// НЕ POJO — final поле без proper getter naming
public class Transaction {
private final String id;
// нет getId()
}
Как проверить, что класс — POJO: запустите job в local mode, в логах увидите либо “GenericType” (Kryo fallback) либо “PojoType” (POJO recognized). Также можно явно:
TypeInformation<Transaction> typeInfo = TypeInformation.of(Transaction.class);
System.out.println(typeInfo); // "PojoType<Transaction>" или "GenericType<Transaction>"
Tuple: legacy, но рабочий
Flink имеет Tuple1, Tuple2, …, Tuple25 — generic classes для tuples:
DataStream<Tuple2<String, Long>> counts = words
.map(w -> Tuple2.of(w, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.keyBy(t -> t.f0)
.sum(1);
Tuple — это POJO с public полями f0, f1, …, fN. Flink имеет встроенный TupleTypeInfo, очень быстрый serializer.
Когда использовать Tuple:
- Быстрый prototyping.
- Очень простые случаи, где named POJO избыточен.
- Legacy code.
Когда НЕ использовать Tuple:
- Production code с осмысленными доменами — POJO с named fields читается намного лучше.
- Когда полей больше 4-5 —
tuple.f0,tuple.f1,tuple.f2непонятно.
Best practice: используйте POJO/records для production, Tuple для prototyping.
Avro: для schema evolution и cross-service
Avro — schema-aware формат. Используется широко в Kafka-стеке (через Confluent Schema Registry).
Avro Deep-Dive в Kafka Schema RegistryВ Flink есть полноценная поддержка Avro:
// Avro generated class (через Avro IDL or Schema)
public class UserEvent extends SpecificRecordBase {
private String userId;
private long timestamp;
private String eventType;
// generated code
}
// Использование в Flink
DataStream<UserEvent> events = env
.fromSource(
kafkaSourceWithAvroSchema(),
ws,
"Kafka Avro Source"
);
Avro generated classes автоматически распознаются Flink (если есть зависимость на flink-avro). Используется AvroSerializer, который быстрый и schema-evolution friendly.
Кейсы:
- Inter-service compatibility (multiple producers/consumers одного топика).
- Schema evolution (добавление/удаление полей без breakage).
- Большие enterprise pipelines с governance.
Цена: дополнительная сложность (schema registry, code generation). Для одиночного Flink-сервиса POJO проще.
Kryo: fallback, которого надо избегать
Если Flink не может распознать тип как POJO, Tuple, Avro и так далее, он fallback на Kryo — generic Java сериализатор.
Проблемы Kryo:
- Медленный: 5-10x slower, чем POJO serializer.
- Большой output: 2-3x больше байт, чем POJO.
- Не schema-evolvable: добавил поле в класс — старые savepoints/checkpoints невалидны.
- Skipping changes при schema mismatch — данные могут читаться неправильно без warning.
Когда Flink fallback на Kryo:
- Класс — не POJO (нарушены правила, например, private field без getter).
- Класс — third-party (не контролируете), не POJO-compatible.
- Java collection types в state (List, Map of complex types).
- Generic типы с erasure (например,
Object).
Как обнаружить Kryo fallback
- Логи: при старте job логи показывают “Cannot use Kryo serializer for…” или “Falling back to Kryo serialization for…”.
- Web UI Configuration tab: показывает type information для каждого оператора.
- Программно:
TypeInformation.of(MyClass.class).toString()— если “GenericType<MyClass>” — это Kryo. - Метрики: высокий time spent in serialization (через flame graphs).
Как избегать Kryo
- Сделайте класс POJO-compatible: добавьте public no-arg constructor, getters/setters.
- Используйте records: автоматически POJO-compatible.
- Используйте Avro/Protobuf для cross-service.
- Register custom serializer, если класс действительно не может быть POJO:
env.getConfig().registerKryoType(MyClass.class);
env.getConfig().addDefaultKryoSerializer(MyClass.class, MyCustomSerializer.class);
Это всё ещё Kryo, но с вашей кастомной логикой — может быть быстрее, чем generic Kryo.
- Disable Kryo fallback (для catching проблем в dev):
env.getConfig().disableGenericTypes();
Это сделает job exception вместо fallback — поможет catch проблемы рано.
В production НЕ disable generic types — иначе любой missed POJO bag упадёт job. В DEV — да, для catching. В production — мониторьте через метрики и логи.
Сравнение производительности
Benchmarks из Flink community (числа упрощены, конкретные значения зависят от типа объекта):
| Serializer | Speed | Size | Schema evolution |
|---|---|---|---|
| Primitive (int, long) | 100% (baseline) | 100% | N/A |
Tuple2<String, Long> | 95% | 110% | No |
| POJO (Transaction) | 90% | 120% | Limited (через registered Pojo evolution) |
| Avro | 80% | 130% | Yes (с registry) |
| Kryo | 10-20% | 250-300% | No |
Вывод: POJO с правилами — sweet spot. Avro когда нужна schema evolution. Kryo — никогда осознанно.
State serialization: особо важно
В state backend (RocksDB, ForSt, HashMap) каждое state read/write — это serialization/deserialization. Если у вас миллион keys, и для каждого state операция требует 5 мс из-за Kryo вместо 0.5 мс из-за POJO — это разница 10x на throughput.
Особенно важно для:
- MapState с complex value types: каждая операция get/put сериализует value.
- Checkpoint duration: сериализация state в snapshot — главный contributor.
- Recovery time: после failure deserialization из snapshot.
Правило: state value types ВСЕГДА должны быть POJOs или Tuples. Никогда не используйте Kryo для state, если есть выбор.
Production checklist
-
Все custom event/state classes — POJOs или records. Запустите job в dev, проверьте логи на “Kryo fallback”.
-
Generic types в state — избегайте: вместо
ValueState<Object>илиMapState<String, Object>определите конкретные types. -
Java records для immutable events — отличный выбор в Flink 2.x.
-
Avro для cross-service events в Kafka topics (схемы в Schema Registry).
-
disableGenericTypes()в local dev и testing: помогает catch проблемы рано. -
Регулярный аудит: после изменений типов проверьте, что они всё ещё распознаются как POJO (logs / TypeInformation.toString()).
-
Performance test: если throughput неудовлетворителен — проверьте flame graphs на serialization overhead.
Попробуй сам
Создайте простой test:
-
Намеренно сломайте POJO: убрите public no-arg constructor у Transaction класса в WordCount-like pipeline. Запустите job. Какие сообщения в логах? Какая разница в performance vs нормального POJO?
-
Перепишите Transaction как Java record:
public record Transaction(String id, String userId, BigDecimal amount, long timestamp) {}Запустите job. Распознан как POJO? Какой serializer используется?
-
Добавьте
env.getConfig().disableGenericTypes()в начале main(). Запустите. Если есть Kryo fallback — exception. Если нет — всё работает быстрее (без overhead reflection). -
Bonus: сравните size of state в checkpoint при POJO vs Kryo — для big state может быть существенная разница в размере на disk.