Learning Platform
Глоссарий Troubleshooting
Урок 18.01 · 25 мин
Продвинутый
TypeInformationTypeHintTypeExtractorPOJOGeneric TypeSerializer Selection

TypeInformation система

Flink — это движок на JVM, но он не доверяет стандартной Java-сериализации. Когда вы пишете DataStream{'<'}MyEvent{'>'}, Flink не вызывает ObjectOutputStream.writeObject(event) где-то под капотом. Вместо этого он на этапе построения JobGraph решает: какой именно сериализатор использовать для этого типа, какие поля у него есть, можно ли применить zero-copy при шаффле. Эта система называется TypeInformation, и её решения определяют пропускную способность вашего pipeline настолько же сильно, как параллелизм или backpressure.

Урок разбирает, как Flink выводит TypeInformation, какая иерархия типов есть в системе, и почему TypeHint иногда становится единственным способом сохранить производительность.

Tungsten: unsafe memory и code generation

Стандартная Java-сериализация (Serializable + ObjectOutputStream) даёт два неприемлемых для streaming-движка свойства. Во-первых, она пишет имена классов в каждое сообщение — это десятки байт overhead на каждый event. Во-вторых, она использует reflection в runtime для каждой записи, что добавляет микросекунды на event. На потоке в миллион событий в секунду это превращается в неприемлемую CPU-нагрузку и удваивает объём данных, проходящих по сети.

Flink решает обе проблемы через статический анализ типов. На этапе построения JobGraph (ещё до старта job-а) фреймворк рекурсивно проходит по каждому типу, определяет его структуру и подбирает оптимальный сериализатор. В runtime сериализатор работает напрямую с полями — без reflection, без записи имён классов, без BSON-подобных tag-ов.

DataStream{'<'}Order{'>'} orders = env.fromSource(kafkaSource, ...);

// На этапе построения JobGraph Flink вычислил:
// - Order это POJO с полями (String userId, long amount, Instant ts)
// - сериализатор = PojoSerializer{'<'}Order{'>'} с 3 field-сериализаторами
// - schema evolution возможна (новые поля добавляются без поломки старых state)
// - byte layout: var-int len + utf8 userId + 8 bytes long + 8 bytes long

Эта информация прикрепляется к каждому stream в графе и передаётся каждому оператору. На уровне runtime сериализатор уже фиксирован — никаких решений в hot path не принимается.

NOTE

TypeInformation решается на этапе клиента (driver), который строит JobGraph. Это происходит ДО передачи job-а в JobManager. Если на клиенте отсутствует класс события (например он находится только в JAR-е TaskManager-а), TypeExtractor упадёт с ClassNotFoundException ещё на стадии построения графа.


Иерархия TypeInformation

Flink делит все типы на семь категорий, и для каждой категории есть свой сериализатор с предсказуемым layout:

Иерархия TypeInformation в Flink

TypeInformation (abstract root)

TypeInformation abstract класс — корень иерархии. Каждый конкретный subtype знает, как создать соответствующий TypeSerializer и TypeComparator для своих экземпляров. Сериализатор отвечает за write/read, comparator — за сравнения для keyBy и сортировки.
BasicTypeInfoBasicTypeInfo: int, long, double, String, Boolean — встроенные типы JDK. Сериализуются напрямую как fixed-width bytes (или var-int для строк). Самый быстрый путь — zero overhead.
TupleTypeInfoTupleTypeInfo: Tuple1-Tuple25 из flink-core. Сериализуется как concat полей: каждое поле использует свой суб-сериализатор. Compact byte layout, хорош для intermediate представлений.
PojoTypeInfoPojoTypeInfo: Java POJO с public default constructor и public/getter+setter полями. Поля сортируются по имени для детерминированного layout. Поддерживает schema evolution — добавление/удаление полей не ломает state restore.
RowTypeInfoRowTypeInfo: Row из Table/SQL API — позиционный набор полей с types определёнными в LogicalType. Используется внутри Table planner для intermediate представлений.
CompositeTypesListTypeInfo / MapTypeInfo / ArrayTypeInfo: композитные коллекции с указанным TypeInformation для элементов. Сериализуются как size + N сериализованных элементов.
GenericTypeInfo (Kryo)GenericTypeInfo: fallback для всего, что не подпало под другие категории. Использует Kryo для сериализации, что в 5-10 раз медленнее POJO. КРАСНЫЙ ФЛАГ — если ваш main payload type оказался GenericType, производительность будет деградирована.

Каждая категория имеет собственный сериализатор: IntSerializer, LongSerializer, PojoSerializer{'<'}T{'>'}, TupleSerializer{'<'}T{'>'}, …, KryoSerializer{'<'}T{'>'}. Производительность падает по мере спуска вниз по этой иерархии. Базовые типы — десятки наносекунд на event. POJO — сотни наносекунд. Kryo — микросекунды.


