Learning Platform
Глоссарий Troubleshooting
Урок 17.01 · 22 мин
Средний
Schema EvolutionPOJOAvroOperator UIDStateSerialization

Schema evolution для state: POJO, Avro и UID

Стримовый джоб живёт месяцами и годами. За это время бизнес-требования меняются: добавляется поле в события, переименовывается, меняется тип. Если бы Flink-стейт был хрупким — каждое изменение требовало бы полного передеплоя с обнулением, потерей агрегаций, истории. К счастью, Flink поддерживает schema evolution — возможность изменять структуру state-классов и продолжать работу с уже накопленным стейтом.

Но есть подводные камни. Главный — это operator UID. Потерять UID = потерять state. Этот урок — про правила эволюции POJO и Avro, и почему UID нужно проставлять ВСЕМ stateful операторам с первого дня.


Что такое schema evolution для state

Когда оператор пишет state (ValueState, ListState, MapState), Flink сериализует объект через TypeSerializer. При checkpoint этот сериализованный stream сохраняется в state backend (RocksDB), а затем в S3. При восстановлении из savepoint — десериализуется обратно.

Schema evolution — это способность Flink восстановить state при изменении класса:

  • Добавлено новое поле (значение по умолчанию для старых записей).
  • Удалено поле.
  • Переименовано поле (для Avro через aliases).
  • Изменён primitive тип (int -> long, с проверкой совместимости).

Без schema evolution: меняете класс — старый сериализатор не может десериализовать новые данные, или наоборот. Джоб не стартует, savepoint не восстанавливается.

В Flink сериализаторы делятся на категории:

  • PojoSerializer для POJO-классов (стандартный Java/Kotlin класс с getters/setters).
  • AvroSerializer для Avro-классов (генерированных из .avsc схем).
  • KryoSerializer — fallback для всего, что не POJO и не Avro. Кошмарен для schema evolution.
Avro schema evolution: backward и forward compatibility
Serializer-ы и их поведение при evolution
PojoSerializerСтандартный для POJO-классов. Поддерживает: добавление и удаление полей. Не поддерживает: изменение типа поля, изменение базового класса.
AvroSerializerЛучшая поддержка evolution: aliases, default values, type promotion (int->long, float->double). Используется через генерацию из .avsc.
KryoSerializerFallback. НЕ поддерживает schema evolution. Любое изменение класса = corrupted state.
Custom TypeSerializerМожно написать свой. Полный контроль, но и полная ответственность за обратную совместимость.

POJO: что можно и что нельзя

POJO в Flink — это Java/Kotlin класс с публичными полями ИЛИ с getter/setter pair-ами, имеющий public no-args constructor. Flink определяет POJO через type analysis: если класс удовлетворяет требованиям — используется PojoSerializer.

Что можно (forward compatible):

// Версия 1
public class Order {
    public long orderId;
    public String userId;
    public double amount;
}

// Версия 2: добавили поле
public class Order {
    public long orderId;
    public String userId;
    public double amount;
    public String currency;  // NEW
}

Это работает. При восстановлении старых записей currency будет null (для objects) или 0 (для primitives). Можно добавлять любое количество новых полей.

// Версия 3: удалили поле amount
public class Order {
    public long orderId;
    public String userId;
    public String currency;
}

Тоже работает. Старое поле amount игнорируется при десериализации.

Что НЕЛЬЗЯ (breaking change):

  • Изменить тип поля (long -> String).
  • Переименовать поле (Flink видит это как удаление+добавление, но потеряет данные).
  • Изменить класс на не-POJO (например, добавить final field без default value).
  • Поменять Order на наследника или интерфейс.

При попытке restore с несовместимым классом джоб упадёт с StateMigrationException.

WARNING

Переименование полей в POJO — это операция, которая молча теряет данные старого поля. Если переименование критично, делайте это через две версии: сначала добавить новое поле и копировать данные через ProcessFunction, потом удалить старое.


Avro: золотой стандарт evolution

Если у вас сложная эволюция (часто меняющиеся события), используйте Avro классы для state. Avro spec изначально проектировался для schema evolution, имеет богатые инструменты:

  • aliases: новое поле может ссылаться на старое имя.
  • default values: значение по умолчанию для новых полей при восстановлении старых записей.
  • type promotion: int -> long, float -> double, и т.д.
  • union types: nullable через ["null", "string"].
