Schema evolution для state: POJO, Avro и UID
Стримовый джоб живёт месяцами и годами. За это время бизнес-требования меняются: добавляется поле в события, переименовывается, меняется тип. Если бы Flink-стейт был хрупким — каждое изменение требовало бы полного передеплоя с обнулением, потерей агрегаций, истории. К счастью, Flink поддерживает schema evolution — возможность изменять структуру state-классов и продолжать работу с уже накопленным стейтом.
Но есть подводные камни. Главный — это operator UID. Потерять UID = потерять state. Этот урок — про правила эволюции POJO и Avro, и почему UID нужно проставлять ВСЕМ stateful операторам с первого дня.
Что такое schema evolution для state
Когда оператор пишет state (ValueState, ListState, MapState), Flink сериализует объект через TypeSerializer. При checkpoint этот сериализованный stream сохраняется в state backend (RocksDB), а затем в S3. При восстановлении из savepoint — десериализуется обратно.
Schema evolution — это способность Flink восстановить state при изменении класса:
- Добавлено новое поле (значение по умолчанию для старых записей).
- Удалено поле.
- Переименовано поле (для Avro через aliases).
- Изменён primitive тип (int -> long, с проверкой совместимости).
Без schema evolution: меняете класс — старый сериализатор не может десериализовать новые данные, или наоборот. Джоб не стартует, savepoint не восстанавливается.
В Flink сериализаторы делятся на категории:
- PojoSerializer для POJO-классов (стандартный Java/Kotlin класс с getters/setters).
- AvroSerializer для Avro-классов (генерированных из .avsc схем).
- KryoSerializer — fallback для всего, что не POJO и не Avro. Кошмарен для schema evolution.
POJO: что можно и что нельзя
POJO в Flink — это Java/Kotlin класс с публичными полями ИЛИ с getter/setter pair-ами, имеющий public no-args constructor. Flink определяет POJO через type analysis: если класс удовлетворяет требованиям — используется PojoSerializer.
Что можно (forward compatible):
// Версия 1
public class Order {
public long orderId;
public String userId;
public double amount;
}
// Версия 2: добавили поле
public class Order {
public long orderId;
public String userId;
public double amount;
public String currency; // NEW
}
Это работает. При восстановлении старых записей currency будет null (для objects) или 0 (для primitives). Можно добавлять любое количество новых полей.
// Версия 3: удалили поле amount
public class Order {
public long orderId;
public String userId;
public String currency;
}
Тоже работает. Старое поле amount игнорируется при десериализации.
Что НЕЛЬЗЯ (breaking change):
- Изменить тип поля (
long -> String). - Переименовать поле (Flink видит это как удаление+добавление, но потеряет данные).
- Изменить класс на не-POJO (например, добавить final field без default value).
- Поменять
Orderна наследника или интерфейс.
При попытке restore с несовместимым классом джоб упадёт с StateMigrationException.
Переименование полей в POJO — это операция, которая молча теряет данные старого поля. Если переименование критично, делайте это через две версии: сначала добавить новое поле и копировать данные через ProcessFunction, потом удалить старое.
Avro: золотой стандарт evolution
Если у вас сложная эволюция (часто меняющиеся события), используйте Avro классы для state. Avro spec изначально проектировался для schema evolution, имеет богатые инструменты:
- aliases: новое поле может ссылаться на старое имя.
- default values: значение по умолчанию для новых полей при восстановлении старых записей.
- type promotion: int -> long, float -> double, и т.д.
- union types: nullable через
["null", "string"].
// orders.avsc - версия 1
{
"type": "record",
"name": "Order",
"namespace": "com.acme.events",
"fields": [
{"name": "orderId", "type": "long"},
{"name": "userId", "type": "string"},
{"name": "amount", "type": "double"}
]
}
// Версия 2: добавили currency с default, переименовали amount -> totalAmount через alias
{
"type": "record",
"name": "Order",
"namespace": "com.acme.events",
"fields": [
{"name": "orderId", "type": "long"},
{"name": "userId", "type": "string"},
{"name": "totalAmount", "type": "double", "aliases": ["amount"]},
{"name": "currency", "type": "string", "default": "USD"}
]
}
Avro плагин (avro-maven-plugin или gradle) генерирует Java-класс Order. Flink-у нужно указать AvroSerializer для этого типа:
DataStream<Order> orders = ...;
TypeInformation<Order> typeInfo = TypeInformation.of(new TypeHint<Order>(){});
// Avro classes по default используют AvroSerializer
При restore из savepoint v1 в код v2:
- Старые записи (без currency) получают
"USD"по умолчанию. - Старое поле
amountмапится на новоеtotalAmountчерез alias.
Это и есть зрелая evolution. В крупных проектах с быстро меняющимися событиями Avro — практически дефолтный выбор для state.
Operator UID: критическая роль
Теперь самое главное. Каждый оператор в DataStream API получает operator UID — уникальный идентификатор внутри ExecutionGraph. По UID Flink сопоставляет state в savepoint с операторами в новой версии графа.
DataStream<Order> orders = env.fromSource(...)
.uid("kafka-source-orders"); // <-- UID
DataStream<EnrichedOrder> enriched = orders
.keyBy(o -> o.userId)
.process(new EnrichmentFunction())
.uid("enrichment-by-user"); // <-- UID
DataStream<AggregatedOrder> agg = enriched
.keyBy(o -> o.region)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new SumAggregate())
.uid("region-window-aggregate"); // <-- UID
agg.sinkTo(...)
.uid("iceberg-sink-aggregates"); // <-- UID
Без явного UID Flink генерирует UID по хешу графа — то есть UID меняется при любом изменении до этого оператора в графе. Добавили map() в середине пайплайна — все downstream UID-ы поменялись — все downstream state потеряны.
Правило: проставляйте UID всем stateful операторам с первого дня. Это бесплатно и спасает в будущем.
Если вы не проставили UID на stateful операторах и катите изменение графа (даже добавление безобидного map) — Flink не сможет восстановить state, потому что хеш-UID-ы поменялись. При restore из savepoint с allowNonRestoredState=true джоб запустится, но state пуст. С allowNonRestoredState=false (default) — джоб не стартует. Это часто становится драматичной потерей agg-данных в production.
Что делать, если забыли UID
Допустим, джоб уже в production без UID-ов. Стейт накоплен. Что делать?
Шаг 1: НЕ менять граф. Любое изменение — потеря state.
Шаг 2: Сделать savepoint текущего состояния (с авто-генерированными UID-ами).
Шаг 3: Изучить map UID -> state в savepoint. Используйте Flink State Processor API (см. Flink docs) для чтения metadata savepoint-а — вы увидите, какие UID-ы автогенерированы.
Шаг 4: Добавить явные UID-ы в код, СОВПАДАЮЩИЕ с автогенерированными.
// До: без UID, autogen UID = 5a7f3b2c1d4e9a8b
.process(new EnrichmentFunction())
// После: тот же UID явно
.process(new EnrichmentFunction()).uid("5a7f3b2c1d4e9a8b")
Это сохраняет state как был. Дальше можно постепенно переходить на читаемые UID-ы через специальные миграции (это уже advanced topic, см. State Processor API).
Шаг 5: Не делать так больше. В team conventions проставляйте UID на все .process(), .keyBy().window(), .fromSource(), .sinkTo() — то есть все stateful operations.
Миграция стейта между версиями Flink
Savepoint-ы Flink версионно совместимы. Saved в Flink 1.18 — restore in 2.2 работает. Но есть нюансы:
- Canonical format (default до Flink 2.0): savepoint в backend-агностичном формате. Медленнее, но переносим между RocksDB и HashMap.
- Native format (default с Flink 2.0): savepoint в формате backend-а. Быстрее (incremental), но привязан к backend-у.
Для миграции версий Flink или backend-а — используйте canonical format при создании savepoint:
flink savepoint :jobId :path -type canonical
Или в FlinkDeployment:
flinkConfiguration:
state.savepoints.format: canonical # для миграции
# или native для regular upgrades в той же версии
Между минорными версиями Flink (1.18 -> 1.19 -> 1.20) обычно нет проблем. Major (1.x -> 2.x) требует canonical format и тестирования.
Pre-flight check для production деплоя
Перед каждым production деплоем со state changes:
-
Локально: restore savepoint в dev-кластер с новым кодом. Если не стартует — фикс перед деплоем.
-
Schema validation: для Avro —
mvn avro:compileвалидирует совместимость с предыдущей версией .avsc (если в git есть прошлая версия — добавьте в pom check на compatibility). -
Test against minicluster: интеграционный тест с savepoint предыдущей версии (см. урок 4 этого модуля про CI/CD).
-
UID audit: автоматизированно скриптом грепать все
.process(),.keyBy().window(),.fromSource(), проверять, что у каждого.uid()рядом. Можно сделать ArchUnit-правилом.
# Простой grep audit
grep -rE "\.process\(|\.window\(|\.fromSource\(|\.sinkTo\(" src/main/java | \
grep -v "\.uid("
# Должно быть пусто
- Rollback plan: знать savepoint для отката, путь зарегистрирован в external store.
Ключевые выводы
-
Schema evolution в Flink возможна для POJO (add/remove полей) и Avro (полная поддержка: aliases, defaults, type promotion). KryoSerializer schema evolution НЕ поддерживает.
-
POJO правила: можно добавлять/удалять поля. НЕЛЬЗЯ менять тип, переименовывать, менять иерархию.
-
Avro — золотой стандарт для сложной evolution. Aliases для переименования, defaults для новых полей, type promotion.
-
Operator UID — критичен. Без UID Flink генерирует хеш по графу, любое изменение = потеря state. Проставляйте
.uid()всем stateful операторам ВСЕГДА. -
Восстановление без UID возможно через State Processor API: читаете autogen UID-ы из savepoint metadata, проставляете их явно в код, продолжаете жить.
-
Pre-flight check: restore в dev, valid Avro schema, integration test, UID audit, rollback plan. Без этого production-деплои со state changes — лотерея.
-
Между версиями Flink используйте canonical format savepoint для миграции major-версий. Native — для regular weekly upgrades.