TypeInformation система
Flink — это движок на JVM, но он не доверяет стандартной Java-сериализации. Когда вы пишете DataStream{'<'}MyEvent{'>'}, Flink не вызывает ObjectOutputStream.writeObject(event) где-то под капотом. Вместо этого он на этапе построения JobGraph решает: какой именно сериализатор использовать для этого типа, какие поля у него есть, можно ли применить zero-copy при шаффле. Эта система называется TypeInformation, и её решения определяют пропускную способность вашего pipeline настолько же сильно, как параллелизм или backpressure.
Урок разбирает, как Flink выводит TypeInformation, какая иерархия типов есть в системе, и почему TypeHint иногда становится единственным способом сохранить производительность.
Зачем Flink собственная система типов
Стандартная Java-сериализация (Serializable + ObjectOutputStream) даёт два неприемлемых для streaming-движка свойства. Во-первых, она пишет имена классов в каждое сообщение — это десятки байт overhead на каждый event. Во-вторых, она использует reflection в runtime для каждой записи, что добавляет микросекунды на event. На потоке в миллион событий в секунду это превращается в неприемлемую CPU-нагрузку и удваивает объём данных, проходящих по сети.
Flink решает обе проблемы через статический анализ типов. На этапе построения JobGraph (ещё до старта job-а) фреймворк рекурсивно проходит по каждому типу, определяет его структуру и подбирает оптимальный сериализатор. В runtime сериализатор работает напрямую с полями — без reflection, без записи имён классов, без BSON-подобных tag-ов.
DataStream{'<'}Order{'>'} orders = env.fromSource(kafkaSource, ...);
// На этапе построения JobGraph Flink вычислил:
// - Order это POJO с полями (String userId, long amount, Instant ts)
// - сериализатор = PojoSerializer{'<'}Order{'>'} с 3 field-сериализаторами
// - schema evolution возможна (новые поля добавляются без поломки старых state)
// - byte layout: var-int len + utf8 userId + 8 bytes long + 8 bytes long
Эта информация прикрепляется к каждому stream в графе и передаётся каждому оператору. На уровне runtime сериализатор уже фиксирован — никаких решений в hot path не принимается.
TypeInformation решается на этапе клиента (driver), который строит JobGraph. Это происходит ДО передачи job-а в JobManager. Если на клиенте отсутствует класс события (например он находится только в JAR-е TaskManager-а), TypeExtractor упадёт с ClassNotFoundException ещё на стадии построения графа.
Иерархия TypeInformation
Flink делит все типы на семь категорий, и для каждой категории есть свой сериализатор с предсказуемым layout:
TypeInformation (abstract root)
TypeInformation abstract класс — корень иерархии. Каждый конкретный subtype знает, как создать соответствующий TypeSerializer и TypeComparator для своих экземпляров. Сериализатор отвечает за write/read, comparator — за сравнения для keyBy и сортировки.Каждая категория имеет собственный сериализатор: IntSerializer, LongSerializer, PojoSerializer{'<'}T{'>'}, TupleSerializer{'<'}T{'>'}, …, KryoSerializer{'<'}T{'>'}. Производительность падает по мере спуска вниз по этой иерархии. Базовые типы — десятки наносекунд на event. POJO — сотни наносекунд. Kryo — микросекунды.
Как работает TypeExtractor
Когда вы пишете лямбду map(event -{'>'} new Output(event.userId)), Flink должен вывести TypeInformation для Output без вашей явной подсказки. Это делает класс TypeExtractor — рекурсивный анализатор Java reflection и generic signatures.
Алгоритм такой. TypeExtractor получает на вход Type (из java.lang.reflect.Type — это может быть Class, ParameterizedType, TypeVariable). Дальше:
- Если тип примитивный, или один из known basic types — возвращает соответствующий
BasicTypeInfo. - Если тип это
Tuple1-Tuple25— рекурсивно извлекает типы параметров и оборачивает вTupleTypeInfo. - Если тип проходит проверки на POJO — пытается построить
PojoTypeInfo, рекурсивно обходя все поля. - Если ничего не сработало — возвращает
GenericTypeInfoс Kryo fallback.
POJO-критерии строгие. Класс должен:
- иметь
public(или package-private с no-args) default constructor; - все поля должны быть
publicили иметьpublicgetter/setter с правильными именами; - поля сериализуемых типов (любой supported тип, включая другие POJO рекурсивно);
- класс не должен реализовать
java.io.Serializableс customwriteObject(это убивает POJO-detection в старых версиях, в Flink 2.2 — менее критично, но всё ещё рискованно).
// POJO — будет распознан как PojoTypeInfo
public static class Order {
public String userId; // public field — OK
public long amount;
private Instant ts; // private — нужен getter/setter
public Instant getTs() { return ts; }
public void setTs(Instant ts) { this.ts = ts; }
public Order() {} // default constructor — обязателен
public Order(String u, long a, Instant t) { ... }
}
// НЕ POJO — нет default constructor, упадёт в Kryo
public static class FailedPojo {
public final String userId;
public final long amount;
public FailedPojo(String u, long a) { this.userId = u; this.amount = a; }
}
Самая частая причина падения в Kryo — отсутствие default constructor (особенно у data classes из Kotlin/Scala). В Kotlin POJO выглядит так: data class Order(var userId: String = "", var amount: Long = 0L) — обязательно var и значения по умолчанию для каждого поля.
TypeHint и преодоление type erasure
Java стирает generic-типы во время компиляции — это называется type erasure. Когда вы пишете map(event -{'>'} Tuple2.of(event.userId, 1L)), в bytecode возвращаемый тип лямбды это просто Tuple2, без параметров. TypeExtractor не может извлечь {'<'}String, Long{'>'} из лямбды — информация была стёрта.
В таких случаях Flink бросает исключение с сообщением “The return type of function ‘X’ could not be determined automatically”. Решение — TypeHint:
DataStream{'<'}Tuple2{'<'}String, Long{'>'}{'>'} result = orders
.map(order -{'>'} Tuple2.of(order.userId, 1L))
.returns(new TypeHint{'<'}Tuple2{'<'}String, Long{'>'}{'>'}() {});
TypeHint это anonymous subclass — компилятор сохраняет generic-параметры в его суперклассе, и Flink может извлечь их через reflection в runtime. Это единственный способ передать generic-параметры через границу type erasure.
Альтернатива — явное Types.TUPLE:
.returns(Types.TUPLE(Types.STRING, Types.LONG))
Этот синтаксис менее декларативный, но не использует anonymous class, что иногда проще для генерируемого кода или для случаев, когда тип определяется динамически.
TypeInformation на уровне DataStream API
Каждый DataStream{'<'}T{'>'} несёт TypeInformation{'<'}T{'>'}. Вы можете получить его явно:
TypeInformation{'<'}Order{'>'} typeInfo = orders.getType();
System.out.println(typeInfo);
// Выведет: PojoType{'<'}com.example.Order, fields = [amount: Long, ts: Instant, userId: String]{'>'}
Это полезно для дебага: если у вас неожиданно медленная пайплайн, первым делом проверьте — все ли operator-output типы являются POJO/Basic/Tuple. Если хоть один промежуточный поток показывает GenericType{'<'}...{'>'}, у вас Kryo fallback.
Можно перехватить решение TypeExtractor через явное указание типа:
DataStream{'<'}MyType{'>'} typed = stream
.map(rawString -{'>'} parse(rawString))
.returns(TypeInformation.of(MyType.class));
Или через Types:
.returns(Types.POJO(MyType.class))
.returns(Types.ROW_NAMED(new String[]{"id", "amount"}, Types.STRING, Types.LONG))
TypeInformation в Table/SQL API
В Table API типы выражаются через DataType и LogicalType — это новая система типов, которая моделирует SQL-семантику (NULL allowable, precision, time semantics). Преобразование между DataStream API и Table API происходит через TypeInformation как мост:
// Table -{'>'} DataStream: конвертация LogicalType -{'>'} TypeInformation
Table table = tableEnv.sqlQuery("SELECT user_id, count(*) FROM orders GROUP BY user_id");
DataStream{'<'}Row{'>'} stream = tableEnv.toChangelogStream(table);
// stream.getType() будет ExternalTypeInfo{'<'}Row{'>'} с soft DataType reference
// DataStream -{'>'} Table: конвертация TypeInformation -{'>'} DataType
DataStream{'<'}Order{'>'} orders = env.fromSource(...);
Table ordersTable = tableEnv.fromDataStream(orders);
// Поля Order автоматически становятся столбцами таблицы
В Flink 2.2 был доработан мост: tableEnv.fromDataStream(stream, Schema.newBuilder()...) позволяет явно указать SQL-схему даже для streams с генерируемыми типами. Это критично для пайплайнов, которые смешивают DataStream и Table API в одном job-е.
Когда TypeExtractor молча проигрывает
Самый болезненный сценарий: TypeExtractor не падает, но возвращает GenericTypeInfo вместо ожидаемого PojoTypeInfo. Job запускается, работает, но в 5-10 раз медленнее, чем мог бы. Без явной диагностики это никогда не всплывёт.
Случаи, когда POJO-detection ломается:
- Inner class без статической ссылки —
static classнужен для top-level POJO; nested non-static несут implicit reference на outer instance. - Final fields без конструктора-сеттера — Flink не может set value через reflection в final field (в Java 17+ это ещё больше затруднено).
- Поля типа
Objectили generic-неинстанцированный тип — TypeExtractor не знает, что туда писать. - Поля с private getters/setters или несовпадающим naming pattern (
getFooбезsetFooили с разным типом). - Класс из external library, помеченный как
@Generatedили с агрессивной обфускацией.
Диагностика — флаг JVM -Dlog4j2.flink.typeinfo.level=DEBUG, или явная проверка через TypeInformation.of(MyType.class).toString() в driver-коде:
TypeInformation{'<'}MyType{'>'} info = TypeInformation.of(MyType.class);
if (info instanceof GenericTypeInfo) {
throw new RuntimeException(
"MyType fell back to Kryo: " + info +
". Check class is POJO-compatible."
);
}
В Flink 2.0+ появился флаг pipeline.strict-pojo-validation (доработан в 2.2 — в следующем уроке). Включает обязательное падение при Kryo fallback, что является production-grade safety net.
Попробуй сам
-
Проверить TypeInformation своих типов. В драйвер-коде вашего job-а вставьте
TypeInformation.of(YourType.class).toString()для каждого payload type. Убедитесь, что нигде не возвращаетсяGenericType. -
Сломать POJO detection. Возьмите рабочий POJO и удалите default constructor. Запустите job, посмотрите на логи: вы увидите warning о Kryo fallback. Сравните throughput до и после на одинаковом потоке.
-
TypeHint для Tuple. Напишите job, который мапит в
Tuple2{'<'}String, Long{'>'}без.returns(...). ПоймайтеInvalidTypesException. Добавьте TypeHint — убедитесь, что job стартует.