Как работает TypeExtractor

Когда вы пишете лямбду map(event -{'>'} new Output(event.userId)), Flink должен вывести TypeInformation для Output без вашей явной подсказки. Это делает класс TypeExtractor — рекурсивный анализатор Java reflection и generic signatures.

Алгоритм такой. TypeExtractor получает на вход Type (из java.lang.reflect.Type — это может быть Class, ParameterizedType, TypeVariable). Дальше:

  1. Если тип примитивный, или один из known basic types — возвращает соответствующий BasicTypeInfo.
  2. Если тип это Tuple1-Tuple25 — рекурсивно извлекает типы параметров и оборачивает в TupleTypeInfo.
  3. Если тип проходит проверки на POJO — пытается построить PojoTypeInfo, рекурсивно обходя все поля.
  4. Если ничего не сработало — возвращает GenericTypeInfo с Kryo fallback.

POJO-критерии строгие. Класс должен:

  • иметь public (или package-private с no-args) default constructor;
  • все поля должны быть public или иметь public getter/setter с правильными именами;
  • поля сериализуемых типов (любой supported тип, включая другие POJO рекурсивно);
  • класс не должен реализовать java.io.Serializable с custom writeObject (это убивает POJO-detection в старых версиях, в Flink 2.2 — менее критично, но всё ещё рискованно).
// POJO — будет распознан как PojoTypeInfo
public static class Order {
    public String userId;     // public field — OK
    public long amount;
    private Instant ts;        // private — нужен getter/setter
    public Instant getTs() { return ts; }
    public void setTs(Instant ts) { this.ts = ts; }

    public Order() {}          // default constructor — обязателен
    public Order(String u, long a, Instant t) { ... }
}

// НЕ POJO — нет default constructor, упадёт в Kryo
public static class FailedPojo {
    public final String userId;
    public final long amount;
    public FailedPojo(String u, long a) { this.userId = u; this.amount = a; }
}
WARNING

Самая частая причина падения в Kryo — отсутствие default constructor (особенно у data classes из Kotlin/Scala). В Kotlin POJO выглядит так: data class Order(var userId: String = "", var amount: Long = 0L) — обязательно var и значения по умолчанию для каждого поля.


TypeHint и преодоление type erasure

Java стирает generic-типы во время компиляции — это называется type erasure. Когда вы пишете map(event -{'>'} Tuple2.of(event.userId, 1L)), в bytecode возвращаемый тип лямбды это просто Tuple2, без параметров. TypeExtractor не может извлечь {'<'}String, Long{'>'} из лямбды — информация была стёрта.

В таких случаях Flink бросает исключение с сообщением “The return type of function ‘X’ could not be determined automatically”. Решение — TypeHint:

DataStream{'<'}Tuple2{'<'}String, Long{'>'}{'>'} result = orders
    .map(order -{'>'} Tuple2.of(order.userId, 1L))
    .returns(new TypeHint{'<'}Tuple2{'<'}String, Long{'>'}{'>'}() {});

TypeHint это anonymous subclass — компилятор сохраняет generic-параметры в его суперклассе, и Flink может извлечь их через reflection в runtime. Это единственный способ передать generic-параметры через границу type erasure.

Альтернатива — явное Types.TUPLE:

.returns(Types.TUPLE(Types.STRING, Types.LONG))

Этот синтаксис менее декларативный, но не использует anonymous class, что иногда проще для генерируемого кода или для случаев, когда тип определяется динамически.


TypeInformation на уровне DataStream API

Каждый DataStream{'<'}T{'>'} несёт TypeInformation{'<'}T{'>'}. Вы можете получить его явно:

TypeInformation{'<'}Order{'>'} typeInfo = orders.getType();
System.out.println(typeInfo);
// Выведет: PojoType{'<'}com.example.Order, fields = [amount: Long, ts: Instant, userId: String]{'>'}

Это полезно для дебага: если у вас неожиданно медленная пайплайн, первым делом проверьте — все ли operator-output типы являются POJO/Basic/Tuple. Если хоть один промежуточный поток показывает GenericType{'<'}...{'>'}, у вас Kryo fallback.

Можно перехватить решение TypeExtractor через явное указание типа:

DataStream{'<'}MyType{'>'} typed = stream
    .map(rawString -{'>'} parse(rawString))
    .returns(TypeInformation.of(MyType.class));

Или через Types:

.returns(Types.POJO(MyType.class))
.returns(Types.ROW_NAMED(new String[]{"id", "amount"}, Types.STRING, Types.LONG))
Жизненный цикл TypeInformation: от лямбды до сериализатора

