Learning Platform
Глоссарий Troubleshooting
Урок 08.04 · 26 мин
Продвинутый
Schema EvolutionPOJO SerializerAvro CompatibilityTypeSerializerSnapshotState Migration

Schema evolution — POJO, Avro, миграция

Любой production Flink-job рано или поздно сталкивается с необходимостью изменить schema state. Вы добавляете новое поле в UserProfile POJO, переименовываете property, или хотите перейти с Java POJO на Avro для better evolution. Без поддержки schema evolution каждое такое изменение требует полного рестарта job-а с потерей state — что для большинства production систем неприемлемо.

Flink поддерживает schema evolution для нескольких сериализаторов с разными ограничениями. POJO — самый ограниченный (add/remove fields, не change types). Avro — самый гибкий (полная Avro compatibility rules). В этом уроке разбираем правила evolution для каждого сериализатора, как Flink detect-ит migration, и какие patterns работают в production.

Schema Registry: режимы совместимости Avro-схем Schema evolution для state: POJO, Avro и UID

Зачем нужна schema evolution

Без schema evolution каждое изменение state class требует full reprocessing:

  1. Take savepoint с old schema.
  2. Modify class.
  3. Try restore -> deserialization fails (старый byte format не матчится новому class).
  4. Drop savepoint, restart с нуля.

Это работает для маленьких job-ов, но для production с большим state и event history это означает days of reprocessing. Schema evolution позволяет:

  • Add new fields с default value (старый state читается, новое поле получает default).
  • Remove fields (старые значения игнорируются при read).
  • Reorder fields (mapped по name, не position).
  • Sometimes change types через explicit migration.

Flink determines migration through TypeSerializerSnapshot — каждый serializer хранит описание своего схемы в savepoint metadata. При restore Flink сравнивает old snapshot с new serializer и решает: compatible without migration, compatible with migration (нужно конвертировать), or incompatible (restore failes).


TypeSerializerSnapshot: контракт

Когда Flink делает snapshot, для каждого State Descriptor он сохраняет:

  • Bytes state (фактические данные).
  • TypeSerializerSnapshot — описание сериализатора, который использовался.

TypeSerializerSnapshot — это metadata, описывающее format. Для POJO snapshot содержит:

  • Class name.
  • Field names and types в order, в котором они были при snapshot.
  • Per-field TypeSerializerSnapshot.

При restore Flink создаёт new serializer из current code (current POJO class) и спрашивает: resolveSchemaCompatibility(oldSnapshot). Метод возвращает один из:

  • COMPATIBLE_AS_IS — нет миграции, читать как есть.
  • COMPATIBLE_AFTER_MIGRATION — нужно прочитать с old serializer и записать с new (full pass через state).
  • COMPATIBLE_WITH_RECONFIGURED_SERIALIZER — new serializer был reconfigured (например, для backwards compat field ordering).
  • INCOMPATIBLE — fail restore.
public abstract class TypeSerializerSnapshot<T> {
    public abstract TypeSerializerSchemaCompatibility<T> 
        resolveSchemaCompatibility(TypeSerializer<T> newSerializer);
}

Это та абстракция, через которую Flink unifies evolution rules для разных типов state.


POJO evolution rules

POJO — самый распространённый serializer для Flink-state (когда вы пишете class UserProfile { String name; int age; }). Правила evolution:

Разрешено:

  • Добавлять new fields. Старые snapshot не имеют эти поля — при read получают default value (null, 0, false).
  • Удалять fields. Старые snapshot имеют эти поля — при read они skipп-ятся.
  • Переставлять fields. POJO serializer matches by field name, не position.
  • Менять access modifiers (public/private), getters/setters.

Запрещено:

  • Менять type существующего field. int age -> long age — INCOMPATIBLE.
  • Менять nested class type (если ваш field был Address и стал String).
  • Удалять field, который был part of equality (POJO supports primary key concept не напрямую, but if state ID changes meaning может стать broken).

Особенный case: nullable to non-nullable. Если в old schema field был Integer age (nullable), а в new schema стал int age (non-nullable), null values из old state могут привести к NullPointerException при auto-unboxing. Flink не предупреждает об этом — нужно осторожность.

Пример migration:

// Old POJO
public class UserProfile {
    public String userId;
    public String name;
    public int age;
}

// New POJO (compatible evolution: added email, removed nothing)
public class UserProfile {
    public String userId;
    public String name;
    public int age;
    public String email;  // NEW field
}

При restore: existing UserProfile entries загрузятся с email = null (default for String). Новые puts будут писать с email. Никаких проблем.