// orders.avsc - версия 1
{
  "type": "record",
  "name": "Order",
  "namespace": "com.acme.events",
  "fields": [
    {"name": "orderId", "type": "long"},
    {"name": "userId", "type": "string"},
    {"name": "amount", "type": "double"}
  ]
}

// Версия 2: добавили currency с default, переименовали amount -> totalAmount через alias
{
  "type": "record",
  "name": "Order",
  "namespace": "com.acme.events",
  "fields": [
    {"name": "orderId", "type": "long"},
    {"name": "userId", "type": "string"},
    {"name": "totalAmount", "type": "double", "aliases": ["amount"]},
    {"name": "currency", "type": "string", "default": "USD"}
  ]
}

Avro плагин (avro-maven-plugin или gradle) генерирует Java-класс Order. Flink-у нужно указать AvroSerializer для этого типа:

DataStream<Order> orders = ...;
TypeInformation<Order> typeInfo = TypeInformation.of(new TypeHint<Order>(){});
// Avro classes по default используют AvroSerializer

При restore из savepoint v1 в код v2:

  • Старые записи (без currency) получают "USD" по умолчанию.
  • Старое поле amount мапится на новое totalAmount через alias.

Это и есть зрелая evolution. В крупных проектах с быстро меняющимися событиями Avro — практически дефолтный выбор для state.


Operator UID: критическая роль

Теперь самое главное. Каждый оператор в DataStream API получает operator UID — уникальный идентификатор внутри ExecutionGraph. По UID Flink сопоставляет state в savepoint с операторами в новой версии графа.

DataStream<Order> orders = env.fromSource(...)
    .uid("kafka-source-orders");  // <-- UID

DataStream<EnrichedOrder> enriched = orders
    .keyBy(o -> o.userId)
    .process(new EnrichmentFunction())
    .uid("enrichment-by-user");  // <-- UID

DataStream<AggregatedOrder> agg = enriched
    .keyBy(o -> o.region)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new SumAggregate())
    .uid("region-window-aggregate");  // <-- UID

agg.sinkTo(...)
   .uid("iceberg-sink-aggregates");  // <-- UID

Без явного UID Flink генерирует UID по хешу графа — то есть UID меняется при любом изменении до этого оператора в графе. Добавили map() в середине пайплайна — все downstream UID-ы поменялись — все downstream state потеряны.

Правило: проставляйте UID всем stateful операторам с первого дня. Это бесплатно и спасает в будущем.

DANGER

Если вы не проставили UID на stateful операторах и катите изменение графа (даже добавление безобидного map) — Flink не сможет восстановить state, потому что хеш-UID-ы поменялись. При restore из savepoint с allowNonRestoredState=true джоб запустится, но state пуст. С allowNonRestoredState=false (default) — джоб не стартует. Это часто становится драматичной потерей agg-данных в production.


Что делать, если забыли UID

Допустим, джоб уже в production без UID-ов. Стейт накоплен. Что делать?

Шаг 1: НЕ менять граф. Любое изменение — потеря state.

Шаг 2: Сделать savepoint текущего состояния (с авто-генерированными UID-ами).

Шаг 3: Изучить map UID -> state в savepoint. Используйте Flink State Processor API (см. Flink docs) для чтения metadata savepoint-а — вы увидите, какие UID-ы автогенерированы.

Шаг 4: Добавить явные UID-ы в код, СОВПАДАЮЩИЕ с автогенерированными.

// До: без UID, autogen UID = 5a7f3b2c1d4e9a8b
.process(new EnrichmentFunction())

// После: тот же UID явно
.process(new EnrichmentFunction()).uid("5a7f3b2c1d4e9a8b")

Это сохраняет state как был. Дальше можно постепенно переходить на читаемые UID-ы через специальные миграции (это уже advanced topic, см. State Processor API).

Шаг 5: Не делать так больше. В team conventions проставляйте UID на все .process(), .keyBy().window(), .fromSource(), .sinkTo() — то есть все stateful operations.


Savepoint-ы Flink версионно совместимы. Saved в Flink 1.18 — restore in 2.2 работает. Но есть нюансы:

  • Canonical format (default до Flink 2.0): savepoint в backend-агностичном формате. Медленнее, но переносим между RocksDB и HashMap.
  • Native format (default с Flink 2.0): savepoint в формате backend-а. Быстрее (incremental), но привязан к backend-у.

Для миграции версий Flink или backend-а — используйте canonical format при создании savepoint:

flink savepoint :jobId :path -type canonical

Или в FlinkDeployment:

flinkConfiguration:
  state.savepoints.format: canonical  # для миграции
  # или native для regular upgrades в той же версии