TypeInformation в Table/SQL API

В Table API типы выражаются через DataType и LogicalType — это новая система типов, которая моделирует SQL-семантику (NULL allowable, precision, time semantics). Преобразование между DataStream API и Table API происходит через TypeInformation как мост:

// Table -{'>'} DataStream: конвертация LogicalType -{'>'} TypeInformation
Table table = tableEnv.sqlQuery("SELECT user_id, count(*) FROM orders GROUP BY user_id");
DataStream{'<'}Row{'>'} stream = tableEnv.toChangelogStream(table);
// stream.getType() будет ExternalTypeInfo{'<'}Row{'>'} с soft DataType reference

// DataStream -{'>'} Table: конвертация TypeInformation -{'>'} DataType
DataStream{'<'}Order{'>'} orders = env.fromSource(...);
Table ordersTable = tableEnv.fromDataStream(orders);
// Поля Order автоматически становятся столбцами таблицы

В Flink 2.2 был доработан мост: tableEnv.fromDataStream(stream, Schema.newBuilder()...) позволяет явно указать SQL-схему даже для streams с генерируемыми типами. Это критично для пайплайнов, которые смешивают DataStream и Table API в одном job-е.


Когда TypeExtractor молча проигрывает

Самый болезненный сценарий: TypeExtractor не падает, но возвращает GenericTypeInfo вместо ожидаемого PojoTypeInfo. Job запускается, работает, но в 5-10 раз медленнее, чем мог бы. Без явной диагностики это никогда не всплывёт.

Случаи, когда POJO-detection ломается:

  • Inner class без статической ссылкиstatic class нужен для top-level POJO; nested non-static несут implicit reference на outer instance.
  • Final fields без конструктора-сеттера — Flink не может set value через reflection в final field (в Java 17+ это ещё больше затруднено).
  • Поля типа Object или generic-неинстанцированный тип — TypeExtractor не знает, что туда писать.
  • Поля с private getters/setters или несовпадающим naming pattern (getFoo без setFoo или с разным типом).
  • Класс из external library, помеченный как @Generated или с агрессивной обфускацией.

Диагностика — флаг JVM -Dlog4j2.flink.typeinfo.level=DEBUG, или явная проверка через TypeInformation.of(MyType.class).toString() в driver-коде:

TypeInformation{'<'}MyType{'>'} info = TypeInformation.of(MyType.class);
if (info instanceof GenericTypeInfo) {
    throw new RuntimeException(
        "MyType fell back to Kryo: " + info +
        ". Check class is POJO-compatible."
    );
}

В Flink 2.0+ появился флаг pipeline.strict-pojo-validation (доработан в 2.2 — в следующем уроке). Включает обязательное падение при Kryo fallback, что является production-grade safety net.


Попробуй сам

  1. Проверить TypeInformation своих типов. В драйвер-коде вашего job-а вставьте TypeInformation.of(YourType.class).toString() для каждого payload type. Убедитесь, что нигде не возвращается GenericType.

  2. Сломать POJO detection. Возьмите рабочий POJO и удалите default constructor. Запустите job, посмотрите на логи: вы увидите warning о Kryo fallback. Сравните throughput до и после на одинаковом потоке.

  3. TypeHint для Tuple. Напишите job, который мапит в Tuple2{'<'}String, Long{'>'} без .returns(...). Поймайте InvalidTypesException. Добавьте TypeHint — убедитесь, что job стартует.

Проверка знанийKnowledge check
Вы написали Flink job: stream.map(order -> new Result(order.userId, order.amount, order.tags)). Где Result — POJO с public default constructor и тремя public полями (String, long, List<String>). Job запускается, но throughput в 6 раз ниже ожидаемого. Что проверить первым делом и почему?
ОтветAnswer
Первым делом проверить TypeInformation поля tags — скорее всего там GenericType. List<String> — это generic тип, и TypeExtractor может потерять параметр (String) если объявление поля сделано без явной аннотации. Когда параметр generic-а потерян, TypeExtractor использует GenericType с Kryo, что в 5-10 раз медленнее POJO. Шесть-кратная деградация именно об этом. Решения: (1) использовать конкретный тип вместо List<String> — например ArrayList<String>; (2) явно указать TypeInformation для Result в .returns(Types.POJO(Result.class)); (3) добавить @TypeInfo аннотацию на поле tags для подсказки TypeExtractor; (4) проверить через TypeInformation.of(Result.class).toString() — там должно быть PojoType с tags: BasicArrayTypeInfo<String>, а не GenericType<List>.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Когда именно Flink принимает решение о выборе TypeSerializer для конкретного типа?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4