// New POJO (INCOMPATIBLE: changed type)
public class UserProfile {
    public String userId;
    public String name;
    public long age;  // int -> long, INCOMPATIBLE
    public String email;
}

При restore: INCOMPATIBLE -> job fails. Нужна migration через State Processor API: read с старым serializer, transform, write с новым.


Avro evolution rules

Avro имеет formal schema language и rich evolution rules. Avro state хранится в binary Avro format, и при restore Flink использует Avro schema resolution.

Правила Avro evolution (subset, относящийся к Flink state):

Разрешено:

  • Добавлять field с default value (default обязателен для evolution).
  • Удалять field (если он не required).
  • Переименовать field через aliases.
  • Изменить type через promotion: int -> long, int -> float, long -> double, float -> double. Promotions безопасны (no loss).
  • Добавить значение в enum (с default fallback).
  • Сделать field nullable через union type.

Запрещено:

  • Удалить required field без default.
  • Сменить type non-promotably: string -> int.
  • Удалить значение из enum.

Avro evolution более liberal, чем POJO, потому что Avro schema explicitly versioned и schema resolution — built-in feature.

Для использования Avro в Flink state:

import org.apache.avro.specific.SpecificRecord;
import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;

ValueState<UserProfileAvro> state = getRuntimeContext().getState(
    new ValueStateDescriptor<>(
        "user-profile",
        AvroTypeInfo.of(UserProfileAvro.class)  // explicit Avro
    )
);

Или для generic schemas:

ValueState<GenericRecord> state = getRuntimeContext().getState(
    new ValueStateDescriptor<>(
        "user-profile",
        new GenericRecordAvroTypeInfo(schema)
    )
);

В production Avro-based state часто используется для evolutionary-heavy workloads — например, event-driven system, где event schema постоянно растёт.


Operator UID как контракт schema

В прошлых уроках мы упоминали важность .uid() для restore. Здесь это становится особо критично — UID работает как stable identifier, который связывает state в savepoint с operator в новом DAG.

Operator UID как контракт между versions
v1 codeOld version code, deployed в production. UID 'fraud-detector' зафиксирован, state size 5 GB.
savepoint
savepoint ASavepoint содержит OperatorID(hash('fraud-detector')) -> state file.
v2 codeNew version. UID сохранён 'fraud-detector', изменения только в state class (added email field в POJO).
restore
state loadedOperatorID matches -> state loaded -> POJO evolution detected -> email = null для existing entries.
v3 codeBad version. UID accidentally changed на 'fraud-detector-v2' для clarity. Это breaking change.
restore
state LOSTNew OperatorID hash('fraud-detector-v2') не найден в savepoint. State пустой. Если без --allowNonRestoredState — fail. С флагом — empty state. Data loss.

Контракт: UID never changes during product lifetime. UID — это implementation detail, не user-visible. Не меняйте его, даже если хотите rename класса или operator-а. Если нужно действительно migrate state на новый UID — делайте через State Processor API (read state с old UID, write с new UID).


Migration через State Processor API

Когда automatic schema evolution не работает (incompatible change), используется manual migration:

// Read с old schema
SavepointReader reader = SavepointReader.read(env, oldSavepoint, new EmbeddedRocksDBStateBackend());

DataStream<OldUserProfile> oldProfiles = reader.readKeyedState(
    OperatorIdentifier.forUid("fraud-detector"),
    new OldProfileReader()
);

// Transform к new schema
DataStream<NewUserProfile> newProfiles = oldProfiles.map(old -> 
    new NewUserProfile(
        old.userId,
        old.name,
        (long) old.age,  // int -> long
        deriveEmail(old.userId)  // populate new field
    )
);

// Write новый savepoint с new schema
SavepointWriter writer = SavepointWriter
    .newSavepoint(env, new EmbeddedRocksDBStateBackend(), 128)
    .withOperator(
        OperatorIdentifier.forUid("fraud-detector"),
        OperatorTransformation
            .bootstrapWith(newProfiles)
            .keyBy(NewUserProfile::getUserId)
            .transform(new NewProfileBootstrapFunction())
    );

writer.write(newSavepoint);

Это тяжёлая операция, но универсальная — works for any schema change.


Production patterns

Pattern 1: Defensive new fields. Когда добавляете field в production POJO state, всегда обрабатывайте null case:

public class UserProfile {
    public String email;
    
    public String getEmailOrDefault() {
        return email == null ? "[email protected]" : email;
    }
}

Это критично, потому что после restore старые entries будут иметь email = null.

Pattern 2: Versioned migration. Если у вас несколько evolution step-ов:

  • v1 -> v2: added email (compatible).
  • v2 -> v3: changed age type int -> long (incompatible).