Между минорными версиями Flink (1.18 -> 1.19 -> 1.20) обычно нет проблем. Major (1.x -> 2.x) требует canonical format и тестирования.


Pre-flight check для production деплоя

Перед каждым production деплоем со state changes:

  1. Локально: restore savepoint в dev-кластер с новым кодом. Если не стартует — фикс перед деплоем.

  2. Schema validation: для Avro — mvn avro:compile валидирует совместимость с предыдущей версией .avsc (если в git есть прошлая версия — добавьте в pom check на compatibility).

  3. Test against minicluster: интеграционный тест с savepoint предыдущей версии (см. урок 4 этого модуля про CI/CD).

  4. UID audit: автоматизированно скриптом грепать все .process(), .keyBy().window(), .fromSource(), проверять, что у каждого .uid() рядом. Можно сделать ArchUnit-правилом.

# Простой grep audit
grep -rE "\.process\(|\.window\(|\.fromSource\(|\.sinkTo\(" src/main/java | \
  grep -v "\.uid("
# Должно быть пусто
  1. Rollback plan: знать savepoint для отката, путь зарегистрирован в external store.

Ключевые выводы

  1. Schema evolution в Flink возможна для POJO (add/remove полей) и Avro (полная поддержка: aliases, defaults, type promotion). KryoSerializer schema evolution НЕ поддерживает.

  2. POJO правила: можно добавлять/удалять поля. НЕЛЬЗЯ менять тип, переименовывать, менять иерархию.

  3. Avro — золотой стандарт для сложной evolution. Aliases для переименования, defaults для новых полей, type promotion.

  4. Operator UID — критичен. Без UID Flink генерирует хеш по графу, любое изменение = потеря state. Проставляйте .uid() всем stateful операторам ВСЕГДА.

  5. Восстановление без UID возможно через State Processor API: читаете autogen UID-ы из savepoint metadata, проставляете их явно в код, продолжаете жить.

  6. Pre-flight check: restore в dev, valid Avro schema, integration test, UID audit, rollback plan. Без этого production-деплои со state changes — лотерея.

  7. Между версиями Flink используйте canonical format savepoint для миграции major-версий. Native — для regular weekly upgrades.

Проверка знанийKnowledge check
Команда запускает новую версию Flink-джоба. Изменения: добавили operator filter() в начало графа, изменили POJO State class из class Order { public long id; public double amount; } в class Order { public long id; public Long amount; }. Какие проблемы при restore из savepoint и как фиксить?
ОтветAnswer
Две независимые проблемы, обе серьёзные. Проблема 1: добавили filter() в начало графа БЕЗ UID на последующих операторах. Без явных UID Flink генерирует хеш-UID по позиции в графе. Filter в начале графа = все downstream UID-ы изменились. При restore Flink ищет state по UID, не находит совпадений - выбрасывает StateMigrationException. С allowNonRestoredState=true джоб запустится с пустым state (потеря всех агрегаций). Фикс: проставить явные UID на все stateful операторы (.uid("enrichment"), .uid("window-agg") и т.д.). Если в проде уже без UID - перед добавлением filter сначала: (1) сделать savepoint текущего состояния; (2) использовать State Processor API для чтения autogen UID-ов из savepoint metadata; (3) проставить эти UID-ы явно в коде; (4) пересабмитить с новым кодом - state восстанавливается; (5) только потом можно добавить filter. Проблема 2: изменили тип amount с primitive double на boxed Long (заметьте, не Long, а Long - typo? double->Long тоже несовместимо). PojoSerializer НЕ поддерживает изменение типа поля. double и Long - разные типы, разные сериализации. При restore: ClassCastException или StateMigrationException. Если предположить, что хотели Double вместо double - это тоже type change (primitive vs boxed) - не поддерживается. Фикс: либо вернуть double, либо мигрировать через две версии. Версия N: добавить новое поле amountLong как Long (default null), скопировать данные из amount в amountLong через ProcessFunction. Восстановить state. Версия N+1: удалить amount, использовать только amountLong. Альтернатива - переписать класс на Avro с aliases, тогда evolution бесплатна. Общий вывод: оба изменения нужно катить отдельными pull request-ами, тестировать restore из prev savepoint в dev, и НИКОГДА не делать оба изменения вместе.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Production Flink-джоб без явных UID на операторах работает уже 6 месяцев со state 80 ГБ. Команда хочет добавить filter() в начале графа (отбросить test events). Что произойдёт при kubectl apply?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5