Schema evolution — POJO, Avro, миграция
Любой production Flink-job рано или поздно сталкивается с необходимостью изменить schema state. Вы добавляете новое поле в UserProfile POJO, переименовываете property, или хотите перейти с Java POJO на Avro для better evolution. Без поддержки schema evolution каждое такое изменение требует полного рестарта job-а с потерей state — что для большинства production систем неприемлемо.
Flink поддерживает schema evolution для нескольких сериализаторов с разными ограничениями. POJO — самый ограниченный (add/remove fields, не change types). Avro — самый гибкий (полная Avro compatibility rules). В этом уроке разбираем правила evolution для каждого сериализатора, как Flink detect-ит migration, и какие patterns работают в production.
Schema Registry: режимы совместимости Avro-схем Schema evolution для state: POJO, Avro и UIDЗачем нужна schema evolution
Без schema evolution каждое изменение state class требует full reprocessing:
- Take savepoint с old schema.
- Modify class.
- Try restore -> deserialization fails (старый byte format не матчится новому class).
- Drop savepoint, restart с нуля.
Это работает для маленьких job-ов, но для production с большим state и event history это означает days of reprocessing. Schema evolution позволяет:
- Add new fields с default value (старый state читается, новое поле получает default).
- Remove fields (старые значения игнорируются при read).
- Reorder fields (mapped по name, не position).
- Sometimes change types через explicit migration.
Flink determines migration through TypeSerializerSnapshot — каждый serializer хранит описание своего схемы в savepoint metadata. При restore Flink сравнивает old snapshot с new serializer и решает: compatible without migration, compatible with migration (нужно конвертировать), or incompatible (restore failes).
TypeSerializerSnapshot: контракт
Когда Flink делает snapshot, для каждого State Descriptor он сохраняет:
- Bytes state (фактические данные).
- TypeSerializerSnapshot — описание сериализатора, который использовался.
TypeSerializerSnapshot — это metadata, описывающее format. Для POJO snapshot содержит:
- Class name.
- Field names and types в order, в котором они были при snapshot.
- Per-field TypeSerializerSnapshot.
При restore Flink создаёт new serializer из current code (current POJO class) и спрашивает: resolveSchemaCompatibility(oldSnapshot). Метод возвращает один из:
- COMPATIBLE_AS_IS — нет миграции, читать как есть.
- COMPATIBLE_AFTER_MIGRATION — нужно прочитать с old serializer и записать с new (full pass через state).
- COMPATIBLE_WITH_RECONFIGURED_SERIALIZER — new serializer был reconfigured (например, для backwards compat field ordering).
- INCOMPATIBLE — fail restore.
public abstract class TypeSerializerSnapshot<T> {
public abstract TypeSerializerSchemaCompatibility<T>
resolveSchemaCompatibility(TypeSerializer<T> newSerializer);
}
Это та абстракция, через которую Flink unifies evolution rules для разных типов state.
POJO evolution rules
POJO — самый распространённый serializer для Flink-state (когда вы пишете class UserProfile { String name; int age; }). Правила evolution:
Разрешено:
- Добавлять new fields. Старые snapshot не имеют эти поля — при read получают default value (null, 0, false).
- Удалять fields. Старые snapshot имеют эти поля — при read они skipп-ятся.
- Переставлять fields. POJO serializer matches by field name, не position.
- Менять access modifiers (public/private), getters/setters.
Запрещено:
- Менять type существующего field.
int age -> long age— INCOMPATIBLE. - Менять nested class type (если ваш field был
Addressи сталString). - Удалять field, который был part of equality (POJO supports primary key concept не напрямую, but if state ID changes meaning может стать broken).
Особенный case: nullable to non-nullable. Если в old schema field был Integer age (nullable), а в new schema стал int age (non-nullable), null values из old state могут привести к NullPointerException при auto-unboxing. Flink не предупреждает об этом — нужно осторожность.
Пример migration:
// Old POJO
public class UserProfile {
public String userId;
public String name;
public int age;
}
// New POJO (compatible evolution: added email, removed nothing)
public class UserProfile {
public String userId;
public String name;
public int age;
public String email; // NEW field
}
При restore: existing UserProfile entries загрузятся с email = null (default for String). Новые puts будут писать с email. Никаких проблем.
// New POJO (INCOMPATIBLE: changed type)
public class UserProfile {
public String userId;
public String name;
public long age; // int -> long, INCOMPATIBLE
public String email;
}
При restore: INCOMPATIBLE -> job fails. Нужна migration через State Processor API: read с старым serializer, transform, write с новым.
Avro evolution rules
Avro имеет formal schema language и rich evolution rules. Avro state хранится в binary Avro format, и при restore Flink использует Avro schema resolution.
Правила Avro evolution (subset, относящийся к Flink state):
Разрешено:
- Добавлять field с default value (default обязателен для evolution).
- Удалять field (если он не required).
- Переименовать field через aliases.
- Изменить type через promotion:
int -> long,int -> float,long -> double,float -> double. Promotions безопасны (no loss). - Добавить значение в enum (с default fallback).
- Сделать field nullable через union type.
Запрещено:
- Удалить required field без default.
- Сменить type non-promotably:
string -> int. - Удалить значение из enum.
Avro evolution более liberal, чем POJO, потому что Avro schema explicitly versioned и schema resolution — built-in feature.
Для использования Avro в Flink state:
import org.apache.avro.specific.SpecificRecord;
import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
ValueState<UserProfileAvro> state = getRuntimeContext().getState(
new ValueStateDescriptor<>(
"user-profile",
AvroTypeInfo.of(UserProfileAvro.class) // explicit Avro
)
);
Или для generic schemas:
ValueState<GenericRecord> state = getRuntimeContext().getState(
new ValueStateDescriptor<>(
"user-profile",
new GenericRecordAvroTypeInfo(schema)
)
);
В production Avro-based state часто используется для evolutionary-heavy workloads — например, event-driven system, где event schema постоянно растёт.
Operator UID как контракт schema
В прошлых уроках мы упоминали важность .uid() для restore. Здесь это становится особо критично — UID работает как stable identifier, который связывает state в savepoint с operator в новом DAG.
Контракт: UID never changes during product lifetime. UID — это implementation detail, не user-visible. Не меняйте его, даже если хотите rename класса или operator-а. Если нужно действительно migrate state на новый UID — делайте через State Processor API (read state с old UID, write с new UID).
Migration через State Processor API
Когда automatic schema evolution не работает (incompatible change), используется manual migration:
// Read с old schema
SavepointReader reader = SavepointReader.read(env, oldSavepoint, new EmbeddedRocksDBStateBackend());
DataStream<OldUserProfile> oldProfiles = reader.readKeyedState(
OperatorIdentifier.forUid("fraud-detector"),
new OldProfileReader()
);
// Transform к new schema
DataStream<NewUserProfile> newProfiles = oldProfiles.map(old ->
new NewUserProfile(
old.userId,
old.name,
(long) old.age, // int -> long
deriveEmail(old.userId) // populate new field
)
);
// Write новый savepoint с new schema
SavepointWriter writer = SavepointWriter
.newSavepoint(env, new EmbeddedRocksDBStateBackend(), 128)
.withOperator(
OperatorIdentifier.forUid("fraud-detector"),
OperatorTransformation
.bootstrapWith(newProfiles)
.keyBy(NewUserProfile::getUserId)
.transform(new NewProfileBootstrapFunction())
);
writer.write(newSavepoint);
Это тяжёлая операция, но универсальная — works for any schema change.
Production patterns
Pattern 1: Defensive new fields. Когда добавляете field в production POJO state, всегда обрабатывайте null case:
public class UserProfile {
public String email;
public String getEmailOrDefault() {
return email == null ? "[email protected]" : email;
}
}
Это критично, потому что после restore старые entries будут иметь email = null.
Pattern 2: Versioned migration. Если у вас несколько evolution step-ов:
- v1 -> v2: added email (compatible).
- v2 -> v3: changed age type int -> long (incompatible).
Don’t try to go v1 -> v3 directly. Сначала restore from v1 savepoint в v2 code (compatible evolution), take savepoint v2, потом manual migration v2 -> v3 через State Processor API.
Pattern 3: Avro для heavy evolution. Если ваш state class регулярно меняется (новые events, новые fields), переключитесь на Avro early. POJO evolution rules слишком restrictive для long-term evolution.
Pattern 4: Testing schema evolution. Перед deploying changes к state classes, run integration test: create savepoint с old class, restore с new class, verify entries readable. Это часто-overlooked test, который ловит INCOMPATIBLE changes до production.
Pattern 5: Separate state operations. Если есть много state с разной evolution velocity, разделите их по operators. Один operator — stable schema, другой — frequently evolving. Это limit-ит blast radius при incompatible changes.
Kryo serializer (Flink fallback для non-POJO non-Avro types) не поддерживает schema evolution. Если вы добавляете field в class, который сериализуется через Kryo — restore fails. Решение: для evolution-heavy state используйте explicit POJO или Avro. Если уже на Kryo — мigrate через State Processor API при следующем breaking change.
Trade-offs serializers
| Serializer | Performance | Size | Evolution | Use case |
|---|---|---|---|---|
| POJO | Fast | Medium | Limited (add/remove only) | Most common, default |
| Avro | Medium | Compact | Excellent | Heavy evolution, schema registry integration |
| Kryo | Slow | Large | None | Fallback, AVOID for state |
| Tuple | Fastest | Smallest | None | Performance-critical, fixed schema |
| String/Long/Int (primitives) | Fastest | Smallest | N/A | Primitives обходят сериализацию |
В production:
- Primitives для key и value, когда возможно.
- POJO для structured state с low evolution rate.
- Avro для event-driven systems с frequent schema changes.
- Never Kryo для production state.
Чтение source
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot— interface для schema metadata.org.apache.flink.api.java.typeutils.runtime.PojoSerializer— POJO serializer implementation.org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot— snapshot для POJO.org.apache.flink.formats.avro.typeutils.AvroSerializerSnapshot— snapshot для Avro.- FLIP-31 (State schema migration) в Apache Flink Wiki — original design.
Попробуй сам
-
Test POJO evolution. Создайте job с POJO state, run, take savepoint. Modify POJO (add field), restore. Verify, что existing entries readable с null field. Затем change field type, restore — должно fail.
-
Avro state experiment. Перепишите job на Avro state. Take savepoint, modify Avro schema (compatibly — add field с default), restore. Compare с POJO experience.
-
Manual migration через API. Симулируйте incompatible change (int -> long). Используйте SavepointReader+SavepointWriter для convert old savepoint в new format. Это тренировка для real production migration.