Don’t try to go v1 -> v3 directly. Сначала restore from v1 savepoint в v2 code (compatible evolution), take savepoint v2, потом manual migration v2 -> v3 через State Processor API.

Pattern 3: Avro для heavy evolution. Если ваш state class регулярно меняется (новые events, новые fields), переключитесь на Avro early. POJO evolution rules слишком restrictive для long-term evolution.

Pattern 4: Testing schema evolution. Перед deploying changes к state classes, run integration test: create savepoint с old class, restore с new class, verify entries readable. Это часто-overlooked test, который ловит INCOMPATIBLE changes до production.

Pattern 5: Separate state operations. Если есть много state с разной evolution velocity, разделите их по operators. Один operator — stable schema, другой — frequently evolving. Это limit-ит blast radius при incompatible changes.

WARNING

Kryo serializer (Flink fallback для non-POJO non-Avro types) не поддерживает schema evolution. Если вы добавляете field в class, который сериализуется через Kryo — restore fails. Решение: для evolution-heavy state используйте explicit POJO или Avro. Если уже на Kryo — мigrate через State Processor API при следующем breaking change.


Trade-offs serializers

SerializerPerformanceSizeEvolutionUse case
POJOFastMediumLimited (add/remove only)Most common, default
AvroMediumCompactExcellentHeavy evolution, schema registry integration
KryoSlowLargeNoneFallback, AVOID for state
TupleFastestSmallestNonePerformance-critical, fixed schema
String/Long/Int (primitives)FastestSmallestN/APrimitives обходят сериализацию

В production:

  • Primitives для key и value, когда возможно.
  • POJO для structured state с low evolution rate.
  • Avro для event-driven systems с frequent schema changes.
  • Never Kryo для production state.

Чтение source

  • org.apache.flink.api.common.typeutils.TypeSerializerSnapshot — interface для schema metadata.
  • org.apache.flink.api.java.typeutils.runtime.PojoSerializer — POJO serializer implementation.
  • org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot — snapshot для POJO.
  • org.apache.flink.formats.avro.typeutils.AvroSerializerSnapshot — snapshot для Avro.
  • FLIP-31 (State schema migration) в Apache Flink Wiki — original design.

Попробуй сам

  1. Test POJO evolution. Создайте job с POJO state, run, take savepoint. Modify POJO (add field), restore. Verify, что existing entries readable с null field. Затем change field type, restore — должно fail.

  2. Avro state experiment. Перепишите job на Avro state. Take savepoint, modify Avro schema (compatibly — add field с default), restore. Compare с POJO experience.

  3. Manual migration через API. Симулируйте incompatible change (int -> long). Используйте SavepointReader+SavepointWriter для convert old savepoint в new format. Это тренировка для real production migration.

Проверка знанийKnowledge check
У вас production job 6 месяцев на POJO state UserProfile { String userId; String name; int credits; }. Бизнес требует: 1) поддерживать large credits values (overflow int), 2) добавить email field, 3) переименовать credits в balance для clarity. Какой safe migration plan и какие шаги МОГУТ потерять state без правильного подхода?
ОтветAnswer
Безопасный план — incremental, по одному change за раз, с testing между: STEP 1 (add email): добавьте String email field в POJO. Это compatible evolution. Take savepoint v1.0, deploy v1.1, restore — works, existing entries имеют email = null. STEP 2 (rename credits to balance): POJO matches fields by name, поэтому rename = effectively delete credits + add balance. На restore: credits field в old snapshot будет skipped (lost), balance в new schema получит default value 0. Это data loss! Чтобы избежать: использовать @AlternateFieldName аннотацию (если поддерживается) ИЛИ migrate manually: SavepointReader.readKeyedState, transform с copy credits -> balance, SavepointWriter с new schema. STEP 3 (int -> long для balance): incompatible. Это требует manual migration через State Processor API. Read с старым POJO (где balance был int), write с новым (long). Что МОЖЕТ потерять state: a) Делать все changes одновременно в одном release — POJO rename + type change в одном deployment может silent corrupt restore (часть fields skipped, default values вместо real data, и из-за type incompatibility — INCOMPATIBLE fail). b) Использовать --allowNonRestoredState флаг для bypass error — это hide проблему, prod продолжает работать, но с lost data. c) Не testing migration перед deploy: всегда run staging test с production savepoint, verify counts match. Правильный production flow: incremental changes, each tested via integration test (old savepoint + new code = success), separate releases для каждого incompatible change.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Какие изменения POJO state разрешены для compatible schema evolution в Flink?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4